Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion integrations/adk-middleware/python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "ag_ui_adk"
version = "0.3.5"
version = "0.3.6"
readme = "README.md"
authors = [
{ name = "Mark Fogle", email = "mark@contextable.com" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ async def _translate_text_content(
# Case 1: A stream is actively running. We must close it.
if self._is_streaming and self._streaming_message_id:
logger.info("⏭️ Final response event received. Closing active stream.")

if self._current_stream_text:
# Save the complete streamed text for de-duplication
self._last_streamed_text = self._current_stream_text
Expand All @@ -307,10 +307,7 @@ async def _translate_text_content(
logger.info("🏁 Streaming completed via final response")
return # We are done.

# Case 2: No stream is active.
# This event contains the *entire* message.
# We must send it, *unless* it's a duplicate of a stream that *just* finished.

# Case 2: No stream is active.
# Check for duplicates from a *previous* stream in this *same run*.
is_duplicate = (
self._last_streamed_run_id == run_id and
Expand All @@ -322,35 +319,21 @@ async def _translate_text_content(
logger.info(
"⏭️ Skipping final response event (duplicate content detected from finished stream)"
)
else:
# Not a duplicate, or no previous stream. Send the full message.
logger.info(
f"⏩ Delivering complete non-streamed message or final content event_id={adk_event.id}"
)
message_events = [
TextMessageStartEvent(
type=EventType.TEXT_MESSAGE_START,
message_id=adk_event.id, # Use event ID for non-streamed
role="assistant",
),
TextMessageContentEvent(
type=EventType.TEXT_MESSAGE_CONTENT,
message_id=adk_event.id,
delta=combined_text,
),
TextMessageEndEvent(
type=EventType.TEXT_MESSAGE_END,
message_id=adk_event.id,
),
]
for msg in message_events:
yield msg

# Clean up state regardless, as this is the end of the line for text.
self._current_stream_text = ""
self._last_streamed_text = None
self._last_streamed_run_id = None
return
# Clean up state as this is still the terminal signal for text.
self._current_stream_text = ""
self._last_streamed_text = None
self._last_streamed_run_id = None
return

if not combined_text:
logger.info("⏭️ Final response contained no text; nothing to emit")
self._current_stream_text = ""
self._last_streamed_text = None
self._last_streamed_run_id = None
return

# Fall through to the normal emission path to send the consolidated
# START/CONTENT/END trio for non-streaming final responses.

# Early return for empty text (non-final responses only).
# Final responses with empty text are handled above to close active streams.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,11 +391,16 @@ async def test_translate_text_content_final_response_no_streaming(self, translat
async for event in translator.translate(mock_adk_event_with_content, "thread_1", "run_1"):
events.append(event)

assert len(events) == 3 # START, CONTENT, END for first final payload
assert len(events) == 3 # START, CONTENT, END
assert isinstance(events[0], TextMessageStartEvent)
assert isinstance(events[1], TextMessageContentEvent)
assert isinstance(events[2], TextMessageEndEvent)

# Final response without streaming should capture the last streamed text for de-dupe
assert translator._current_stream_text == ""
assert translator._last_streamed_text == "Test content"
assert translator._last_streamed_run_id == "run_1"

@pytest.mark.asyncio
async def test_translate_text_content_final_response_from_agent_callback(self, translator, mock_adk_event_with_content):
"""Test final response when it was received from an agent callback function."""
Expand All @@ -409,10 +414,10 @@ async def test_translate_text_content_final_response_from_agent_callback(self, t
async for event in translator.translate(mock_adk_event_with_content, "thread_1", "run_1"):
events.append(event)

assert len(events) == 3 # START, CONTENT , END
assert len(events) == 3 # START, CONTENT, END
assert isinstance(events[0], TextMessageStartEvent)
assert isinstance(events[1], TextMessageContentEvent)
assert events[1].delta == mock_adk_event_with_content.content.parts[0].text
assert events[1].delta == "Test content"
assert isinstance(events[2], TextMessageEndEvent)

@pytest.mark.asyncio
Expand Down Expand Up @@ -476,6 +481,54 @@ async def test_translate_text_content_final_response_after_stream_duplicate_supp

assert events == [] # duplicate suppressed

@pytest.mark.asyncio
async def test_translate_text_content_final_response_closes_stream_without_consolidated_text(self, translator):
"""Final response with consolidated text should only close the open stream."""

# Stream some content first
stream_event = MagicMock(spec=ADKEvent)
stream_event.id = "event-1"
stream_event.author = "model"
stream_event.content = MagicMock()
stream_part = MagicMock()
stream_part.text = "Streaming chunk"
stream_event.content.parts = [stream_part]
stream_event.partial = False
stream_event.turn_complete = False
stream_event.is_final_response = False
stream_event.usage_metadata = {"tokens": 1}

async for _ in translator.translate(stream_event, "thread_1", "run_1"):
pass

streaming_message_id = translator._streaming_message_id

# Now receive a final response that includes the consolidated text
final_event = MagicMock(spec=ADKEvent)
final_event.id = "event-2"
final_event.author = "model"
final_event.content = MagicMock()
final_part = MagicMock()
final_part.text = "Streaming chunk"
final_event.content.parts = [final_part]
final_event.partial = False
final_event.turn_complete = True
final_event.is_final_response = True
final_event.usage_metadata = {"tokens": 2}

events = []
async for event in translator.translate(final_event, "thread_1", "run_1"):
events.append(event)

# Only the END event should be emitted to close the active stream
assert events == [
TextMessageEndEvent(type=EventType.TEXT_MESSAGE_END, message_id=streaming_message_id)
]
assert translator._is_streaming is False
assert translator._current_stream_text == ""
assert translator._last_streamed_text == "Streaming chunk"
assert translator._last_streamed_run_id == "run_1"

@pytest.mark.asyncio
async def test_translate_text_content_final_response_after_stream_new_content(self, translator):
"""Final LLM payload with new content should be emitted."""
Expand Down
Loading