Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.cache
__pycache__
.coverage*
12 changes: 10 additions & 2 deletions fasta2a/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
from .applications import FastA2A
from .broker import Broker
from .schema import Skill
from .schema import AgentExtension, Skill, StreamEvent
from .storage import Storage
from .worker import Worker

__all__ = ['FastA2A', 'Skill', 'Storage', 'Broker', 'Worker']
__all__ = [
'AgentExtension',
'Broker',
'FastA2A',
'Skill',
'Storage',
'StreamEvent',
'Worker',
]
40 changes: 37 additions & 3 deletions fasta2a/applications.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pathlib import Path
from typing import Any

from sse_starlette import EventSourceResponse
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.requests import Request
Expand All @@ -16,11 +17,15 @@
from .schema import (
AgentCapabilities,
AgentCard,
AgentExtension,
AgentProvider,
Skill,
StreamMessageResponse,
a2a_request_ta,
a2a_response_ta,
agent_card_ta,
stream_message_request_ta,
stream_message_response_ta,
)
from .storage import Storage
from .task_manager import TaskManager
Expand All @@ -41,7 +46,9 @@ def __init__(
description: str | None = None,
provider: AgentProvider | None = None,
skills: list[Skill] | None = None,
extensions: list[AgentExtension] | None = None,
docs_url: str | None = '/docs',
streaming: bool = True,
# Starlette
debug: bool = False,
routes: Sequence[Route] | None = None,
Expand All @@ -66,7 +73,9 @@ def __init__(
self.description = description
self.provider = provider
self.skills = skills or []
self.extensions = extensions or []
self.docs_url = docs_url
self.streaming = streaming
# NOTE: For now, I don't think there's any reason to support any other input/output modes.
self.default_input_modes = ['application/json']
self.default_output_modes = ['application/json']
Expand All @@ -90,6 +99,11 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:

async def _agent_card_endpoint(self, request: Request) -> Response:
if self._agent_card_json_schema is None:
capabilities = AgentCapabilities(
streaming=self.streaming, push_notifications=False, state_transition_history=False
)
if self.extensions:
capabilities['extensions'] = self.extensions
agent_card = AgentCard(
name=self.name,
description=self.description or 'An AI agent exposed as an A2A agent.',
Expand All @@ -99,9 +113,7 @@ async def _agent_card_endpoint(self, request: Request) -> Response:
skills=self.skills,
default_input_modes=self.default_input_modes,
default_output_modes=self.default_output_modes,
capabilities=AgentCapabilities(
streaming=False, push_notifications=False, state_transition_history=False
),
capabilities=capabilities,
)
if self.provider is not None:
agent_card['provider'] = self.provider
Expand Down Expand Up @@ -129,8 +141,30 @@ async def _agent_run_endpoint(self, request: Request) -> Response:
data = await request.body()
a2a_request = a2a_request_ta.validate_json(data)

# Parse activated extensions from the A2A-Extensions header
extensions_header = request.headers.get('a2a-extensions', '')
activated_extensions: list[str] = (
[uri.strip() for uri in extensions_header.split(',') if uri.strip()] if extensions_header else []
)
# Stash on the request state so workers / handlers can inspect them
request.state.activated_extensions = activated_extensions
Comment on lines +144 to +150

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 activated_extensions parsed but never consumed by downstream code

At fasta2a/applications.py:144-150, the A2A-Extensions header is parsed and stored on request.state.activated_extensions with a comment saying "so workers / handlers can inspect them." However, this state is never passed to TaskManager, Broker, or Worker — those components receive only TaskSendParams which doesn't include activated extensions information. The request.state is scoped to the HTTP request handler and is not accessible from workers. This appears to be scaffolding for future use rather than a bug, but it means extensions have no functional effect currently.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


if a2a_request['method'] == 'message/send':
jsonrpc_response = await self.task_manager.send_message(a2a_request)
elif a2a_request['method'] == 'message/stream':
stream_request = stream_message_request_ta.validate_json(data)

async def sse_generator():
request_id = stream_request.get('id')
async for event in self.task_manager.stream_message(stream_request):
jsonrpc_response = StreamMessageResponse(
jsonrpc='2.0',
id=request_id,
result=event,
)
yield stream_message_response_ta.dump_json(jsonrpc_response, by_alias=True).decode()

return EventSourceResponse(sse_generator())
Comment on lines 152 to +167
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FastA2A advertises streaming capability via AgentCapabilities.streaming, but _agent_run_endpoint will still accept and serve message/stream even when self.streaming is False. Add a guard that returns an appropriate JSON-RPC error (e.g., MethodNotFound/UnsupportedOperation) when streaming is disabled so behavior matches the advertised capabilities.

Copilot uses AI. Check for mistakes.
elif a2a_request['method'] == 'tasks/get':
jsonrpc_response = await self.task_manager.get_task(a2a_request)
elif a2a_request['method'] == 'tasks/cancel':
Expand Down
91 changes: 90 additions & 1 deletion fasta2a/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
from typing import Annotated, Any, Generic, Literal, TypeVar

import anyio
from anyio.streams.memory import MemoryObjectSendStream
from opentelemetry.trace import Span, get_current_span, get_tracer
from pydantic import Discriminator
from typing_extensions import Self, TypedDict

from .schema import TaskIdParams, TaskSendParams
from .schema import StreamEvent, TaskIdParams, TaskSendParams

tracer = get_tracer(__name__)

Expand All @@ -37,6 +38,26 @@ async def cancel_task(self, params: TaskIdParams) -> None:
"""Cancel a task."""
raise NotImplementedError('send_cancel_task is not implemented yet.')

@abstractmethod
async def send_stream_event(self, task_id: str, event: StreamEvent) -> None:
"""Send a streaming event from worker to subscribers.

This is used by workers to publish status updates, messages, and artifacts
during task execution. Events are forwarded to all active subscribers of
the given task_id.
"""
...

@abstractmethod
def subscribe_to_stream(self, task_id: str) -> AsyncIterator[StreamEvent]:
"""Subscribe to streaming events for a specific task.

Returns an async iterator that yields events published by workers for the
given task_id. The iterator completes when a TaskStatusUpdateEvent with
final=True is received or the subscription is cancelled.
"""
...

@abstractmethod
async def __aenter__(self) -> Self: ...

Expand Down Expand Up @@ -73,6 +94,10 @@ class _TaskOperation(TypedDict, Generic[OperationT, ParamsT]):
class InMemoryBroker(Broker):
"""A broker that schedules tasks in memory."""

def __init__(self) -> None:
self._event_subscribers: dict[str, list[MemoryObjectSendStream[StreamEvent]]] = {}
self._subscriber_lock: anyio.Lock | None = None

async def __aenter__(self):
self.aexit_stack = AsyncExitStack()
await self.aexit_stack.__aenter__()
Expand All @@ -81,6 +106,8 @@ async def __aenter__(self):
await self.aexit_stack.enter_async_context(self._read_stream)
await self.aexit_stack.enter_async_context(self._write_stream)

self._subscriber_lock = anyio.Lock()

return self

async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any):
Expand All @@ -96,3 +123,65 @@ async def receive_task_operations(self) -> AsyncIterator[TaskOperation]:
"""Receive task operations from the broker."""
async for task_operation in self._read_stream:
yield task_operation

async def send_stream_event(self, task_id: str, event: StreamEvent) -> None:
"""Send a streaming event from worker to subscribers."""
assert self._subscriber_lock is not None, 'Broker not initialized'

async with self._subscriber_lock:
subscribers = self._event_subscribers.get(task_id, [])
if not subscribers:
return

# Send event to all subscribers, removing closed streams
active_subscribers: list[MemoryObjectSendStream[StreamEvent]] = []
for stream in subscribers:
try:
await stream.send(event)
active_subscribers.append(stream)
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
# Subscriber disconnected, remove from list
pass

# Update subscriber list with only active ones
if active_subscribers:
self._event_subscribers[task_id] = active_subscribers
elif task_id in self._event_subscribers:
# No active subscribers left, clean up
Comment on lines +131 to +150
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

send_stream_event() holds _subscriber_lock while awaiting stream.send(...). If a subscriber is slow (buffer full) this blocks all subscription changes and event sends across all tasks. Consider copying the current subscriber list under the lock, releasing it before awaiting sends, then reacquiring to prune/commit active subscribers.

Suggested change
async with self._subscriber_lock:
subscribers = self._event_subscribers.get(task_id, [])
if not subscribers:
return
# Send event to all subscribers, removing closed streams
active_subscribers: list[MemoryObjectSendStream[StreamEvent]] = []
for stream in subscribers:
try:
await stream.send(event)
active_subscribers.append(stream)
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
# Subscriber disconnected, remove from list
pass
# Update subscriber list with only active ones
if active_subscribers:
self._event_subscribers[task_id] = active_subscribers
elif task_id in self._event_subscribers:
# No active subscribers left, clean up
# Take a snapshot of current subscribers under the lock.
async with self._subscriber_lock:
subscribers = list(self._event_subscribers.get(task_id, []))
if not subscribers:
return
# Send event to all subscribers without holding the lock, tracking active ones.
active_subscribers: list[MemoryObjectSendStream[StreamEvent]] = []
for stream in subscribers:
try:
await stream.send(event)
active_subscribers.append(stream)
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
# Subscriber disconnected, remove from list on reconciliation.
pass
# Reacquire the lock to reconcile the active subscribers with the current list.
async with self._subscriber_lock:
current = self._event_subscribers.get(task_id)
if current is None:
# Task already cleaned up or no subscribers remain.
return
if not active_subscribers:
# No active subscribers left, clean up if still present.
if task_id in self._event_subscribers:
del self._event_subscribers[task_id]
return
# Preserve only streams that are still active and still present in the current list.
active_set = set(active_subscribers)
new_current = [stream for stream in current if stream in active_set]
if new_current:
self._event_subscribers[task_id] = new_current
elif task_id in self._event_subscribers:
# No active subscribers left, clean up.

Copilot uses AI. Check for mistakes.
del self._event_subscribers[task_id]

async def subscribe_to_stream(self, task_id: str) -> AsyncIterator[StreamEvent]:
"""Subscribe to streaming events for a specific task."""
assert self._subscriber_lock is not None, 'Broker not initialized'

# Create a new stream for this subscriber
send_stream, receive_stream = anyio.create_memory_object_stream[StreamEvent](max_buffer_size=100)

# Register the subscriber
async with self._subscriber_lock:
if task_id not in self._event_subscribers:
self._event_subscribers[task_id] = []
self._event_subscribers[task_id].append(send_stream)

try:
async with receive_stream:
async for event in receive_stream:
yield event

# Check if this is a final status update
if isinstance(event, dict) and event.get('kind') == 'status-update' and event.get('final', False):
break
finally:
# Clean up subscription on exit
async with self._subscriber_lock:
if task_id in self._event_subscribers:
try:
self._event_subscribers[task_id].remove(send_stream)
if not self._event_subscribers[task_id]:
del self._event_subscribers[task_id]
except ValueError:
# Already removed
pass

# Close the send stream
await send_stream.aclose()
15 changes: 15 additions & 0 deletions fasta2a/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ class AgentCapabilities(TypedDict):
state_transition_history: NotRequired[bool]
"""Whether the agent exposes status change history for tasks."""

extensions: NotRequired[list[AgentExtension]]
"""A2A extensions supported by this agent.

Each extension is declared as an ``AgentExtension`` object with a
unique ``uri``, optional ``description``, ``required`` flag, and
``params`` configuration. Clients activate extensions by sending
the selected URIs in the ``A2A-Extensions`` HTTP header.
"""


@pydantic.with_config({'alias_generator': to_camel})
class HttpSecurityScheme(TypedDict):
Expand Down Expand Up @@ -808,3 +817,9 @@ class JSONRPCResponse(JSONRPCMessage, Generic[ResultT, ErrorT]):
send_message_response_ta: TypeAdapter[SendMessageResponse] = TypeAdapter(SendMessageResponse)
stream_message_request_ta: TypeAdapter[StreamMessageRequest] = TypeAdapter(StreamMessageRequest)
stream_message_response_ta: TypeAdapter[StreamMessageResponse] = TypeAdapter(StreamMessageResponse)

# Type for streaming events (used by broker and task manager)
StreamEvent = Union[Task, Message, TaskStatusUpdateEvent, TaskArtifactUpdateEvent]
"""A streaming event that can be sent during message/stream requests."""

stream_event_ta: TypeAdapter[StreamEvent] = TypeAdapter(StreamEvent)
8 changes: 7 additions & 1 deletion fasta2a/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@

from typing_extensions import TypeVar

from .schema import Artifact, Message, Task, TaskState, TaskStatus
from .schema import (
Artifact,
Message,
Task,
TaskState,
TaskStatus,
)

ContextT = TypeVar('ContextT', default=Any)

Expand Down
45 changes: 41 additions & 4 deletions fasta2a/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@

from __future__ import annotations as _annotations

import asyncio
import uuid
from collections.abc import AsyncGenerator
from contextlib import AsyncExitStack
from dataclasses import dataclass, field
from typing import Any
Expand All @@ -78,8 +80,8 @@
SendMessageResponse,
SetTaskPushNotificationRequest,
SetTaskPushNotificationResponse,
StreamEvent,
StreamMessageRequest,
StreamMessageResponse,
TaskNotFoundError,
TaskSendParams,
)
Expand Down Expand Up @@ -156,9 +158,44 @@ async def cancel_task(self, request: CancelTaskRequest) -> CancelTaskResponse:
)
return CancelTaskResponse(jsonrpc='2.0', id=request['id'], result=task)

