Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions ROADMAP.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,16 @@
- `AgentCardResolver` actor with TTL-based caching for multi-agent discovery
- Updated A2AServer sample to use `A2AVapor`

## Short Term

### A2AHummingbird Integration
- Add `A2AHummingbird` target for Hummingbird 2.0+ integration
- Same pattern as A2AVapor — separate product, no forced dependency
### v0.4.0 — Streaming Resilience
- SSE reconnection with configurable retry, exponential backoff, and jitter (`SSEConfiguration`)
- `SSELineParser` for proper SSE field parsing (`data:`, `id:`, `retry:`, `event:`)
- Server-side SSE `id:` and `retry:` field emission for reconnection support
- `Last-Event-ID` header on reconnect with event deduplication
- `ConnectionState` enum and `StreamingSession` type for connection health monitoring
- `sendStreamingMessageWithSession` / `subscribeToTaskWithSession` — rich streaming APIs
- Existing streaming methods unchanged (non-breaking)

### Client Enhancements
- SSE reconnection with automatic retry and last-event-id
- Connection health monitoring
## Short Term

## Medium Term

Expand Down
3 changes: 3 additions & 0 deletions Sources/A2A/A2A.docc/A2A.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ let router = A2ARouter(handler: handler)
- ``EventQueueManager``
- ``EventSubscription``
- ``StreamResponseSequence``
- ``StreamingSession``
- ``ConnectionState``
- ``SSEConfiguration``

### Request & Response Types

Expand Down
259 changes: 199 additions & 60 deletions Sources/A2A/Client/A2AClient.swift

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions Sources/A2A/Client/ConnectionState.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import Foundation

/// Represents the state of an SSE streaming connection.
///
/// Use with ``StreamingSession/connectionState`` to monitor connection health
/// during streaming operations.
public enum ConnectionState: Sendable {
/// The connection is active and receiving events.
case connected

/// The connection dropped and is being re-established.
case reconnecting(attempt: Int, maxAttempts: Int)

/// The connection has been permanently lost.
case disconnected(any Error)
}
65 changes: 65 additions & 0 deletions Sources/A2A/Client/SSEConfiguration.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import Foundation

/// Configuration for SSE streaming reconnection behavior.
///
/// When a streaming connection drops unexpectedly, the client can automatically
/// retry with exponential backoff. Use ``default`` for sensible defaults or
/// ``disabled`` to opt out of reconnection.
///
/// ```swift
/// // Default reconnection (3 retries with exponential backoff)
/// let client = A2AClient(baseURL: url)
///
/// // Custom configuration
/// let client = A2AClient(
/// baseURL: url,
/// sseConfiguration: SSEConfiguration(maxRetries: 5, initialRetryInterval: 2.0)
/// )
///
/// // Disable reconnection
/// let client = A2AClient(baseURL: url, sseConfiguration: .disabled)
/// ```
public struct SSEConfiguration: Sendable, Equatable {
/// Maximum number of reconnection attempts before giving up.
public var maxRetries: Int

/// Initial delay between reconnection attempts in seconds.
public var initialRetryInterval: TimeInterval

/// Maximum delay between reconnection attempts in seconds.
public var maxRetryInterval: TimeInterval

/// Multiplier applied to the retry interval after each failed attempt.
public var backoffMultiplier: Double

/// Fraction of the retry interval to use as random jitter (0.0–1.0).
public var jitterFraction: Double

public init(
maxRetries: Int = 3,
initialRetryInterval: TimeInterval = 1.0,
maxRetryInterval: TimeInterval = 30.0,
backoffMultiplier: Double = 2.0,
jitterFraction: Double = 0.1
) {
self.maxRetries = maxRetries
self.initialRetryInterval = initialRetryInterval
self.maxRetryInterval = maxRetryInterval
self.backoffMultiplier = backoffMultiplier
self.jitterFraction = jitterFraction
}

/// Default configuration with 3 retries and exponential backoff.
public static let `default` = SSEConfiguration()

/// Disabled reconnection — errors are thrown immediately.
public static let disabled = SSEConfiguration(maxRetries: 0)

/// Calculates the delay for a given retry attempt, incorporating backoff and jitter.
internal func delay(forAttempt attempt: Int) -> TimeInterval {
let base = initialRetryInterval * pow(backoffMultiplier, Double(attempt))
let clamped = min(base, maxRetryInterval)
let jitter = clamped * jitterFraction * Double.random(in: -1...1)
return max(0, clamped + jitter)
}
}
62 changes: 62 additions & 0 deletions Sources/A2A/Client/SSELineParser.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import Foundation

/// Internal parser for Server-Sent Events (SSE) lines.
///
/// Handles the SSE protocol fields: `data:`, `id:`, `retry:`, and `event:`.
/// Tracks the last event ID and server-suggested retry interval for reconnection.
struct SSELineParser: Sendable {
/// The last received event ID, used for `Last-Event-ID` header on reconnect.
private(set) var lastEventId: String?

/// Server-suggested retry interval in seconds, if received.
private(set) var serverRetryInterval: TimeInterval?

enum Field: Sendable, Equatable {
case data(String)
case id(String)
case retry(Int)
case event(String)
case comment
case empty
}

/// Parses a single SSE line and returns the field type.
mutating func parse(line: String) -> Field {
let trimmed = line.trimmingCharacters(in: .whitespaces)

if trimmed.isEmpty {
return .empty
}

if trimmed.hasPrefix(":") {
return .comment
}

if trimmed.hasPrefix("data:") {
let value = String(trimmed.dropFirst(5)).trimmingCharacters(in: .whitespaces)
return .data(value)
}

if trimmed.hasPrefix("id:") {
let value = String(trimmed.dropFirst(3)).trimmingCharacters(in: .whitespaces)
lastEventId = value
return .id(value)
}

if trimmed.hasPrefix("retry:") {
let value = String(trimmed.dropFirst(6)).trimmingCharacters(in: .whitespaces)
if let ms = Int(value) {
serverRetryInterval = TimeInterval(ms) / 1000.0
return .retry(ms)
}
return .comment
}

if trimmed.hasPrefix("event:") {
let value = String(trimmed.dropFirst(6)).trimmingCharacters(in: .whitespaces)
return .event(value)
}

return .comment
}
}
37 changes: 37 additions & 0 deletions Sources/A2A/Client/StreamingSession.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import Foundation

