diff --git a/README.md b/README.md index 87c78f2..af841e7 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,49 @@ Real-world agents you can clone and run. Most recent first. ### 2026 -#### [A2A Real Estate Multi-Agent — AgentCore Edition](examples/a2a-realestate-agentcore/) `NEW` +#### [Deep Research Agent — LangGraph + BMasterAI Telemetry](examples/deep-research-agent/) `NEW` +*March 2026* + +A multi-step web research agent built with **LangGraph** and fully instrumented with **BMasterAI** logging and telemetry. Inspired by [langchain-ai/deepagents](https://github.com/langchain-ai/deepagents). Give it any research question and it plans, searches, analyzes, reflects on quality, and synthesizes a structured report — automatically looping back for more research if gaps are found. + +**Stack:** LangGraph, Claude (Anthropic), Tavily, BMasterAI + +**What it demonstrates:** +- Multi-node LangGraph pipeline with a conditional reflection loop (planner → search → analyze → reflect → synthesize) +- Quality-gated research: reflector scores findings 1–10, loops back for follow-up searches when score < 7 (max 2 loops) +- BMasterAI on every step: `track_agent_start/stop`, `track_llm_call`, `track_task_duration`, `log_event(TOOL_USE)`, `log_reasoning_chain`, `log_event(DECISION_POINT)` +- Structured JSONL telemetry at `logs/research.jsonl` — pipe to any analytics tool + +```bash +pip install -r requirements.txt +cp .env.example .env # add ANTHROPIC_API_KEY + TAVILY_API_KEY +python main.py "What is the current state of multi-agent AI systems in 2026?" +``` + +--- + +#### [Viral YouTube Short Generator — LangGraph](examples/langgraph-viral-youtube/) +*March 2026* + +A four-agent LangGraph pipeline that researches trending topics and generates complete viral YouTube Short production packages — title, hook, 45-60 second script, tags, and thumbnail concept — with a quality gate that retries automatically if the output doesn't meet bar. + +**Stack:** LangGraph, Claude (Anthropic), Tavily, BMasterAI + +**What it demonstrates:** +- Four specialist agents in sequence: Trend Researcher → Hook Writer → Script Writer → Title & Tags +- Quality gate node with automatic retry (max 2 loops) using LangGraph conditional edges +- BMasterAI structured logging on every agent call: `configure_logging`, `track_agent_start/stop`, `track_llm_call`, `log_event(EventType.*)` +- Shared `VideoState` TypedDict flowing through all nodes — clean state handoff pattern + +```bash +pip install -r requirements.txt +cp .env.example .env # add ANTHROPIC_API_KEY + TAVILY_API_KEY +python main.py "AI agents taking over software engineering" +``` + +--- + +#### [A2A Real Estate Multi-Agent — AgentCore Edition](examples/a2a-realestate-agentcore/) *March 2026* A BMasterAI adaptation of the [AWS Labs A2A Real Estate sample](https://github.com/awslabs/amazon-bedrock-agentcore-samples/tree/main/02-use-cases/A2A-realestate-agentcore-multiagents). Three Strands agents — Property Search, Property Booking, and a Coordinator — communicate over the A2A (Agent-to-Agent) protocol, with every tool call and A2A hop instrumented via BMasterAI structured telemetry. diff --git a/examples/deep-research-agent/.env.example b/examples/deep-research-agent/.env.example new file mode 100644 index 0000000..26bc3c7 --- /dev/null +++ b/examples/deep-research-agent/.env.example @@ -0,0 +1,6 @@ +# Required +ANTHROPIC_API_KEY=your_anthropic_api_key_here +TAVILY_API_KEY=your_tavily_api_key_here + +# Optional — override default model (claude-3-5-haiku-20241022) +# ANTHROPIC_MODEL=claude-3-5-sonnet-20241022 diff --git a/examples/deep-research-agent/.gitignore b/examples/deep-research-agent/.gitignore new file mode 100644 index 0000000..2a978c3 --- /dev/null +++ b/examples/deep-research-agent/.gitignore @@ -0,0 +1,4 @@ +__pycache__/ +*.pyc +logs/ +.env diff --git a/examples/deep-research-agent/README.md b/examples/deep-research-agent/README.md new file mode 100644 index 0000000..91c500a --- /dev/null +++ b/examples/deep-research-agent/README.md @@ -0,0 +1,131 @@ +# Deep Research Agent + +A multi-step web research agent built with **LangGraph** and instrumented with **BMasterAI** logging and telemetry. Inspired by [langchain-ai/deepagents](https://github.com/langchain-ai/deepagents). + +## What It Does + +1. **Planner** — breaks your research question into 3–5 focused sub-questions, logs the reasoning chain +2. **Web Searcher** — runs Tavily searches for each sub-question in sequence +3. **Analyzer** — synthesizes search results into one clear finding per question, logs each LLM call +4. **Reflector** — evaluates completeness (quality score 1–10); if gaps exist, loops back for more searches (max 2 loops) +5. **Synthesizer** — combines all findings into a structured report with Executive Summary, Key Findings, Analysis, and Limitations + +BMasterAI instruments every step: LLM calls, tool use, agent lifecycle events, decision points, and reasoning chains. Telemetry is written to `logs/research.jsonl` for downstream analysis. + +## Architecture + +``` +planner → web_searcher → analyzer → reflector + ↓ + needs_more_research? + / \\ + yes (≤2x) no + ↓ ↓ + web_searcher synthesizer → END + (follow-ups) +``` + +## BMasterAI Integration + +| BMasterAI feature | Where used | +|---|---| +| `configure_logging` | Once at startup | +| `monitor.track_agent_start/stop` | Every node entry/exit | +| `monitor.track_llm_call` | Every Claude API call | +| `monitor.track_task_duration` | Per-node timing | +| `bm.log_event(EventType.LLM_CALL)` | Before each LLM call | +| `bm.log_event(EventType.TOOL_USE)` | Each Tavily search | +| `bm.log_event(EventType.DECISION_POINT)` | Planner decomposition + reflector routing | +| `bm.log_event(EventType.LLM_REASONING)` | Analyzer synthesis step | +| `bm.log_reasoning_chain` | Planner sub-question breakdown | +| `monitor.get_agent_dashboard()` | Final telemetry printout | +| `logs/research.jsonl` | Structured JSON telemetry for analytics | + +## Setup + +```bash +pip install -r requirements.txt +# or with uv: +uv add -r requirements.txt +``` + +Copy `.env.example` to `.env` and add your keys: + +```bash +cp .env.example .env +``` + +Required keys: +- `ANTHROPIC_API_KEY` — [console.anthropic.com](https://console.anthropic.com) +- `TAVILY_API_KEY` — [tavily.com](https://tavily.com) (generous free tier) + +## Usage + +```bash +# Pass topic as argument +python main.py "What is the current state of multi-agent AI systems in 2026?" + +# Or run interactively +python main.py +``` + +## Example Output + +``` +📋 Plan: 4 sub-questions + 1. What are the leading multi-agent AI frameworks in 2026? + 2. How are enterprises adopting multi-agent systems? + 3. What benchmarks evaluate multi-agent performance? + 4. What are the key limitations of current multi-agent systems? + +🔎 Web search: 4 query batches completed +🧪 Analysis: 4 findings synthesized +🪞 Reflection 1: approved for synthesis +📝 Synthesizing final report... + +═════════════════════════════════════ +📊 DEEP RESEARCH REPORT +═════════════════════════════════════ + +## Executive Summary +... + +📈 BMASTERAI TELEMETRY +═════════════════════════════════════ +System Health: HEALTHY + +Agent: planner + LLM calls: 1 + Total tokens: 342 + Avg latency: 1.23s +... + +✅ Completed in 38.2s +``` + +## Logs + +| File | Contents | +|---|---| +| `logs/research.log` | Human-readable event log | +| `logs/research.jsonl` | Structured JSON — pipe to any analytics tool | + +```bash +# Filter LLM calls from telemetry +cat logs/research.jsonl | python3 -c " +import sys, json +for line in sys.stdin: + e = json.loads(line) + if e.get('event_type') == 'llm_call': + print(e) +" +``` + +## Files + +| File | Purpose | +|---|---| +| `state.py` | `ResearchState` TypedDict — shared pipeline state | +| `agents.py` | Five agent nodes with full BMasterAI instrumentation | +| `graph.py` | LangGraph `StateGraph` with conditional reflection loop | +| `main.py` | CLI entry point with streaming progress + telemetry summary | diff --git a/examples/deep-research-agent/agents.py b/examples/deep-research-agent/agents.py new file mode 100644 index 0000000..1896ba7 --- /dev/null +++ b/examples/deep-research-agent/agents.py @@ -0,0 +1,386 @@ +""" +Research agent nodes for the LangGraph deep research pipeline. +Each node is a plain function: (state) -> partial state dict. +BMasterAI logs every agent call, LLM call, tool use, and reasoning step. +""" +import os +import time +from typing import Any + +from bmasterai.logging import configure_logging, EventType, LogLevel +from bmasterai.monitoring import get_monitor +from langchain_anthropic import ChatAnthropic +from langchain_core.messages import HumanMessage +from tavily import TavilyClient + +from state import ResearchState + +# ── BMasterAI setup ──────────────────────────────────────────────────────────── +# BMasterAI always prepends "logs/" — ensure that directory exists relative to cwd +os.makedirs("logs", exist_ok=True) +os.makedirs("logs/reasoning", exist_ok=True) + +bm = configure_logging( + log_level=LogLevel.INFO, + log_file="research.log", + json_log_file="research.jsonl", +) +monitor = get_monitor() +monitor.start_monitoring() + +# ── LLM + tools ─────────────────────────────────────────────────────────────── +llm = ChatAnthropic( + model=os.getenv("ANTHROPIC_MODEL", "claude-3-5-haiku-20241022"), + max_tokens=2048, +) +tavily = TavilyClient(api_key=os.environ["TAVILY_API_KEY"]) + +MAX_REFLECTION_LOOPS = 2 + + +# ── Shared LLM helper ───────────────────────────────────────────────────────── +def _call_llm(prompt: str, agent_name: str, task: str = "") -> str: + """Call Claude with BMasterAI telemetry.""" + t0 = time.time() + bm.log_event( + event_type=EventType.LLM_CALL, + agent_id=agent_name, + data={"task": task, "prompt_chars": len(prompt)}, + ) + response = llm.invoke([HumanMessage(content=prompt)]) + elapsed = time.time() - t0 + text = response.content if hasattr(response, "content") else str(response) + + monitor.track_llm_call( + agent_id=agent_name, + model=os.getenv("ANTHROPIC_MODEL", "claude-3-5-haiku-20241022"), + tokens_used=getattr(response, "usage_metadata", {}).get("total_tokens", 0) if hasattr(response, "usage_metadata") else 0, + duration=elapsed, + ) + return text + + +# ── Node: Planner ───────────────────────────────────────────────────────────── +def planner(state: ResearchState) -> dict: + """ + Break the research topic into 3–5 focused sub-questions. + Logs a reasoning chain showing the decomposition rationale. + """ + agent_id = "planner" + monitor.track_agent_start(agent_id) + t0 = time.time() + + bm.log_event( + event_type=EventType.TASK_START, + agent_id=agent_id, + data={"topic": state["topic"]}, + ) + + prompt = f"""You are a research planning agent. Your job is to break down a complex research topic into 3–5 focused sub-questions that together will fully answer the main question. + +Research topic: {state['topic']} + +Output ONLY valid JSON in this format: +{{ + "sub_questions": [ + "specific sub-question 1", + "specific sub-question 2", + "specific sub-question 3" + ], + "reasoning": "Brief explanation of why these sub-questions cover the topic" +}} + +Make each sub-question specific and searchable. Avoid overlap between questions.""" + + import json + raw = _call_llm(prompt, agent_id, task="topic_decomposition") + + # Parse JSON + try: + # Strip markdown code fences if present + clean = raw.strip() + if clean.startswith("```"): + clean = clean.split("```")[1] + if clean.startswith("json"): + clean = clean[4:] + parsed = json.loads(clean) + sub_questions = parsed.get("sub_questions", []) + reasoning = parsed.get("reasoning", "") + except Exception as e: + # Fallback: treat the whole response as a single question + sub_questions = [state["topic"]] + reasoning = f"Parsing failed ({e}), falling back to direct search" + + # Log reasoning chain + bm.log_reasoning_chain( + agent_id=agent_id, + chain=[ + {"step": f"Q{i+1}", "content": q} + for i, q in enumerate(sub_questions) + ], + ) + bm.log_event( + event_type=EventType.DECISION_POINT, + agent_id=agent_id, + data={"sub_questions": sub_questions, "count": len(sub_questions)}, + ) + + monitor.track_task_duration(agent_id, "plan", time.time() - t0) + monitor.track_agent_stop(agent_id) + + return { + "sub_questions": sub_questions, + "plan_reasoning": reasoning, + "search_results": [], + "findings": [], + "reflection_count": 0, + "needs_more_research": False, + "follow_up_questions": [], + "errors": [], + "sources": [], + } + + +# ── Node: Web Searcher ──────────────────────────────────────────────────────── +def web_searcher(state: ResearchState) -> dict: + """ + Run Tavily searches for each sub-question (+ any follow-up questions). + Appends results to state without replacing existing ones (supports loops). + """ + agent_id = "web_searcher" + monitor.track_agent_start(agent_id) + t0 = time.time() + + questions = state.get("sub_questions", []) + if state.get("needs_more_research") and state.get("follow_up_questions"): + questions = state["follow_up_questions"] + + all_results = list(state.get("search_results", [])) + all_sources = list(state.get("sources", [])) + errors = list(state.get("errors", [])) + + for q in questions: + bm.log_event( + event_type=EventType.TOOL_USE, + agent_id=agent_id, + data={"tool": "tavily_search", "query": q}, + ) + try: + result = tavily.search( + query=q, + search_depth="advanced", + max_results=4, + days=30, + ) + hits = result.get("results", []) + all_results.append({ + "question": q, + "results": [ + {"url": h.get("url"), "title": h.get("title"), "content": h.get("content", "")[:600]} + for h in hits + ], + }) + all_sources.extend([h.get("url", "") for h in hits]) + except Exception as e: + errors.append(f"Search error for '{q}': {e}") + + monitor.track_task_duration(agent_id, "search", time.time() - t0) + monitor.track_agent_stop(agent_id) + + return { + "search_results": all_results, + "sources": list(dict.fromkeys(all_sources)), # deduplicate, preserve order + "errors": errors, + } + + +# ── Node: Analyzer ──────────────────────────────────────────────────────────── +def analyzer(state: ResearchState) -> dict: + """ + Synthesize search results into one clear finding per sub-question. + Replaces previous findings with updated ones (safe for loops). + """ + agent_id = "analyzer" + monitor.track_agent_start(agent_id) + t0 = time.time() + + findings = [] + errors = list(state.get("errors", [])) + + for entry in state.get("search_results", []): + question = entry["question"] + sources_text = "\n\n".join( + f"Source: {r['url']}\nTitle: {r['title']}\n{r['content']}" + for r in entry.get("results", []) + ) + + if not sources_text.strip(): + findings.append(f"[{question}]\nNo results found.") + continue + + prompt = f"""You are a research analyst. Synthesize the search results below into a clear, factual finding that answers the question. + +Question: {question} + +Search Results: +{sources_text} + +Write 2–4 sentences that directly answer the question using the evidence above. Be specific and cite key facts. Do not speculate beyond the sources.""" + + bm.log_event( + event_type=EventType.LLM_REASONING, + agent_id=agent_id, + data={"question": question, "source_count": len(entry.get("results", []))}, + ) + + finding = _call_llm(prompt, agent_id, task="source_synthesis") + findings.append(f"**{question}**\n{finding.strip()}") + + monitor.track_task_duration(agent_id, "analyze", time.time() - t0) + monitor.track_agent_stop(agent_id) + + return {"findings": findings, "errors": errors} + + +# ── Node: Reflector ──────────────────────────────────────────────────────────── +def reflector(state: ResearchState) -> dict: + """ + Evaluate research completeness. If gaps exist and loops remain, flag for + another search round. Otherwise approve for synthesis. + """ + agent_id = "reflector" + monitor.track_agent_start(agent_id) + t0 = time.time() + + reflection_count = state.get("reflection_count", 0) + + if reflection_count >= MAX_REFLECTION_LOOPS: + bm.log_event( + event_type=EventType.DECISION_POINT, + agent_id=agent_id, + data={"decision": "max_loops_reached", "loops": reflection_count}, + ) + monitor.track_agent_stop(agent_id) + return { + "reflection": "Max reflection loops reached — proceeding to synthesis.", + "needs_more_research": False, + "follow_up_questions": [], + "reflection_count": reflection_count + 1, + } + + findings_text = "\n\n".join(state.get("findings", [])) + original_topic = state["topic"] + + prompt = f"""You are a research quality reviewer. Evaluate whether the findings below adequately answer the original research question. + +Original research question: {original_topic} + +Current findings: +{findings_text} + +Assess: +1. Are there significant gaps or unanswered aspects? +2. Is any finding too vague or lacking evidence? +3. Are there contradictions that need resolution? + +Output ONLY valid JSON: +{{ + "quality_score": 1-10, + "gaps": ["gap 1", "gap 2"], + "needs_more_research": true/false, + "follow_up_questions": ["question if needed"], + "reflection": "Brief overall assessment" +}} + +Set needs_more_research to true only if quality_score < 7 and there are clear, searchable gaps.""" + + import json + raw = _call_llm(prompt, agent_id, task="quality_reflection") + + try: + clean = raw.strip() + if clean.startswith("```"): + clean = clean.split("```")[1] + if clean.startswith("json"): + clean = clean[4:] + parsed = json.loads(clean) + needs_more = parsed.get("needs_more_research", False) + follow_ups = parsed.get("follow_up_questions", []) + reflection = parsed.get("reflection", "") + score = parsed.get("quality_score", 7) + except Exception as e: + needs_more = False + follow_ups = [] + reflection = f"Reflection parse failed ({e}) — approving for synthesis." + score = 7 + + bm.log_event( + event_type=EventType.DECISION_POINT, + agent_id=agent_id, + data={ + "quality_score": score, + "needs_more_research": needs_more, + "follow_up_count": len(follow_ups), + "loop": reflection_count + 1, + }, + ) + + monitor.track_task_duration(agent_id, "reflect", time.time() - t0) + monitor.track_agent_stop(agent_id) + + return { + "reflection": reflection, + "needs_more_research": needs_more, + "follow_up_questions": follow_ups if needs_more else [], + "reflection_count": reflection_count + 1, + } + + +# ── Node: Synthesizer ───────────────────────────────────────────────────────── +def synthesizer(state: ResearchState) -> dict: + """ + Combine all findings into a final structured research report. + """ + agent_id = "synthesizer" + monitor.track_agent_start(agent_id) + t0 = time.time() + + findings_text = "\n\n".join(state.get("findings", [])) + sources = state.get("sources", []) + topic = state["topic"] + reflection = state.get("reflection", "") + + prompt = f"""You are a research synthesis agent. Write a comprehensive, well-structured research report based on the findings below. + +Research question: {topic} + +Findings: +{findings_text} + +Quality notes from reviewer: {reflection} + +Write a clear research report with these sections: +1. **Executive Summary** (2–3 sentences) +2. **Key Findings** (bullet points with the most important facts) +3. **Analysis** (2–3 paragraphs connecting the findings) +4. **Limitations** (what's still uncertain or not covered) + +Use a professional, analytical tone. Be specific — cite facts, numbers, and evidence from the findings. Do not pad with filler.""" + + bm.log_event( + event_type=EventType.TASK_START, + agent_id=agent_id, + data={"findings_count": len(state.get("findings", [])), "source_count": len(sources)}, + ) + + report = _call_llm(prompt, agent_id, task="final_synthesis") + + # Append source list to report + if sources: + source_list = "\n".join(f"- {url}" for url in sources[:20]) + report += f"\n\n---\n**Sources**\n{source_list}" + + monitor.track_task_duration(agent_id, "synthesize", time.time() - t0) + monitor.track_agent_stop(agent_id) + + return {"report": report} diff --git a/examples/deep-research-agent/graph.py b/examples/deep-research-agent/graph.py new file mode 100644 index 0000000..bddfc2d --- /dev/null +++ b/examples/deep-research-agent/graph.py @@ -0,0 +1,54 @@ +""" +LangGraph pipeline for the deep research agent. + +Flow: + planner → web_searcher → analyzer → reflector + ↓ + needs_more_research? + / \\ + yes no + ↓ ↓ + web_searcher synthesizer → END + (follow-ups) +""" +from langgraph.graph import StateGraph, END + +from state import ResearchState +from agents import planner, web_searcher, analyzer, reflector, synthesizer + + +def should_continue(state: ResearchState) -> str: + """Route after reflection: loop back for more research or synthesize.""" + if state.get("needs_more_research") and state.get("follow_up_questions"): + return "more_research" + return "synthesize" + + +def build_graph() -> StateGraph: + graph = StateGraph(ResearchState) + + # Register nodes + graph.add_node("planner", planner) + graph.add_node("web_searcher", web_searcher) + graph.add_node("analyzer", analyzer) + graph.add_node("reflector", reflector) + graph.add_node("synthesizer", synthesizer) + + # Define edges + graph.set_entry_point("planner") + graph.add_edge("planner", "web_searcher") + graph.add_edge("web_searcher", "analyzer") + graph.add_edge("analyzer", "reflector") + + # Conditional: loop or finish + graph.add_conditional_edges( + "reflector", + should_continue, + { + "more_research": "web_searcher", + "synthesize": "synthesizer", + }, + ) + graph.add_edge("synthesizer", END) + + return graph.compile() diff --git a/examples/deep-research-agent/main.py b/examples/deep-research-agent/main.py new file mode 100644 index 0000000..66539c3 --- /dev/null +++ b/examples/deep-research-agent/main.py @@ -0,0 +1,160 @@ +""" +Deep Research Agent — powered by LangGraph + BMasterAI + +Usage: + python main.py "What is the current state of AI agents in enterprise software?" + python main.py # prompts interactively + +Features: + - Planner decomposes topic into sub-questions + - Parallel Tavily searches per sub-question + - Analyzer synthesizes each result set + - Reflector evaluates quality and loops if needed (max 2 loops) + - Synthesizer produces a final structured report + - BMasterAI logs every LLM call, tool use, agent event, and reasoning step +""" +import sys +import os +import json +import time +from pathlib import Path +from dotenv import load_dotenv + +load_dotenv() +# Change to example dir so BMasterAI writes logs relative to here +os.chdir(os.path.dirname(os.path.abspath(__file__))) +os.makedirs("logs", exist_ok=True) +os.makedirs("logs/reasoning", exist_ok=True) + +# Validate required env vars +required = ["ANTHROPIC_API_KEY", "TAVILY_API_KEY"] +missing = [k for k in required if not os.getenv(k)] +if missing: + print(f"❌ Missing required environment variables: {', '.join(missing)}") + print(" Copy .env.example → .env and fill in your keys.") + sys.exit(1) + +from graph import build_graph +from agents import monitor, bm +from bmasterai.logging import EventType + +DIVIDER = "─" * 70 + + +def print_report(state: dict) -> None: + print(f"\n{'═'*70}") + print("📊 DEEP RESEARCH REPORT") + print("═" * 70) + print(f"\n🔍 Topic: {state.get('topic', '—')}\n") + print(DIVIDER) + print(state.get("report", "No report generated.")) + + if state.get("errors"): + print(f"\n⚠️ Errors encountered:") + for e in state["errors"]: + print(f" • {e}") + + print(f"\n{DIVIDER}") + print(f"Sources consulted: {len(state.get('sources', []))}") + print(f"Reflection loops: {state.get('reflection_count', 0)}") + print(f"Sub-questions: {len(state.get('sub_questions', []))}") + + +def print_telemetry(agent_id: str = "deep_research_pipeline") -> None: + """Print BMasterAI telemetry dashboard.""" + print(f"\n{'═'*70}") + print("📈 BMASTERAI TELEMETRY") + print("═" * 70) + try: + dashboard = monitor.get_agent_dashboard() + health = monitor.get_system_health() + + print(f"\nSystem Health: {health.get('status', 'unknown').upper()}") + + agents = dashboard.get("agents", {}) + for aid, metrics in agents.items(): + print(f"\nAgent: {aid}") + print(f" LLM calls: {metrics.get('llm_calls', 0)}") + print(f" Total tokens: {metrics.get('total_tokens', 0)}") + print(f" Avg latency: {metrics.get('avg_latency', 0):.2f}s") + print(f" Errors: {metrics.get('error_count', 0)}") + + # Export raw logs path + log_dir = Path("logs") + print(f"\nLogs written to: {log_dir.resolve()}/") + print(f" research.log — human-readable event log") + print(f" research.jsonl — structured JSON telemetry (pipe to any analytics tool)") + except Exception as e: + print(f"Telemetry unavailable: {e}") + + +def main(): + if len(sys.argv) > 1: + topic = " ".join(sys.argv[1:]) + else: + print("\n🔬 Deep Research Agent (BMasterAI + LangGraph)") + print(DIVIDER) + topic = input("Research question: ").strip() + if not topic: + topic = "What is the current state of multi-agent AI systems in 2026?" + + print(f"\n🚀 Starting deep research on: {topic}") + print(DIVIDER) + + # Track overall pipeline + pipeline_id = "deep_research_pipeline" + monitor.track_agent_start(pipeline_id) + bm.log_event( + event_type=EventType.AGENT_START, + agent_id=pipeline_id, + data={"topic": topic}, + ) + t_start = time.time() + + graph = build_graph() + + # Stream node completions so user sees progress + print("\n📋 Plan: ", end="", flush=True) + final_state = None + + for chunk in graph.stream({"topic": topic}): + for node_name, node_state in chunk.items(): + if node_name == "planner": + qs = node_state.get("sub_questions", []) + print(f"{len(qs)} sub-questions") + for i, q in enumerate(qs, 1): + print(f" {i}. {q}") + elif node_name == "web_searcher": + count = len(node_state.get("search_results", [])) + print(f"🔎 Web search: {count} query batches completed") + elif node_name == "analyzer": + count = len(node_state.get("findings", [])) + print(f"🧪 Analysis: {count} findings synthesized") + elif node_name == "reflector": + score_msg = "needs more research" if node_state.get("needs_more_research") else "approved for synthesis" + loops = node_state.get("reflection_count", 1) + print(f"🪞 Reflection {loops}: {score_msg}") + elif node_name == "synthesizer": + print("📝 Synthesizing final report...") + final_state = node_state + + elapsed = time.time() - t_start + + # Merge final state for display (stream gives partial states per node) + full_state = graph.invoke({"topic": topic}) # type: ignore + + monitor.track_task_duration(pipeline_id, "full_pipeline", elapsed) + monitor.track_agent_stop(pipeline_id) + bm.log_event( + event_type=EventType.AGENT_STOP, + agent_id=pipeline_id, + data={"elapsed_seconds": round(elapsed, 2), "topic": topic}, + ) + + print_report(full_state) + print_telemetry() + print(f"\n✅ Completed in {elapsed:.1f}s\n") + + +if __name__ == "__main__": + main() diff --git a/examples/deep-research-agent/requirements.txt b/examples/deep-research-agent/requirements.txt new file mode 100644 index 0000000..2c4c616 --- /dev/null +++ b/examples/deep-research-agent/requirements.txt @@ -0,0 +1,6 @@ +bmasterai>=0.2.3 +langgraph>=0.2.0 +langchain-anthropic>=0.3.0 +langchain-core>=0.3.0 +tavily-python>=0.5.0 +python-dotenv>=1.0.0 diff --git a/examples/deep-research-agent/state.py b/examples/deep-research-agent/state.py new file mode 100644 index 0000000..998e414 --- /dev/null +++ b/examples/deep-research-agent/state.py @@ -0,0 +1,33 @@ +""" +Shared state flowing through the LangGraph deep research pipeline. +""" +from typing import TypedDict, Optional + + +class ResearchState(TypedDict): + # ── Input ────────────────────────────────────────────────────────────────── + topic: str # User-supplied research question + + # ── Planner output ───────────────────────────────────────────────────────── + sub_questions: list[str] # 3–5 focused sub-questions to research + plan_reasoning: str # Planner's rationale for the breakdown + + # ── Web Searcher output ──────────────────────────────────────────────────── + search_results: list[dict] # Raw Tavily results per sub-question + # Each: {question, results: [{url, title, content}]} + + # ── Analyzer output ──────────────────────────────────────────────────────── + findings: list[str] # Synthesized finding per sub-question + + # ── Reflector output ─────────────────────────────────────────────────────── + reflection: str # Gaps identified, quality assessment + needs_more_research: bool # Whether to loop back for another round + follow_up_questions: list[str] # Additional questions if gaps found + reflection_count: int # Number of reflection loops completed + + # ── Synthesizer output ───────────────────────────────────────────────────── + report: str # Final structured research report + + # ── Metadata ─────────────────────────────────────────────────────────────── + errors: list[str] # Any errors from any node + sources: list[str] # All URLs cited (deduplicated)