-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstreaming_agent.py
More file actions
96 lines (69 loc) · 2.91 KB
/
streaming_agent.py
File metadata and controls
96 lines (69 loc) · 2.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
"""Real-time streaming research agent with LangGraph."""
from typing import TypedDict
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.config import get_stream_writer
from langgraph.graph import END, START, StateGraph
from langsmith import traceable, tracing_context
llm = ChatAnthropic(model="claude-sonnet-4-5-20250929", temperature=0)
class State(TypedDict):
question: str
research: str
answer: str
@traceable(name="research", run_type="chain")
def research(state: State) -> dict:
writer = get_stream_writer()
writer({"step": "research", "status": "starting", "message": "Searching knowledge base..."})
response = llm.invoke([
SystemMessage(
content="You are a research assistant. Search for relevant information "
"about the user's question. Return a concise summary of findings."
),
HumanMessage(content=state["question"]),
])
writer({"step": "research", "status": "complete", "message": "Research complete."})
return {"research": response.content}
@traceable(name="synthesize", run_type="chain")
def synthesize(state: State) -> dict:
writer = get_stream_writer()
writer({"step": "synthesize", "status": "starting", "message": "Synthesizing answer..."})
response = llm.invoke([
SystemMessage(
content="Synthesize the research into a clear, actionable answer. "
"Be concise but thorough."
),
HumanMessage(
content=f"Question: {state['question']}\n\nResearch:\n{state['research']}"
),
])
writer({"step": "synthesize", "status": "complete", "message": "Done."})
return {"answer": response.content}
# Build the graph
builder = StateGraph(State)
builder.add_node("research", research)
builder.add_node("synthesize", synthesize)
builder.add_edge(START, "research")
builder.add_edge("research", "synthesize")
builder.add_edge("synthesize", END)
graph = builder.compile()
def main():
with tracing_context(
metadata={"stream_mode": "multi", "client_type": "cli"},
tags=["development", "streaming", "v1"],
):
for mode, chunk in graph.stream(
{"question": "What are the key differences between REST and GraphQL for mobile APIs?"},
stream_mode=["updates", "custom", "messages"],
):
if mode == "updates":
node_name = list(chunk.keys())[0]
print(f"\n[node] {node_name} completed")
elif mode == "custom":
print(f"[status] {chunk.get('message', chunk)}")
elif mode == "messages":
message_chunk, metadata = chunk
if hasattr(message_chunk, "content") and message_chunk.content:
print(message_chunk.content, end="", flush=True)
print("\n\nStream complete.")
if __name__ == "__main__":
main()