/// A streaming session that provides both A2A events and connection state updates.
///
/// Use this type to monitor connection health during streaming operations.
/// Obtain a `StreamingSession` via ``A2AClient/sendStreamingMessageWithSession(_:)``
/// or ``A2AClient/subscribeToTaskWithSession(_:)``.
///
/// ```swift
/// let session = try await client.sendStreamingMessageWithSession(request)
///
/// // Monitor connection state in a separate task
/// Task {
/// for await state in session.connectionState {
/// switch state {
/// case .connected:
/// print("Connected")
/// case .reconnecting(let attempt, let max):
/// print("Reconnecting (\(attempt)/\(max))...")
/// case .disconnected(let error):
/// print("Disconnected: \(error)")
/// }
/// }
/// }
///
/// // Consume events
/// for try await event in session.events {
/// // handle event
/// }
/// ```
public struct StreamingSession: Sendable {
/// The stream of A2A events.
public let events: AsyncThrowingStream<StreamResponse, Error>

/// Connection state changes during the streaming session.
public let connectionState: AsyncStream<ConnectionState>
}
17 changes: 14 additions & 3 deletions Sources/A2A/Server/A2AServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -232,14 +232,24 @@ public struct A2ARouter: Sendable {

return AsyncThrowingStream { continuation in
let task = Task {
var eventCounter = 0
do {
for try await event in stream {
guard !Task.isCancelled else { break }
eventCounter += 1
let response = JSONRPCResponse(id: resolvedId, result: event)
let jsonData = try encoder.encode(response)
guard let jsonString = String(data: jsonData, encoding: .utf8) else { continue }
let sseData = "data: \(jsonString)\n\n".data(using: .utf8)!
continuation.yield(sseData)

var sse = ""
if eventCounter == 1 {
sse += "retry: 3000\n"
}
sse += "id: \(eventCounter)\n"
sse += "data: \(jsonString)\n"
sse += "\n"

continuation.yield(sse.data(using: .utf8)!)
}
continuation.finish()
} catch let error as A2AError {
Expand All @@ -249,7 +259,8 @@ public struct A2ARouter: Sendable {
)
if let errorData = try? encoder.encode(errorResponse),
let errorString = String(data: errorData, encoding: .utf8) {
let sseData = "data: \(errorString)\n\n".data(using: .utf8)!
eventCounter += 1
let sseData = "id: \(eventCounter)\ndata: \(errorString)\n\n".data(using: .utf8)!
continuation.yield(sseData)
}
continuation.finish(throwing: error)
Expand Down
72 changes: 72 additions & 0 deletions Tests/A2ATests/SSEConfigurationTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import Testing
import Foundation
@testable import A2A

@Suite("SSEConfiguration")
struct SSEConfigurationTests {

@Test func defaultConfiguration() {
let config = SSEConfiguration.default
#expect(config.maxRetries == 3)
#expect(config.initialRetryInterval == 1.0)
#expect(config.maxRetryInterval == 30.0)
#expect(config.backoffMultiplier == 2.0)
#expect(config.jitterFraction == 0.1)
}

@Test func disabledConfiguration() {
let config = SSEConfiguration.disabled
#expect(config.maxRetries == 0)
}

@Test func delayExponentialBackoff() {
let config = SSEConfiguration(
initialRetryInterval: 1.0,
maxRetryInterval: 30.0,
backoffMultiplier: 2.0,
jitterFraction: 0.0 // No jitter for deterministic testing
)

let delay0 = config.delay(forAttempt: 0)
#expect(delay0 == 1.0) // 1.0 * 2^0 = 1.0

let delay1 = config.delay(forAttempt: 1)
#expect(delay1 == 2.0) // 1.0 * 2^1 = 2.0

let delay2 = config.delay(forAttempt: 2)
#expect(delay2 == 4.0) // 1.0 * 2^2 = 4.0
}

@Test func delayClampsToMax() {
let config = SSEConfiguration(
initialRetryInterval: 1.0,
maxRetryInterval: 5.0,
backoffMultiplier: 2.0,
jitterFraction: 0.0
)

let delay10 = config.delay(forAttempt: 10)
#expect(delay10 == 5.0) // Clamped to maxRetryInterval
}

@Test func delayWithJitterInRange() {
let config = SSEConfiguration(
initialRetryInterval: 10.0,
maxRetryInterval: 30.0,
backoffMultiplier: 1.0,
jitterFraction: 0.1
)

// With jitter=0.1, delay should be in range [9.0, 11.0]
for _ in 0..<100 {
let delay = config.delay(forAttempt: 0)
#expect(delay >= 9.0)
#expect(delay <= 11.0)
}
}

@Test func equatable() {
#expect(SSEConfiguration.default == SSEConfiguration())
#expect(SSEConfiguration.default != SSEConfiguration.disabled)
}
}
Loading
Loading