Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.cache
__pycache__
.coverage*
4 changes: 2 additions & 2 deletions fasta2a/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from .applications import FastA2A
from .broker import Broker
from .schema import Skill
from .schema import Skill, StreamEvent
from .storage import Storage
from .worker import Worker

__all__ = ['FastA2A', 'Skill', 'Storage', 'Broker', 'Worker']
__all__ = ['FastA2A', 'Skill', 'Storage', 'Broker', 'Worker', 'StreamEvent']
22 changes: 21 additions & 1 deletion fasta2a/applications.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pathlib import Path
from typing import Any

from sse_starlette import EventSourceResponse
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.requests import Request
Expand All @@ -18,9 +19,12 @@
AgentCard,
AgentProvider,
Skill,
StreamMessageResponse,
a2a_request_ta,
a2a_response_ta,
agent_card_ta,
stream_message_request_ta,
stream_message_response_ta,
)
from .storage import Storage
from .task_manager import TaskManager
Expand All @@ -42,6 +46,7 @@ def __init__(
provider: AgentProvider | None = None,
skills: list[Skill] | None = None,
docs_url: str | None = '/docs',
streaming: bool = True,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Default streaming=True may break backward compatibility for non-streaming setups

The FastA2A.__init__ now defaults streaming=True (fasta2a/applications.py:49), which means the agent card will advertise streaming capability. If a user upgrades without wrapping their storage in StreamingStorageWrapper, the agent card will claim streaming support but message/stream requests will fail because no streaming events are published. This is a potential compatibility concern — previously streaming was hardcoded to False (fasta2a/applications.py:103). The default being True assumes all users are streaming-ready.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

# Starlette
debug: bool = False,
routes: Sequence[Route] | None = None,
Expand All @@ -67,6 +72,7 @@ def __init__(
self.provider = provider
self.skills = skills or []
self.docs_url = docs_url
self.streaming = streaming
# NOTE: For now, I don't think there's any reason to support any other input/output modes.
self.default_input_modes = ['application/json']
self.default_output_modes = ['application/json']
Expand Down Expand Up @@ -100,7 +106,7 @@ async def _agent_card_endpoint(self, request: Request) -> Response:
default_input_modes=self.default_input_modes,
default_output_modes=self.default_output_modes,
capabilities=AgentCapabilities(
streaming=False, push_notifications=False, state_transition_history=False
streaming=self.streaming, push_notifications=False, state_transition_history=False
),
)
if self.provider is not None:
Expand Down Expand Up @@ -131,6 +137,20 @@ async def _agent_run_endpoint(self, request: Request) -> Response:

if a2a_request['method'] == 'message/send':
jsonrpc_response = await self.task_manager.send_message(a2a_request)
elif a2a_request['method'] == 'message/stream':
stream_request = stream_message_request_ta.validate_json(data)

async def sse_generator():
request_id = stream_request.get('id')
async for event in self.task_manager.stream_message(stream_request):
jsonrpc_response = StreamMessageResponse(
jsonrpc='2.0',
id=request_id,
result=event,
)
yield stream_message_response_ta.dump_json(jsonrpc_response, by_alias=True).decode()

return EventSourceResponse(sse_generator())
elif a2a_request['method'] == 'tasks/get':
jsonrpc_response = await self.task_manager.get_task(a2a_request)
elif a2a_request['method'] == 'tasks/cancel':
Expand Down
91 changes: 90 additions & 1 deletion fasta2a/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
from typing import Annotated, Any, Generic, Literal, TypeVar

import anyio
from anyio.streams.memory import MemoryObjectSendStream
from opentelemetry.trace import Span, get_current_span, get_tracer
from pydantic import Discriminator
from typing_extensions import Self, TypedDict

from .schema import TaskIdParams, TaskSendParams
from .schema import StreamEvent, TaskIdParams, TaskSendParams

tracer = get_tracer(__name__)

Expand All @@ -37,6 +38,26 @@ async def cancel_task(self, params: TaskIdParams) -> None:
"""Cancel a task."""
raise NotImplementedError('send_cancel_task is not implemented yet.')

@abstractmethod
async def send_stream_event(self, task_id: str, event: StreamEvent) -> None:
"""Send a streaming event from worker to subscribers.

This is used by workers to publish status updates, messages, and artifacts
during task execution. Events are forwarded to all active subscribers of
the given task_id.
"""
...

@abstractmethod
def subscribe_to_stream(self, task_id: str) -> AsyncIterator[StreamEvent]:
"""Subscribe to streaming events for a specific task.

Returns an async iterator that yields events published by workers for the
given task_id. The iterator completes when a TaskStatusUpdateEvent with
final=True is received or the subscription is cancelled.
"""
...

@abstractmethod
async def __aenter__(self) -> Self: ...

Expand Down Expand Up @@ -73,6 +94,10 @@ class _TaskOperation(TypedDict, Generic[OperationT, ParamsT]):
class InMemoryBroker(Broker):
"""A broker that schedules tasks in memory."""

def __init__(self) -> None:
self._event_subscribers: dict[str, list[MemoryObjectSendStream[StreamEvent]]] = {}
self._subscriber_lock: anyio.Lock | None = None

async def __aenter__(self):
self.aexit_stack = AsyncExitStack()
await self.aexit_stack.__aenter__()
Expand All @@ -81,6 +106,8 @@ async def __aenter__(self):
await self.aexit_stack.enter_async_context(self._read_stream)
await self.aexit_stack.enter_async_context(self._write_stream)

self._subscriber_lock = anyio.Lock()

return self

async def __aexit__(self, exc_type: Any, exc_value: Any, traceback: Any):
Expand All @@ -96,3 +123,65 @@ async def receive_task_operations(self) -> AsyncIterator[TaskOperation]:
"""Receive task operations from the broker."""
async for task_operation in self._read_stream:
yield task_operation

async def send_stream_event(self, task_id: str, event: StreamEvent) -> None:
"""Send a streaming event from worker to subscribers."""
assert self._subscriber_lock is not None, 'Broker not initialized'

async with self._subscriber_lock:
subscribers = self._event_subscribers.get(task_id, [])
if not subscribers:
return

# Send event to all subscribers, removing closed streams
active_subscribers: list[MemoryObjectSendStream[StreamEvent]] = []
for stream in subscribers:
try:
await stream.send(event)
active_subscribers.append(stream)
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
# Subscriber disconnected, remove from list
pass

# Update subscriber list with only active ones
if active_subscribers:
self._event_subscribers[task_id] = active_subscribers
elif task_id in self._event_subscribers:
# No active subscribers left, clean up
del self._event_subscribers[task_id]
Comment on lines +131 to +151

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Potential lock contention in send_stream_event while holding lock during await

In fasta2a/broker.py:131-151, send_stream_event holds _subscriber_lock for the entire duration of sending events to ALL subscribers. Each await stream.send(event) at line 140 could block if a subscriber's buffer is full (max_buffer_size=100). While holding the lock, no new subscriptions or unsubscriptions can proceed. I analyzed whether this could deadlock with subscribe_to_stream's finally block (line 176), which also needs the lock. The scenario where subscriber A's buffer is full while subscriber B tries to clean up is NOT a deadlock because: (1) A's consumer doesn't need the lock to read from its receive_stream, so it can drain the buffer, and (2) if A's receive_stream is already closed, the send raises BrokenResourceError which is caught at line 142. However, this pattern could cause significant latency spikes under load with many slow subscribers, as a single slow subscriber blocks event delivery to all others for that task_id.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


async def subscribe_to_stream(self, task_id: str) -> AsyncIterator[StreamEvent]:
"""Subscribe to streaming events for a specific task."""
assert self._subscriber_lock is not None, 'Broker not initialized'

# Create a new stream for this subscriber
send_stream, receive_stream = anyio.create_memory_object_stream[StreamEvent](max_buffer_size=100)

# Register the subscriber
async with self._subscriber_lock:
if task_id not in self._event_subscribers:
self._event_subscribers[task_id] = []
self._event_subscribers[task_id].append(send_stream)

try:
async with receive_stream:
async for event in receive_stream:
yield event

# Check if this is a final status update
if isinstance(event, dict) and event.get('kind') == 'status-update' and event.get('final', False):
break
finally:
# Clean up subscription on exit
async with self._subscriber_lock:
if task_id in self._event_subscribers:
try:
self._event_subscribers[task_id].remove(send_stream)
if not self._event_subscribers[task_id]:
del self._event_subscribers[task_id]
except ValueError:
# Already removed
pass

# Close the send stream
await send_stream.aclose()
6 changes: 6 additions & 0 deletions fasta2a/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -808,3 +808,9 @@ class JSONRPCResponse(JSONRPCMessage, Generic[ResultT, ErrorT]):
send_message_response_ta: TypeAdapter[SendMessageResponse] = TypeAdapter(SendMessageResponse)
stream_message_request_ta: TypeAdapter[StreamMessageRequest] = TypeAdapter(StreamMessageRequest)
stream_message_response_ta: TypeAdapter[StreamMessageResponse] = TypeAdapter(StreamMessageResponse)

# Type for streaming events (used by broker and task manager)
StreamEvent = Union[Task, Message, TaskStatusUpdateEvent, TaskArtifactUpdateEvent]
"""A streaming event that can be sent during message/stream requests."""

stream_event_ta: TypeAdapter[StreamEvent] = TypeAdapter(StreamEvent)
8 changes: 7 additions & 1 deletion fasta2a/storage.py

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 InMemoryStorage.load_task mutates the stored task's history in place

Pre-existing issue: InMemoryStorage.load_task at fasta2a/storage.py:85-86 does task['history'] = task['history'][-history_length:], which mutates the stored task dict in place, permanently truncating the history. This means subsequent calls to load_task without history_length will still see the truncated history. This is a pre-existing bug, not introduced by this PR, but relevant context since streaming relies on storage state.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@

from typing_extensions import TypeVar

from .schema import Artifact, Message, Task, TaskState, TaskStatus
from .schema import (
Artifact,
Message,
Task,
TaskState,
TaskStatus,
)

ContextT = TypeVar('ContextT', default=Any)

Expand Down
45 changes: 41 additions & 4 deletions fasta2a/task_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@

from __future__ import annotations as _annotations

import asyncio
import uuid
from collections.abc import AsyncGenerator
from contextlib import AsyncExitStack
from dataclasses import dataclass, field
from typing import Any
Expand All @@ -78,8 +80,8 @@
SendMessageResponse,
SetTaskPushNotificationRequest,
SetTaskPushNotificationResponse,
StreamEvent,
StreamMessageRequest,
StreamMessageResponse,
TaskNotFoundError,
TaskSendParams,
)
Expand Down Expand Up @@ -156,9 +158,44 @@ async def cancel_task(self, request: CancelTaskRequest) -> CancelTaskResponse:
)
return CancelTaskResponse(jsonrpc='2.0', id=request['id'], result=task)