async def stream_message(self, request: StreamMessageRequest) -> StreamMessageResponse:
"""Stream messages using Server-Sent Events."""
raise NotImplementedError('message/stream method is not implemented yet.')
async def stream_message(self, request: StreamMessageRequest) -> AsyncGenerator[StreamEvent, None]:
"""Handle a streaming message request.
This method:
1. Creates and submits a new task
2. Yields the initial task object
3. Subscribes to the broker's event stream
4. Starts task execution asynchronously
5. Streams all events until completion
"""
# Extract parameters
params = request['params']
message = params['message']
context_id = message.get('context_id', str(uuid.uuid4()))

# Create and submit the task
task = await self.storage.submit_task(context_id, message)

# Yield the initial task
yield task

# Prepare broker params
broker_params: TaskSendParams = {'id': task['id'], 'context_id': context_id, 'message': message}
config = params.get('configuration', {})
history_length = config.get('history_length')
if history_length is not None:
broker_params['history_length'] = history_length

metadata = params.get('metadata')
if metadata is not None:
broker_params['metadata'] = metadata

# Start task execution in background
asyncio.create_task(self.broker.run_task(broker_params))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Fire-and-forget asyncio.create_task loses exceptions and task reference

At fasta2a/task_manager.py:194, asyncio.create_task(self.broker.run_task(broker_params)) creates a background task but discards the returned Task object. If run_task raises an exception, Python emits a "Task exception was never retrieved" warning and the error is silently lost — the streaming client will never know the task failed to even start. The task object is also subject to garbage collection since no reference is held, which could cause the task to be cancelled unexpectedly (per Python docs: "the event loop only keeps weak references to tasks").

