diff --git a/src/a2a/server/tasks/base_push_notification_sender.py b/src/a2a/server/tasks/base_push_notification_sender.py index 087d2973..73f313ee 100644 --- a/src/a2a/server/tasks/base_push_notification_sender.py +++ b/src/a2a/server/tasks/base_push_notification_sender.py @@ -52,13 +52,26 @@ async def _dispatch_notification( ) -> bool: url = push_info.url try: - headers = None + headers = {} if push_info.token: - headers = {'X-A2A-Notification-Token': push_info.token} + headers['X-A2A-Notification-Token'] = push_info.token + + # Add authentication header if configured + if push_info.authentication and push_info.authentication.schemes: + for scheme in push_info.authentication.schemes: + if ( + scheme.lower() == 'bearer' + and push_info.authentication.credentials + ): + headers['Authorization'] = ( + f'Bearer {push_info.authentication.credentials}' + ) + break + response = await self._client.post( url, json=task.model_dump(mode='json', exclude_none=True), - headers=headers, + headers=headers if headers else None, ) response.raise_for_status() logger.info( diff --git a/tests/server/tasks/test_push_notification_sender.py b/tests/server/tasks/test_push_notification_sender.py index a3272c2c..f4179f48 100644 --- a/tests/server/tasks/test_push_notification_sender.py +++ b/tests/server/tasks/test_push_notification_sender.py @@ -8,6 +8,7 @@ BasePushNotificationSender, ) from a2a.types import ( + PushNotificationAuthenticationInfo, PushNotificationConfig, Task, TaskState, @@ -29,8 +30,11 @@ def create_sample_push_config( url: str = 'http://example.com/callback', config_id: str = 'cfg1', token: str | None = None, + authentication: PushNotificationAuthenticationInfo | None = None, ) -> PushNotificationConfig: - return PushNotificationConfig(id=config_id, url=url, token=token) + return PushNotificationConfig( + id=config_id, url=url, token=token, authentication=authentication + ) class TestBasePushNotificationSender(unittest.IsolatedAsyncioTestCase): @@ -92,6 +96,90 @@ async def test_send_notification_with_token_success(self) -> None: ) mock_response.raise_for_status.assert_called_once() + async def test_send_notification_with_bearer_authentication(self) -> None: + task_id = 'task_send_bearer_auth' + task_data = create_sample_task(task_id=task_id) + auth_info = PushNotificationAuthenticationInfo( + schemes=['Bearer'], credentials='test-jwt-token' + ) + config = create_sample_push_config( + url='http://notify.me/here', + token='unique_token', + authentication=auth_info, + ) + self.mock_config_store.get_info.return_value = [config] + + mock_response = AsyncMock(spec=httpx.Response) + mock_response.status_code = 200 + self.mock_httpx_client.post.return_value = mock_response + + await self.sender.send_notification(task_data) + + self.mock_config_store.get_info.assert_awaited_once_with(task_id) + + # assert httpx_client post method got invoked with right parameters + self.mock_httpx_client.post.assert_awaited_once_with( + config.url, + json=task_data.model_dump(mode='json', exclude_none=True), + headers={ + 'X-A2A-Notification-Token': 'unique_token', + 'Authorization': 'Bearer test-jwt-token', + }, + ) + mock_response.raise_for_status.assert_called_once() + + async def test_send_notification_with_bearer_authentication_no_credentials( + self, + ) -> None: + task_id = 'task_send_bearer_no_creds' + task_data = create_sample_task(task_id=task_id) + auth_info = PushNotificationAuthenticationInfo( + schemes=['Bearer'], credentials=None + ) + config = create_sample_push_config( + url='http://notify.me/here', authentication=auth_info + ) + self.mock_config_store.get_info.return_value = [config] + + mock_response = AsyncMock(spec=httpx.Response) + mock_response.status_code = 200 + self.mock_httpx_client.post.return_value = mock_response + + await self.sender.send_notification(task_data) + + # Should not add Authorization header when credentials are missing + self.mock_httpx_client.post.assert_awaited_once_with( + config.url, + json=task_data.model_dump(mode='json', exclude_none=True), + headers=None, + ) + + async def test_send_notification_with_non_bearer_authentication( + self, + ) -> None: + task_id = 'task_send_non_bearer' + task_data = create_sample_task(task_id=task_id) + auth_info = PushNotificationAuthenticationInfo( + schemes=['Basic'], credentials='user:pass' + ) + config = create_sample_push_config( + url='http://notify.me/here', authentication=auth_info + ) + self.mock_config_store.get_info.return_value = [config] + + mock_response = AsyncMock(spec=httpx.Response) + mock_response.status_code = 200 + self.mock_httpx_client.post.return_value = mock_response + + await self.sender.send_notification(task_data) + + # Should not add Authorization header for non-Bearer schemes + self.mock_httpx_client.post.assert_awaited_once_with( + config.url, + json=task_data.model_dump(mode='json', exclude_none=True), + headers=None, + ) + async def test_send_notification_no_config(self) -> None: task_id = 'task_send_no_config' task_data = create_sample_task(task_id=task_id)