From 15afa25bc2f1c57a8fa89f335a9c05ba42620fe7 Mon Sep 17 00:00:00 2001 From: Mhith16 Date: Wed, 25 Mar 2026 15:56:57 +0530 Subject: [PATCH 1/3] =?UTF-8?q?add=20LiveKit=20provider=20(Phase=201=20?= =?UTF-8?q?=E2=80=94=20data=20channel)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements LiveKitClient using LiveKit data messages for text-based adversarial evals without audio. Includes config integration, pipeline and researcher wiring, example config, env vars, and a fix for the run_conversation signature mismatch that caused TypeError when called from pipeline/researcher with scenario/dynamic_variables kwargs. --- .env.example | 6 + .gitignore | 2 +- autovoiceevals/config.py | 48 +++++- autovoiceevals/livekit_provider.py | 264 +++++++++++++++++++++++++++++ autovoiceevals/pipeline.py | 17 ++ autovoiceevals/researcher.py | 17 ++ examples/livekit.config.yaml | 40 +++++ requirements.txt | 3 + 8 files changed, 393 insertions(+), 4 deletions(-) create mode 100644 autovoiceevals/livekit_provider.py create mode 100644 examples/livekit.config.yaml diff --git a/.env.example b/.env.example index 193adb8..aecdb1a 100644 --- a/.env.example +++ b/.env.example @@ -8,3 +8,9 @@ SMALLEST_API_KEY=your-smallest-api-key # ElevenLabs ConvAI (if provider: elevenlabs) ELEVENLABS_API_KEY=your-elevenlabs-api-key + +# LiveKit (if provider: livekit) +LIVEKIT_API_KEY=your-livekit-api-key +LIVEKIT_API_SECRET=your-livekit-api-secret +# LIVEKIT_URL can also go here instead of config.yaml +# LIVEKIT_URL=wss://your-project.livekit.cloud diff --git a/.gitignore b/.gitignore index cdf63b9..3705298 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,5 @@ .env -venv/ +.venv/ __pycache__/ *.pyc results/ diff --git a/autovoiceevals/config.py b/autovoiceevals/config.py index 4188f58..e2db407 100644 --- a/autovoiceevals/config.py +++ b/autovoiceevals/config.py @@ -83,6 +83,16 @@ class OutputConfig: graphs: bool = True +@dataclass +class LiveKitConfig: + url: str = "" + room_prefix: str = "eval" + data_topic: str = "text" + response_timeout: float = 30.0 + agent_join_timeout: float = 30.0 + agent_backend: str = "none" # "smallest" or "none" + + # --------------------------------------------------------------------------- # Top-level config # --------------------------------------------------------------------------- @@ -96,11 +106,14 @@ class Config: conversation: ConversationConfig llm: LLMConfig output: OutputConfig - provider: str = "vapi" # "vapi", "smallest", or "elevenlabs" + livekit: LiveKitConfig = None + provider: str = "vapi" # "vapi", "smallest", "elevenlabs", or "livekit" anthropic_api_key: str = "" vapi_api_key: str = "" smallest_api_key: str = "" elevenlabs_api_key: str = "" + livekit_api_key: str = "" + livekit_api_secret: str = "" # --------------------------------------------------------------------------- @@ -123,14 +136,18 @@ def load_config(path: str | None = None) -> Config: # --- Provider --- provider = raw.get("provider", "vapi") - if provider not in ("vapi", "smallest", "elevenlabs"): - raise ValueError(f"Unknown provider: {provider}. Must be 'vapi', 'smallest', or 'elevenlabs'.") + if provider not in ("vapi", "smallest", "elevenlabs", "livekit"): + raise ValueError( + f"Unknown provider: {provider}. Must be 'vapi', 'smallest', 'elevenlabs', or 'livekit'." + ) # --- API keys (from env only, never from YAML) --- anthropic_key = os.environ.get("ANTHROPIC_API_KEY", "") vapi_key = os.environ.get("VAPI_API_KEY", "") smallest_key = os.environ.get("SMALLEST_API_KEY", "") elevenlabs_key = os.environ.get("ELEVENLABS_API_KEY", "") + livekit_api_key = os.environ.get("LIVEKIT_API_KEY", "") + livekit_api_secret = os.environ.get("LIVEKIT_API_SECRET", "") if not anthropic_key: raise ValueError("ANTHROPIC_API_KEY not set in .env or environment") @@ -140,6 +157,11 @@ def load_config(path: str | None = None) -> Config: raise ValueError("SMALLEST_API_KEY not set in .env or environment") if provider == "elevenlabs" and not elevenlabs_key: raise ValueError("ELEVENLABS_API_KEY not set in .env or environment") + if provider == "livekit": + if not livekit_api_key: + raise ValueError("LIVEKIT_API_KEY not set in .env or environment") + if not livekit_api_secret: + raise ValueError("LIVEKIT_API_SECRET not set in .env or environment") # --- Assistant (required) --- ast = raw.get("assistant", {}) @@ -169,6 +191,23 @@ def load_config(path: str | None = None) -> Config: cv = raw.get("conversation", {}) lm = raw.get("llm", {}) out = raw.get("output", {}) + lk = raw.get("livekit", {}) + + # --- LiveKit section (required if provider == "livekit") --- + livekit_url = lk.get("url", os.environ.get("LIVEKIT_URL", "")) + if provider == "livekit" and not livekit_url: + raise ValueError( + "livekit.url is required when provider is 'livekit'. " + "Set it in config.yaml or LIVEKIT_URL in .env." + ) + livekit_cfg = LiveKitConfig( + url=livekit_url, + room_prefix=lk.get("room_prefix", "eval"), + data_topic=lk.get("data_topic", "text"), + response_timeout=float(lk.get("response_timeout", 30.0)), + agent_join_timeout=float(lk.get("agent_join_timeout", 30.0)), + agent_backend=lk.get("agent_backend", "none"), + ) return Config( assistant=AssistantConfig( @@ -203,9 +242,12 @@ def load_config(path: str | None = None) -> Config: save_transcripts=out.get("save_transcripts", True), graphs=out.get("graphs", True), ), + livekit=livekit_cfg, provider=provider, anthropic_api_key=anthropic_key, vapi_api_key=vapi_key, smallest_api_key=smallest_key, elevenlabs_api_key=elevenlabs_key, + livekit_api_key=livekit_api_key, + livekit_api_secret=livekit_api_secret, ) diff --git a/autovoiceevals/livekit_provider.py b/autovoiceevals/livekit_provider.py new file mode 100644 index 0000000..3f0eb9d --- /dev/null +++ b/autovoiceevals/livekit_provider.py @@ -0,0 +1,264 @@ +"""LiveKit provider — Phase 1 (data messages). + +Runs adversarial eval conversations via a LiveKit room using the data +channel (text messages), not audio. The caller bot joins the room, +sends turns as JSON data messages, and waits for the agent to respond +the same way. + +Requirements +------------ +* pip install "livekit>=1.0.0" +* LIVEKIT_URL, LIVEKIT_API_KEY, LIVEKIT_API_SECRET in .env + +Agent requirements +------------------ +The target agent must listen on the configured data_topic and reply +on the same topic. Message format (JSON): + + Caller → Agent: {"role": "user", "content": ""} + Agent → Caller: {"role": "assistant", "content": ""} + +Plain-text responses (non-JSON) are also accepted. + +Prompt management +----------------- +If agent_backend is "smallest", prompt reads/writes are delegated to +SmallestClient. If "none", get_system_prompt/update_prompt will raise +NotImplementedError — use the livekit provider only for conversations +and manage prompts externally. +""" + +from __future__ import annotations + +import asyncio +import json +import time +import uuid + +from .models import Turn, Conversation + +DEFAULT_END_PHRASES = [ + "have a great day", + "goodbye", + "talk to you soon", + "take care", +] + + +class LiveKitClient: + """Voice platform client that uses LiveKit data channel messages.""" + + def __init__( + self, + url: str, + api_key: str, + api_secret: str, + room_prefix: str = "eval", + data_topic: str = "text", + response_timeout: float = 30.0, + agent_join_timeout: float = 30.0, + end_phrases: list[str] | None = None, + agent_backend=None, + ): + """ + Args: + url: LiveKit server WebSocket URL (wss://...). + api_key: LiveKit API key. + api_secret: LiveKit API secret. + room_prefix: Prefix for generated room names. + data_topic: Data channel topic for messages. + response_timeout: Seconds to wait for each agent response. + agent_join_timeout: Seconds to wait for the agent to join. + end_phrases: Phrases that signal conversation end. + agent_backend: Optional client for prompt management + (e.g. SmallestClient). If None, prompt + methods raise NotImplementedError. + """ + self.url = url.rstrip("/") + self.api_key = api_key + self.api_secret = api_secret + self.room_prefix = room_prefix + self.data_topic = data_topic + self.response_timeout = response_timeout + self.agent_join_timeout = agent_join_timeout + self.end_phrases = end_phrases or DEFAULT_END_PHRASES + self._backend = agent_backend + + # ------------------------------------------------------------------ + # Conversations + # ------------------------------------------------------------------ + + def run_conversation( + self, + assistant_id: str, + scenario_id: str, + caller_turns: list[str], + max_turns: int = 12, + scenario=None, + dynamic_variables: dict | None = None, + simulate_timeout_secs: int | None = None, + ) -> Conversation: + """Run a multi-turn conversation via LiveKit data messages. + + Each call creates a unique room. The agent is expected to join + that room (via webhook dispatch or a fixed room name configured + on the agent side) and respond to data messages on data_topic. + + The ``scenario``, ``dynamic_variables``, and ``simulate_timeout_secs`` + parameters are accepted for interface compatibility but are unused in + data-channel mode. + """ + return asyncio.run( + self._run_async(assistant_id, scenario_id, caller_turns, max_turns) + ) + + async def _run_async( + self, + assistant_id: str, + scenario_id: str, + caller_turns: list[str], + max_turns: int, + ) -> Conversation: + try: + from livekit import rtc + from livekit.api import AccessToken, VideoGrants + except ImportError: + conv = Conversation(scenario_id=scenario_id) + conv.error = ( + "livekit package not installed. " + "Run: pip install 'livekit>=1.0.0'" + ) + return conv + + conv = Conversation(scenario_id=scenario_id) + total_latency = 0.0 + + # Unique room per conversation to avoid cross-talk + room_name = f"{self.room_prefix}-{scenario_id}-{uuid.uuid4().hex[:8]}" + identity = f"caller-{uuid.uuid4().hex[:6]}" + + token = ( + AccessToken(self.api_key, self.api_secret) + .with_identity(identity) + .with_name(identity) + .with_grants(VideoGrants(room_join=True, room=room_name)) + .to_jwt() + ) + + room = rtc.Room() + response_queue: asyncio.Queue[str] = asyncio.Queue() + agent_joined = asyncio.Event() + + @room.on("data_received") + def on_data(packet): + # livekit >= 1.0: packet is a DataPacket with .data bytes attribute + try: + raw = bytes(packet.data) if hasattr(packet, "data") else bytes(packet) + text = raw.decode("utf-8") + response_queue.put_nowait(text) + except Exception: + pass + + @room.on("participant_connected") + def on_participant(_participant): + agent_joined.set() + + try: + await room.connect(self.url, token) + except Exception as e: + conv.error = f"LiveKit connect failed: {str(e)[:200]}" + return conv + + # Agent may already be in the room when we join + if room.remote_participants: + agent_joined.set() + + try: + await asyncio.wait_for( + agent_joined.wait(), timeout=self.agent_join_timeout + ) + except asyncio.TimeoutError: + conv.error = ( + f"Agent did not join room '{room_name}' " + f"within {self.agent_join_timeout}s. " + "Ensure the agent is configured to dispatch to this room." + ) + await room.disconnect() + return conv + + # Run turns + for msg in caller_turns[:max_turns]: + if not msg or not msg.strip(): + msg = "..." + + conv.turns.append(Turn(role="caller", content=msg)) + + payload = json.dumps({"role": "user", "content": msg}).encode("utf-8") + + try: + t0 = time.time() + await room.local_participant.publish_data( + payload, + reliable=True, + topic=self.data_topic, + ) + + raw_response = await asyncio.wait_for( + response_queue.get(), + timeout=self.response_timeout, + ) + latency = (time.time() - t0) * 1000 + + # Accept JSON {"role": "assistant", "content": "..."} or plain text + try: + parsed = json.loads(raw_response) + agent_msg = parsed.get("content", raw_response) + except (json.JSONDecodeError, AttributeError): + agent_msg = raw_response + + conv.turns.append( + Turn(role="assistant", content=agent_msg, latency_ms=latency) + ) + total_latency += latency + + if any(p in agent_msg.lower() for p in self.end_phrases): + break + + except asyncio.TimeoutError: + conv.error = f"Response timeout (>{self.response_timeout}s)" + break + except Exception as e: + conv.error = str(e)[:200] + break + + await asyncio.sleep(0.1) + + await room.disconnect() + + n = len(conv.agent_turns) + conv.avg_latency_ms = total_latency / n if n else 0 + return conv + + # ------------------------------------------------------------------ + # Prompt management (delegated to agent_backend) + # ------------------------------------------------------------------ + + def get_system_prompt(self, agent_id: str) -> str: + """Read the current system prompt via the configured backend.""" + if self._backend is not None: + return self._backend.get_system_prompt(agent_id) + raise NotImplementedError( + "No agent_backend configured for LiveKit provider. " + "Set livekit.agent_backend: 'smallest' in config.yaml, " + "or manage prompts externally." + ) + + def update_prompt(self, agent_id: str, new_prompt: str) -> bool: + """Update the system prompt via the configured backend.""" + if self._backend is not None: + return self._backend.update_prompt(agent_id, new_prompt) + raise NotImplementedError( + "No agent_backend configured for LiveKit provider. " + "Set livekit.agent_backend: 'smallest' in config.yaml, " + "or manage prompts externally." + ) diff --git a/autovoiceevals/pipeline.py b/autovoiceevals/pipeline.py index 44601bd..dc5fc42 100644 --- a/autovoiceevals/pipeline.py +++ b/autovoiceevals/pipeline.py @@ -149,6 +149,23 @@ def run(cfg: Config) -> None: elif cfg.provider == "elevenlabs": from .elevenlabs import ElevenLabsClient provider = ElevenLabsClient(cfg.elevenlabs_api_key) + elif cfg.provider == "livekit": + from .livekit_provider import LiveKitClient + lk = cfg.livekit + backend = None + if lk.agent_backend == "smallest": + from .smallest import SmallestClient + backend = SmallestClient(cfg.smallest_api_key) + provider = LiveKitClient( + url=lk.url, + api_key=cfg.livekit_api_key, + api_secret=cfg.livekit_api_secret, + room_prefix=lk.room_prefix, + data_topic=lk.data_topic, + response_timeout=lk.response_timeout, + agent_join_timeout=lk.agent_join_timeout, + agent_backend=backend, + ) else: provider = VapiClient(cfg.vapi_api_key) diff --git a/autovoiceevals/researcher.py b/autovoiceevals/researcher.py index 928a67f..9644aff 100644 --- a/autovoiceevals/researcher.py +++ b/autovoiceevals/researcher.py @@ -27,6 +27,23 @@ def _build_provider(cfg: Config, llm_client: LLMClient | None = None): elif cfg.provider == "elevenlabs": from .elevenlabs import ElevenLabsClient return ElevenLabsClient(cfg.elevenlabs_api_key) + elif cfg.provider == "livekit": + from .livekit_provider import LiveKitClient + lk = cfg.livekit + backend = None + if lk.agent_backend == "smallest": + from .smallest import SmallestClient + backend = SmallestClient(cfg.smallest_api_key) + return LiveKitClient( + url=lk.url, + api_key=cfg.livekit_api_key, + api_secret=cfg.livekit_api_secret, + room_prefix=lk.room_prefix, + data_topic=lk.data_topic, + response_timeout=lk.response_timeout, + agent_join_timeout=lk.agent_join_timeout, + agent_backend=backend, + ) else: from .vapi import VapiClient return VapiClient(cfg.vapi_api_key) diff --git a/examples/livekit.config.yaml b/examples/livekit.config.yaml new file mode 100644 index 0000000..b4ec02a --- /dev/null +++ b/examples/livekit.config.yaml @@ -0,0 +1,40 @@ +# Example: LiveKit-based voice agent (Phase 1 — data messages) +# +# This provider connects to a LiveKit room and communicates via the +# data channel (text messages). No audio is used. +# +# Your agent must listen on data_topic and respond in the same format: +# Incoming: {"role": "user", "content": ""} +# Outgoing: {"role": "assistant", "content": ""} +# +# The agent must be configured to auto-join rooms matching room_prefix +# (via LiveKit Dispatch / webhook). See the livekit-agents SDK docs. + +provider: livekit + +assistant: + id: "your-agent-id" + name: "My LiveKit Agent" + description: | + Voice agent for [describe your agent here]. + Replace this with your actual agent's capabilities, services, + pricing, policies, and constraints. The more detail you provide, + the sharper the adversarial test scenarios will be. + +livekit: + url: "wss://your-project.livekit.cloud" + room_prefix: "eval" # Room names: eval- + data_topic: "text" # Data channel topic your agent listens on + response_timeout: 30 # Seconds to wait for each response + agent_join_timeout: 30 # Seconds to wait for agent to join room + agent_backend: "none" # "smallest" to delegate prompt management + # to Smallest AI, "none" to manage externally + +autoresearch: + eval_scenarios: 6 + max_experiments: 20 + +scoring: + should_weight: 0.50 + should_not_weight: 0.35 + latency_weight: 0.15 diff --git a/requirements.txt b/requirements.txt index 30caf60..cb63c96 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,6 @@ requests>=2.31.0 matplotlib>=3.8.0 pyyaml>=6.0 python-dotenv>=1.0.0 + +# LiveKit provider (optional — only needed if provider: livekit) +# livekit>=1.0.0 From a972dba445c6c1de4f06e3de1bb1a22973090023 Mon Sep 17 00:00:00 2001 From: Mohith1612 Date: Thu, 26 Mar 2026 15:34:25 +0530 Subject: [PATCH 2/3] docs: document LiveKit provider in README Add LiveKit to the supported providers list, setup instructions (env vars, config copy command), providers table, and project structure. --- README.md | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index d950026..84d1d44 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ A self-improving loop for voice AI agents. Inspired by the keep/revert pattern f It generates adversarial callers, attacks your agent, proposes prompt improvements one at a time, keeps what works, reverts what doesn't. Run it overnight, wake up to a better agent. -Works with [Vapi](https://vapi.ai), [Smallest AI](https://smallest.ai), and [ElevenLabs ConvAI](https://elevenlabs.io/conversational-ai). +Works with [Vapi](https://vapi.ai), [Smallest AI](https://smallest.ai), [ElevenLabs ConvAI](https://elevenlabs.io/conversational-ai), and [LiveKit](https://livekit.io). ``` ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ @@ -53,6 +53,11 @@ SMALLEST_API_KEY=your-smallest-api-key # If using ElevenLabs ELEVENLABS_API_KEY=your-elevenlabs-api-key + +# If using LiveKit +LIVEKIT_URL=wss://your-project.livekit.cloud +LIVEKIT_API_KEY=your-livekit-api-key +LIVEKIT_API_SECRET=your-livekit-api-secret ``` You need the Anthropic key (for Claude, which generates scenarios and judges conversations) plus the key for whichever voice platform your agent runs on. @@ -70,6 +75,9 @@ cp examples/smallest.config.yaml config.yaml # For ElevenLabs cp examples/elevenlabs.config.yaml config.yaml + +# For LiveKit +cp examples/livekit.config.yaml config.yaml ``` Then open `config.yaml` and replace the example with your agent's details. @@ -77,7 +85,7 @@ Then open `config.yaml` and replace the example with your agent's details. The config has three required fields: ```yaml -provider: vapi # "vapi", "smallest", or "elevenlabs" +provider: vapi # "vapi", "smallest", "elevenlabs", or "livekit" assistant: id: "your-agent-id" # from your platform dashboard @@ -297,6 +305,7 @@ Weights and threshold are configurable in `config.yaml` under `scoring:`. | **[Vapi](https://vapi.ai)** | Live multi-turn conversations via Vapi Chat API | Read/write via assistant PATCH endpoint | | **[Smallest AI](https://smallest.ai)** | Simulated — Claude plays the agent using the system prompt from the platform | Read/write via Atoms workflow API | | **[ElevenLabs ConvAI](https://elevenlabs.io/conversational-ai)** | Native `simulate-conversation` endpoint — ElevenLabs runs the real deployed agent (with its tools and knowledge base) and plays the user via a persona prompt | Read/write via agent PATCH endpoint | +| **[LiveKit](https://livekit.io)** | Text-based evals via LiveKit data channel messages — Phase 1 (no audio). Caller bot joins a room and exchanges turns as JSON. | Delegated to `agent_backend` (e.g. `"smallest"`) or managed externally | **Why simulated for Smallest AI?** Atoms agents only accept audio input through LiveKit rooms — there's no text chat API. Since the system optimizes the *prompt* (not the voice pipeline), simulating conversations with Claude using the actual prompt from the platform is effective and fast. @@ -325,7 +334,8 @@ autovoiceevals/ ├── examples/ │ ├── vapi.config.yaml Salon booking agent on Vapi │ ├── smallest.config.yaml Pizza delivery agent on Smallest AI -│ └── elevenlabs.config.yaml Medical clinic scheduling agent on ElevenLabs +│ ├── elevenlabs.config.yaml Medical clinic scheduling agent on ElevenLabs +│ └── livekit.config.yaml LiveKit data-channel agent (Phase 1) └── autovoiceevals/ Core package ├── cli.py CLI (research | pipeline subcommands) ├── config.py Config loading + validation @@ -335,6 +345,7 @@ autovoiceevals/ ├── vapi.py Vapi client ├── smallest.py Smallest AI client ├── elevenlabs.py ElevenLabs ConvAI client + ├── livekit_provider.py LiveKit data channel client ├── llm.py Claude client ├── evaluator.py Scenario generation, judging, prompt proposals ├── results.py Post-run results viewer From 92f95817ef0b99d6ba910e79ebd51a726240bb43 Mon Sep 17 00:00:00 2001 From: Mohith1612 Date: Sun, 12 Apr 2026 23:13:48 +0530 Subject: [PATCH 3/3] fix LiveKit: add local prompt backend and harden data channel receiver --- autovoiceevals/config.py | 8 ++- autovoiceevals/livekit_provider.py | 97 ++++++++++++++++++++++++++---- autovoiceevals/pipeline.py | 7 +++ autovoiceevals/researcher.py | 7 +++ examples/livekit.config.yaml | 32 ++++++++-- 5 files changed, 132 insertions(+), 19 deletions(-) diff --git a/autovoiceevals/config.py b/autovoiceevals/config.py index e2db407..a835e55 100644 --- a/autovoiceevals/config.py +++ b/autovoiceevals/config.py @@ -90,7 +90,10 @@ class LiveKitConfig: data_topic: str = "text" response_timeout: float = 30.0 agent_join_timeout: float = 30.0 - agent_backend: str = "none" # "smallest" or "none" + agent_backend: str = "none" # "smallest" | "local" | "none" + system_prompt: str = "" # initial prompt when agent_backend="local" + system_prompt_file: str = "" # path to prompt file; overrides system_prompt if it exists + inject_system_prompt: bool = False # send prompt as first data msg each conversation # --------------------------------------------------------------------------- @@ -207,6 +210,9 @@ def load_config(path: str | None = None) -> Config: response_timeout=float(lk.get("response_timeout", 30.0)), agent_join_timeout=float(lk.get("agent_join_timeout", 30.0)), agent_backend=lk.get("agent_backend", "none"), + system_prompt=lk.get("system_prompt", ""), + system_prompt_file=lk.get("system_prompt_file", ""), + inject_system_prompt=bool(lk.get("inject_system_prompt", False)), ) return Config( diff --git a/autovoiceevals/livekit_provider.py b/autovoiceevals/livekit_provider.py index 3f0eb9d..15af56f 100644 --- a/autovoiceevals/livekit_provider.py +++ b/autovoiceevals/livekit_provider.py @@ -15,6 +15,7 @@ The target agent must listen on the configured data_topic and reply on the same topic. Message format (JSON): + Caller → Agent: {"role": "system", "content": ""} # optional, if inject_system_prompt=True Caller → Agent: {"role": "user", "content": ""} Agent → Caller: {"role": "assistant", "content": ""} @@ -22,21 +23,28 @@ Prompt management ----------------- -If agent_backend is "smallest", prompt reads/writes are delegated to -SmallestClient. If "none", get_system_prompt/update_prompt will raise -NotImplementedError — use the livekit provider only for conversations -and manage prompts externally. +* agent_backend="smallest": prompt reads/writes delegate to SmallestClient. +* agent_backend="local": prompt is stored locally (in memory or a file). + Use inject_system_prompt=True to send the current prompt as the first + data message of every conversation so the agent can apply it. +* agent_backend="none": get_system_prompt/update_prompt raise + NotImplementedError — use the livekit provider only for conversations + and manage prompts externally. """ from __future__ import annotations import asyncio import json +import logging +import os import time import uuid from .models import Turn, Conversation +_log = logging.getLogger(__name__) + DEFAULT_END_PHRASES = [ "have a great day", "goodbye", @@ -45,6 +53,33 @@ ] +class LocalPromptBackend: + """Manages prompts locally without any external API calls. + + Suitable for self-hosted LiveKit agents that read their prompt from a + shared file, or for research sessions where the prompt is injected into + each conversation via the data channel (inject_system_prompt=True). + """ + + def __init__(self, initial_prompt: str = "", prompt_file: str = ""): + self._file = prompt_file + if prompt_file and os.path.exists(prompt_file): + with open(prompt_file) as f: + self._prompt = f.read().strip() + else: + self._prompt = initial_prompt + + def get_system_prompt(self, agent_id: str) -> str: + return self._prompt + + def update_prompt(self, agent_id: str, new_prompt: str) -> bool: + self._prompt = new_prompt + if self._file: + with open(self._file, "w") as f: + f.write(new_prompt) + return True + + class LiveKitClient: """Voice platform client that uses LiveKit data channel messages.""" @@ -59,6 +94,7 @@ def __init__( agent_join_timeout: float = 30.0, end_phrases: list[str] | None = None, agent_backend=None, + inject_system_prompt: bool = False, ): """ Args: @@ -71,8 +107,11 @@ def __init__( agent_join_timeout: Seconds to wait for the agent to join. end_phrases: Phrases that signal conversation end. agent_backend: Optional client for prompt management - (e.g. SmallestClient). If None, prompt - methods raise NotImplementedError. + (e.g. SmallestClient, LocalPromptBackend). + If None, prompt methods raise NotImplementedError. + inject_system_prompt: If True, send the current system prompt as a + {"role": "system", ...} data message before the + first caller turn. Requires agent_backend to be set. """ self.url = url.rstrip("/") self.api_key = api_key @@ -83,6 +122,7 @@ def __init__( self.agent_join_timeout = agent_join_timeout self.end_phrases = end_phrases or DEFAULT_END_PHRASES self._backend = agent_backend + self.inject_system_prompt = inject_system_prompt # ------------------------------------------------------------------ # Conversations @@ -148,6 +188,7 @@ async def _run_async( room = rtc.Room() response_queue: asyncio.Queue[str] = asyncio.Queue() agent_joined = asyncio.Event() + loop = asyncio.get_running_loop() @room.on("data_received") def on_data(packet): @@ -155,9 +196,23 @@ def on_data(packet): try: raw = bytes(packet.data) if hasattr(packet, "data") else bytes(packet) text = raw.decode("utf-8") - response_queue.put_nowait(text) - except Exception: - pass + + # Filter by topic: only accept messages on our configured topic + topic = getattr(packet, "topic", None) + if topic is not None and topic != self.data_topic: + _log.debug("data_received: ignoring topic=%s", topic) + return + + # Ignore messages we sent ourselves (guard for edge cases) + sender = getattr(packet, "participant", None) + if sender is not None and getattr(sender, "identity", "") == identity: + return + + _log.debug("data_received: topic=%s len=%d", topic, len(raw)) + # Use call_soon_threadsafe in case the SDK fires from a non-asyncio thread + loop.call_soon_threadsafe(response_queue.put_nowait, text) + except Exception as exc: + _log.debug("data_received decode error: %s", exc) @room.on("participant_connected") def on_participant(_participant): @@ -186,6 +241,18 @@ def on_participant(_participant): await room.disconnect() return conv + # Optionally inject the current system prompt before any caller turns + if self.inject_system_prompt and self._backend is not None: + prompt = self._backend.get_system_prompt(assistant_id) + sys_payload = json.dumps({"role": "system", "content": prompt}).encode("utf-8") + try: + await room.local_participant.publish_data( + sys_payload, reliable=True, topic=self.data_topic + ) + await asyncio.sleep(0.2) # give agent time to apply the prompt + except Exception as e: + _log.debug("Failed to inject system prompt: %s", e) + # Run turns for msg in caller_turns[:max_turns]: if not msg or not msg.strip(): @@ -196,6 +263,10 @@ def on_participant(_participant): payload = json.dumps({"role": "user", "content": msg}).encode("utf-8") try: + # Drain any stale data from previous turns before starting the timer + while not response_queue.empty(): + response_queue.get_nowait() + t0 = time.time() await room.local_participant.publish_data( payload, @@ -249,8 +320,8 @@ def get_system_prompt(self, agent_id: str) -> str: return self._backend.get_system_prompt(agent_id) raise NotImplementedError( "No agent_backend configured for LiveKit provider. " - "Set livekit.agent_backend: 'smallest' in config.yaml, " - "or manage prompts externally." + "Set livekit.agent_backend to 'local' (with a system_prompt) or " + "'smallest' in config.yaml, or manage prompts externally." ) def update_prompt(self, agent_id: str, new_prompt: str) -> bool: @@ -259,6 +330,6 @@ def update_prompt(self, agent_id: str, new_prompt: str) -> bool: return self._backend.update_prompt(agent_id, new_prompt) raise NotImplementedError( "No agent_backend configured for LiveKit provider. " - "Set livekit.agent_backend: 'smallest' in config.yaml, " - "or manage prompts externally." + "Set livekit.agent_backend to 'local' (with a system_prompt) or " + "'smallest' in config.yaml, or manage prompts externally." ) diff --git a/autovoiceevals/pipeline.py b/autovoiceevals/pipeline.py index dc5fc42..8720c10 100644 --- a/autovoiceevals/pipeline.py +++ b/autovoiceevals/pipeline.py @@ -156,6 +156,12 @@ def run(cfg: Config) -> None: if lk.agent_backend == "smallest": from .smallest import SmallestClient backend = SmallestClient(cfg.smallest_api_key) + elif lk.agent_backend == "local": + from .livekit_provider import LocalPromptBackend + backend = LocalPromptBackend( + initial_prompt=lk.system_prompt, + prompt_file=lk.system_prompt_file, + ) provider = LiveKitClient( url=lk.url, api_key=cfg.livekit_api_key, @@ -165,6 +171,7 @@ def run(cfg: Config) -> None: response_timeout=lk.response_timeout, agent_join_timeout=lk.agent_join_timeout, agent_backend=backend, + inject_system_prompt=lk.inject_system_prompt, ) else: provider = VapiClient(cfg.vapi_api_key) diff --git a/autovoiceevals/researcher.py b/autovoiceevals/researcher.py index 9644aff..37e985c 100644 --- a/autovoiceevals/researcher.py +++ b/autovoiceevals/researcher.py @@ -34,6 +34,12 @@ def _build_provider(cfg: Config, llm_client: LLMClient | None = None): if lk.agent_backend == "smallest": from .smallest import SmallestClient backend = SmallestClient(cfg.smallest_api_key) + elif lk.agent_backend == "local": + from .livekit_provider import LocalPromptBackend + backend = LocalPromptBackend( + initial_prompt=lk.system_prompt, + prompt_file=lk.system_prompt_file, + ) return LiveKitClient( url=lk.url, api_key=cfg.livekit_api_key, @@ -43,6 +49,7 @@ def _build_provider(cfg: Config, llm_client: LLMClient | None = None): response_timeout=lk.response_timeout, agent_join_timeout=lk.agent_join_timeout, agent_backend=backend, + inject_system_prompt=lk.inject_system_prompt, ) else: from .vapi import VapiClient diff --git a/examples/livekit.config.yaml b/examples/livekit.config.yaml index b4ec02a..60a58c7 100644 --- a/examples/livekit.config.yaml +++ b/examples/livekit.config.yaml @@ -4,8 +4,11 @@ # data channel (text messages). No audio is used. # # Your agent must listen on data_topic and respond in the same format: -# Incoming: {"role": "user", "content": ""} -# Outgoing: {"role": "assistant", "content": ""} +# Incoming (optional): {"role": "system", "content": ""} +# Incoming: {"role": "user", "content": ""} +# Outgoing: {"role": "assistant", "content": ""} +# +# Plain-text responses (non-JSON) are also accepted from the agent. # # The agent must be configured to auto-join rooms matching room_prefix # (via LiveKit Dispatch / webhook). See the livekit-agents SDK docs. @@ -23,12 +26,31 @@ assistant: livekit: url: "wss://your-project.livekit.cloud" - room_prefix: "eval" # Room names: eval- + room_prefix: "eval" # Room names: eval-- data_topic: "text" # Data channel topic your agent listens on response_timeout: 30 # Seconds to wait for each response agent_join_timeout: 30 # Seconds to wait for agent to join room - agent_backend: "none" # "smallest" to delegate prompt management - # to Smallest AI, "none" to manage externally + + # Prompt management — choose one: + # + # "smallest" — delegate to Smallest AI (requires SMALLEST_API_KEY) + # "local" — store prompt locally; use with inject_system_prompt: true + # so each conversation starts with the current prompt + # "none" — manage externally; research/pipeline modes will error + # + agent_backend: "local" + + # Used when agent_backend is "local": + # Provide the initial system prompt inline or via a file path. + # If system_prompt_file exists on disk, it takes precedence over system_prompt. + system_prompt: | + You are a helpful voice assistant. Answer questions clearly and concisely. + system_prompt_file: "" # Optional: e.g. "agent_prompt.txt" + + # Send the current system prompt as the first data message of every + # conversation so the agent can apply it before user turns arrive. + # Your agent must handle {"role": "system", "content": "..."} messages. + inject_system_prompt: true autoresearch: eval_scenarios: 6