Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions samples/agent/adk/restaurant_finder/a2ui_validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import json
import os
from typing import List, Dict, Any
from jsonschema import validate, ValidationError

SCHEMA_FILE = "/Users/jsimionato/development/a2ui_repos/jewel_case/A2UI/specification/0.8/json/server_to_client_with_standard_catalog.json"

SCHEMAS = {}

def load_schemas():
if SCHEMAS:
return
try:
with open(SCHEMA_FILE, 'r') as f:
main_schema = json.load(f)
SCHEMAS.update(main_schema.get('properties', {}))
if not SCHEMAS:
raise ValueError("No properties found in main schema")
# The actual component schemas are in the definitions, so let's add those too
SCHEMAS.update(main_schema.get('$defs', {}))
except FileNotFoundError:
raise ValueError(f"Schema file not found at {SCHEMA_FILE}")
except json.JSONDecodeError:
raise ValueError(f"Failed to decode JSON from {SCHEMA_FILE}")

def get_schema(name: str):
load_schemas()
if name in SCHEMAS:
return SCHEMAS[name]
raise ValueError(f"Schema {name} not found in {SCHEMA_FILE}")

def validate_a2ui_messages(messages: List[Dict[str, Any]]):
for i, msg in enumerate(messages):
try:
if "beginRendering" in msg:
validate(instance=msg, schema=get_schema("beginRendering"))
elif "surfaceUpdate" in msg:
validate(instance=msg, schema=get_schema("surfaceUpdate"))
elif "dataModelUpdate" in msg:
validate(instance=msg, schema=get_schema("dataModelUpdate"))
elif "deleteSurface" in msg:
validate(instance=msg, schema=get_schema("deleteSurface"))
else:
raise ValidationError(f"Message {i} has no known A2UI message type key")
except ValidationError as e:
print(f"A2UI Validation Error in message {i} ({list(msg.keys())[0]}): {e.message}")
raise
except ValueError as e:
print(f"Schema loading/lookup error: {e}")
raise
232 changes: 88 additions & 144 deletions samples/agent/adk/restaurant_finder/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
import json
import logging
import os
import time
from collections.abc import AsyncIterable
from typing import Any

import instrumentation

import jsonschema
from google.adk.agents.llm_agent import LlmAgent
from google.adk.artifacts import InMemoryArtifactService
Expand All @@ -27,13 +30,13 @@
from google.adk.sessions import InMemorySessionService
from google.genai import types
from prompt_builder import (
A2UI_SCHEMA,
RESTAURANT_UI_EXAMPLES,
get_text_prompt,
get_ui_prompt,
)
from tools import get_restaurants

import ui_schema
import template_renderer
from pydantic import ValidationError
logger = logging.getLogger(__name__)

AGENT_INSTRUCTION = """
Expand All @@ -53,6 +56,33 @@
"""


class InstrumentedLiteLlm(LiteLlm):
async def generate_content_async(self, *args, **kwargs):
logger.info("InstrumentedLiteLlm.generate_content_async called")
start_time = time.time()
try:
try:
async for chunk in super().generate_content_async(*args, **kwargs):
yield chunk
except ValueError as e:
if "No message in response" in str(e):
logger.warning(f"Ignored ValueError from LiteLlm (likely empty stop chunk): {e}")
return
raise e
finally:
duration = (time.time() - start_time) * 1000
instrumentation.track_inference(duration)

def generate_content(self, *args, **kwargs):
logger.info("InstrumentedLiteLlm.generate_content (sync) called")
start_time = time.time()
try:
return super().generate_content(*args, **kwargs)
finally:
duration = (time.time() - start_time) * 1000
instrumentation.track_inference(duration)


class RestaurantAgent:
"""An agent that finds restaurants based on user criteria."""

Expand All @@ -71,44 +101,25 @@ def __init__(self, base_url: str, use_ui: bool = False):
memory_service=InMemoryMemoryService(),
)

# --- MODIFICATION: Wrap the schema ---
# Load the A2UI_SCHEMA string into a Python object for validation
try:
# First, load the schema for a *single message*
single_message_schema = json.loads(A2UI_SCHEMA)

