diff --git a/fern/pages/02-speech-to-text/universal-streaming/voice-agents/speechtospeech.mdx b/fern/pages/02-speech-to-text/universal-streaming/voice-agents/speechtospeech.mdx
index c3c28c66..12dcc5bf 100644
--- a/fern/pages/02-speech-to-text/universal-streaming/voice-agents/speechtospeech.mdx
+++ b/fern/pages/02-speech-to-text/universal-streaming/voice-agents/speechtospeech.mdx
@@ -1,467 +1,1736 @@
---
-title: "Speech-to-Speech"
-description: "Build real-time voice AI agents that listen and respond naturally"
+title: "Speech-to-Speech API (Beta)"
+description: "Build real-time voice AI agents using AssemblyAI's OpenAI-compatible Realtime API"
---
-Build voice-powered AI agents that have natural conversations with your users. Your agent listens to speech and responds with a natural-sounding voice—all in real-time.
+
+ This is a beta product and is not production-ready. The API is subject to change without notice. Do not use this for production workloads.
+
-
- This is an early stage product subject to change and should not be used for
- production usage.
-
+AssemblyAI's Speech-to-Speech API lets you build voice agents that listen and respond naturally in real-time. The API follows the OpenAI Realtime API schema, making it easy to integrate with existing tools and frameworks like LiveKit, Pipecat, and the OpenAI client libraries.
-## How it works
+## Quickstart
-```
-┌─────────────┐ ┌─────────────────┐ ┌─────────────┐
-│ │ Audio │ │ Audio │ │
-│ User │ ────────────► │ Voice Agent │ ────────────► │ User │
-│ (speaks) │ │ │ │ (hears) │
-└─────────────┘ └─────────────────┘ └─────────────┘
-```
+The easiest way to get started is with the AssemblyAI Python SDK. You can also use raw WebSocket connections or the OpenAI client library for more control.
-1. **User speaks** — Your app captures microphone audio and streams it to the agent
-2. **Agent responds** — The agent processes the speech and generates a spoken response
-3. **User hears** — Your app receives audio and plays it through the speaker
+
+
+```python
+"""
+Minimal Speech-to-Speech Example
-The entire flow happens in real-time with low latency.
+Requirements:
+ pip install assemblyai pyaudio
----
+Usage:
+ export ASSEMBLYAI_API_KEY=your_api_key
+ python voice_agent.py
+"""
-## Quick Start
+import os
-Get a voice agent up and running in 3 steps.
+import pyaudio
+from assemblyai.speech_to_speech import SpeechToSpeechClient
-### Step 1: Get your API key
+# Setup
+client = SpeechToSpeechClient(api_key=os.getenv("ASSEMBLYAI_API_KEY"))
+audio = pyaudio.PyAudio()
-Grab your API key from your [AssemblyAI dashboard](https://www.assemblyai.com/app).
+# Audio output stream
+output_stream = audio.open(format=pyaudio.paInt16, channels=1, rate=24000, output=True)
-### Step 2: Create your agent
-Create an agent by sending a POST request. Here's an example of a friendly assistant:
+# Handle events
+@client.on_audio
+def play(audio_bytes: bytes):
+ output_stream.write(audio_bytes)
-
-
-```bash
-curl -X POST https://aaigentsv1.up.railway.app/agents \
- -H "Authorization: YOUR_API_KEY" \
- -H "Content-Type: application/json" \
- -d '{
- "agent_name": "friendly_assistant",
- "instructions": "You are a friendly and helpful assistant. Keep your responses concise and conversational. Be warm and personable.",
- "voice": "luna",
- "greeting": "Say hello and ask how you can help today."
- }'
+
+@client.on_text
+def show_text(text: str):
+ print(f"\nAgent: {text}")
+
+
+@client.on_transcript
+def show_transcript(text: str):
+ print(f"\nYou: {text}")
+
+
+@client.on_error
+def handle_error(error):
+ print(f"\nERROR: {error}")
+
+
+# Connect
+print("Connecting to AssemblyAI Speech-to-Speech API...")
+client.connect(
+ instructions="You are a helpful assistant. Be brief.",
+ output_modalities=["audio", "text"],
+ vad_threshold=0.3,
+)
+
+# Stream from microphone
+input_stream = audio.open(
+ format=pyaudio.paInt16, channels=1, rate=24000, input=True, frames_per_buffer=4096
+)
+
+try:
+ while True:
+ audio_data = input_stream.read(4096, exception_on_overflow=False)
+ client.send_audio(audio_data)
+except KeyboardInterrupt:
+ pass
+finally:
+ client.disconnect()
+ input_stream.close()
+ output_stream.close()
+ audio.terminate()
```
-
-
+
+
```python
-import requests
-
-response = requests.post(
-"https://aaigentsv1.up.railway.app/agents",
-headers={
-"Authorization": "YOUR_API_KEY",
-"Content-Type": "application/json"
-},
-json={
-"agent_name": "friendly_assistant",
-"instructions": "You are a friendly and helpful assistant. Keep your responses concise and conversational. Be warm and personable.",
-"voice": "luna",
-"greeting": "Say hello and ask how you can help today."
-}
-)
+"""
+Speech-to-Speech Voice Agent with Tool Calling
-print(response.json())
+Requirements:
+ pip install assemblyai pyaudio
-````
-
-
-```javascript
-const response = await fetch("https://aaigentsv1.up.railway.app/agents", {
- method: "POST",
- headers: {
- "Authorization": "YOUR_API_KEY",
- "Content-Type": "application/json"
- },
- body: JSON.stringify({
- agent_name: "friendly_assistant",
- instructions: "You are a friendly and helpful assistant. Keep your responses concise and conversational. Be warm and personable.",
- voice: "luna",
- greeting: "Say hello and ask how you can help today."
- })
-});
-
-console.log(await response.json());
-````
+Usage:
+ export ASSEMBLYAI_API_KEY=your_api_key
+ python voice_agent.py
+"""
-
-
+import os
-### Step 3: Start a conversation
+import pyaudio
+from assemblyai.speech_to_speech import SpeechToSpeechClient
-Connect to your agent via WebSocket and start talking:
+# Setup
+client = SpeechToSpeechClient(api_key=os.getenv("ASSEMBLYAI_API_KEY"))
+audio = pyaudio.PyAudio()
-```
-wss://aaigentsv1.up.railway.app/ws/friendly_assistant
-```
+# Audio output stream
+output_stream = audio.open(format=pyaudio.paInt16, channels=1, rate=24000, output=True)
-Once connected, send audio as binary WebSocket frames (PCM16, 16kHz, mono) and receive the agent's spoken responses back as audio.
-
-```python
-import asyncio
-import json
-import websockets
-import sounddevice as sd
-import numpy as np
+# === Register Tools ===
-async def voice_chat():
-uri = "wss://aaigentsv1.up.railway.app/ws/friendly_assistant"
-queue = asyncio.Queue(maxsize=100)
-session_ready = False
+@client.tool
+def get_current_time() -> str:
+ """Get the current time."""
+ from datetime import datetime
+ return datetime.now().strftime("%I:%M %p")
- async with websockets.connect(uri, ping_interval=10, ping_timeout=20) as ws:
- print("Connected! Waiting for session...")
- # Send microphone audio to the agent
- async def send_audio():
- while True:
- data = await queue.get()
- if session_ready:
- await ws.send(data)
- queue.task_done()
+@client.tool
+def get_weather(location: str, units: str = "fahrenheit") -> dict:
+ """Get the current weather for a location."""
+ return {
+ "location": location,
+ "temperature": 72 if units == "fahrenheit" else 22,
+ "units": units,
+ "conditions": "sunny",
+ }
- asyncio.create_task(send_audio())
- loop = asyncio.get_running_loop()
- def mic_callback(indata, frames, time, status):
- if not queue.full():
- loop.call_soon_threadsafe(queue.put_nowait, bytes(indata))
+@client.tool
+def set_reminder(message: str, minutes: int) -> str:
+ """Set a reminder for a specified number of minutes from now."""
+ return f"Reminder set: '{message}' in {minutes} minutes"
- with sd.InputStream(samplerate=16000, channels=1, dtype='int16', callback=mic_callback), \
- sd.OutputStream(samplerate=16000, channels=1, dtype='int16') as speaker:
- while True:
- response = await ws.recv()
+# === Event Handlers ===
- # Play audio responses
- if isinstance(response, bytes) and len(response):
- speaker.write(np.frombuffer(response, dtype=np.int16))
+@client.on_audio
+def play(audio_bytes: bytes):
+ output_stream.write(audio_bytes)
- # Handle JSON messages
- elif isinstance(response, str):
- msg = json.loads(response)
- if msg.get("type") == "session.created":
- print("Session ready! Start speaking...")
- session_ready = True
+@client.on_text
+def show_text(text: str):
+ print(f"\nAgent: {text}")
- elif msg.get("type") == "conversation.item.done":
- item = msg.get("item", {})
- role = item.get("role")
- text = item.get("content", [{}])[0].get("text", "")
- print(f"[{role}]: {text}")
-asyncio.run(voice_chat())
+@client.on_transcript
+def show_transcript(text: str):
+ print(f"\nYou: {text}")
-````
-Install dependencies with:
-```bash
-pip install websockets sounddevice numpy
-````
+@client.on_speech_started
+def on_speech_started():
+ print("\nListening...", end="", flush=True)
-
-That's it! You now have a working voice agent.
+@client.on_speech_stopped
+def on_speech_stopped():
+ print(" [processing]", end="", flush=True)
----
-## Example agents
+@client.on_error
+def handle_error(error):
+ print(f"\nERROR: {error}")
-Here are some practical examples to inspire your own agents.
-### Customer support agent
+# === Main ===
-```json
-{
- "agent_name": "support_agent",
- "instructions": "You are a customer support agent for a software company. Be helpful, patient, and empathetic. Ask clarifying questions to understand the customer's issue. If you can't solve a problem, offer to escalate to a human agent. Keep responses brief and focused.",
- "voice": "celeste",
- "greeting": "Thank the customer for calling and ask how you can help them today."
-}
-```
+print("Speech-to-Speech Voice Agent")
+print("Registered tools:", [t.name for t in client.tools])
+print("Press Ctrl+C to stop\n")
-### Appointment scheduler
+# Connect
+client.connect(
+ instructions="You are a helpful assistant. Be brief.",
+ output_modalities=["audio", "text"],
+ vad_threshold=0.3,
+)
-```json
-{
- "agent_name": "appointment_scheduler",
- "instructions": "You are a friendly receptionist who helps schedule appointments. Collect the caller's name, preferred date and time, and reason for the appointment. Confirm all details before ending the call. Be efficient but warm.",
- "voice": "estelle",
- "greeting": "Welcome the caller and ask if they'd like to schedule an appointment."
-}
+# Stream from microphone
+input_stream = audio.open(
+ format=pyaudio.paInt16, channels=1, rate=24000, input=True, frames_per_buffer=4096
+)
+
+try:
+ while True:
+ audio_data = input_stream.read(4096, exception_on_overflow=False)
+ client.send_audio(audio_data)
+except KeyboardInterrupt:
+ pass
+finally:
+ client.disconnect()
+ input_stream.close()
+ output_stream.close()
+ audio.terminate()
```
+
+
+
+```python
+"""
+Speech-to-Speech Voice Agent using raw WebSocket
-### Virtual concierge
+Requirements:
+ pip install websockets pyaudio
-```json
-{
- "agent_name": "hotel_concierge",
- "instructions": "You are a luxury hotel concierge. Be warm, professional, and knowledgeable. Help guests with restaurant recommendations, local attractions, transportation, and any requests. Anticipate needs and offer personalized suggestions.",
- "voice": "orion",
- "greeting": "Welcome the guest and ask how you can make their stay more enjoyable."
-}
-```
+Usage:
+ export ASSEMBLYAI_API_KEY=your_api_key
+ python voice_agent_ws.py
+"""
----
+import asyncio
+import json
+import os
+import queue
+import threading
-## Choose a voice
+import pyaudio
+import websockets
-Pick a voice that matches your agent's personality.
+ASSEMBLYAI_API_KEY = os.environ.get("ASSEMBLYAI_API_KEY")
+URL = "wss://speech-to-speech.assemblyai.com/v1/realtime"
+
+# Audio settings
+INPUT_SAMPLE_RATE = 16000 # 16kHz for input
+OUTPUT_SAMPLE_RATE = 24000 # 24kHz for output
+CHANNELS = 1
+CHUNK_SIZE = 4096
+
+
+class AudioPlayer:
+ """Handles audio playback in a separate thread."""
+
+ def __init__(self):
+ self._audio = pyaudio.PyAudio()
+ self._stream = self._audio.open(
+ format=pyaudio.paInt16,
+ channels=CHANNELS,
+ rate=OUTPUT_SAMPLE_RATE,
+ output=True,
+ frames_per_buffer=CHUNK_SIZE,
+ )
+ self._queue = queue.Queue()
+ self._running = True
+ self._thread = threading.Thread(target=self._playback_loop, daemon=True)
+ self._thread.start()
+
+ def play(self, audio_data: bytes):
+ self._queue.put(audio_data)
+
+ def _playback_loop(self):
+ while self._running:
+ try:
+ audio_data = self._queue.get(timeout=0.1)
+ self._stream.write(audio_data)
+ except queue.Empty:
+ continue
+
+ def stop(self):
+ self._running = False
+ self._thread.join(timeout=1)
+ self._stream.stop_stream()
+ self._stream.close()
+ self._audio.terminate()
+
+
+class MicrophoneStream:
+ """Streams audio from the microphone."""
+
+ def __init__(self):
+ self._audio = pyaudio.PyAudio()
+ self._stream = self._audio.open(
+ format=pyaudio.paInt16,
+ channels=CHANNELS,
+ rate=INPUT_SAMPLE_RATE,
+ input=True,
+ frames_per_buffer=CHUNK_SIZE,
+ )
+ self._running = True
+
+ def read(self):
+ if self._running:
+ return self._stream.read(CHUNK_SIZE, exception_on_overflow=False)
+ return None
+
+ def stop(self):
+ self._running = False
+ self._stream.stop_stream()
+ self._stream.close()
+ self._audio.terminate()
+
+
+async def main():
+ audio_player = AudioPlayer()
+ mic = MicrophoneStream()
+
+ headers = {
+ "Authorization": f"Bearer {ASSEMBLYAI_API_KEY}",
+ "OpenAI-Beta": "realtime=v1"
+ }
-| Voice | Style |
-| ----------- | ----------------------------------- |
-| `luna` | Chill but excitable, gen-z optimist |
-| `celeste` | Warm, laid-back, fun-loving |
-| `orion` | Older male, warm and happy |
-| `ursa` | Young male, energetic |
-| `astra` | Young female, wide-eyed and curious |
-| `esther` | Older female, loving and caring |
-| `estelle` | Middle-aged female, sweet and kind |
-| `andromeda` | Young female, breathy and calm |
+ async with websockets.connect(URL, additional_headers=headers) as ws:
+ # Configure the session
+ await ws.send(json.dumps({
+ "type": "session.update",
+ "session": {
+ "model": "universal-streaming",
+ "voice": "sage",
+ "instructions": "You are a helpful assistant. Be concise and friendly.",
+ "input_audio_transcription": {
+ "model": "universal-streaming"
+ }
+ }
+ }))
+
+ print("Connected! Start speaking...")
+ print("Press Ctrl+C to stop\n")
----
+ async def send_audio():
+ loop = asyncio.get_event_loop()
+ while True:
+ # Read from microphone in a thread to avoid blocking
+ audio_data = await loop.run_in_executor(None, mic.read)
+ if audio_data:
+ await ws.send(audio_data)
+
+ async def receive_messages():
+ async for message in ws:
+ if isinstance(message, bytes):
+ # Binary frames contain audio data
+ audio_player.play(message)
+ else:
+ # Text frames contain JSON events
+ event = json.loads(message)
+
+ if event["type"] == "conversation.item.input_audio_transcription.completed":
+ print(f"\nYou: {event['transcript']}")
+
+ elif event["type"] == "response.audio_transcript.done":
+ print(f"\nAgent: {event['transcript']}")
+
+ try:
+ await asyncio.gather(send_audio(), receive_messages())
+ except KeyboardInterrupt:
+ print("\nStopping...")
+ finally:
+ mic.stop()
+ audio_player.stop()
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
+```
+
+
+```javascript
+/**
+ * Speech-to-Speech Voice Agent using WebSocket (Browser)
+ *
+ * Usage:
+ * 1. Set your API key in the ASSEMBLYAI_API_KEY variable
+ * 2. Open this file in a browser
+ * 3. Click "Start" to begin the conversation
+ */
+
+const ASSEMBLYAI_API_KEY = "your_api_key_here";
+const URL = "wss://speech-to-speech.assemblyai.com/v1/realtime";
+
+// Audio settings
+const INPUT_SAMPLE_RATE = 16000;
+const OUTPUT_SAMPLE_RATE = 24000;
+
+let ws;
+let audioContext;
+let mediaStream;
+let audioWorklet;
+
+async function start() {
+ // Set up WebSocket connection
+ ws = new WebSocket(URL);
+ ws.binaryType = "arraybuffer";
+
+ // Set up audio context for playback
+ audioContext = new AudioContext({ sampleRate: OUTPUT_SAMPLE_RATE });
+
+ ws.onopen = async () => {
+ // Send auth header via first message (browser WebSocket doesn't support headers)
+ ws.send(JSON.stringify({
+ type: "session.update",
+ session: {
+ model: "universal-streaming",
+ voice: "sage",
+ instructions: "You are a helpful assistant. Be concise and friendly.",
+ input_audio_transcription: { model: "universal-streaming" },
+ },
+ }));
+
+ console.log("Connected! Start speaking...");
+
+ // Start microphone capture
+ mediaStream = await navigator.mediaDevices.getUserMedia({
+ audio: { sampleRate: INPUT_SAMPLE_RATE, channelCount: 1 },
+ });
+
+ // Process microphone audio and send to WebSocket
+ const source = audioContext.createMediaStreamSource(mediaStream);
+ await audioContext.audioWorklet.addModule("audio-processor.js");
+ audioWorklet = new AudioWorkletNode(audioContext, "audio-processor");
+
+ audioWorklet.port.onmessage = (event) => {
+ if (ws.readyState === WebSocket.OPEN) {
+ // Send raw PCM16 audio as binary
+ ws.send(event.data);
+ }
+ };
+
+ source.connect(audioWorklet);
+ };
+
+ ws.onmessage = (event) => {
+ if (event.data instanceof ArrayBuffer) {
+ // Binary frames contain audio - play it
+ playAudio(event.data);
+ } else {
+ // Text frames contain JSON events
+ const data = JSON.parse(event.data);
+
+ if (data.type === "conversation.item.input_audio_transcription.completed") {
+ console.log(`You: ${data.transcript}`);
+ } else if (data.type === "response.audio_transcript.done") {
+ console.log(`Agent: ${data.transcript}`);
+ }
+ }
+ };
-## Add tools
+ ws.onerror = (error) => console.error("WebSocket error:", error);
+ ws.onclose = () => console.log("Disconnected");
+}
-Tools let your agent take actions—like checking a database, calling an API, or triggering a workflow.
+function playAudio(arrayBuffer) {
+ // Convert PCM16 to Float32 for Web Audio API
+ const int16Array = new Int16Array(arrayBuffer);
+ const float32Array = new Float32Array(int16Array.length);
+ for (let i = 0; i < int16Array.length; i++) {
+ float32Array[i] = int16Array[i] / 32768;
+ }
-Here's a simple example of an agent with a weather tool:
+ // Create and play audio buffer
+ const audioBuffer = audioContext.createBuffer(1, float32Array.length, OUTPUT_SAMPLE_RATE);
+ audioBuffer.getChannelData(0).set(float32Array);
-```json
-{
- "agent_name": "weather_assistant",
- "instructions": "You help users check the weather. When they ask about weather, use the get_weather tool to look it up.",
- "voice": "luna",
- "tools": [
- {
- "name": "get_weather",
- "description": "Get the current weather for a city",
- "parameters": {
- "type": "object",
- "properties": {
- "city": {
- "type": "string",
- "description": "The city name"
- }
- },
- "required": ["city"]
+ const source = audioContext.createBufferSource();
+ source.buffer = audioBuffer;
+ source.connect(audioContext.destination);
+ source.start();
+}
+
+function stop() {
+ if (mediaStream) {
+ mediaStream.getTracks().forEach((track) => track.stop());
+ }
+ if (ws) {
+ ws.close();
+ }
+ if (audioContext) {
+ audioContext.close();
+ }
+ console.log("Stopped");
+}
+```
+
+Create an `audio-processor.js` file for the AudioWorklet:
+
+```javascript
+// audio-processor.js - AudioWorklet for capturing microphone audio
+class AudioProcessor extends AudioWorkletProcessor {
+ constructor() {
+ super();
+ this.buffer = [];
+ }
+
+ process(inputs) {
+ const input = inputs[0];
+ if (input.length > 0) {
+ const samples = input[0];
+ // Convert Float32 to Int16 (PCM16)
+ const int16 = new Int16Array(samples.length);
+ for (let i = 0; i < samples.length; i++) {
+ int16[i] = Math.max(-32768, Math.min(32767, samples[i] * 32768));
}
+ this.port.postMessage(int16.buffer, [int16.buffer]);
}
- ]
+ return true;
+ }
}
+
+registerProcessor("audio-processor", AudioProcessor);
+```
+
+
+```python
+"""
+Speech-to-Speech Voice Agent using OpenAI Python Client
+
+Requirements:
+ pip install openai pyaudio
+
+Usage:
+ export ASSEMBLYAI_API_KEY=your_api_key
+ python voice_agent_openai.py
+"""
+
+import os
+import queue
+import threading
+
+import pyaudio
+from openai import OpenAI
+
+# Audio settings
+SAMPLE_RATE = 24000
+CHANNELS = 1
+CHUNK_SIZE = 4096
+
+
+class AudioPlayer:
+ """Handles audio playback in a separate thread."""
+
+ def __init__(self):
+ self._audio = pyaudio.PyAudio()
+ self._stream = self._audio.open(
+ format=pyaudio.paInt16,
+ channels=CHANNELS,
+ rate=SAMPLE_RATE,
+ output=True,
+ frames_per_buffer=CHUNK_SIZE,
+ )
+ self._queue = queue.Queue()
+ self._running = True
+ self._thread = threading.Thread(target=self._playback_loop, daemon=True)
+ self._thread.start()
+
+ def play(self, audio_data: bytes):
+ self._queue.put(audio_data)
+
+ def _playback_loop(self):
+ while self._running:
+ try:
+ audio_data = self._queue.get(timeout=0.1)
+ self._stream.write(audio_data)
+ except queue.Empty:
+ continue
+
+ def stop(self):
+ self._running = False
+ self._thread.join(timeout=1)
+ self._stream.stop_stream()
+ self._stream.close()
+ self._audio.terminate()
+
+
+class MicrophoneStream:
+ """Streams audio from the microphone."""
+
+ def __init__(self):
+ self._audio = pyaudio.PyAudio()
+ self._stream = self._audio.open(
+ format=pyaudio.paInt16,
+ channels=CHANNELS,
+ rate=SAMPLE_RATE,
+ input=True,
+ frames_per_buffer=CHUNK_SIZE,
+ )
+ self._running = True
+
+ def __iter__(self):
+ while self._running:
+ try:
+ data = self._stream.read(CHUNK_SIZE, exception_on_overflow=False)
+ yield data
+ except OSError:
+ break
+
+ def stop(self):
+ self._running = False
+ self._stream.stop_stream()
+ self._stream.close()
+ self._audio.terminate()
+
+
+def main():
+ client = OpenAI(
+ api_key=os.environ.get("ASSEMBLYAI_API_KEY"),
+ base_url="https://speech-to-speech.assemblyai.com/v1"
+ )
+
+ audio_player = AudioPlayer()
+ mic = MicrophoneStream()
+
+ print("Speech-to-Speech Voice Agent (OpenAI Client)")
+ print("Press Ctrl+C to stop\n")
+
+ with client.beta.realtime.connect(model="universal-streaming") as connection:
+ # Configure the session
+ connection.session.update(
+ session={
+ "voice": "sage",
+ "instructions": "You are a helpful assistant. Be concise and friendly.",
+ "input_audio_transcription": {
+ "model": "universal-streaming"
+ }
+ }
+ )
+
+ # Start sending audio in a background thread
+ def send_audio():
+ for chunk in mic:
+ connection.input_audio_buffer.append(audio=chunk)
+
+ audio_thread = threading.Thread(target=send_audio, daemon=True)
+ audio_thread.start()
+
+ try:
+ # Receive and handle events
+ for event in connection:
+ if event.type == "response.audio.delta":
+ # Play audio as it arrives
+ audio_player.play(event.delta)
+ elif event.type == "response.audio_transcript.done":
+ print(f"\nAgent: {event.transcript}")
+ elif event.type == "conversation.item.input_audio_transcription.completed":
+ print(f"\nYou: {event.transcript}")
+ except KeyboardInterrupt:
+ print("\nStopping...")
+ finally:
+ mic.stop()
+ audio_player.stop()
+
+
+if __name__ == "__main__":
+ main()
```
+
+
-When a user asks "What's the weather in Tokyo?", the agent sends your client a `tool.call` event:
+## Integration with voice agent frameworks
-```json
-{
- "type": "tool.call",
- "call_id": "call_abc123",
- "name": "get_weather",
- "arguments": { "city": "Tokyo" }
-}
+The Speech-to-Speech API works seamlessly with popular voice agent frameworks. Since it follows the OpenAI Realtime API schema, you can use it as a drop-in replacement.
+
+### LiveKit
+
+LiveKit's OpenAI realtime plugin automatically appends `/v1/realtime` to the base URL, so you only need to specify the base domain.
+
+```python
+import os
+from livekit.agents import AgentSession
+from livekit.plugins import openai
+from livekit.plugins.openai.realtime import AudioTranscription
+
+api_url = os.environ.get("ASSEMBLYAI_API_URL", "wss://speech-to-speech.assemblyai.com/v1")
+api_key = os.environ.get("ASSEMBLYAI_API_KEY")
+
+if not api_key:
+ raise ValueError("ASSEMBLYAI_API_KEY environment variable is required")
+
+session = AgentSession(
+ llm=openai.realtime.RealtimeModel(
+ base_url=api_url,
+ api_key=api_key,
+ voice="sage",
+ model="universal-streaming",
+ input_audio_transcription=AudioTranscription(
+ model="universal-streaming"
+ )
+ )
+)
+```
+
+
+```python
+import os
+import logging
+from livekit import rtc
+from livekit.agents import (
+ AgentSession,
+ Agent,
+ RoomInputOptions,
+ function_tool,
+ RunContext,
+)
+from livekit.plugins import openai
+from livekit.plugins.openai.realtime import AudioTranscription
+
+logger = logging.getLogger("voice-agent")
+
+class VoiceAgent(Agent):
+ def __init__(self):
+ super().__init__(
+ instructions="""You are a helpful voice assistant powered by AssemblyAI.
+ Be conversational, friendly, and concise in your responses."""
+ )
+
+ @function_tool()
+ async def get_current_time(self, context: RunContext) -> str:
+ """Get the current time."""
+ from datetime import datetime
+ return datetime.now().strftime("%I:%M %p")
+
+ @function_tool()
+ async def end_conversation(self, context: RunContext) -> str:
+ """End the conversation when the user says goodbye."""
+ return "Goodbye! Have a great day."
+
+async def entrypoint(ctx):
+ api_url = os.environ.get("ASSEMBLYAI_API_URL", "wss://speech-to-speech.assemblyai.com/v1")
+ api_key = os.environ.get("ASSEMBLYAI_API_KEY")
+
+ session = AgentSession(
+ llm=openai.realtime.RealtimeModel(
+ base_url=api_url,
+ api_key=api_key,
+ voice="sage",
+ model="universal-streaming",
+ input_audio_transcription=AudioTranscription(
+ model="universal-streaming"
+ )
+ )
+ )
+
+ agent = VoiceAgent()
+ await session.start(
+ room=ctx.room,
+ agent=agent,
+ room_input_options=RoomInputOptions()
+ )
```
+
-Your client executes the function and sends back the result:
+### Pipecat
-```json
-{
- "type": "tool.result",
- "call_id": "call_abc123",
- "result": "{\"temperature\": \"72°F\", \"conditions\": \"sunny\"}"
-}
+Pipecat supports the OpenAI Realtime API through its transport layer. Configure it to use AssemblyAI's endpoint:
+
+```python
+import os
+from pipecat.transports.services.daily import DailyTransport
+from pipecat.services.openai_realtime import OpenAIRealtimeService
+
+api_key = os.environ.get("ASSEMBLYAI_API_KEY")
+
+realtime_service = OpenAIRealtimeService(
+ api_key=api_key,
+ base_url="wss://speech-to-speech.assemblyai.com/v1/realtime",
+ model="universal-streaming",
+ voice="sage",
+ system_prompt="You are a helpful assistant."
+)
```
-The agent then speaks the weather information to the user.
+
+```python
+import os
+import asyncio
+from pipecat.pipeline.pipeline import Pipeline
+from pipecat.pipeline.runner import PipelineRunner
+from pipecat.pipeline.task import PipelineTask
+from pipecat.transports.services.daily import DailyTransport, DailyParams
+from pipecat.services.openai_realtime import OpenAIRealtimeService
+
+async def main():
+ api_key = os.environ.get("ASSEMBLYAI_API_KEY")
+ daily_api_key = os.environ.get("DAILY_API_KEY")
+
+ transport = DailyTransport(
+ room_url="https://your-domain.daily.co/your-room",
+ token=daily_api_key,
+ bot_name="AssemblyAI Voice Agent",
+ params=DailyParams(
+ audio_in_enabled=True,
+ audio_out_enabled=True,
+ )
+ )
+
+ realtime_service = OpenAIRealtimeService(
+ api_key=api_key,
+ base_url="wss://speech-to-speech.assemblyai.com/v1/realtime",
+ model="universal-streaming",
+ voice="sage",
+ system_prompt="""You are a helpful customer service agent.
+ Be professional, empathetic, and solution-oriented."""
+ )
+
+ pipeline = Pipeline([
+ transport.input(),
+ realtime_service,
+ transport.output()
+ ])
+
+ runner = PipelineRunner()
+ task = PipelineTask(pipeline)
+ await runner.run(task)
+
+if __name__ == "__main__":
+ asyncio.run(main())
+```
+
----
+## Configuration
-## Agent configuration
+### Session parameters
-Full list of options when creating an agent.
+Configure your session using the `session.update` event:
-| Field | Type | Default | Description |
-| ----------------------- | ------ | -------- | --------------------------------------------------- |
-| `agent_name` | string | required | Unique identifier (letters, numbers, underscores) |
-| `instructions` | string | - | Personality and behavior guidelines |
-| `voice` | string | `"luna"` | Voice to use for responses |
-| `greeting` | string | - | What the agent says when a conversation starts |
-| `temperature` | float | `0.8` | Response creativity (0.0 = focused, 1.0 = creative) |
-| `max_tokens` | int | `4096` | Maximum response length |
-| `language` | string | `"en"` | Language code |
-| `tools` | array | - | Tool definitions (see above) |
-| `audio_in_sample_rate` | int | `16000` | Input audio sample rate in Hz |
-| `audio_out_sample_rate` | int | `16000` | Output audio sample rate in Hz |
+| Parameter | Type | Default | Description |
+|-----------|------|---------|-------------|
+| `model` | string | required | Use `"universal-streaming"` |
+| `voice` | string | `"sage"` | Voice for audio responses |
+| `instructions` | string | - | System prompt defining agent behavior |
+| `input_audio_transcription.model` | string | - | Set to `"universal-streaming"` for transcription |
+| `temperature` | float | `0.8` | Response creativity (0.0-1.0) |
+| `max_response_output_tokens` | int | `4096` | Maximum tokens in response |
+| `turn_detection` | object | - | Configure voice activity detection |
----
+### Available voices
-## WebSocket events
+| Voice | Description |
+|-------|-------------|
+| `sage` | Calm and professional |
+| `ember` | Warm and expressive |
+| `breeze` | Light and friendly |
+| `cascade` | Clear and articulate |
-When connected to an agent, you'll receive these events:
+### Audio format
-### session.created
+The API uses raw PCM16 audio sent as binary WebSocket frames (not base64 encoded). Send audio in 50ms chunks for optimal performance.
-Sent when the connection is established and ready.
+**Input audio:**
+- Encoding: Raw PCM16 (16-bit signed integer, little-endian)
+- Sample rate: 16,000 Hz
+- Channels: Mono
+- Chunk size: 50ms recommended (1,600 bytes per chunk)
-```json
-{
- "type": "session.created",
- "session": {
- "id": "uuid",
- "agent_name": "my_agent"
- }
-}
-```
+**Output audio:**
+- Encoding: Raw PCM16 (16-bit signed integer, little-endian)
+- Sample rate: 24,000 Hz
+- Channels: Mono
-### conversation.item.done
+## Tool calling
-Sent when a speaker finishes talking. Contains the transcript.
+Enable your agent to perform actions by defining tools.
-```json
-{
- "type": "conversation.item.done",
- "item": {
- "role": "user",
- "content": [{ "type": "text", "text": "What's the weather like?" }]
- }
-}
-```
+
+
+With the SDK, use the `@client.tool` decorator to register functions as tools. The SDK automatically handles tool execution and response generation.
-### conversation.item.interim
+```python
+import assemblyai as aai
-Sent during speech with partial transcripts. Useful for showing real-time captions.
+client = aai.speech_to_speech.SpeechToSpeechClient(
+ api_key=os.environ["ASSEMBLYAI_API_KEY"]
+)
-```json
-{
- "type": "conversation.item.interim",
- "item": {
- "role": "user",
- "content": [{ "type": "text", "text": "What's the wea..." }]
- }
-}
+@client.tool
+def get_current_time() -> str:
+ """Get the current time."""
+ from datetime import datetime
+ return datetime.now().strftime("%I:%M %p")
+
+@client.tool
+def get_weather(location: str, units: str = "fahrenheit") -> dict:
+ """Get the current weather for a location."""
+ return {
+ "location": location,
+ "temperature": 72 if units == "fahrenheit" else 22,
+ "conditions": "sunny",
+ }
+
+@client.tool
+def check_order_status(order_id: str) -> dict:
+ """Check the status of a customer order."""
+ return {
+ "order_id": order_id,
+ "status": "shipped",
+ "estimated_delivery": "January 28, 2026",
+ }
+
+# Tools are automatically available when you connect
+client.connect(
+ instructions="You help users check order status and get weather information.",
+ voice=aai.speech_to_speech.Voice.SAGE,
+)
```
+
+
+With raw WebSocket, define tools using JSON Schema format and handle tool calls manually.
-### tool.call
+```python
+# Define tools in your session configuration
+await ws.send(json.dumps({
+ "type": "session.update",
+ "session": {
+ "model": "universal-streaming",
+ "voice": "sage",
+ "instructions": "You help users check order status.",
+ "tools": [
+ {
+ "type": "function",
+ "name": "check_order_status",
+ "description": "Check the status of a customer order",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "order_id": {
+ "type": "string",
+ "description": "The order ID to look up"
+ }
+ },
+ "required": ["order_id"]
+ }
+ }
+ ]
+ }
+}))
+```
-Sent when the agent wants to use a tool. See [Add tools](#add-tools) for handling.
+When the agent decides to use a tool, you'll receive a `response.function_call_arguments.done` event:
-### Audio (binary)
+```python
+async for message in ws:
+ event = json.loads(message)
+
+ if event["type"] == "response.function_call_arguments.done":
+ tool_name = event["name"]
+ arguments = json.loads(event["arguments"])
+ call_id = event["call_id"]
+
+ # Execute the tool
+ if tool_name == "check_order_status":
+ result = await check_order_status(arguments["order_id"])
+
+ # Send the result back
+ await ws.send(json.dumps({
+ "type": "conversation.item.create",
+ "item": {
+ "type": "function_call_output",
+ "call_id": call_id,
+ "output": json.dumps(result)
+ }
+ }))
+
+ # Trigger a response
+ await ws.send(json.dumps({"type": "response.create"}))
+```
+
+
-The agent's spoken responses come as binary WebSocket frames containing PCM16 audio.
+## Subagent routing
----
+Route conversations to specialized subagents based on user intent. This pattern is useful for complex applications where different agents handle different domains.
-## Audio format
+```python
+from livekit.agents import AgentSession, Agent, function_tool, RunContext
+from livekit.plugins import openai
+from livekit.plugins.openai.realtime import AudioTranscription
+
+class RouterAgent(Agent):
+ def __init__(self):
+ super().__init__(
+ instructions="""You are a routing agent. Determine the user's intent and
+ route them to the appropriate specialist:
+ - For billing questions, use transfer_to_billing
+ - For technical support, use transfer_to_support
+ - For sales inquiries, use transfer_to_sales"""
+ )
+
+ @function_tool()
+ async def transfer_to_billing(self, context: RunContext) -> str:
+ """Transfer the conversation to the billing specialist."""
+ context.session.update_agent(BillingAgent())
+ return "Transferring you to our billing specialist..."
+
+ @function_tool()
+ async def transfer_to_support(self, context: RunContext) -> str:
+ """Transfer the conversation to technical support."""
+ context.session.update_agent(SupportAgent())
+ return "Transferring you to technical support..."
+
+ @function_tool()
+ async def transfer_to_sales(self, context: RunContext) -> str:
+ """Transfer the conversation to the sales team."""
+ context.session.update_agent(SalesAgent())
+ return "Transferring you to our sales team..."
+
+class BillingAgent(Agent):
+ def __init__(self):
+ super().__init__(
+ instructions="""You are a billing specialist. Help users with:
+ - Invoice questions
+ - Payment issues
+ - Subscription changes
+ Be professional and thorough."""
+ )
+
+ @function_tool()
+ async def lookup_invoice(self, context: RunContext, invoice_id: str) -> str:
+ """Look up an invoice by ID."""
+ # Implement invoice lookup logic
+ return f"Invoice {invoice_id}: $99.00, paid on Jan 15, 2026"
+
+class SupportAgent(Agent):
+ def __init__(self):
+ super().__init__(
+ instructions="""You are a technical support specialist. Help users with:
+ - Troubleshooting issues
+ - Product questions
+ - Feature explanations
+ Be patient and clear in your explanations."""
+ )
+
+class SalesAgent(Agent):
+ def __init__(self):
+ super().__init__(
+ instructions="""You are a sales specialist. Help users with:
+ - Product information
+ - Pricing questions
+ - Demo scheduling
+ Be helpful and not pushy."""
+ )
+```
-Both input and output audio use the same format:
+## Sample agents
-- **Encoding**: PCM16 (16-bit signed integer, little-endian)
-- **Sample rate**: 16,000 Hz (configurable)
-- **Channels**: Mono
+Here are complete, copy-paste-ready examples for common use cases. Set your `ASSEMBLYAI_API_KEY` environment variable and run.
----
+### Debt collection agent
-## REST API reference
+A professional agent for payment reminder calls with compliance-aware messaging.
-
+
+```python
+import os
+import asyncio
+import json
+import base64
+import websockets
+from datetime import datetime
-**Base URL**: `https://aaigentsv1.up.railway.app`
+ASSEMBLYAI_API_KEY = os.environ.get("ASSEMBLYAI_API_KEY")
+URL = "wss://speech-to-speech.assemblyai.com/v1/realtime"
-All REST endpoints require an `Authorization: YOUR_API_KEY` header.
+# Mock database
+ACCOUNTS = {
+ "ACC001": {"name": "John Smith", "balance": 450.00, "due_date": "2026-01-15"},
+ "ACC002": {"name": "Jane Doe", "balance": 1200.00, "due_date": "2026-01-10"},
+}
-### Create or update agent
+INSTRUCTIONS = """You are a professional debt collection agent for ABC Financial Services.
+Your role is to remind customers about overdue payments in a respectful and compliant manner.
-`POST /agents` — Create a new agent or update an existing one.
+Guidelines:
+- Always identify yourself and the company at the start
+- Verify you're speaking with the right person before discussing account details
+- Be professional, empathetic, and non-threatening
+- Offer payment plan options when appropriate
+- Document any promises to pay
+- Never harass, threaten, or use abusive language
+- Comply with FDCPA regulations
-### List agents
+Use the available tools to look up account information and record payment arrangements."""
-`GET /agents` — List all your agents.
+TOOLS = [
+ {
+ "type": "function",
+ "name": "lookup_account",
+ "description": "Look up a customer's account information by account ID",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "account_id": {
+ "type": "string",
+ "description": "The customer's account ID"
+ }
+ },
+ "required": ["account_id"]
+ }
+ },
+ {
+ "type": "function",
+ "name": "record_payment_promise",
+ "description": "Record a customer's promise to pay",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "account_id": {"type": "string"},
+ "amount": {"type": "number"},
+ "payment_date": {"type": "string", "description": "Date in YYYY-MM-DD format"}
+ },
+ "required": ["account_id", "amount", "payment_date"]
+ }
+ },
+ {
+ "type": "function",
+ "name": "setup_payment_plan",
+ "description": "Set up a payment plan for the customer",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "account_id": {"type": "string"},
+ "monthly_amount": {"type": "number"},
+ "num_payments": {"type": "integer"}
+ },
+ "required": ["account_id", "monthly_amount", "num_payments"]
+ }
+ }
+]
+
+def lookup_account(account_id: str) -> dict:
+ if account_id in ACCOUNTS:
+ return {"success": True, "account": ACCOUNTS[account_id]}
+ return {"success": False, "error": "Account not found"}
+
+def record_payment_promise(account_id: str, amount: float, payment_date: str) -> dict:
+ return {
+ "success": True,
+ "confirmation": f"Payment promise recorded: ${amount} by {payment_date}",
+ "reference": f"PRM-{datetime.now().strftime('%Y%m%d%H%M%S')}"
+ }
-```json
-{
- "agents": ["agent1", "agent2"],
- "count": 2
-}
+def setup_payment_plan(account_id: str, monthly_amount: float, num_payments: int) -> dict:
+ return {
+ "success": True,
+ "plan": {
+ "monthly_payment": monthly_amount,
+ "total_payments": num_payments,
+ "total_amount": monthly_amount * num_payments
+ },
+ "reference": f"PLN-{datetime.now().strftime('%Y%m%d%H%M%S')}"
+ }
+
+async def handle_tool_call(ws, event):
+ tool_name = event["name"]
+ arguments = json.loads(event["arguments"])
+ call_id = event["call_id"]
+
+ if tool_name == "lookup_account":
+ result = lookup_account(arguments["account_id"])
+ elif tool_name == "record_payment_promise":
+ result = record_payment_promise(
+ arguments["account_id"],
+ arguments["amount"],
+ arguments["payment_date"]
+ )
+ elif tool_name == "setup_payment_plan":
+ result = setup_payment_plan(
+ arguments["account_id"],
+ arguments["monthly_amount"],
+ arguments["num_payments"]
+ )
+ else:
+ result = {"error": "Unknown tool"}
+
+ await ws.send(json.dumps({
+ "type": "conversation.item.create",
+ "item": {
+ "type": "function_call_output",
+ "call_id": call_id,
+ "output": json.dumps(result)
+ }
+ }))
+ await ws.send(json.dumps({"type": "response.create"}))
+
+async def main():
+ headers = {
+ "Authorization": f"Bearer {ASSEMBLYAI_API_KEY}",
+ "OpenAI-Beta": "realtime=v1"
+ }
+
+ async with websockets.connect(URL, additional_headers=headers) as ws:
+ await ws.send(json.dumps({
+ "type": "session.update",
+ "session": {
+ "model": "universal-streaming",
+ "voice": "sage",
+ "instructions": INSTRUCTIONS,
+ "tools": TOOLS,
+ "input_audio_transcription": {"model": "universal-streaming"}
+ }
+ }))
+
+ print("Debt Collection Agent ready. Start speaking...")
+
+ async for message in ws:
+ event = json.loads(message)
+
+ if event["type"] == "response.function_call_arguments.done":
+ await handle_tool_call(ws, event)
+ elif event["type"] == "conversation.item.input_audio_transcription.completed":
+ print(f"Customer: {event['transcript']}")
+ elif event["type"] == "response.audio_transcript.done":
+ print(f"Agent: {event['transcript']}")
+
+if __name__ == "__main__":
+ asyncio.run(main())
```
+
-### Get agent
+### Interview agent
-`GET /agents/{agent_name}` — Get an agent's configuration.
+An AI interviewer that conducts structured interviews and evaluates candidates.
-### Delete agent
+
+```python
+import os
+import asyncio
+import json
+import websockets
+from datetime import datetime
-`DELETE /agents/{agent_name}` — Delete an agent.
+ASSEMBLYAI_API_KEY = os.environ.get("ASSEMBLYAI_API_KEY")
+URL = "wss://speech-to-speech.assemblyai.com/v1/realtime"
-
+INSTRUCTIONS = """You are an AI interviewer conducting a technical screening interview for a software engineering position.
-
+Interview structure:
+1. Introduction and rapport building (2 minutes)
+2. Background and experience questions (5 minutes)
+3. Technical questions (10 minutes)
+4. Behavioral questions using STAR method (5 minutes)
+5. Candidate questions (3 minutes)
+6. Closing
-### List conversations
+Guidelines:
+- Be professional, warm, and encouraging
+- Ask follow-up questions to dig deeper into responses
+- Take notes on key points using the record_note tool
+- Score responses using the score_response tool
+- Keep track of time and move through sections appropriately
+- At the end, provide a summary using the generate_summary tool
-`GET /agents/{agent_name}/conversations` — List all conversations for an agent.
+Start by introducing yourself and the interview process."""
-```json
-{
- "agent_name": "my_agent",
- "conversations": [
+TOOLS = [
+ {
+ "type": "function",
+ "name": "record_note",
+ "description": "Record a note about the candidate's response",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "category": {
+ "type": "string",
+ "enum": ["experience", "technical", "behavioral", "communication", "other"]
+ },
+ "note": {"type": "string"},
+ "sentiment": {
+ "type": "string",
+ "enum": ["positive", "neutral", "negative"]
+ }
+ },
+ "required": ["category", "note"]
+ }
+ },
+ {
+ "type": "function",
+ "name": "score_response",
+ "description": "Score a candidate's response to a question",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "question_topic": {"type": "string"},
+ "score": {
+ "type": "integer",
+ "description": "Score from 1-5"
+ },
+ "reasoning": {"type": "string"}
+ },
+ "required": ["question_topic", "score", "reasoning"]
+ }
+ },
{
- "conversation_id": "uuid",
- "created_at": "2025-12-18T13:00:00Z"
+ "type": "function",
+ "name": "generate_summary",
+ "description": "Generate an interview summary at the end",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "overall_impression": {"type": "string"},
+ "strengths": {
+ "type": "array",
+ "items": {"type": "string"}
+ },
+ "areas_for_improvement": {
+ "type": "array",
+ "items": {"type": "string"}
+ },
+ "recommendation": {
+ "type": "string",
+ "enum": ["strong_hire", "hire", "maybe", "no_hire"]
+ }
+ },
+ "required": ["overall_impression", "strengths", "areas_for_improvement", "recommendation"]
+ }
+ },
+ {
+ "type": "function",
+ "name": "end_interview",
+ "description": "End the interview session",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "reason": {"type": "string"}
+ },
+ "required": ["reason"]
+ }
}
- ],
- "count": 1
+]
+
+interview_data = {
+ "notes": [],
+ "scores": [],
+ "start_time": None
}
-```
-### Get conversation
+def record_note(category: str, note: str, sentiment: str = "neutral") -> dict:
+ interview_data["notes"].append({
+ "category": category,
+ "note": note,
+ "sentiment": sentiment,
+ "timestamp": datetime.now().isoformat()
+ })
+ return {"success": True, "message": "Note recorded"}
+
+def score_response(question_topic: str, score: int, reasoning: str) -> dict:
+ interview_data["scores"].append({
+ "topic": question_topic,
+ "score": score,
+ "reasoning": reasoning
+ })
+ avg_score = sum(s["score"] for s in interview_data["scores"]) / len(interview_data["scores"])
+ return {"success": True, "current_average": round(avg_score, 2)}
+
+def generate_summary(overall_impression: str, strengths: list, areas_for_improvement: list, recommendation: str) -> dict:
+ return {
+ "success": True,
+ "summary": {
+ "overall_impression": overall_impression,
+ "strengths": strengths,
+ "areas_for_improvement": areas_for_improvement,
+ "recommendation": recommendation,
+ "average_score": sum(s["score"] for s in interview_data["scores"]) / len(interview_data["scores"]) if interview_data["scores"] else 0,
+ "notes_count": len(interview_data["notes"])
+ }
+ }
-`GET /agents/{agent_name}/conversations/{conversation_id}` — Get a specific conversation with all messages.
+async def handle_tool_call(ws, event):
+ tool_name = event["name"]
+ arguments = json.loads(event["arguments"])
+ call_id = event["call_id"]
+
+ if tool_name == "record_note":
+ result = record_note(arguments["category"], arguments["note"], arguments.get("sentiment", "neutral"))
+ elif tool_name == "score_response":
+ result = score_response(arguments["question_topic"], arguments["score"], arguments["reasoning"])
+ elif tool_name == "generate_summary":
+ result = generate_summary(
+ arguments["overall_impression"],
+ arguments["strengths"],
+ arguments["areas_for_improvement"],
+ arguments["recommendation"]
+ )
+ elif tool_name == "end_interview":
+ result = {"success": True, "message": "Interview ended", "reason": arguments["reason"]}
+ print(f"\n=== Interview Summary ===")
+ print(f"Notes: {len(interview_data['notes'])}")
+ print(f"Scores: {interview_data['scores']}")
+ else:
+ result = {"error": "Unknown tool"}
+
+ await ws.send(json.dumps({
+ "type": "conversation.item.create",
+ "item": {
+ "type": "function_call_output",
+ "call_id": call_id,
+ "output": json.dumps(result)
+ }
+ }))
+ await ws.send(json.dumps({"type": "response.create"}))
+
+async def main():
+ interview_data["start_time"] = datetime.now()
+
+ headers = {
+ "Authorization": f"Bearer {ASSEMBLYAI_API_KEY}",
+ "OpenAI-Beta": "realtime=v1"
+ }
-```json
-{
- "conversation_id": "uuid",
- "agent_name": "my_agent",
- "items": [],
- "created_at": "2025-12-18T13:00:00Z"
-}
+ async with websockets.connect(URL, additional_headers=headers) as ws:
+ await ws.send(json.dumps({
+ "type": "session.update",
+ "session": {
+ "model": "universal-streaming",
+ "voice": "sage",
+ "instructions": INSTRUCTIONS,
+ "tools": TOOLS,
+ "input_audio_transcription": {"model": "universal-streaming"}
+ }
+ }))
+
+ print("Interview Agent ready. The interview will begin shortly...")
+
+ # Trigger initial greeting
+ await ws.send(json.dumps({"type": "response.create"}))
+
+ async for message in ws:
+ event = json.loads(message)
+
+ if event["type"] == "response.function_call_arguments.done":
+ await handle_tool_call(ws, event)
+ elif event["type"] == "conversation.item.input_audio_transcription.completed":
+ print(f"Candidate: {event['transcript']}")
+ elif event["type"] == "response.audio_transcript.done":
+ print(f"Interviewer: {event['transcript']}")
+
+if __name__ == "__main__":
+ asyncio.run(main())
```
-
-
+### Lead qualification agent
-Tools follow JSON Schema format:
+A sales development agent that qualifies leads using BANT methodology.
-```json
-{
- "name": "tool_name",
- "description": "What this tool does",
- "parameters": {
- "type": "object",
- "properties": {
- "param_name": {
- "type": "string",
- "description": "What this parameter is for"
- }
+
+```python
+import os
+import asyncio
+import json
+import websockets
+from datetime import datetime
+
+ASSEMBLYAI_API_KEY = os.environ.get("ASSEMBLYAI_API_KEY")
+URL = "wss://speech-to-speech.assemblyai.com/v1/realtime"
+
+INSTRUCTIONS = """You are a sales development representative (SDR) for TechCorp, a B2B SaaS company.
+Your goal is to qualify leads using the BANT framework:
+- Budget: Do they have budget allocated?
+- Authority: Are you speaking with a decision maker?
+- Need: Do they have a genuine need for our solution?
+- Timeline: When are they looking to implement?
+
+Guidelines:
+- Be conversational and build rapport
+- Ask open-ended questions to understand their situation
+- Listen actively and respond to what they say
+- Don't be pushy - focus on understanding their needs
+- Use the qualification tools to track BANT criteria
+- If qualified, offer to schedule a demo with an account executive
+- If not qualified, politely end the call and offer resources
+
+Start by introducing yourself and asking about their current challenges."""
+
+TOOLS = [
+ {
+ "type": "function",
+ "name": "update_qualification",
+ "description": "Update the lead's BANT qualification status",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "criterion": {
+ "type": "string",
+ "enum": ["budget", "authority", "need", "timeline"]
+ },
+ "status": {
+ "type": "string",
+ "enum": ["qualified", "not_qualified", "unknown"]
+ },
+ "notes": {"type": "string"}
+ },
+ "required": ["criterion", "status"]
+ }
},
- "required": ["param_name"]
- }
+ {
+ "type": "function",
+ "name": "record_company_info",
+ "description": "Record information about the prospect's company",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "company_name": {"type": "string"},
+ "industry": {"type": "string"},
+ "company_size": {"type": "string"},
+ "current_solution": {"type": "string"}
+ }
+ }
+ },
+ {
+ "type": "function",
+ "name": "schedule_demo",
+ "description": "Schedule a demo with an account executive",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "preferred_date": {"type": "string"},
+ "preferred_time": {"type": "string"},
+ "attendees": {
+ "type": "array",
+ "items": {"type": "string"}
+ },
+ "notes": {"type": "string"}
+ },
+ "required": ["preferred_date", "preferred_time"]
+ }
+ },
+ {
+ "type": "function",
+ "name": "send_resources",
+ "description": "Send educational resources to the prospect",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "resource_type": {
+ "type": "string",
+ "enum": ["case_study", "whitepaper", "product_overview", "pricing_guide"]
+ },
+ "email": {"type": "string"}
+ },
+ "required": ["resource_type", "email"]
+ }
+ },
+ {
+ "type": "function",
+ "name": "end_call",
+ "description": "End the qualification call",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "outcome": {
+ "type": "string",
+ "enum": ["qualified_demo_scheduled", "qualified_follow_up", "not_qualified", "callback_requested"]
+ },
+ "summary": {"type": "string"}
+ },
+ "required": ["outcome", "summary"]
+ }
+ }
+]
+
+lead_data = {
+ "qualification": {
+ "budget": {"status": "unknown", "notes": ""},
+ "authority": {"status": "unknown", "notes": ""},
+ "need": {"status": "unknown", "notes": ""},
+ "timeline": {"status": "unknown", "notes": ""}
+ },
+ "company_info": {},
+ "call_start": None
}
-```
-**Supported parameter types**: `string`, `number`, `boolean`, `array`, `object`
+def update_qualification(criterion: str, status: str, notes: str = "") -> dict:
+ lead_data["qualification"][criterion] = {"status": status, "notes": notes}
+ qualified_count = sum(1 for c in lead_data["qualification"].values() if c["status"] == "qualified")
+ return {
+ "success": True,
+ "qualification_progress": f"{qualified_count}/4 criteria qualified",
+ "is_fully_qualified": qualified_count == 4
+ }
+
+def record_company_info(**kwargs) -> dict:
+ lead_data["company_info"].update(kwargs)
+ return {"success": True, "recorded_fields": list(kwargs.keys())}
+
+def schedule_demo(preferred_date: str, preferred_time: str, attendees: list = None, notes: str = "") -> dict:
+ return {
+ "success": True,
+ "confirmation": {
+ "date": preferred_date,
+ "time": preferred_time,
+ "attendees": attendees or [],
+ "meeting_link": "https://meet.techcorp.com/demo-abc123",
+ "calendar_invite_sent": True
+ }
+ }
+
+def send_resources(resource_type: str, email: str) -> dict:
+ return {
+ "success": True,
+ "message": f"{resource_type.replace('_', ' ').title()} will be sent to {email}"
+ }
+
+def end_call(outcome: str, summary: str) -> dict:
+ duration = (datetime.now() - lead_data["call_start"]).seconds if lead_data["call_start"] else 0
+ return {
+ "success": True,
+ "call_summary": {
+ "outcome": outcome,
+ "summary": summary,
+ "duration_seconds": duration,
+ "qualification_status": lead_data["qualification"],
+ "company_info": lead_data["company_info"]
+ }
+ }
+
+async def handle_tool_call(ws, event):
+ tool_name = event["name"]
+ arguments = json.loads(event["arguments"])
+ call_id = event["call_id"]
+
+ if tool_name == "update_qualification":
+ result = update_qualification(arguments["criterion"], arguments["status"], arguments.get("notes", ""))
+ elif tool_name == "record_company_info":
+ result = record_company_info(**arguments)
+ elif tool_name == "schedule_demo":
+ result = schedule_demo(
+ arguments["preferred_date"],
+ arguments["preferred_time"],
+ arguments.get("attendees"),
+ arguments.get("notes", "")
+ )
+ elif tool_name == "send_resources":
+ result = send_resources(arguments["resource_type"], arguments["email"])
+ elif tool_name == "end_call":
+ result = end_call(arguments["outcome"], arguments["summary"])
+ print(f"\n=== Call Summary ===")
+ print(json.dumps(result["call_summary"], indent=2))
+ else:
+ result = {"error": "Unknown tool"}
+
+ await ws.send(json.dumps({
+ "type": "conversation.item.create",
+ "item": {
+ "type": "function_call_output",
+ "call_id": call_id,
+ "output": json.dumps(result)
+ }
+ }))
+ await ws.send(json.dumps({"type": "response.create"}))
+
+async def main():
+ lead_data["call_start"] = datetime.now()
+
+ headers = {
+ "Authorization": f"Bearer {ASSEMBLYAI_API_KEY}",
+ "OpenAI-Beta": "realtime=v1"
+ }
+ async with websockets.connect(URL, additional_headers=headers) as ws:
+ await ws.send(json.dumps({
+ "type": "session.update",
+ "session": {
+ "model": "universal-streaming",
+ "voice": "coral",
+ "instructions": INSTRUCTIONS,
+ "tools": TOOLS,
+ "input_audio_transcription": {"model": "universal-streaming"}
+ }
+ }))
+
+ print("Lead Qualification Agent ready. Start the call...")
+
+ # Trigger initial greeting
+ await ws.send(json.dumps({"type": "response.create"}))
+
+ async for message in ws:
+ event = json.loads(message)
+
+ if event["type"] == "response.function_call_arguments.done":
+ await handle_tool_call(ws, event)
+ elif event["type"] == "conversation.item.input_audio_transcription.completed":
+ print(f"Prospect: {event['transcript']}")
+ elif event["type"] == "response.audio_transcript.done":
+ print(f"SDR: {event['transcript']}")
+
+if __name__ == "__main__":
+ asyncio.run(main())
+```
+
+## WebSocket events reference
+
+### Client events (you send)
+
+| Event | Description |
+|-------|-------------|
+| `session.update` | Configure session parameters, instructions, and tools (JSON) |
+| Binary frame | Send raw PCM16 audio data (50ms chunks recommended) |
+| `input_audio_buffer.commit` | Commit the audio buffer for processing (JSON) |
+| `input_audio_buffer.clear` | Clear the audio buffer (JSON) |
+| `conversation.item.create` | Add an item to the conversation, e.g., tool results (JSON) |
+| `response.create` | Request the model to generate a response (JSON) |
+| `response.cancel` | Cancel an in-progress response (JSON) |
+
+### Server events (you receive)
+
+| Event | Description |
+|-------|-------------|
+| `session.created` | Session has been created (JSON) |
+| `session.updated` | Session configuration has been updated (JSON) |
+| `conversation.item.created` | A conversation item was added (JSON) |
+| `conversation.item.input_audio_transcription.completed` | User speech transcription is complete (JSON) |
+| `response.created` | Response generation has started (JSON) |
+| Binary frame | Raw PCM16 audio chunk for the response |
+| `response.audio.done` | Audio generation is complete (JSON) |
+| `response.audio_transcript.delta` | Partial transcript of the response (JSON) |
+| `response.audio_transcript.done` | Full transcript of the response (JSON) |
+| `response.function_call_arguments.done` | Tool call with complete arguments (JSON) |
+| `response.done` | Response generation is complete (JSON) |
+| `error` | An error occurred (JSON) |
+
+## Roadmap
+
+The Speech-to-Speech API is under active development. Planned features include:
+
+- Additional voice options
+- Custom voice cloning
+- Improved latency optimizations
+- Enhanced turn detection
+- Multi-language support
+- Conversation history and context management
+
+## Known issues
+
+Current limitations of the beta:
+
+- Latency may vary during high-traffic periods
+- Some edge cases in turn detection may cause interruptions
+- Tool calling response times may occasionally be slower than expected
+- WebSocket connections may timeout after extended idle periods
+
+Report issues or provide feedback through your AssemblyAI account representative.