Prompt for agents
In fasta2a/task_manager.py at line 194 in the stream_message method, the asyncio.create_task result is discarded. Store the task reference and add error handling. One approach: store the task in a local variable, and after the subscribe_to_stream loop completes, check if the background task raised an exception (e.g. via task.result() in a try/except). Alternatively, restructure to use a task group (e.g. anyio.create_task_group) to properly manage the background task's lifecycle and propagate exceptions.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


Comment on lines +193 to +195
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The background asyncio.create_task(self.broker.run_task(...)) isn't tracked or awaited. If it raises, you'll get "Task exception was never retrieved" and the SSE generator may keep streaming indefinitely. Consider capturing the task and handling exceptions (e.g., add a done callback that logs and/or publishes a failed final status) and cancel it if the client disconnects.

Copilot uses AI. Check for mistakes.
# Stream events from broker
async for event in self.broker.subscribe_to_stream(task['id']):
Comment on lines +193 to +197
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stream_message() can miss early stream events: it yields the initial task, then schedules broker.run_task(), and only afterwards starts iterating subscribe_to_stream(). If the worker publishes status updates quickly, they can be sent before the subscription is registered and will be dropped. Restructure so the subscription is registered before starting task execution (and ideally before yielding the first event), or change the broker API so subscription registration happens eagerly when subscribe_to_stream() is called.

Suggested change
# Start task execution in background
asyncio.create_task(self.broker.run_task(broker_params))
# Stream events from broker
async for event in self.broker.subscribe_to_stream(task['id']):
# Register subscription to the broker's event stream before starting task execution
stream = self.broker.subscribe_to_stream(task['id'])
# Start task execution in background
asyncio.create_task(self.broker.run_task(broker_params))
# Stream events from broker
async for event in stream:

Copilot uses AI. Check for mistakes.
yield event

async def set_task_push_notification(
self, request: SetTaskPushNotificationRequest
Expand Down
Loading
Loading