# The prompt instructs the LLM to return a *list* of messages.
# Therefore, our validation schema must be an *array* of the single message schema.
self.a2ui_schema_object = {"type": "array", "items": single_message_schema}
logger.info(
"A2UI_SCHEMA successfully loaded and wrapped in an array validator."
)
except json.JSONDecodeError as e:
logger.error(f"CRITICAL: Failed to parse A2UI_SCHEMA: {e}")
self.a2ui_schema_object = None
# --- END MODIFICATION ---

def get_processing_message(self) -> str:
return "Finding restaurants that match your criteria..."

def _build_agent(self, use_ui: bool) -> LlmAgent:
"""Builds the LLM agent for the restaurant agent."""
LITELLM_MODEL = os.getenv("LITELLM_MODEL", "gemini/gemini-2.5-flash")
LITELLM_MODEL = os.getenv("LITELLM_MODEL", "gemini/gemini-2.5-flash-lite")

if use_ui:
# Construct the full prompt with UI instructions, examples, and schema
instruction = AGENT_INSTRUCTION + get_ui_prompt(
self.base_url, RESTAURANT_UI_EXAMPLES
)
instruction = AGENT_INSTRUCTION + get_ui_prompt()
else:
instruction = get_text_prompt()

return LlmAgent(
model=LiteLlm(model=LITELLM_MODEL),
model=InstrumentedLiteLlm(model=LITELLM_MODEL),
name="restaurant_agent",
description="An agent that finds restaurants and helps book tables.",
instruction=instruction,
tools=[get_restaurants],
)

async def stream(self, query, session_id) -> AsyncIterable[dict[str, Any]]:
Expand All @@ -129,26 +140,11 @@ async def stream(self, query, session_id) -> AsyncIterable[dict[str, Any]]:
elif "base_url" not in session.state:
session.state["base_url"] = self.base_url

# --- Begin: UI Validation and Retry Logic ---
# --- Begin: NEW UI Processing Logic ---
max_retries = 1 # Total 2 attempts
attempt = 0
current_query_text = query

# Ensure schema was loaded
if self.use_ui and self.a2ui_schema_object is None:
logger.error(
"--- RestaurantAgent.stream: A2UI_SCHEMA is not loaded. "
"Cannot perform UI validation. ---"
)
yield {
"is_task_complete": True,
"content": (
"I'm sorry, I'm facing an internal configuration error with my UI components. "
"Please contact support."
),
}
return

