Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cel/assistants/router/agentic_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ async def infer_best_assistant(self, input_text: str) -> BaseAssistant:
If you are not sure, please select the default agent.
Returns only the name of the assistant."""

asts = "\n".join([f"{agent.name}" for i, agent in enumerate(self._assistants)])
asts = "\n".join([f"{agent.name}: {agent.description}" for i, agent in enumerate(self._assistants)])

prompt = LangchainPromptTemplate.from_template(prompt_str)

Expand Down
167 changes: 167 additions & 0 deletions cel/assistants/router/concierge_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# cel/assistants/router/concierge_router.py
from __future__ import annotations

from typing import AsyncGenerator, Dict, List, Any
from loguru import logger as log
from langsmith import traceable
from langchain_core.messages import SystemMessage, HumanMessage
from langchain_openai import ChatOpenAI

from cel.assistants.router.agentic_router import AgenticRouter
from cel.assistants.base_assistant import BaseAssistant
from cel.assistants.function_response import FunctionResponse
from cel.assistants.stream_content_chunk import StreamContentChunk
from cel.gateway.model.conversation_lead import ConversationLead
from cel.gateway.model.message import Message

class ConciergeRouter(AgenticRouter):
"""
The ConciergeRouter is a router that orchestrates the conversation between multiple agents.
"""

ACTIVE_AGENT_KEY = "active_agent"
TRANSFER_FLAG_KEY = "transfer_requested"
MAX_TRANSFER_PER_TURN = 1 # prevents infinite loops

DEFAULT_PROMPT = (
"You are an orchestration agent.\n"
"Your job is to decide which agent to run based on the current state "
"of the user and what they've asked to do.\n"
"You do not need to figure out dependencies between agents; the agents "
"will handle that themselves.\n"
"Here are the agents you can choose from:\n{agent_context_str}\n\n"
"Here is the recent conversation:\n{dialog_str}\n\n"
"Please assist the user and transfer them as needed.\n"
"Return **only** the name of the agent."
"If you are not sure, please select the default agent."
"Default agent: {default_agent_name}"
)

def __init__(
self,
assistants: List[BaseAssistant],
history_store=None,
state_store=None,
history_length: int = 5,
llm=None,
default_assistant: int = 0,
):
super().__init__(
assistants=assistants,
history_store=history_store,
state_store=state_store,
history_length=history_length,
llm=llm,
)
self._default_assistant = default_assistant
# inject transfer tool into all agents
for ast in self._assistants:
self._inject_request_transfer_tool(ast)

def _inject_request_transfer_tool(self, assistant: BaseAssistant) -> None:
@assistant.function(
name="request_transfer",
desc="Useful to transfer the conversation to another agent.",
params=[],
)
async def _request_transfer(session: str = None, **_) -> FunctionResponse:
state = await self._state_store.get_store(session) or {}
state[self.TRANSFER_FLAG_KEY] = True
await self._state_store.set_store(session, state)
log.debug(f"[ConciergeRouter] Transfer flag set by {assistant.name}")
return FunctionResponse(text="")

async def _get_active(self, session: str) -> str | None:
state = await self._state_store.get_store(session) or {}
return state.get(self.ACTIVE_AGENT_KEY)

async def _set_active(self, session: str, name: str | None):
state = await self._state_store.get_store(session) or {}
if name is None:
state.pop(self.ACTIVE_AGENT_KEY, None)
else:
state[self.ACTIVE_AGENT_KEY] = name
state.pop(self.TRANSFER_FLAG_KEY, None)
await self._state_store.set_store(session, state)

async def _transfer_pending(self, session: str) -> bool:
state = await self._state_store.get_store(session) or {}
return bool(state.get(self.TRANSFER_FLAG_KEY))

@traceable
async def _infer_best_assistant(
self, lead: ConversationLead, user_msg: str
) -> BaseAssistant:
llm = self._llm or ChatOpenAI(model="gpt-4o", temperature=0, max_tokens=64)

agent_context_str = "\n".join(
f"{ast.name}: {ast.description}" for ast in self._assistants
)

dialog_entries = await self.build_dialog(lead, user_msg)
dialog_str = await self.format_dialog_to_plain_text(dialog_entries)

system_prompt = self.DEFAULT_PROMPT.format(
agent_context_str=agent_context_str,
dialog_str=dialog_str,
default_agent_name=self._assistants[self._default_assistant].name
)

res = await llm.ainvoke([SystemMessage(system_prompt), HumanMessage(user_msg)])
agent_name = (res.content or "").strip()

for ast in self._assistants:
if ast.name.lower() == agent_name.lower():
return ast

log.warning(
f"[ConciergeRouter] Unknown agent '{agent_name}', defaulting to "
f"'{self._current_assistant.name}'."
)
return self._current_assistant

@traceable
async def _pick_agent(self, lead: ConversationLead, user_msg: str) -> BaseAssistant:
active = await self._get_active(lead.get_session_id())
if active:
for ast in self._assistants:
if ast.name == active:
return ast
log.warning(f"Active agent '{active}' not found, re-inferring.")

best = await self._infer_best_assistant(lead, user_msg)
await self._set_active(lead.get_session_id(), best.name)
return best

async def new_message(
self, message: Message, local_state: Dict[str, Any] | None = None
) -> AsyncGenerator[StreamContentChunk, None]:
"""
If the agent requests a transfer, its output is discarded and the
new agent responds immediately.
"""

lead = message.lead
session = lead.get_session_id()
transfers = 0

while transfers <= self.MAX_TRANSFER_PER_TURN:
agent = await self._pick_agent(lead, message.text)
log.debug(f"[ConciergeRouter] Active agent → {agent.name}")

# buffer output in case of transfer
buffer: List[StreamContentChunk] = []
async for chunk in agent.new_message(message, local_state or {}):
buffer.append(chunk)

if await self._transfer_pending(session):
# Discard response and prepare new agent
log.info(f"[ConciergeRouter] Transfer triggered by {agent.name}")
await self._set_active(session, None) # clear flags
transfers += 1
continue # retry with new agent
else:
# No transfer → send accumulated content to user
for chunk in buffer:
yield chunk
break
89 changes: 89 additions & 0 deletions examples/19_concierge_router_experimental/assistant.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
"""
Cel.ai Example: Agentic Router Experimental
--------------------------------------------------