async def stream_message(self, request: StreamMessageRequest) -> StreamMessageResponse:
"""Stream messages using Server-Sent Events."""
raise NotImplementedError('message/stream method is not implemented yet.')
async def stream_message(self, request: StreamMessageRequest) -> AsyncGenerator[StreamEvent, None]:
"""Handle a streaming message request.
This method:
1. Creates and submits a new task
2. Yields the initial task object
3. Subscribes to the broker's event stream
4. Starts task execution asynchronously
5. Streams all events until completion
"""
# Extract parameters
params = request['params']
message = params['message']
context_id = message.get('context_id', str(uuid.uuid4()))

# Create and submit the task
task = await self.storage.submit_task(context_id, message)

# Yield the initial task
yield task

# Prepare broker params
broker_params: TaskSendParams = {'id': task['id'], 'context_id': context_id, 'message': message}
config = params.get('configuration', {})
history_length = config.get('history_length')
if history_length is not None:
broker_params['history_length'] = history_length

metadata = params.get('metadata')
if metadata is not None:
broker_params['metadata'] = metadata
Comment on lines +189 to +191

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Inconsistent metadata forwarding between send_message and stream_message

The stream_message method (lines 189-191) forwards metadata from params to broker_params, but the pre-existing send_message method (fasta2a/task_manager.py:117-132) does not. This is an inconsistency between the two code paths. While not a bug per se (since send_message was pre-existing), it means the two methods handle the same MessageSendParams differently. This should be harmonized.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


