Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/actions/spelling/allow.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ otherurl
postgres
POSTGRES
postgresql
mariadb
protoc
pyi
pypistats
Expand Down
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,20 @@
## [0.3.11](https://github.com/a2aproject/a2a-python/compare/v0.3.10...v0.3.11) (2025-11-07)


### Bug Fixes

* add metadata to send message request ([12b4a1d](https://github.com/a2aproject/a2a-python/commit/12b4a1d565a53794f5b55c8bd1728221c906ed41))

## [0.3.12](https://github.com/a2aproject/a2a-python/compare/v0.3.11...v0.3.12) (2025-11-12)


### Bug Fixes

* **grpc:** Add `extensions` to `Artifact` converters. ([#523](https://github.com/a2aproject/a2a-python/issues/523)) ([c03129b](https://github.com/a2aproject/a2a-python/commit/c03129b99a663ae1f1ae72f20e4ead7807ede941))

## [0.3.11](https://github.com/a2aproject/a2a-python/compare/v0.3.10...v0.3.11) (2025-11-07)


### Bug Fixes

* add metadata to send message request ([12b4a1d](https://github.com/a2aproject/a2a-python/commit/12b4a1d565a53794f5b55c8bd1728221c906ed41))
Expand Down
11 changes: 11 additions & 0 deletions src/a2a/client/base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from a2a.types import (
AgentCard,
GetTaskPushNotificationConfigParams,
ListTasksParams,
ListTasksResult,
Message,
MessageSendConfiguration,
MessageSendParams,
Expand Down Expand Up @@ -157,6 +159,15 @@ async def get_task(
request, context=context, extensions=extensions
)

async def list_tasks(
self,
request: ListTasksParams,
*,
context: ClientCallContext | None = None,
) -> ListTasksResult:
"""Retrieves tasks for an agent."""
return await self._transport.list_tasks(request, context=context)

async def cancel_task(
self,
request: TaskIdParams,
Expand Down
11 changes: 11 additions & 0 deletions src/a2a/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from a2a.types import (
AgentCard,
GetTaskPushNotificationConfigParams,
ListTasksParams,
ListTasksResult,
Message,
PushNotificationConfig,
Task,
Expand Down Expand Up @@ -124,57 +126,66 @@
pairs, or a `Message`. Client will also send these values to any
configured `Consumer`s in the client.
"""
return
yield

@abstractmethod
async def get_task(
self,
request: TaskQueryParams,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> Task:
"""Retrieves the current state and history of a specific task."""

@abstractmethod
async def list_tasks(
self,
request: ListTasksParams,
*,
context: ClientCallContext | None = None,
) -> ListTasksResult:
"""Retrieves tasks for an agent."""

@abstractmethod
async def cancel_task(
self,
request: TaskIdParams,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> Task:
"""Requests the agent to cancel a specific task."""

@abstractmethod
async def set_task_callback(
self,
request: TaskPushNotificationConfig,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> TaskPushNotificationConfig:
"""Sets or updates the push notification configuration for a specific task."""

@abstractmethod
async def get_task_callback(
self,
request: GetTaskPushNotificationConfigParams,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> TaskPushNotificationConfig:
"""Retrieves the push notification configuration for a specific task."""

@abstractmethod
async def resubscribe(
self,
request: TaskIdParams,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> AsyncIterator[ClientEvent]:

Check notice on line 188 in src/a2a/client/client.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/client/transports/base.py (45-104)
"""Resubscribes to a task's event stream."""
return
yield
Expand Down
11 changes: 11 additions & 0 deletions src/a2a/client/transports/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from a2a.types import (
AgentCard,
GetTaskPushNotificationConfigParams,
ListTasksParams,
ListTasksResult,
Message,
MessageSendParams,
Task,
Expand Down Expand Up @@ -40,57 +42,66 @@
Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent
]:
"""Sends a streaming message request to the agent and yields responses as they arrive."""
return
yield

@abstractmethod
async def get_task(
self,
request: TaskQueryParams,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> Task:
"""Retrieves the current state and history of a specific task."""

@abstractmethod
async def list_tasks(
self,
request: ListTasksParams,
*,
context: ClientCallContext | None = None,
) -> ListTasksResult:
"""Retrieves tasks for an agent."""

@abstractmethod
async def cancel_task(
self,
request: TaskIdParams,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> Task:
"""Requests the agent to cancel a specific task."""

@abstractmethod
async def set_task_callback(
self,
request: TaskPushNotificationConfig,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> TaskPushNotificationConfig:
"""Sets or updates the push notification configuration for a specific task."""

@abstractmethod
async def get_task_callback(
self,
request: GetTaskPushNotificationConfigParams,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> TaskPushNotificationConfig:
"""Retrieves the push notification configuration for a specific task."""

@abstractmethod
async def resubscribe(
self,
request: TaskIdParams,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> AsyncGenerator[

Check notice on line 104 in src/a2a/client/transports/base.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/client/client.py (129-188)
Task | Message | TaskStatusUpdateEvent | TaskArtifactUpdateEvent
]:
"""Reconnects to get task updates."""
Expand Down
17 changes: 17 additions & 0 deletions src/a2a/client/transports/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from collections.abc import AsyncGenerator, Callable

from a2a.utils.constants import DEFAULT_LIST_TASKS_PAGE_SIZE


try:
import grpc
Expand All @@ -22,6 +24,8 @@
from a2a.types import (
AgentCard,
GetTaskPushNotificationConfigParams,
ListTasksParams,
ListTasksResult,
Message,
MessageSendParams,
Task,
Expand Down Expand Up @@ -168,15 +172,28 @@
)
return proto_utils.FromProto.task(task)

async def list_tasks(
self,
request: ListTasksParams,
*,
context: ClientCallContext | None = None,
) -> ListTasksResult:
"""Retrieves tasks for an agent."""
response = await self.stub.ListTasks(
proto_utils.ToProto.list_tasks_request(request)
)
page_size = request.page_size or DEFAULT_LIST_TASKS_PAGE_SIZE
return proto_utils.FromProto.list_tasks_result(response, page_size)

async def cancel_task(
self,
request: TaskIdParams,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> Task:
"""Requests the agent to cancel a specific task."""
task = await self.stub.CancelTask(

Check notice on line 196 in src/a2a/client/transports/grpc.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/client/transports/rest.py (267-279)
a2a_pb2.CancelTaskRequest(name=f'tasks/{request.id}'),
metadata=self._get_grpc_metadata(extensions),
)
Expand Down
24 changes: 24 additions & 0 deletions src/a2a/client/transports/jsonrpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
GetTaskRequest,
GetTaskResponse,
JSONRPCErrorResponse,
ListTasksParams,
ListTasksRequest,
ListTasksResponse,
ListTasksResult,
Message,
MessageSendParams,
SendMessageRequest,
Expand Down Expand Up @@ -242,6 +246,26 @@ async def get_task(
raise A2AClientJSONRPCError(response.root)
return response.root.result

async def list_tasks(
self,
request: ListTasksParams,
*,
context: ClientCallContext | None = None,
) -> ListTasksResult:
"""Retrieves tasks for an agent."""
rpc_request = ListTasksRequest(params=request, id=str(uuid4()))
payload, modified_kwargs = await self._apply_interceptors(
'tasks/list',
rpc_request.model_dump(mode='json', exclude_none=True),
self._get_http_args(context),
context,
)
response_data = await self._send_request(payload, modified_kwargs)
response = ListTasksResponse.model_validate(response_data)
if isinstance(response.root, JSONRPCErrorResponse):
raise A2AClientJSONRPCError(response.root)
return response.root.result

async def cancel_task(
self,
request: TaskIdParams,
Expand Down
44 changes: 44 additions & 0 deletions src/a2a/client/transports/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from google.protobuf.json_format import MessageToDict, Parse, ParseDict
from httpx_sse import SSEError, aconnect_sse
from pydantic import BaseModel

from a2a.client.card_resolver import A2ACardResolver
from a2a.client.errors import A2AClientHTTPError, A2AClientJSONError
Expand All @@ -18,6 +19,8 @@
from a2a.types import (
AgentCard,
GetTaskPushNotificationConfigParams,
ListTasksParams,
ListTasksResult,
Message,
MessageSendParams,
Task,
Expand All @@ -28,6 +31,7 @@
TaskStatusUpdateEvent,
)
from a2a.utils import proto_utils
from a2a.utils.constants import DEFAULT_LIST_TASKS_PAGE_SIZE
from a2a.utils.telemetry import SpanKind, trace_class


Expand Down Expand Up @@ -159,98 +163,120 @@
yield proto_utils.FromProto.stream_response(event)
except httpx.HTTPStatusError as e:
raise A2AClientHTTPError(e.response.status_code, str(e)) from e
except SSEError as e:
raise A2AClientHTTPError(
400, f'Invalid SSE response or protocol error: {e}'
) from e
except json.JSONDecodeError as e:
raise A2AClientJSONError(str(e)) from e
except httpx.RequestError as e:
raise A2AClientHTTPError(
503, f'Network communication error: {e}'
) from e

async def _send_request(self, request: httpx.Request) -> dict[str, Any]:
try:
response = await self.httpx_client.send(request)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
raise A2AClientHTTPError(e.response.status_code, str(e)) from e
except json.JSONDecodeError as e:
raise A2AClientJSONError(str(e)) from e
except httpx.RequestError as e:
raise A2AClientHTTPError(
503, f'Network communication error: {e}'
) from e

async def _send_post_request(
self,
target: str,
rpc_request_payload: dict[str, Any],
http_kwargs: dict[str, Any] | None = None,
) -> dict[str, Any]:
return await self._send_request(
self.httpx_client.build_request(
'POST',
f'{self.url}{target}',
json=rpc_request_payload,
**(http_kwargs or {}),
)
)

async def _send_get_request(
self,
target: str,
query_params: dict[str, str],
http_kwargs: dict[str, Any] | None = None,
) -> dict[str, Any]:
return await self._send_request(
self.httpx_client.build_request(
'GET',
f'{self.url}{target}',
params=query_params,
**(http_kwargs or {}),
)
)

async def get_task(
self,
request: TaskQueryParams,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> Task:
"""Retrieves the current state and history of a specific task."""
modified_kwargs = update_extension_header(
self._get_http_args(context),
extensions if extensions is not None else self.extensions,
)
_payload, modified_kwargs = await self._apply_interceptors(
request.model_dump(mode='json', exclude_none=True),
modified_kwargs,
context,
)
response_data = await self._send_get_request(
f'/v1/tasks/{request.id}',
{'historyLength': str(request.history_length)}
if request.history_length is not None
else {},
modified_kwargs,
)
task = a2a_pb2.Task()
ParseDict(response_data, task)
return proto_utils.FromProto.task(task)

async def list_tasks(
self,
request: ListTasksParams,
*,
context: ClientCallContext | None = None,
) -> ListTasksResult:
"""Retrieves tasks for an agent."""
_, modified_kwargs = await self._apply_interceptors(
request.model_dump(mode='json', exclude_none=True),
self._get_http_args(context),
context,
)
response_data = await self._send_get_request(
'/v1/tasks',
_model_to_query_params(request),
modified_kwargs,
)
response = a2a_pb2.ListTasksResponse()
ParseDict(response_data, response)
page_size = request.page_size or DEFAULT_LIST_TASKS_PAGE_SIZE
return proto_utils.FromProto.list_tasks_result(response, page_size)

async def cancel_task(
self,
request: TaskIdParams,
*,
context: ClientCallContext | None = None,
extensions: list[str] | None = None,
) -> Task:
"""Requests the agent to cancel a specific task."""
pb = a2a_pb2.CancelTaskRequest(name=f'tasks/{request.id}')

Check notice on line 279 in src/a2a/client/transports/rest.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/client/transports/grpc.py (184-196)
payload = MessageToDict(pb)
modified_kwargs = update_extension_header(
self._get_http_args(context),
Expand Down Expand Up @@ -413,3 +439,21 @@
async def close(self) -> None:
"""Closes the httpx client."""
await self.httpx_client.aclose()


def _model_to_query_params(instance: BaseModel) -> dict[str, str]:
data = instance.model_dump(mode='json', exclude_none=True)
return _json_to_query_params(data)


def _json_to_query_params(data: dict[str, Any]) -> dict[str, str]:
query_dict = {}
for key, value in data.items():
if isinstance(value, list):
query_dict[key] = ','.join(map(str, value))
elif isinstance(value, bool):
query_dict[key] = str(value).lower()
else:
query_dict[key] = str(value)

return query_dict
Loading
Loading