This is a simple example of an AI Assistant implemented using the Cel.ai framework.
It serves as a basic demonstration of how to get started with Cel.ai for creating intelligent assistants.

Framework: Cel.ai
License: MIT License

This script is part of the Cel.ai example series and is intended for educational purposes.

Usage:
------
Configure the required environment variables in a .env file in the root directory of the project.
The required environment variables are:
- NGROK_AUTHTOKEN: The Ngrok authentication token. You can get this from the Ngrok dashboard.
- TELEGRAM_TOKEN: The Telegram bot token for the assistant. You can get this from the BotFather on Telegram.
- OPENAI_API_KEY: The OpenAI API key for the assistant. You can get this from the OpenAI dashboard.

Then run this script to see a basic AI assistant in action.

Note:
-----
Please ensure you have the Cel.ai framework installed in your Python environment prior to running this script.
"""
# LOAD ENV VARIABLES
import os
from loguru import logger as log
# Load .env variables
from dotenv import load_dotenv
load_dotenv()


# REMOVE THIS BLOCK IF YOU ARE USING THIS SCRIPT AS A TEMPLATE
# -------------------------------------------------------------
import sys
from pathlib import Path
# Add parent directory to path
path = Path(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(str(path.parents[1]))
# -------------------------------------------------------------

# Import Cel.ai modules
from cel.connectors.telegram import TelegramConnector
from cel.gateway.message_gateway import MessageGateway, StreamMode
from cel.message_enhancers.smart_message_enhancer_openai import SmartMessageEnhancerOpenAI
from cel.assistants.router.agentic_router import AgenticRouter
from cel.assistants.router.concierge_agent import ConciergeRouter
from balance_agent import build_balance_agent
from transfer_agent import build_transfer_agent



# Create the Agentic Router
# --------------------------------------------------------------
# The Agentic Router is a special type of assistant that routes messages to other assistants
# It is used to create a multi-assistant system
# The Agentic Router requires a list of assistants to route messages to
assistants = [build_balance_agent(), build_transfer_agent()]




# Instantiate the Agentic Router
ast = ConciergeRouter(assistants=assistants, default_assistant=1)


# Create the Message Gateway - This component is the core of the assistant
# It handles the communication between the assistant and the connectors
gateway = MessageGateway(
assistant=ast,
host="127.0.0.1", port=3000,
webhook_url=os.environ.get("WEBHOOK_URL")
)

# For this example, we will use the Telegram connector
conn = TelegramConnector(
token=os.environ.get("TELEGRAM_TOKEN"),
stream_mode=StreamMode.FULL
)

# Register the connector with the gateway
gateway.register_connector(conn)

# Then start the gateway and begin processing messages
gateway.run()


43 changes: 43 additions & 0 deletions examples/19_concierge_router_experimental/balance_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@

# Import Cel.ai modules
from cel.assistants.macaw.macaw_assistant import MacawAssistant
from cel.prompt.prompt_template import PromptTemplate
from cel.rag.providers.markdown_rag import MarkdownRAG
from cel.assistants.function_context import FunctionContext
from cel.assistants.function_response import RequestMode
from cel.assistants.common import Param
from cel.stores.history.base_history_provider import BaseHistoryProvider
from cel.stores.state.base_state_provider import BaseChatStateProvider


def build_balance_agent(base_prompt: str = ''):

# Setup prompt
prompt = base_prompt + """You are a virtual assistant called Luis specializing in banking balance enquiries.
Your goal is to provide clients with accurate information about the balance of their bank accounts such as savings and checking accounts.
You answer questions like 'What is my current balance?' or 'How much money do I have in my savings account?' If the user asks for a transfer, you should transfer the conversation to the Transfer Agent.
Today is {date}
"""

prompt_template = PromptTemplate(prompt)

ast = MacawAssistant(
# For observability purposes, it is recommended to provide a name to the assistant
name="Balance Agent",

# This is the description of the assistant, it will be used by AssistantRouter
# to match the assistant with the user intent
description="""You are a virtual assistant specializing in balance inquiries.
Your goal is to provide clients with accurate information about the balance of
their bank accounts such as savings and checking accounts.""",

prompt=prompt_template,
)

# TODO: Add RAG here

# TODO: Event handling

# TODO: Add Tooling here

return ast
42 changes: 42 additions & 0 deletions examples/19_concierge_router_experimental/transfer_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@

# Import Cel.ai modules
from cel.assistants.macaw.macaw_assistant import MacawAssistant
from cel.prompt.prompt_template import PromptTemplate
from cel.rag.providers.markdown_rag import MarkdownRAG
from cel.assistants.function_context import FunctionContext
from cel.assistants.function_response import RequestMode
from cel.assistants.common import Param
from cel.stores.history.base_history_provider import BaseHistoryProvider
from cel.stores.state.base_state_provider import BaseChatStateProvider


def build_transfer_agent(base_prompt: str = ''):
# For matching and observability purposes, its required to provide a name to the assistant
name = "Transfer Agent"

# This is the description of the assistant, it will be used only in semantic routers
# Use name for AgenticRouter
description="""You are a virtual assistant specializing in bank transfers.
Your goal is to help customers transfer money between their accounts or to third-party accounts.""",

# Setup prompt
prompt = base_prompt + """You are a virtual assistant called Antonio specializing in bank transfers.
Your goal is to help customers transfer money between their accounts or to third-party accounts.
Answer questions like 'How can I transfer money?' or 'I want to send $100 to John Doe's account.'.
If the user asks for a balance, you should transfer the conversation to the Balance Agent.
Today is {date}
"""

ast = MacawAssistant(
name=name,
description=description,
prompt=PromptTemplate(prompt)
)

# ----------------------------------------------------------------------
# TODO: Add RAG here
# TODO: Add Tooling here
# ----------------------------------------------------------------------


return ast