Skip to content

Add streaming support#43

Open
eleonorecharles wants to merge 12 commits intopydantic:mainfrom
datalayer-externals:feature/sse-streaming-2
Open

Add streaming support#43
eleonorecharles wants to merge 12 commits intopydantic:mainfrom
datalayer-externals:feature/sse-streaming-2

Conversation

@eleonorecharles
Copy link

@echarles
Copy link

echarles commented Jan 9, 2026

CI is green - we are using this branch for various projects.

@echarles
Copy link

echarles commented Jan 13, 2026

@amitgelber
Copy link

@echarles can this be merged? how can we push it? thank you

@echarles
Copy link

@echarles can this be merged? how can we push it? thank you

I hope this PR can be reviewed soon.

Comment on lines +148 to +155
# Serialize event to ensure proper camelCase conversion
event_dict = stream_event_ta.dump_python(event, mode='json', by_alias=True)

# Wrap in JSON-RPC response
jsonrpc_response = {'jsonrpc': '2.0', 'id': request_id, 'result': event_dict}

# Convert to JSON string
yield json.dumps(jsonrpc_response)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to do this serialization and then deserialization? Seems a bit weird. I'll check.

But for sure we are not going to use json.dumps. There's a function in pydantic_core for it.

Copy link

@echarles echarles Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In d16abca

The 3-step serialize pattern in the SSE generator:

  1. stream_event_ta.dump_python→ intermediate Python dict
  2. Wrap in a plain dict {'jsonrpc': '2.0', ...}
  3. json.dumps(...) → JSON string

is replaced by a single call . This skips the intermediate dict materialization and uses pydantic_core' JSON serializer instead of json.dumps. The StreamMessageResponse TypeAdapter handles camelCase aliasing for the entire envelope + nested event in one pass.

provider: AgentProvider | None = None,
skills: list[Skill] | None = None,
docs_url: str | None = '/docs',
streaming: bool = False,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want it to be False by default?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set to True in 23907cf

# Parse the streaming request
stream_request = stream_message_request_ta.validate_json(data)

# Create an async generator wrapper that formats events as JSON-RPC responses
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need for a comment explaining each line.

Suggested change
# Create an async generator wrapper that formats events as JSON-RPC responses

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 33ee9c5

if a2a_request['method'] == 'message/send':
jsonrpc_response = await self.task_manager.send_message(a2a_request)
elif a2a_request['method'] == 'message/stream':
# Parse the streaming request
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Parse the streaming request

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 33ee9c5

Comment on lines +33 to +34
raise NotImplementedError('send_run_task is not implemented yet.')
...
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you replace this?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted in 1709673

Comment on lines +38 to +39
raise NotImplementedError('send_cancel_task is not implemented yet.')
...
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted in 1709673

return self.contexts.get(context_id)


class StreamingStorageWrapper(Storage[ContextT]):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this instead of using Storage?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Storage is used in 2db81b8

Comment on lines +404 to +405
class TestFastA2AStreaming:
"""Tests for the FastA2A message/stream SSE endpoint."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No test classes please.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in ab97e62

Copy link

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 4 potential issues.

View 3 additional findings in Devin Review.

Open in Devin Review

Comment on lines +194 to +198
asyncio.create_task(self.broker.run_task(broker_params))

# Stream events from broker
async for event in self.broker.subscribe_to_stream(task['id']):
yield event

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Race condition: task execution starts before stream subscription is registered

In stream_message, asyncio.create_task(self.broker.run_task(broker_params)) at line 194 schedules the task for execution BEFORE the subscription is registered at line 197. Since subscribe_to_stream is an async generator, the subscriber registration (at fasta2a/broker.py:159-164) only happens when iteration begins — i.e., when __anext__ is called by the async for at line 197. By that time, the background task is already scheduled. The first checkpoint inside subscribe_to_stream (the anyio.Lock acquisition at fasta2a/broker.py:161, which always checkpoints even if uncontested) can allow the scheduled run_task to start executing. If the worker processes the task fast enough (e.g., with a near-zero delay or a remote broker), events emitted via send_stream_event will find no subscribers and be silently dropped, causing the client to hang forever waiting for events that were already lost.

Prompt for agents
In fasta2a/task_manager.py, the stream_message method (lines 161-198) must subscribe to the broker's event stream BEFORE starting task execution. However, since subscribe_to_stream is an async generator, simply reordering the lines won't work — the subscription registration only happens when iteration begins.

The fix requires restructuring so that subscription registration is decoupled from iteration. One approach:
1. In broker.py's subscribe_to_stream (or a new method), separate the registration step from the iteration step. For example, add a method like `register_subscriber(task_id)` that returns a (send_stream, receive_stream) pair and registers the subscriber, then have subscribe_to_stream accept the pre-registered streams.
2. Alternatively, refactor stream_message to manually register the subscriber before starting the task, then iterate the receive stream directly.

The key invariant: the subscriber MUST be registered in self._event_subscribers BEFORE self.broker.run_task(broker_params) is called.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

broker_params['metadata'] = metadata

# Start task execution in background
asyncio.create_task(self.broker.run_task(broker_params))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Fire-and-forget asyncio.create_task loses exceptions and breaks anyio/trio compatibility

At line 194, asyncio.create_task(self.broker.run_task(broker_params)) has two problems: (1) The returned task reference is not stored, so if run_task raises an exception, it is silently lost (Python will only emit a warning about a never-retrieved exception on GC). The streaming client will hang forever waiting for events that will never arrive. (2) The codebase consistently uses anyio for async primitives (fasta2a/broker.py uses anyio.Lock, anyio.create_memory_object_stream; fasta2a/worker.py uses anyio.create_task_group). Using asyncio.create_task directly breaks compatibility with trio (which anyio supports) and violates the project's convention.

Prompt for agents
In fasta2a/task_manager.py line 194, replace the bare `asyncio.create_task(self.broker.run_task(broker_params))` with an anyio-based approach that properly handles errors. For example, use an anyio TaskGroup to manage the background task's lifecycle. The stream_message method needs restructuring: instead of fire-and-forget, use a task group that runs both the task execution and the event streaming concurrently, so that if run_task raises an exception, it properly propagates or is handled (e.g., by sending a 'failed' status event). Also remove the `import asyncio` at line 63 if no longer needed.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Comment on lines +131 to +151
async with self._subscriber_lock:
subscribers = self._event_subscribers.get(task_id, [])
if not subscribers:
return

# Send event to all subscribers, removing closed streams
active_subscribers: list[MemoryObjectSendStream[StreamEvent]] = []
for stream in subscribers:
try:
await stream.send(event)
active_subscribers.append(stream)
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
# Subscriber disconnected, remove from list
pass

# Update subscriber list with only active ones
if active_subscribers:
self._event_subscribers[task_id] = active_subscribers
elif task_id in self._event_subscribers:
# No active subscribers left, clean up
del self._event_subscribers[task_id]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Potential lock contention in send_stream_event while holding lock during await

In fasta2a/broker.py:131-151, send_stream_event holds _subscriber_lock for the entire duration of sending events to ALL subscribers. Each await stream.send(event) at line 140 could block if a subscriber's buffer is full (max_buffer_size=100). While holding the lock, no new subscriptions or unsubscriptions can proceed. I analyzed whether this could deadlock with subscribe_to_stream's finally block (line 176), which also needs the lock. The scenario where subscriber A's buffer is full while subscriber B tries to clean up is NOT a deadlock because: (1) A's consumer doesn't need the lock to read from its receive_stream, so it can drain the buffer, and (2) if A's receive_stream is already closed, the send raises BrokenResourceError which is caught at line 142. However, this pattern could cause significant latency spikes under load with many slow subscribers, as a single slow subscriber blocks event delivery to all others for that task_id.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

task = await self._storage.update_task(task_id, state, new_artifacts, new_messages)

# Determine if this is a final state
final = state in ('completed', 'failed', 'canceled')
Copy link

@devin-ai-integration devin-ai-integration bot Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 TaskState completeness: 'rejected', 'auth-required', 'unknown' and 'input-required' not treated as final

In fasta2a/worker.py:74, final is determined by state in ('completed', 'failed', 'canceled'). Looking at the TaskState type alias at fasta2a/schema.py:436-438, there are additional terminal-like states: 'rejected', 'auth-required', and 'unknown'. If a worker implementation ever sets the state to 'rejected', no final event would be sent to stream subscribers, causing them to hang. The current EchoWorker in tests only uses 'working', 'completed', and 'canceled', so this isn't triggered now, but could be a problem for future worker implementations.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 3 new potential issues.

View 6 additional findings in Devin Review.

Open in Devin Review

task = await self._storage.update_task(task_id, state, new_artifacts, new_messages)

# Determine if this is a final state
final = state in ('completed', 'failed', 'canceled')
Copy link

@devin-ai-integration devin-ai-integration bot Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Missing 'rejected' terminal state causes streaming subscribers to hang forever

At fasta2a/storage.py:175, the final flag is computed as state in ('completed', 'failed', 'canceled'), but TaskState (defined at fasta2a/schema.py:436-438) also includes 'rejected' as a terminal state. If a task transitions to 'rejected', no final=True status event will ever be published — instead, a non-final status update (final=False) is sent. This means subscribers in InMemoryBroker.subscribe_to_stream (fasta2a/broker.py:170-173) will never see a final=True event and will block indefinitely, causing the SSE connection to hang.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Comment on lines +194 to +198
asyncio.create_task(self.broker.run_task(broker_params))

# Stream events from broker
async for event in self.broker.subscribe_to_stream(task['id']):
yield event

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Subscription registration vs task execution ordering relies on subtle asyncio scheduling guarantees

In fasta2a/task_manager.py:194-197, the code first calls asyncio.create_task(self.broker.run_task(broker_params)) (line 194) then subscribes via async for event in self.broker.subscribe_to_stream(task['id']) (line 197). This appears to be a race condition at first glance, but it actually works because asyncio.create_task only schedules the coroutine — it doesn't execute until the current coroutine yields. The subscribe_to_stream async generator executes synchronously through the subscriber registration code (fasta2a/broker.py:159-164) before hitting its first real await (the async for event in receive_stream at line 168). However, this ordering guarantee is fragile: if self._subscriber_lock at fasta2a/broker.py:161 is contended (held by a concurrent send_stream_event call), the subscription registration will yield to the event loop, potentially allowing the background task to start and emit events before the subscriber is registered. In the InMemoryBroker with a single task this is very unlikely, but worth noting for robustness.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

provider: AgentProvider | None = None,
skills: list[Skill] | None = None,
docs_url: str | None = '/docs',
streaming: bool = True,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Default streaming=True may break backward compatibility for non-streaming setups

The FastA2A.__init__ now defaults streaming=True (fasta2a/applications.py:49), which means the agent card will advertise streaming capability. If a user upgrades without wrapping their storage in StreamingStorageWrapper, the agent card will claim streaming support but message/stream requests will fail because no streaming events are published. This is a potential compatibility concern — previously streaming was hardcoded to False (fasta2a/applications.py:103). The default being True assumes all users are streaming-ready.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 3 new potential issues.

View 5 additional findings in Devin Review.

Open in Devin Review

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 InMemoryStorage.load_task mutates the stored task's history in place

Pre-existing issue: InMemoryStorage.load_task at fasta2a/storage.py:85-86 does task['history'] = task['history'][-history_length:], which mutates the stored task dict in place, permanently truncating the history. This means subsequent calls to load_task without history_length will still see the truncated history. This is a pre-existing bug, not introduced by this PR, but relevant context since streaming relies on storage state.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

@@ -56,6 +57,66 @@ async def _handle_task_operation(self, task_operation: TaskOperation) -> None:
except Exception:
await self.storage.update_task(task_operation['params']['id'], state='failed')

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Exception handler in _handle_task_operation bypasses streaming event publishing

When a task raises an exception, _handle_task_operation (fasta2a/worker.py:57-58) catches it and calls self.storage.update_task(...) directly instead of self.update_task(...). The new self.update_task() method (line 60) is responsible for both updating storage AND publishing streaming events (including the crucial final=True status event). By calling self.storage.update_task() directly, no final=True TaskStatusUpdateEvent is ever published to the broker. Any SSE subscriber waiting on subscribe_to_stream will hang indefinitely, never receiving the termination signal.

Suggested change
await self.storage.update_task(task_operation['params']['id'], state='failed')
await self.update_task(task_operation['params']['id'], state='failed')
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Comment on lines +189 to +191
metadata = params.get('metadata')
if metadata is not None:
broker_params['metadata'] = metadata

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Inconsistent metadata forwarding between send_message and stream_message

The stream_message method (lines 189-191) forwards metadata from params to broker_params, but the pre-existing send_message method (fasta2a/task_manager.py:117-132) does not. This is an inconsistency between the two code paths. While not a bug per se (since send_message was pre-existing), it means the two methods handle the same MessageSendParams differently. This should be harmonized.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

@echarles
Copy link

echarles commented Mar 7, 2026

@Kludex Thx for the review, I have addressed them and CI is green.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Does FastA2AClient support subscribing to the SSE stream via tasks/sendSubscribe?

4 participants