-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathparallel_agents.py
More file actions
101 lines (78 loc) · 3.37 KB
/
parallel_agents.py
File metadata and controls
101 lines (78 loc) · 3.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
"""Parallel sub-agents in LangGraph for concurrent research.
Fans out to three specialist agents (KB, Web, Policy), aggregates results,
and synthesizes a single customer-facing response.
"""
import operator
from typing import Annotated, TypedDict
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.graph import END, START, StateGraph
from langgraph.types import RetryPolicy
from langsmith import traceable, tracing_context
llm = ChatAnthropic(model="claude-sonnet-4-5-20250929", temperature=0)
class State(TypedDict):
question: str
research_results: Annotated[list[dict], operator.add]
final_response: str
def make_agent(name: str, focus: str):
"""Factory that builds a traceable research sub-agent."""
@traceable(name=name, run_type="chain")
def node(state: State) -> dict:
response = llm.invoke([
SystemMessage(
content=f"You are the {name} agent. Focus on {focus}. "
"Return a concise summary. Cite your source type."
),
HumanMessage(content=f"Research query: {state['question']}"),
])
return {"research_results": [{"source": name, "content": response.content}]}
return node
kb_agent = make_agent("knowledge_base", "internal knowledge base searches.")
web_agent = make_agent("web_search", "recent news and industry trends.")
policy_agent = make_agent("policy", "compliance, legal, and regulatory frameworks.")
@traceable(name="Synthesizer", run_type="chain")
def synthesize(state: State) -> dict:
"""Merge sub-agent outputs into one response. Policy takes precedence."""
context = "\n\n".join(
f"[{r['source']}]: {r['content']}" for r in state["research_results"]
)
response = llm.invoke([
SystemMessage(
content="Synthesize the following research into a clear, actionable "
"response. When policy information conflicts with or constrains "
"other responses, the policy statement takes precedence. "
"Never soften or omit policy restrictions."
),
HumanMessage(
content=f"Customer question: {state['question']}\n\n"
f"Research findings:\n{context}"
),
])
return {"final_response": response.content}
# --- Graph assembly ---
builder = StateGraph(State)
builder.add_node("kb", kb_agent, retry=RetryPolicy(max_attempts=3))
builder.add_node("web", web_agent, retry=RetryPolicy(max_attempts=3))
builder.add_node("policy", policy_agent, retry=RetryPolicy(max_attempts=3))
builder.add_node("synthesize", synthesize)
builder.add_edge(START, "kb")
builder.add_edge(START, "web")
builder.add_edge(START, "policy")
builder.add_edge(["kb", "web", "policy"], "synthesize")
builder.add_edge("synthesize", END)
graph = builder.compile()
if __name__ == "__main__":
with tracing_context(
metadata={"example": True},
tags=["article-01", "parallel-agents"],
):
result = graph.invoke(
{"question": "How does GDPR affect our data pipeline architecture?"}
)
print("=" * 60)
print("QUESTION:", result["question"])
print("=" * 60)
print(f"\nSources: {len(result['research_results'])}")
for r in result["research_results"]:
print(f" - {r['source']}: {r['content'][:100]}...")
print(f"\nFINAL RESPONSE:\n{result['final_response']}")