while attempt <= max_retries:
attempt += 1
logger.info(
Expand All @@ -166,7 +162,6 @@ async def stream(self, query, session_id) -> AsyncIterable[dict[str, Any]]:
session_id=session.id,
new_message=current_message,
):
logger.info(f"Event from runner: {event}")
if event.is_final_response():
if (
event.content
Expand All @@ -176,128 +171,77 @@ async def stream(self, query, session_id) -> AsyncIterable[dict[str, Any]]:
final_response_content = "\n".join(
[p.text for p in event.content.parts if p.text]
)
break # Got the final response, stop consuming events
break
else:
logger.info(f"Intermediate event: {event}")
# Yield intermediate updates on every attempt
yield {
"is_task_complete": False,
"updates": self.get_processing_message(),
}

if final_response_content is None:
logger.warning(
f"--- RestaurantAgent.stream: Received no final response content from runner "
f"(Attempt {attempt}). ---"
f"--- RestaurantAgent.stream: No final response (Attempt {attempt}). ---"
)
if attempt <= max_retries:
current_query_text = (
"I received no response. Please try again."
f"Please retry the original request: '{query}'"
)
continue # Go to next retry
current_query_text = f"No response. Please retry: '{query}'"
continue
else:
# Retries exhausted on no-response
final_response_content = "I'm sorry, I encountered an error and couldn't process your request."
# Fall through to send this as a text-only error
final_response_content = "I'm sorry, I encountered an error."

is_valid = False
a2ui_messages = None
error_message = ""
text_part = final_response_content

if self.use_ui:
logger.info(
f"--- RestaurantAgent.stream: Validating UI response (Attempt {attempt})... ---"
)
logger.info(f"--- Validating UI response (Attempt {attempt})... ---")
try:
if "---a2ui_JSON---" not in final_response_content:
raise ValueError("Delimiter '---a2ui_JSON---' not found.")
if "```a2ui" not in final_response_content:
raise ValueError("A2UI block not found.")

text_part, json_string = final_response_content.split(
"---a2ui_JSON---", 1
)
parts = final_response_content.split("```a2ui", 1)
text_part = parts[0]
json_string = parts[1].split("```", 1)[0]

if not json_string.strip():
raise ValueError("JSON part is empty.")

json_string_cleaned = (
json_string.strip().lstrip("```json").rstrip("```").strip()
)

if not json_string_cleaned:
raise ValueError("Cleaned JSON string is empty.")

# --- New Validation Steps ---
# 1. Check if it's parsable JSON
parsed_json_data = json.loads(json_string_cleaned)

# 2. Check if it validates against the A2UI_SCHEMA
# This will raise jsonschema.exceptions.ValidationError if it fails
logger.info(
"--- RestaurantAgent.stream: Validating against A2UI_SCHEMA... ---"
)
jsonschema.validate(
instance=parsed_json_data, schema=self.a2ui_schema_object
)
# --- End New Validation Steps ---

logger.info(
f"--- RestaurantAgent.stream: UI JSON successfully parsed AND validated against schema. "
f"Validation OK (Attempt {attempt}). ---"
)
is_valid = True

except (
ValueError,
json.JSONDecodeError,
jsonschema.exceptions.ValidationError,
) as e:
logger.warning(
f"--- RestaurantAgent.stream: A2UI validation failed: {e} (Attempt {attempt}) ---"
)
logger.warning(
f"--- Failed response content: {final_response_content[:500]}... ---"
)
error_message = f"Validation failed: {e}."

else: # Not using UI, so text is always "valid"
is_valid = True

if is_valid:
logger.info(
f"--- RestaurantAgent.stream: Response is valid. Sending final response (Attempt {attempt}). ---"
raise ValueError("A2UI JSON part is empty.")

parsed_llm_output = json.loads(json_string)
llm_output = ui_schema.LLMOutput(**parsed_llm_output)

a2ui_messages = template_renderer.render_ui(llm_output, self.base_url)
logger.info(f"--- UI content generated successfully (Attempt {attempt}). ---")

except (ValueError, json.JSONDecodeError, ValidationError) as e:
logger.warning(f"--- A2UI output processing failed: {e} (Attempt {attempt}) ---")
logger.warning(f"--- Failed content: {final_response_content[:500]}... ---")
error_message = f"Output format error: {e}."
a2ui_messages = None

if a2ui_messages is not None:
# Combine text part and A2UI messages for the final response
a2ui_json_string = json.dumps(a2ui_messages)
final_output = f"{text_part.strip()}\n---a2ui_JSON---{a2ui_json_string}"
logger.info(f"--- Sending final response with UI (Attempt {attempt}). ---")
yield {"is_task_complete": True, "content": final_output}
return
elif not self.use_ui:
logger.info(f"--- Sending text only response (Attempt {attempt}). ---")
yield {"is_task_complete": True, "content": text_part}
return

# --- If we're here, UI generation failed ---
if attempt <= max_retries:
logger.warning(f"--- Retrying... ({attempt}/{max_retries + 1}) ---")
current_query_text = (
f"Your previous response had an issue: {error_message} "
"You MUST produce a JSON block between ```a2ui and ``` "
f"that conforms to the LLMOutput schema. Retry for original query: '{query}'"
)
logger.info(f"Final response: {final_response_content}")
else:
logger.error("--- Max retries exhausted. Sending text-only error. ---")
yield {
"is_task_complete": True,
"content": final_response_content,
"content": text_part + "\n\nI'm having trouble generating the interface right now.",
}
return # We're done, exit the generator

# --- If we're here, it means validation failed ---

if attempt <= max_retries:
logger.warning(
f"--- RestaurantAgent.stream: Retrying... ({attempt}/{max_retries + 1}) ---"
)
# Prepare the query for the retry
current_query_text = (
f"Your previous response was invalid. {error_message} "
"You MUST generate a valid response that strictly follows the A2UI JSON SCHEMA. "
"The response MUST be a JSON list of A2UI messages. "
"Ensure the response is split by '---a2ui_JSON---' and the JSON part is well-formed. "
f"Please retry the original request: '{query}'"
)
# Loop continues...

# --- If we're here, it means we've exhausted retries ---
logger.error(
"--- RestaurantAgent.stream: Max retries exhausted. Sending text-only error. ---"
)
yield {
"is_task_complete": True,
"content": (
"I'm sorry, I'm having trouble generating the interface for that request right now. "
"Please try again in a moment."
),
}
# --- End: UI Validation and Retry Logic ---
return
# --- End: NEW UI Processing Logic ---
Loading