From 4e6100b2a3ab2bad26a818a2ef97ae22e28d55cc Mon Sep 17 00:00:00 2001 From: Mark Date: Sat, 6 Dec 2025 12:46:59 -0800 Subject: [PATCH 1/2] Restore consolidated final responses for non-streaming --- .../python/src/ag_ui_adk/event_translator.py | 51 ++++++---------- .../test_event_translator_comprehensive.py | 59 ++++++++++++++++++- 2 files changed, 73 insertions(+), 37 deletions(-) diff --git a/integrations/adk-middleware/python/src/ag_ui_adk/event_translator.py b/integrations/adk-middleware/python/src/ag_ui_adk/event_translator.py index d0b390868..16e75de96 100644 --- a/integrations/adk-middleware/python/src/ag_ui_adk/event_translator.py +++ b/integrations/adk-middleware/python/src/ag_ui_adk/event_translator.py @@ -289,7 +289,7 @@ async def _translate_text_content( # Case 1: A stream is actively running. We must close it. if self._is_streaming and self._streaming_message_id: logger.info("⏭️ Final response event received. Closing active stream.") - + if self._current_stream_text: # Save the complete streamed text for de-duplication self._last_streamed_text = self._current_stream_text @@ -307,10 +307,7 @@ async def _translate_text_content( logger.info("🏁 Streaming completed via final response") return # We are done. - # Case 2: No stream is active. - # This event contains the *entire* message. - # We must send it, *unless* it's a duplicate of a stream that *just* finished. - + # Case 2: No stream is active. # Check for duplicates from a *previous* stream in this *same run*. is_duplicate = ( self._last_streamed_run_id == run_id and @@ -322,35 +319,21 @@ async def _translate_text_content( logger.info( "⏭️ Skipping final response event (duplicate content detected from finished stream)" ) - else: - # Not a duplicate, or no previous stream. Send the full message. - logger.info( - f"⏩ Delivering complete non-streamed message or final content event_id={adk_event.id}" - ) - message_events = [ - TextMessageStartEvent( - type=EventType.TEXT_MESSAGE_START, - message_id=adk_event.id, # Use event ID for non-streamed - role="assistant", - ), - TextMessageContentEvent( - type=EventType.TEXT_MESSAGE_CONTENT, - message_id=adk_event.id, - delta=combined_text, - ), - TextMessageEndEvent( - type=EventType.TEXT_MESSAGE_END, - message_id=adk_event.id, - ), - ] - for msg in message_events: - yield msg - - # Clean up state regardless, as this is the end of the line for text. - self._current_stream_text = "" - self._last_streamed_text = None - self._last_streamed_run_id = None - return + # Clean up state as this is still the terminal signal for text. + self._current_stream_text = "" + self._last_streamed_text = None + self._last_streamed_run_id = None + return + + if not combined_text: + logger.info("⏭️ Final response contained no text; nothing to emit") + self._current_stream_text = "" + self._last_streamed_text = None + self._last_streamed_run_id = None + return + + # Fall through to the normal emission path to send the consolidated + # START/CONTENT/END trio for non-streaming final responses. # Early return for empty text (non-final responses only). # Final responses with empty text are handled above to close active streams. diff --git a/integrations/adk-middleware/python/tests/test_event_translator_comprehensive.py b/integrations/adk-middleware/python/tests/test_event_translator_comprehensive.py index 8813e00df..84d05f5c2 100644 --- a/integrations/adk-middleware/python/tests/test_event_translator_comprehensive.py +++ b/integrations/adk-middleware/python/tests/test_event_translator_comprehensive.py @@ -391,11 +391,16 @@ async def test_translate_text_content_final_response_no_streaming(self, translat async for event in translator.translate(mock_adk_event_with_content, "thread_1", "run_1"): events.append(event) - assert len(events) == 3 # START, CONTENT, END for first final payload + assert len(events) == 3 # START, CONTENT, END assert isinstance(events[0], TextMessageStartEvent) assert isinstance(events[1], TextMessageContentEvent) assert isinstance(events[2], TextMessageEndEvent) + # Final response without streaming should capture the last streamed text for de-dupe + assert translator._current_stream_text == "" + assert translator._last_streamed_text == "Test content" + assert translator._last_streamed_run_id == "run_1" + @pytest.mark.asyncio async def test_translate_text_content_final_response_from_agent_callback(self, translator, mock_adk_event_with_content): """Test final response when it was received from an agent callback function.""" @@ -409,10 +414,10 @@ async def test_translate_text_content_final_response_from_agent_callback(self, t async for event in translator.translate(mock_adk_event_with_content, "thread_1", "run_1"): events.append(event) - assert len(events) == 3 # START, CONTENT , END + assert len(events) == 3 # START, CONTENT, END assert isinstance(events[0], TextMessageStartEvent) assert isinstance(events[1], TextMessageContentEvent) - assert events[1].delta == mock_adk_event_with_content.content.parts[0].text + assert events[1].delta == "Test content" assert isinstance(events[2], TextMessageEndEvent) @pytest.mark.asyncio @@ -476,6 +481,54 @@ async def test_translate_text_content_final_response_after_stream_duplicate_supp assert events == [] # duplicate suppressed + @pytest.mark.asyncio + async def test_translate_text_content_final_response_closes_stream_without_consolidated_text(self, translator): + """Final response with consolidated text should only close the open stream.""" + + # Stream some content first + stream_event = MagicMock(spec=ADKEvent) + stream_event.id = "event-1" + stream_event.author = "model" + stream_event.content = MagicMock() + stream_part = MagicMock() + stream_part.text = "Streaming chunk" + stream_event.content.parts = [stream_part] + stream_event.partial = False + stream_event.turn_complete = False + stream_event.is_final_response = False + stream_event.usage_metadata = {"tokens": 1} + + async for _ in translator.translate(stream_event, "thread_1", "run_1"): + pass + + streaming_message_id = translator._streaming_message_id + + # Now receive a final response that includes the consolidated text + final_event = MagicMock(spec=ADKEvent) + final_event.id = "event-2" + final_event.author = "model" + final_event.content = MagicMock() + final_part = MagicMock() + final_part.text = "Streaming chunk" + final_event.content.parts = [final_part] + final_event.partial = False + final_event.turn_complete = True + final_event.is_final_response = True + final_event.usage_metadata = {"tokens": 2} + + events = [] + async for event in translator.translate(final_event, "thread_1", "run_1"): + events.append(event) + + # Only the END event should be emitted to close the active stream + assert events == [ + TextMessageEndEvent(type=EventType.TEXT_MESSAGE_END, message_id=streaming_message_id) + ] + assert translator._is_streaming is False + assert translator._current_stream_text == "" + assert translator._last_streamed_text == "Streaming chunk" + assert translator._last_streamed_run_id == "run_1" + @pytest.mark.asyncio async def test_translate_text_content_final_response_after_stream_new_content(self, translator): """Final LLM payload with new content should be emitted.""" From fa16b5396ef90be6a1d1b3a6851a8a39c3161e1c Mon Sep 17 00:00:00 2001 From: Mark Fogle Date: Sat, 6 Dec 2025 14:22:49 -0800 Subject: [PATCH 2/2] Bumping version number. --- integrations/adk-middleware/python/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/adk-middleware/python/pyproject.toml b/integrations/adk-middleware/python/pyproject.toml index 88a25dc40..8a0d7de9f 100644 --- a/integrations/adk-middleware/python/pyproject.toml +++ b/integrations/adk-middleware/python/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "ag_ui_adk" -version = "0.3.5" +version = "0.3.6" readme = "README.md" authors = [ { name = "Mark Fogle", email = "mark@contextable.com" }