# Start task execution in background
asyncio.create_task(self.broker.run_task(broker_params))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Fire-and-forget asyncio.create_task loses exceptions and breaks anyio/trio compatibility

At line 194, asyncio.create_task(self.broker.run_task(broker_params)) has two problems: (1) The returned task reference is not stored, so if run_task raises an exception, it is silently lost (Python will only emit a warning about a never-retrieved exception on GC). The streaming client will hang forever waiting for events that will never arrive. (2) The codebase consistently uses anyio for async primitives (fasta2a/broker.py uses anyio.Lock, anyio.create_memory_object_stream; fasta2a/worker.py uses anyio.create_task_group). Using asyncio.create_task directly breaks compatibility with trio (which anyio supports) and violates the project's convention.

Prompt for agents
In fasta2a/task_manager.py line 194, replace the bare `asyncio.create_task(self.broker.run_task(broker_params))` with an anyio-based approach that properly handles errors. For example, use an anyio TaskGroup to manage the background task's lifecycle. The stream_message method needs restructuring: instead of fire-and-forget, use a task group that runs both the task execution and the event streaming concurrently, so that if run_task raises an exception, it properly propagates or is handled (e.g., by sending a 'failed' status event). Also remove the `import asyncio` at line 63 if no longer needed.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


# Stream events from broker
async for event in self.broker.subscribe_to_stream(task['id']):
yield event
Comment on lines +194 to +198

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Race condition: task execution starts before stream subscription is registered

