Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions deployed-agents/a2a-mcp-langgraph/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Corvic MCP with A2A Protocol: Featuring LangGraph

This sample demonstrates a sales reporting agent built with [LangGraph](https://langchain-ai.github.io/langgraph/) and exposed through the A2A protocol.

## How It Works

This agent leverages a Corvic agent, orchestrated by LangGraph with Google Gemini to sales information through a ReAct agent pattern. The A2A protocol enables standardized interaction with the agent, allowing clients to send requests and receive updates.

## Prerequisites

- Python 3.12 or higher
- [UV](https://docs.astral.sh/uv/)
- Access to an LLM and API Key
- A Corvic agent that provides sales data given some customer ids. You will need the sse-endpoint and the authorization token


## Configuration
- The LangGraph ReAct agent is provided two tools, the `get_duplicates` tool and the `answer_sales_query` tool. The `get_duplicates` is a mock tool that just returns two customer ids
- The `answer_sales_query` invokes a deployed agent using Model Context Protocol (MCP). This agent is able to provide total sales data for customer based on their ids. You are free to create the Corvic agent anyway you like, but it should respond this sales data.
- When you run the client, the LangGraph agent is passed the following question: `Provide sales data for customer 1234`
- The LangGraph will first call the `get_duplicates` tool to get duplicates. The LangGraph agent will then pass all customer and duplicates and get the total sales data


## Setup & Running

1. Navigate to the directory and run uv sync:
```bash
uv sync
```

2. Create an environment file with your API key:

```bash
echo "GOOGLE_API_KEY=your_api_key_here" > .env
```

3. Run the agent:

```bash
# Basic run on default port 10000
uv run app

# On custom host/port
uv run app --host 0.0.0.0 --port 8080
```

4. In a separate terminal, run the test client:

```bash
uv run app/test_client.py
```


## Learn More

- [A2A Protocol Documentation](https://google-a2a.github.io/A2A/)
- [LangGraph Documentation](https://langchain-ai.github.io/langgraph/)
- [Google Gemini API](https://ai.google.dev/gemini-api)
Empty file.
93 changes: 93 additions & 0 deletions deployed-agents/a2a-mcp-langgraph/app/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import logging
import os
import sys

import click
import httpx
import uvicorn

from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryPushNotifier, InMemoryTaskStore
from a2a.types import (
AgentCapabilities,
AgentCard,
AgentSkill,
)
from dotenv import load_dotenv

from app.agent import DuplicateReportingAgent
from app.agent_executor import SalesAgentExecutor


load_dotenv()

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class MissingAPIKeyError(Exception):
"""Exception for missing API key."""


@click.command()
@click.option('--host', 'host', default='localhost')
@click.option('--port', 'port', default=10000)
def main(host, port):
"""Starts the Currency Agent server."""
try:
if not os.getenv('GOOGLE_API_KEY'):
raise MissingAPIKeyError(
'GOOGLE_API_KEY environment variable not set.'
)

capabilities = AgentCapabilities(streaming=True, pushNotifications=True)
skill = AgentSkill(
id='get_duplicates',
name='Tool to get Duplicate customers',
description='Helps with getting duplicate customers',
tags=['duplicate customers', 'Duplicate reporting'],
examples=['Provide duplicates for customer 1234'],
)
skill_sales = AgentSkill(
id='answer_sales_query',
name='Tool to get Sales data for customers ',
description='Helps with getting sales data for customers',
tags=['Sales data', 'Sales data for customer'],
examples=['Provide sales data for customer 1234'],
)
agent_card = AgentCard(
name='Reporting Agent',
description='Helps with reporting customer data',
url=f'http://{host}:{port}/',
version='1.0.0',
defaultInputModes=DuplicateReportingAgent.SUPPORTED_CONTENT_TYPES,
defaultOutputModes=DuplicateReportingAgent.SUPPORTED_CONTENT_TYPES,
capabilities=capabilities,
skills=[skill, skill_sales],
)

# --8<-- [start:DefaultRequestHandler]
httpx_client = httpx.AsyncClient()
request_handler = DefaultRequestHandler(
agent_executor=SalesAgentExecutor(),
task_store=InMemoryTaskStore(),
push_notifier=InMemoryPushNotifier(httpx_client),
)
server = A2AStarletteApplication(
agent_card=agent_card, http_handler=request_handler
)

uvicorn.run(server.build(), host=host, port=port)
# --8<-- [end:DefaultRequestHandler]

except MissingAPIKeyError as e:
logger.error(f'Error: {e}')
sys.exit(1)
except Exception as e:
logger.error(f'An error occurred during server startup: {e}')
sys.exit(1)


if __name__ == '__main__':
main()
181 changes: 181 additions & 0 deletions deployed-agents/a2a-mcp-langgraph/app/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import asyncio
from collections.abc import AsyncIterable
from typing import Any, Literal

import httpx

from langchain_core.messages import AIMessage, ToolMessage
from langchain_core.tools import tool
from langchain_google_genai import ChatGoogleGenerativeAI
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agent
from pydantic import BaseModel
from mcp import ClientSession
from mcp.client.sse import sse_client

memory = MemorySaver()


async def run(q):
async with sse_client(
"YOUR-SSE-ENDPOINT-HERE",
headers={
"Authorization": "YOUR-API-TOKEN-HERE"
},
) as (read, write):
async with ClientSession(read, write) as session:
# Initialize the connection
await session.initialize()

# List available tools
tools = await session.list_tools()
print(tools)

# Call a tool
question = q
result = await session.call_tool(
"query", arguments={"query_content": question}
)

return result


@tool
def answer_sales_query(
query: str = ''
):
"""Use this to answer a sales query.

Args:
query: The customer to get duplicates for.

Returns:
Sales data.
"""
print(f'CALLING CORVIC: {query}')
result = asyncio.run(run( query))
print(f'BACK')

final_response = ''
for content in result.content:
final_response += content.text

return final_response


@tool
def get_duplicates(
customer_in: str = 'USD'
):
"""Use this to get duplicates for a customer.

Args:
customer_in: The customer to get duplicates for.

Returns:
A list of duplicates.
"""
return {'duplicates': ['4321', '4322']}


class ResponseFormat(BaseModel):
"""Respond to the user in this format."""

status: Literal['input_required', 'completed', 'error'] = 'input_required'
message: str


class DuplicateReportingAgent:
"""DuplicateReportingAgent - a specialized assistant for reporting."""

SYSTEM_INSTRUCTION = (
'You are a reporting tool. '
"Your sole purpose is to execute the following steps"
"1) Call 'get_duplicates' tool to get duplicates for a customer. "
"2) call the 'answer_sales_query' explicitly asking for total sales data passing customer and duplicates. "
"Begin the question using the phrase 'Provide Sales data for the following: '."
"3) Calculate the total sales across customer and duplicates and provide the answer"
'If the question is on another topic, politely state that you cannot help with that topic. '
'Do not attempt to answer unrelated questions or use tools for other purposes.'
'Set response status to input_required if the user needs to provide more information.'
'Set response status to error if there is an error while processing the request.'
'Set response status to completed if the request is complete.'
)

def __init__(self):
self.model = ChatGoogleGenerativeAI(model='gemini-2.0-flash')
self.tools = [get_duplicates, answer_sales_query]

self.graph = create_react_agent(
self.model,
tools=self.tools,
checkpointer=memory,
prompt=self.SYSTEM_INSTRUCTION,
response_format=ResponseFormat,
)

def invoke(self, query, context_id) -> str:
config = {'configurable': {'thread_id': context_id}}
self.graph.invoke({'messages': [('user', query)]}, config)
return self.get_agent_response(config)

async def stream(self, query, context_id) -> AsyncIterable[dict[str, Any]]:
inputs = {'messages': [('user', query)]}
config = {'configurable': {'thread_id': context_id}}

for item in self.graph.stream(inputs, config, stream_mode='values'):
message = item['messages'][-1]
if (
isinstance(message, AIMessage)
and message.tool_calls
and len(message.tool_calls) > 0
):
yield {
'is_task_complete': False,
'require_user_input': False,
'content': 'Looking up the databases...',
}
elif isinstance(message, ToolMessage):
yield {
'is_task_complete': False,
'require_user_input': False,
'content': 'Processing the request..',
}

yield self.get_agent_response(config)

def get_agent_response(self, config):
current_state = self.graph.get_state(config)
structured_response = current_state.values.get('structured_response')
if structured_response and isinstance(
structured_response, ResponseFormat
):
if structured_response.status == 'input_required':
return {
'is_task_complete': False,
'require_user_input': True,
'content': structured_response.message,
}
if structured_response.status == 'error':
return {
'is_task_complete': False,
'require_user_input': True,
'content': structured_response.message,
}
if structured_response.status == 'completed':
return {
'is_task_complete': True,
'require_user_input': False,
'content': structured_response.message,
}

return {
'is_task_complete': False,
'require_user_input': True,
'content': (
'We are unable to process your request at the moment. '
'Please try again.'
),
}

SUPPORTED_CONTENT_TYPES = ['text', 'text/plain']
Loading