-
Notifications
You must be signed in to change notification settings - Fork 124
Description
Background
I have a @function_tool that runs a sub-agent and returns structured Feedback.
@function_tool
async def feedback_formatter_tool(ctx: RunContextWrapper[TownHallContext], conversation: str) -> Feedback:
result = await Runner.run(
starting_agent=feedback_formatter_agent,
input=conversation,
context=ctx.context
)
feedback = result.final_output
# Persist directly — session_id was available via ctx.context
db_feedback = await save_feedback(feedback, session_id=ctx.context.session_id)
return feedbackThis worked cleanly because session_id was passed into context before the run, and the tool had everything it needed.
The Problem with Moving to ChatKit
Before ChatKit, I used the SDK's built-in SQLAlchemySession for conversation history:
session = SQLAlchemySession.from_url(
session_id,
url=DATABASE_URL,
create_tables=True,
)
result = Runner.run_streamed(
current_agent,
user_input,
session=session,
context=context,
)With ChatKit, conversation history is now managed by ChatKitStore (which I've implemented using SQLAlchemy). This means I can no longer pass a session to its Runner.run_streamed, and more importantly, the session_id (i.e., thread.id) is only available inside ChatKitServer.respond(), not inside the function tool.
My Current Workaround
I moved DB persistence out of the function tool and into the server layer, after the agent run completes:
class TownHallChatKitServer(ChatKitServer):
async def respond(self, thread, input_user_message, context):
town_hall_context = TownHallContext(session_id=thread.id)
result = Runner.run_streamed(dialogue_agent, input_items, context=town_hall_context)
async for event in stream_agent_response(agent_context, result):
yield event
# Persist structured outputs after the full agent run
await self._persist_structured_outputs(town_hall_context, thread.id)
async def _persist_structured_outputs(self, ctx, session_id):
if ctx.feedback_processed and ctx.feedback is not None:
await save_feedback(ctx.feedback, session_id=session_id)The function tool now only stores the output in context:
@function_tool
async def feedback_formatter_tool(ctx: RunContextWrapper[TownHallContext], conversation: str) -> Feedback:
result = await Runner.run(
starting_agent=feedback_formatter_agent,
input=conversation,
context=ctx.context
)
feedback = result.final_output
# Store in context instead
ctx.context.feedback = feedback
ctx.context.feedback_processed = True
return feedbackQuestion
Is this the intended/recommended pattern for persisting structured agent outputs when using ChatKit? Specifically:
-
Is deferring DB writes to
ChatKitServer.respond()(afterstream_agent_responsecompletes) the right approach, or is there a better hook for this? -
My concern with this approach is that if multiple structured outputs need to be persisted, the ChatKitServer could start to look bloated with persistence logic.
Is there a recommended pattern for organizing this responsibility (for example a persistence layer, service, or store abstraction)?
Any guidance on the recommended architecture here would be appreciated.