In stream_message, asyncio.create_task(self.broker.run_task(broker_params)) at line 194 schedules the task for execution BEFORE the subscription is registered at line 197. Since subscribe_to_stream is an async generator, the subscriber registration (at fasta2a/broker.py:159-164) only happens when iteration begins — i.e., when __anext__ is called by the async for at line 197. By that time, the background task is already scheduled. The first checkpoint inside subscribe_to_stream (the anyio.Lock acquisition at fasta2a/broker.py:161, which always checkpoints even if uncontested) can allow the scheduled run_task to start executing. If the worker processes the task fast enough (e.g., with a near-zero delay or a remote broker), events emitted via send_stream_event will find no subscribers and be silently dropped, causing the client to hang forever waiting for events that were already lost.

Prompt for agents
In fasta2a/task_manager.py, the stream_message method (lines 161-198) must subscribe to the broker's event stream BEFORE starting task execution. However, since subscribe_to_stream is an async generator, simply reordering the lines won't work — the subscription registration only happens when iteration begins.

The fix requires restructuring so that subscription registration is decoupled from iteration. One approach:
1. In broker.py's subscribe_to_stream (or a new method), separate the registration step from the iteration step. For example, add a method like `register_subscriber(task_id)` that returns a (send_stream, receive_stream) pair and registers the subscriber, then have subscribe_to_stream accept the pre-registered streams.
2. Alternatively, refactor stream_message to manually register the subscriber before starting the task, then iterate the receive stream directly.

The key invariant: the subscriber MUST be registered in self._event_subscribers BEFORE self.broker.run_task(broker_params) is called.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Comment on lines +194 to +198

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Subscription registration vs task execution ordering relies on subtle asyncio scheduling guarantees

