Skip to content

feat: extensions#44

Open
echarles wants to merge 13 commits intopydantic:mainfrom
datalayer-externals:feat/extensions
Open

feat: extensions#44
echarles wants to merge 13 commits intopydantic:mainfrom
datalayer-externals:feat/extensions

Conversation

@echarles
Copy link

@echarles echarles commented Mar 7, 2026

@echarles echarles marked this pull request as ready for review March 7, 2026 17:54
Copilot AI review requested due to automatic review settings March 7, 2026 17:54
Copy link

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 3 potential issues.

View 3 additional findings in Devin Review.

Open in Devin Review

@@ -56,6 +57,66 @@ async def _handle_task_operation(self, task_operation: TaskOperation) -> None:
except Exception:
await self.storage.update_task(task_operation['params']['id'], state='failed')

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Exception handler bypasses new update_task method, leaving stream subscribers hanging forever

When run_task or cancel_task raises an exception in _handle_task_operation, the except block at line 58 calls self.storage.update_task(...) directly instead of the new self.update_task(...) method introduced in this PR. The new self.update_task() method (fasta2a/worker.py:60-118) is documented as "the primary method workers should use to update task state" because it both persists the update to storage AND publishes streaming events to the broker via send_stream_event. By calling self.storage.update_task() directly, no TaskStatusUpdateEvent(final=True) is ever published to subscribers. Any active subscribe_to_stream iterator for that task will block indefinitely waiting for a final event, causing the SSE connection to hang.

Suggested change
await self.storage.update_task(task_operation['params']['id'], state='failed')
await self.update_task(task_operation['params']['id'], state='failed')
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

broker_params['metadata'] = metadata

# Start task execution in background
asyncio.create_task(self.broker.run_task(broker_params))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Fire-and-forget asyncio.create_task loses exceptions and task reference

At fasta2a/task_manager.py:194, asyncio.create_task(self.broker.run_task(broker_params)) creates a background task but discards the returned Task object. If run_task raises an exception, Python emits a "Task exception was never retrieved" warning and the error is silently lost — the streaming client will never know the task failed to even start. The task object is also subject to garbage collection since no reference is held, which could cause the task to be cancelled unexpectedly (per Python docs: "the event loop only keeps weak references to tasks").

Prompt for agents
In fasta2a/task_manager.py at line 194 in the stream_message method, the asyncio.create_task result is discarded. Store the task reference and add error handling. One approach: store the task in a local variable, and after the subscribe_to_stream loop completes, check if the background task raised an exception (e.g. via task.result() in a try/except). Alternatively, restructure to use a task group (e.g. anyio.create_task_group) to properly manage the background task's lifecycle and propagate exceptions.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Comment on lines +144 to +150
# Parse activated extensions from the A2A-Extensions header
extensions_header = request.headers.get('a2a-extensions', '')
activated_extensions: list[str] = (
[uri.strip() for uri in extensions_header.split(',') if uri.strip()] if extensions_header else []
)
# Stash on the request state so workers / handlers can inspect them
request.state.activated_extensions = activated_extensions

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 activated_extensions parsed but never consumed by downstream code

At fasta2a/applications.py:144-150, the A2A-Extensions header is parsed and stored on request.state.activated_extensions with a comment saying "so workers / handlers can inspect them." However, this state is never passed to TaskManager, Broker, or Worker — those components receive only TaskSendParams which doesn't include activated extensions information. The request.state is scoped to the HTTP request handler and is not accessible from workers. This appears to be scaffolding for future use rather than a bug, but it means extensions have no functional effect currently.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds A2A extensions support and introduces an SSE-based message/stream implementation, including broker pub/sub and worker-driven stream event publishing.

Changes:

  • Extend agent-card capabilities to advertise supported extensions and default streaming capability.
  • Implement message/stream end-to-end: TaskManager streaming generator, broker stream pub/sub, worker event publishing, and SSE HTTP response.
  • Add/lock new dependencies (sse-starlette, plus dev deps) and introduce streaming-focused test coverage.

Reviewed changes

Copilot reviewed 10 out of 12 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
fasta2a/schema.py Adds extensions to agent capabilities and defines StreamEvent union/type adapter.
fasta2a/applications.py Adds extension header parsing, streaming toggle, and SSE message/stream endpoint.
fasta2a/task_manager.py Implements stream_message() as an async generator yielding stream events.
fasta2a/broker.py Adds streaming event send/subscribe APIs and an in-memory pub/sub implementation.
fasta2a/worker.py Adds Worker.update_task() helper to persist updates and publish stream events.
fasta2a/__init__.py Exports AgentExtension and StreamEvent.
fasta2a/storage.py Import formatting only (no behavioral change).
tests/test_streaming.py New test suite covering broker streaming, worker event publishing, TaskManager streaming, and SSE endpoint.
tests/test_applications.py Updates agent-card snapshot to expect streaming enabled.
pyproject.toml Adds runtime sse-starlette and dev deps used by streaming/tests.
uv.lock Locks newly added dependencies/markers.
.gitignore Ignores coverage artifacts (.coverage*).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +193 to +195
# Start task execution in background
asyncio.create_task(self.broker.run_task(broker_params))

Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The background asyncio.create_task(self.broker.run_task(...)) isn't tracked or awaited. If it raises, you'll get "Task exception was never retrieved" and the SSE generator may keep streaming indefinitely. Consider capturing the task and handling exceptions (e.g., add a done callback that logs and/or publishes a failed final status) and cancel it if the client disconnects.