In fasta2a/task_manager.py:194-197, the code first calls asyncio.create_task(self.broker.run_task(broker_params)) (line 194) then subscribes via async for event in self.broker.subscribe_to_stream(task['id']) (line 197). This appears to be a race condition at first glance, but it actually works because asyncio.create_task only schedules the coroutine — it doesn't execute until the current coroutine yields. The subscribe_to_stream async generator executes synchronously through the subscriber registration code (fasta2a/broker.py:159-164) before hitting its first real await (the async for event in receive_stream at line 168). However, this ordering guarantee is fragile: if self._subscriber_lock at fasta2a/broker.py:161 is contended (held by a concurrent send_stream_event call), the subscription registration will yield to the event loop, potentially allowing the background task to start and emit events before the subscriber is registered. In the InMemoryBroker with a single task this is very unlikely, but worth noting for robustness.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


async def set_task_push_notification(
self, request: SetTaskPushNotificationRequest
Expand Down
63 changes: 62 additions & 1 deletion fasta2a/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@
from opentelemetry.trace import get_tracer, use_span
from typing_extensions import assert_never

from .schema import TaskArtifactUpdateEvent, TaskStatusUpdateEvent
from .storage import ContextT, Storage

if TYPE_CHECKING:
from .broker import Broker, TaskOperation
from .schema import Artifact, Message, TaskIdParams, TaskSendParams
from .schema import Artifact, Message, TaskIdParams, TaskSendParams, TaskState

tracer = get_tracer(__name__)

Expand Down Expand Up @@ -56,6 +57,66 @@ async def _handle_task_operation(self, task_operation: TaskOperation) -> None:
except Exception:
await self.storage.update_task(task_operation['params']['id'], state='failed')

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Exception handler in _handle_task_operation bypasses streaming event publishing

When a task raises an exception, _handle_task_operation (fasta2a/worker.py:57-58) catches it and calls self.storage.update_task(...) directly instead of self.update_task(...). The new self.update_task() method (line 60) is responsible for both updating storage AND publishing streaming events (including the crucial final=True status event). By calling self.storage.update_task() directly, no final=True TaskStatusUpdateEvent is ever published to the broker. Any SSE subscriber waiting on subscribe_to_stream will hang indefinitely, never receiving the termination signal.

Suggested change
await self.storage.update_task(task_operation['params']['id'], state='failed')
await self.update_task(task_operation['params']['id'], state='failed')
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


async def update_task(
self,
task_id: str,
state: TaskState,
new_artifacts: list[Artifact] | None = None,
new_messages: list[Message] | None = None,
) -> None:
"""Update a task's state in storage and publish streaming events to the broker.
This is the primary method workers should use to update task state. It handles
both persisting the update and notifying any stream subscribers.
"""
task = await self.storage.update_task(task_id, state, new_artifacts, new_messages)

final = state in ('completed', 'failed', 'canceled')

# For non-final updates, publish status first
if not final:
await self.broker.send_stream_event(
task_id,
TaskStatusUpdateEvent(
kind='status-update',
task_id=task_id,
context_id=task['context_id'],
status=task['status'],
final=False,
),
)

# Publish message events before final status so subscribers receive them
if new_messages:
for message in new_messages:
await self.broker.send_stream_event(task_id, message)

# Publish artifact events
if new_artifacts:
for artifact in new_artifacts:
await self.broker.send_stream_event(
task_id,
TaskArtifactUpdateEvent(
kind='artifact-update',
task_id=task_id,
context_id=task['context_id'],
artifact=artifact,
),
)

# For final updates, publish status last (after messages and artifacts)
if final:
await self.broker.send_stream_event(
task_id,
TaskStatusUpdateEvent(
kind='status-update',
task_id=task_id,
context_id=task['context_id'],
status=task['status'],
final=True,
),
)

@abstractmethod
async def run_task(self, params: TaskSendParams) -> None: ...

Expand Down
Loading
Loading