Copilot uses AI. Check for mistakes.
Comment on lines 152 to +167
if a2a_request['method'] == 'message/send':
jsonrpc_response = await self.task_manager.send_message(a2a_request)
elif a2a_request['method'] == 'message/stream':
stream_request = stream_message_request_ta.validate_json(data)

async def sse_generator():
request_id = stream_request.get('id')
async for event in self.task_manager.stream_message(stream_request):
jsonrpc_response = StreamMessageResponse(
jsonrpc='2.0',
id=request_id,
result=event,
)
yield stream_message_response_ta.dump_json(jsonrpc_response, by_alias=True).decode()

return EventSourceResponse(sse_generator())
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FastA2A advertises streaming capability via AgentCapabilities.streaming, but _agent_run_endpoint will still accept and serve message/stream even when self.streaming is False. Add a guard that returns an appropriate JSON-RPC error (e.g., MethodNotFound/UnsupportedOperation) when streaming is disabled so behavior matches the advertised capabilities.

Copilot uses AI. Check for mistakes.
Comment on lines 40 to 42
'capabilities': {
'streaming': False,
'streaming': True,
'pushNotifications': False,
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR metadata focuses on "extensions", but this change also flips the default advertised streaming capability to True (and adds a new message/stream SSE endpoint). If this broader behavior change is intentional, it should be reflected in the PR title/description (or split into a separate PR) to avoid surprising consumers.

Copilot uses AI. Check for mistakes.
Comment on lines 57 to 58
except Exception:
await self.storage.update_task(task_operation['params']['id'], state='failed')
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the exception handler, the worker updates storage directly, which bypasses the new Worker.update_task() logic and therefore won't publish a final status-update event to stream subscribers on failures. Use Worker.update_task(..., state='failed') here (and consider capturing/logging the exception) so streaming clients reliably see task termination.

Copilot uses AI. Check for mistakes.
Comment on lines +131 to +150
async with self._subscriber_lock:
subscribers = self._event_subscribers.get(task_id, [])
if not subscribers:
return

# Send event to all subscribers, removing closed streams
active_subscribers: list[MemoryObjectSendStream[StreamEvent]] = []
for stream in subscribers:
try:
await stream.send(event)
active_subscribers.append(stream)
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
# Subscriber disconnected, remove from list
pass

# Update subscriber list with only active ones
if active_subscribers:
self._event_subscribers[task_id] = active_subscribers
elif task_id in self._event_subscribers:
# No active subscribers left, clean up
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

send_stream_event() holds _subscriber_lock while awaiting stream.send(...). If a subscriber is slow (buffer full) this blocks all subscription changes and event sends across all tasks. Consider copying the current subscriber list under the lock, releasing it before awaiting sends, then reacquiring to prune/commit active subscribers.

Suggested change
async with self._subscriber_lock:
subscribers = self._event_subscribers.get(task_id, [])
if not subscribers:
return
# Send event to all subscribers, removing closed streams
active_subscribers: list[MemoryObjectSendStream[StreamEvent]] = []
for stream in subscribers:
try:
await stream.send(event)
active_subscribers.append(stream)
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
# Subscriber disconnected, remove from list
pass
# Update subscriber list with only active ones
if active_subscribers:
self._event_subscribers[task_id] = active_subscribers
elif task_id in self._event_subscribers:
# No active subscribers left, clean up
# Take a snapshot of current subscribers under the lock.
async with self._subscriber_lock:
subscribers = list(self._event_subscribers.get(task_id, []))
if not subscribers:
return
# Send event to all subscribers without holding the lock, tracking active ones.
active_subscribers: list[MemoryObjectSendStream[StreamEvent]] = []
for stream in subscribers:
try:
await stream.send(event)
active_subscribers.append(stream)
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
# Subscriber disconnected, remove from list on reconciliation.
pass
# Reacquire the lock to reconcile the active subscribers with the current list.
async with self._subscriber_lock:
current = self._event_subscribers.get(task_id)
if current is None:
# Task already cleaned up or no subscribers remain.
return
if not active_subscribers:
# No active subscribers left, clean up if still present.
if task_id in self._event_subscribers:
del self._event_subscribers[task_id]
return
# Preserve only streams that are still active and still present in the current list.
active_set = set(active_subscribers)
new_current = [stream for stream in current if stream in active_set]
if new_current:
self._event_subscribers[task_id] = new_current
elif task_id in self._event_subscribers:
# No active subscribers left, clean up.

Copilot uses AI. Check for mistakes.
Comment on lines +193 to +197
# Start task execution in background
asyncio.create_task(self.broker.run_task(broker_params))

# Stream events from broker
async for event in self.broker.subscribe_to_stream(task['id']):
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stream_message() can miss early stream events: it yields the initial task, then schedules broker.run_task(), and only afterwards starts iterating subscribe_to_stream(). If the worker publishes status updates quickly, they can be sent before the subscription is registered and will be dropped. Restructure so the subscription is registered before starting task execution (and ideally before yielding the first event), or change the broker API so subscription registration happens eagerly when subscribe_to_stream() is called.

Suggested change
# Start task execution in background
asyncio.create_task(self.broker.run_task(broker_params))
# Stream events from broker
async for event in self.broker.subscribe_to_stream(task['id']):
# Register subscription to the broker's event stream before starting task execution
stream = self.broker.subscribe_to_stream(task['id'])
# Start task execution in background
asyncio.create_task(self.broker.run_task(broker_params))
# Stream events from broker
async for event in stream:

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants