From a2e04341e51ab3e76d1c99f9dbdf734a8ca9a431 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 21 Jan 2026 20:01:26 +0000 Subject: [PATCH 1/5] Update Speech-to-Speech docs with OpenAI Realtime API schema - Add beta warning prominently at the top - Update to use new API endpoint (wss://speech-to-speech.assemblyai.com/v1/realtime) - Add WebSocket code examples for Python and JavaScript - Add OpenAI Python client example - Add LiveKit integration with full agent example - Add Pipecat integration with full pipeline example - Add tool calling documentation and examples - Add subagent routing documentation with multi-agent example - Add complete sample agents: - Debt collection agent with FDCPA compliance - Interview agent with scoring and notes - Lead qualification agent with BANT methodology - Add WebSocket events reference (client and server events) - Add roadmap and known issues sections - Remove unhelpful ASCII diagram - Reorganize content for better readability Co-Authored-By: Dan Ince --- .../voice-agents/speechtospeech.mdx | 1480 +++++++++++++---- 1 file changed, 1119 insertions(+), 361 deletions(-) diff --git a/fern/pages/02-speech-to-text/universal-streaming/voice-agents/speechtospeech.mdx b/fern/pages/02-speech-to-text/universal-streaming/voice-agents/speechtospeech.mdx index c3c28c66..a0e359ed 100644 --- a/fern/pages/02-speech-to-text/universal-streaming/voice-agents/speechtospeech.mdx +++ b/fern/pages/02-speech-to-text/universal-streaming/voice-agents/speechtospeech.mdx @@ -1,467 +1,1225 @@ --- -title: "Speech-to-Speech" -description: "Build real-time voice AI agents that listen and respond naturally" +title: "Speech-to-Speech API (Beta)" +description: "Build real-time voice AI agents using AssemblyAI's OpenAI-compatible Realtime API" --- -Build voice-powered AI agents that have natural conversations with your users. Your agent listens to speech and responds with a natural-sounding voice—all in real-time. + + This is a beta product and is not production-ready. The API is subject to change without notice. Do not use this for production workloads. + - - This is an early stage product subject to change and should not be used for - production usage. - +AssemblyAI's Speech-to-Speech API lets you build voice agents that listen and respond naturally in real-time. The API follows the OpenAI Realtime API schema, making it easy to integrate with existing tools and frameworks like LiveKit, Pipecat, and the OpenAI client libraries. -## How it works +## Quickstart -``` -┌─────────────┐ ┌─────────────────┐ ┌─────────────┐ -│ │ Audio │ │ Audio │ │ -│ User │ ────────────► │ Voice Agent │ ────────────► │ User │ -│ (speaks) │ │ │ │ (hears) │ -└─────────────┘ └─────────────────┘ └─────────────┘ -``` - -1. **User speaks** — Your app captures microphone audio and streams it to the agent -2. **Agent responds** — The agent processes the speech and generates a spoken response -3. **User hears** — Your app receives audio and plays it through the speaker - -The entire flow happens in real-time with low latency. - ---- - -## Quick Start - -Get a voice agent up and running in 3 steps. - -### Step 1: Get your API key - -Grab your API key from your [AssemblyAI dashboard](https://www.assemblyai.com/app). - -### Step 2: Create your agent - -Create an agent by sending a POST request. Here's an example of a friendly assistant: +Connect to the Speech-to-Speech API at `wss://speech-to-speech.assemblyai.com/v1/realtime` using your AssemblyAI API key. The API accepts audio input and returns both transcriptions and synthesized speech responses. - -```bash -curl -X POST https://aaigentsv1.up.railway.app/agents \ - -H "Authorization: YOUR_API_KEY" \ - -H "Content-Type: application/json" \ - -d '{ - "agent_name": "friendly_assistant", - "instructions": "You are a friendly and helpful assistant. Keep your responses concise and conversational. Be warm and personable.", - "voice": "luna", - "greeting": "Say hello and ask how you can help today." - }' -``` - - -```python -import requests - -response = requests.post( -"https://aaigentsv1.up.railway.app/agents", -headers={ -"Authorization": "YOUR_API_KEY", -"Content-Type": "application/json" -}, -json={ -"agent_name": "friendly_assistant", -"instructions": "You are a friendly and helpful assistant. Keep your responses concise and conversational. Be warm and personable.", -"voice": "luna", -"greeting": "Say hello and ask how you can help today." -} -) - -print(response.json()) - -```` - - -```javascript -const response = await fetch("https://aaigentsv1.up.railway.app/agents", { - method: "POST", - headers: { - "Authorization": "YOUR_API_KEY", - "Content-Type": "application/json" - }, - body: JSON.stringify({ - agent_name: "friendly_assistant", - instructions: "You are a friendly and helpful assistant. Keep your responses concise and conversational. Be warm and personable.", - voice: "luna", - greeting: "Say hello and ask how you can help today." - }) -}); - -console.log(await response.json()); -```` - - - - -### Step 3: Start a conversation - -Connect to your agent via WebSocket and start talking: - -``` -wss://aaigentsv1.up.railway.app/ws/friendly_assistant -``` - -Once connected, send audio as binary WebSocket frames (PCM16, 16kHz, mono) and receive the agent's spoken responses back as audio. - - + ```python import asyncio import json +import base64 +import os import websockets import sounddevice as sd import numpy as np -async def voice_chat(): -uri = "wss://aaigentsv1.up.railway.app/ws/friendly_assistant" -queue = asyncio.Queue(maxsize=100) -session_ready = False +ASSEMBLYAI_API_KEY = os.environ.get("ASSEMBLYAI_API_KEY") +URL = "wss://speech-to-speech.assemblyai.com/v1/realtime" - async with websockets.connect(uri, ping_interval=10, ping_timeout=20) as ws: - print("Connected! Waiting for session...") +async def main(): + headers = { + "Authorization": f"Bearer {ASSEMBLYAI_API_KEY}", + "OpenAI-Beta": "realtime=v1" + } - # Send microphone audio to the agent - async def send_audio(): - while True: - data = await queue.get() - if session_ready: - await ws.send(data) - queue.task_done() + async with websockets.connect(URL, additional_headers=headers) as ws: + # Configure the session + await ws.send(json.dumps({ + "type": "session.update", + "session": { + "model": "universal-streaming", + "voice": "sage", + "instructions": "You are a helpful assistant. Be concise and friendly.", + "input_audio_transcription": { + "model": "universal-streaming" + } + } + })) - asyncio.create_task(send_audio()) - loop = asyncio.get_running_loop() + print("Connected! Start speaking...") - def mic_callback(indata, frames, time, status): - if not queue.full(): - loop.call_soon_threadsafe(queue.put_nowait, bytes(indata)) + # Set up audio input/output + audio_queue = asyncio.Queue() - with sd.InputStream(samplerate=16000, channels=1, dtype='int16', callback=mic_callback), \ - sd.OutputStream(samplerate=16000, channels=1, dtype='int16') as speaker: + def audio_callback(indata, frames, time, status): + audio_queue.put_nowait(bytes(indata)) + async def send_audio(): while True: - response = await ws.recv() - - # Play audio responses - if isinstance(response, bytes) and len(response): - speaker.write(np.frombuffer(response, dtype=np.int16)) - - # Handle JSON messages - elif isinstance(response, str): - msg = json.loads(response) - - if msg.get("type") == "session.created": - print("Session ready! Start speaking...") - session_ready = True - - elif msg.get("type") == "conversation.item.done": - item = msg.get("item", {}) - role = item.get("role") - text = item.get("content", [{}])[0].get("text", "") - print(f"[{role}]: {text}") - -asyncio.run(voice_chat()) - -```` - -Install dependencies with: -```bash -pip install websockets sounddevice numpy -```` - - + audio_data = await audio_queue.get() + audio_b64 = base64.b64encode(audio_data).decode() + await ws.send(json.dumps({ + "type": "input_audio_buffer.append", + "audio": audio_b64 + })) + + async def receive_messages(): + with sd.OutputStream(samplerate=24000, channels=1, dtype='int16') as speaker: + async for message in ws: + event = json.loads(message) + + if event["type"] == "response.audio.delta": + audio_bytes = base64.b64decode(event["delta"]) + audio_array = np.frombuffer(audio_bytes, dtype=np.int16) + speaker.write(audio_array) + + elif event["type"] == "conversation.item.input_audio_transcription.completed": + print(f"You: {event['transcript']}") + + elif event["type"] == "response.audio_transcript.done": + print(f"Agent: {event['transcript']}") + + with sd.InputStream(samplerate=16000, channels=1, dtype='int16', callback=audio_callback): + await asyncio.gather(send_audio(), receive_messages()) + +if __name__ == "__main__": + asyncio.run(main()) +``` + + +```javascript +const WebSocket = require("ws"); -That's it! You now have a working voice agent. +const ASSEMBLYAI_API_KEY = process.env.ASSEMBLYAI_API_KEY; +const URL = "wss://speech-to-speech.assemblyai.com/v1/realtime"; ---- +const ws = new WebSocket(URL, { + headers: { + Authorization: `Bearer ${ASSEMBLYAI_API_KEY}`, + "OpenAI-Beta": "realtime=v1", + }, +}); -## Example agents +ws.on("open", () => { + console.log("Connected!"); + + // Configure the session + ws.send( + JSON.stringify({ + type: "session.update", + session: { + model: "universal-streaming", + voice: "sage", + instructions: "You are a helpful assistant. Be concise and friendly.", + input_audio_transcription: { + model: "universal-streaming", + }, + }, + }) + ); +}); -Here are some practical examples to inspire your own agents. +ws.on("message", (data) => { + const event = JSON.parse(data); -### Customer support agent + switch (event.type) { + case "response.audio.delta": + // Handle audio playback - decode base64 and play + const audioBuffer = Buffer.from(event.delta, "base64"); + // Play audio using your preferred audio library + break; -```json -{ - "agent_name": "support_agent", - "instructions": "You are a customer support agent for a software company. Be helpful, patient, and empathetic. Ask clarifying questions to understand the customer's issue. If you can't solve a problem, offer to escalate to a human agent. Keep responses brief and focused.", - "voice": "celeste", - "greeting": "Thank the customer for calling and ask how you can help them today." -} -``` + case "conversation.item.input_audio_transcription.completed": + console.log(`You: ${event.transcript}`); + break; -### Appointment scheduler + case "response.audio_transcript.done": + console.log(`Agent: ${event.transcript}`); + break; + } +}); -```json -{ - "agent_name": "appointment_scheduler", - "instructions": "You are a friendly receptionist who helps schedule appointments. Collect the caller's name, preferred date and time, and reason for the appointment. Confirm all details before ending the call. Be efficient but warm.", - "voice": "estelle", - "greeting": "Welcome the caller and ask if they'd like to schedule an appointment." +// Send audio data (PCM16, 16kHz, mono) +function sendAudio(audioBuffer) { + const base64Audio = audioBuffer.toString("base64"); + ws.send( + JSON.stringify({ + type: "input_audio_buffer.append", + audio: base64Audio, + }) + ); } ``` + + +```python +import os +from openai import OpenAI -### Virtual concierge +client = OpenAI( + api_key=os.environ.get("ASSEMBLYAI_API_KEY"), + base_url="https://speech-to-speech.assemblyai.com/v1" +) -```json -{ - "agent_name": "hotel_concierge", - "instructions": "You are a luxury hotel concierge. Be warm, professional, and knowledgeable. Help guests with restaurant recommendations, local attractions, transportation, and any requests. Anticipate needs and offer personalized suggestions.", - "voice": "orion", - "greeting": "Welcome the guest and ask how you can make their stay more enjoyable." -} +# Connect to the realtime API +with client.beta.realtime.connect( + model="universal-streaming" +) as connection: + # Configure the session + connection.session.update( + session={ + "voice": "sage", + "instructions": "You are a helpful assistant.", + "input_audio_transcription": { + "model": "universal-streaming" + } + } + ) + + # Send audio and receive responses + for event in connection: + if event.type == "response.audio_transcript.done": + print(f"Agent: {event.transcript}") + elif event.type == "conversation.item.input_audio_transcription.completed": + print(f"You: {event.transcript}") ``` + + ---- +## Integration with voice agent frameworks -## Choose a voice +The Speech-to-Speech API works seamlessly with popular voice agent frameworks. Since it follows the OpenAI Realtime API schema, you can use it as a drop-in replacement. -Pick a voice that matches your agent's personality. +### LiveKit -| Voice | Style | -| ----------- | ----------------------------------- | -| `luna` | Chill but excitable, gen-z optimist | -| `celeste` | Warm, laid-back, fun-loving | -| `orion` | Older male, warm and happy | -| `ursa` | Young male, energetic | -| `astra` | Young female, wide-eyed and curious | -| `esther` | Older female, loving and caring | -| `estelle` | Middle-aged female, sweet and kind | -| `andromeda` | Young female, breathy and calm | +LiveKit's OpenAI realtime plugin automatically appends `/v1/realtime` to the base URL, so you only need to specify the base domain. ---- +```python +import os +from livekit.agents import AgentSession +from livekit.plugins import openai +from livekit.plugins.openai.realtime import AudioTranscription + +api_url = os.environ.get("ASSEMBLYAI_API_URL", "wss://speech-to-speech.assemblyai.com/v1") +api_key = os.environ.get("ASSEMBLYAI_API_KEY") + +if not api_key: + raise ValueError("ASSEMBLYAI_API_KEY environment variable is required") + +session = AgentSession( + llm=openai.realtime.RealtimeModel( + base_url=api_url, + api_key=api_key, + voice="sage", + model="universal-streaming", + input_audio_transcription=AudioTranscription( + model="universal-streaming" + ) + ) +) +``` -## Add tools + +```python +import os +import logging +from livekit import rtc +from livekit.agents import ( + AgentSession, + Agent, + RoomInputOptions, + function_tool, + RunContext, +) +from livekit.plugins import openai +from livekit.plugins.openai.realtime import AudioTranscription + +logger = logging.getLogger("voice-agent") + +class VoiceAgent(Agent): + def __init__(self): + super().__init__( + instructions="""You are a helpful voice assistant powered by AssemblyAI. + Be conversational, friendly, and concise in your responses.""" + ) + + @function_tool() + async def get_current_time(self, context: RunContext) -> str: + """Get the current time.""" + from datetime import datetime + return datetime.now().strftime("%I:%M %p") + + @function_tool() + async def end_conversation(self, context: RunContext) -> str: + """End the conversation when the user says goodbye.""" + return "Goodbye! Have a great day." + +async def entrypoint(ctx): + api_url = os.environ.get("ASSEMBLYAI_API_URL", "wss://speech-to-speech.assemblyai.com/v1") + api_key = os.environ.get("ASSEMBLYAI_API_KEY") + + session = AgentSession( + llm=openai.realtime.RealtimeModel( + base_url=api_url, + api_key=api_key, + voice="sage", + model="universal-streaming", + input_audio_transcription=AudioTranscription( + model="universal-streaming" + ) + ) + ) + + agent = VoiceAgent() + await session.start( + room=ctx.room, + agent=agent, + room_input_options=RoomInputOptions() + ) +``` + -Tools let your agent take actions—like checking a database, calling an API, or triggering a workflow. +### Pipecat -Here's a simple example of an agent with a weather tool: +Pipecat supports the OpenAI Realtime API through its transport layer. Configure it to use AssemblyAI's endpoint: -```json -{ - "agent_name": "weather_assistant", - "instructions": "You help users check the weather. When they ask about weather, use the get_weather tool to look it up.", - "voice": "luna", - "tools": [ - { - "name": "get_weather", - "description": "Get the current weather for a city", - "parameters": { - "type": "object", - "properties": { - "city": { - "type": "string", - "description": "The city name" - } - }, - "required": ["city"] - } - } - ] -} +```python +import os +from pipecat.transports.services.daily import DailyTransport +from pipecat.services.openai_realtime import OpenAIRealtimeService + +api_key = os.environ.get("ASSEMBLYAI_API_KEY") + +realtime_service = OpenAIRealtimeService( + api_key=api_key, + base_url="wss://speech-to-speech.assemblyai.com/v1/realtime", + model="universal-streaming", + voice="sage", + system_prompt="You are a helpful assistant." +) ``` -When a user asks "What's the weather in Tokyo?", the agent sends your client a `tool.call` event: - -```json -{ - "type": "tool.call", - "call_id": "call_abc123", - "name": "get_weather", - "arguments": { "city": "Tokyo" } -} + +```python +import os +import asyncio +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineTask +from pipecat.transports.services.daily import DailyTransport, DailyParams +from pipecat.services.openai_realtime import OpenAIRealtimeService + +async def main(): + api_key = os.environ.get("ASSEMBLYAI_API_KEY") + daily_api_key = os.environ.get("DAILY_API_KEY") + + transport = DailyTransport( + room_url="https://your-domain.daily.co/your-room", + token=daily_api_key, + bot_name="AssemblyAI Voice Agent", + params=DailyParams( + audio_in_enabled=True, + audio_out_enabled=True, + ) + ) + + realtime_service = OpenAIRealtimeService( + api_key=api_key, + base_url="wss://speech-to-speech.assemblyai.com/v1/realtime", + model="universal-streaming", + voice="sage", + system_prompt="""You are a helpful customer service agent. + Be professional, empathetic, and solution-oriented.""" + ) + + pipeline = Pipeline([ + transport.input(), + realtime_service, + transport.output() + ]) + + runner = PipelineRunner() + task = PipelineTask(pipeline) + await runner.run(task) + +if __name__ == "__main__": + asyncio.run(main()) ``` + -Your client executes the function and sends back the result: +## Configuration -```json -{ - "type": "tool.result", - "call_id": "call_abc123", - "result": "{\"temperature\": \"72°F\", \"conditions\": \"sunny\"}" -} -``` +### Session parameters -The agent then speaks the weather information to the user. +Configure your session using the `session.update` event: ---- +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `model` | string | required | Use `"universal-streaming"` | +| `voice` | string | `"sage"` | Voice for audio responses | +| `instructions` | string | - | System prompt defining agent behavior | +| `input_audio_transcription.model` | string | - | Set to `"universal-streaming"` for transcription | +| `temperature` | float | `0.8` | Response creativity (0.0-1.0) | +| `max_response_output_tokens` | int | `4096` | Maximum tokens in response | +| `turn_detection` | object | - | Configure voice activity detection | -## Agent configuration +### Available voices -Full list of options when creating an agent. +| Voice | Description | +|-------|-------------| +| `sage` | Calm and professional | +| `coral` | Warm and friendly | +| `verse` | Clear and articulate | +| `alloy` | Neutral and balanced | -| Field | Type | Default | Description | -| ----------------------- | ------ | -------- | --------------------------------------------------- | -| `agent_name` | string | required | Unique identifier (letters, numbers, underscores) | -| `instructions` | string | - | Personality and behavior guidelines | -| `voice` | string | `"luna"` | Voice to use for responses | -| `greeting` | string | - | What the agent says when a conversation starts | -| `temperature` | float | `0.8` | Response creativity (0.0 = focused, 1.0 = creative) | -| `max_tokens` | int | `4096` | Maximum response length | -| `language` | string | `"en"` | Language code | -| `tools` | array | - | Tool definitions (see above) | -| `audio_in_sample_rate` | int | `16000` | Input audio sample rate in Hz | -| `audio_out_sample_rate` | int | `16000` | Output audio sample rate in Hz | +### Audio format ---- +The API uses the following audio formats: -## WebSocket events +**Input audio:** +- Encoding: PCM16 (16-bit signed integer, little-endian) +- Sample rate: 16,000 Hz +- Channels: Mono -When connected to an agent, you'll receive these events: +**Output audio:** +- Encoding: PCM16 (16-bit signed integer, little-endian) +- Sample rate: 24,000 Hz +- Channels: Mono -### session.created +## Tool calling -Sent when the connection is established and ready. +Enable your agent to perform actions by defining tools. Tools follow the JSON Schema format used by OpenAI's function calling. -```json -{ - "type": "session.created", - "session": { - "id": "uuid", - "agent_name": "my_agent" - } -} +```python +# Define tools in your session configuration +await ws.send(json.dumps({ + "type": "session.update", + "session": { + "model": "universal-streaming", + "voice": "sage", + "instructions": "You help users check order status. Use the check_order_status tool when asked.", + "tools": [ + { + "type": "function", + "name": "check_order_status", + "description": "Check the status of a customer order", + "parameters": { + "type": "object", + "properties": { + "order_id": { + "type": "string", + "description": "The order ID to look up" + } + }, + "required": ["order_id"] + } + } + ] + } +})) ``` -### conversation.item.done +When the agent decides to use a tool, you'll receive a `response.function_call_arguments.done` event: -Sent when a speaker finishes talking. Contains the transcript. - -```json -{ - "type": "conversation.item.done", - "item": { - "role": "user", - "content": [{ "type": "text", "text": "What's the weather like?" }] - } -} +```python +async for message in ws: + event = json.loads(message) + + if event["type"] == "response.function_call_arguments.done": + tool_name = event["name"] + arguments = json.loads(event["arguments"]) + call_id = event["call_id"] + + # Execute the tool + if tool_name == "check_order_status": + result = await check_order_status(arguments["order_id"]) + + # Send the result back + await ws.send(json.dumps({ + "type": "conversation.item.create", + "item": { + "type": "function_call_output", + "call_id": call_id, + "output": json.dumps(result) + } + })) + + # Trigger a response + await ws.send(json.dumps({"type": "response.create"})) ``` -### conversation.item.interim +## Subagent routing -Sent during speech with partial transcripts. Useful for showing real-time captions. +Route conversations to specialized subagents based on user intent. This pattern is useful for complex applications where different agents handle different domains. -```json -{ - "type": "conversation.item.interim", - "item": { - "role": "user", - "content": [{ "type": "text", "text": "What's the wea..." }] - } -} +```python +from livekit.agents import AgentSession, Agent, function_tool, RunContext +from livekit.plugins import openai +from livekit.plugins.openai.realtime import AudioTranscription + +class RouterAgent(Agent): + def __init__(self): + super().__init__( + instructions="""You are a routing agent. Determine the user's intent and + route them to the appropriate specialist: + - For billing questions, use transfer_to_billing + - For technical support, use transfer_to_support + - For sales inquiries, use transfer_to_sales""" + ) + + @function_tool() + async def transfer_to_billing(self, context: RunContext) -> str: + """Transfer the conversation to the billing specialist.""" + context.session.update_agent(BillingAgent()) + return "Transferring you to our billing specialist..." + + @function_tool() + async def transfer_to_support(self, context: RunContext) -> str: + """Transfer the conversation to technical support.""" + context.session.update_agent(SupportAgent()) + return "Transferring you to technical support..." + + @function_tool() + async def transfer_to_sales(self, context: RunContext) -> str: + """Transfer the conversation to the sales team.""" + context.session.update_agent(SalesAgent()) + return "Transferring you to our sales team..." + +class BillingAgent(Agent): + def __init__(self): + super().__init__( + instructions="""You are a billing specialist. Help users with: + - Invoice questions + - Payment issues + - Subscription changes + Be professional and thorough.""" + ) + + @function_tool() + async def lookup_invoice(self, context: RunContext, invoice_id: str) -> str: + """Look up an invoice by ID.""" + # Implement invoice lookup logic + return f"Invoice {invoice_id}: $99.00, paid on Jan 15, 2026" + +class SupportAgent(Agent): + def __init__(self): + super().__init__( + instructions="""You are a technical support specialist. Help users with: + - Troubleshooting issues + - Product questions + - Feature explanations + Be patient and clear in your explanations.""" + ) + +class SalesAgent(Agent): + def __init__(self): + super().__init__( + instructions="""You are a sales specialist. Help users with: + - Product information + - Pricing questions + - Demo scheduling + Be helpful and not pushy.""" + ) ``` -### tool.call +## Sample agents -Sent when the agent wants to use a tool. See [Add tools](#add-tools) for handling. +Here are complete, copy-paste-ready examples for common use cases. Set your `ASSEMBLYAI_API_KEY` environment variable and run. -### Audio (binary) +### Debt collection agent -The agent's spoken responses come as binary WebSocket frames containing PCM16 audio. +A professional agent for payment reminder calls with compliance-aware messaging. ---- - -## Audio format - -Both input and output audio use the same format: - -- **Encoding**: PCM16 (16-bit signed integer, little-endian) -- **Sample rate**: 16,000 Hz (configurable) -- **Channels**: Mono - ---- + +```python +import os +import asyncio +import json +import base64 +import websockets +from datetime import datetime -## REST API reference +ASSEMBLYAI_API_KEY = os.environ.get("ASSEMBLYAI_API_KEY") +URL = "wss://speech-to-speech.assemblyai.com/v1/realtime" - +# Mock database +ACCOUNTS = { + "ACC001": {"name": "John Smith", "balance": 450.00, "due_date": "2026-01-15"}, + "ACC002": {"name": "Jane Doe", "balance": 1200.00, "due_date": "2026-01-10"}, +} -**Base URL**: `https://aaigentsv1.up.railway.app` +INSTRUCTIONS = """You are a professional debt collection agent for ABC Financial Services. +Your role is to remind customers about overdue payments in a respectful and compliant manner. -All REST endpoints require an `Authorization: YOUR_API_KEY` header. +Guidelines: +- Always identify yourself and the company at the start +- Verify you're speaking with the right person before discussing account details +- Be professional, empathetic, and non-threatening +- Offer payment plan options when appropriate +- Document any promises to pay +- Never harass, threaten, or use abusive language +- Comply with FDCPA regulations -### Create or update agent +Use the available tools to look up account information and record payment arrangements.""" -`POST /agents` — Create a new agent or update an existing one. +TOOLS = [ + { + "type": "function", + "name": "lookup_account", + "description": "Look up a customer's account information by account ID", + "parameters": { + "type": "object", + "properties": { + "account_id": { + "type": "string", + "description": "The customer's account ID" + } + }, + "required": ["account_id"] + } + }, + { + "type": "function", + "name": "record_payment_promise", + "description": "Record a customer's promise to pay", + "parameters": { + "type": "object", + "properties": { + "account_id": {"type": "string"}, + "amount": {"type": "number"}, + "payment_date": {"type": "string", "description": "Date in YYYY-MM-DD format"} + }, + "required": ["account_id", "amount", "payment_date"] + } + }, + { + "type": "function", + "name": "setup_payment_plan", + "description": "Set up a payment plan for the customer", + "parameters": { + "type": "object", + "properties": { + "account_id": {"type": "string"}, + "monthly_amount": {"type": "number"}, + "num_payments": {"type": "integer"} + }, + "required": ["account_id", "monthly_amount", "num_payments"] + } + } +] + +def lookup_account(account_id: str) -> dict: + if account_id in ACCOUNTS: + return {"success": True, "account": ACCOUNTS[account_id]} + return {"success": False, "error": "Account not found"} + +def record_payment_promise(account_id: str, amount: float, payment_date: str) -> dict: + return { + "success": True, + "confirmation": f"Payment promise recorded: ${amount} by {payment_date}", + "reference": f"PRM-{datetime.now().strftime('%Y%m%d%H%M%S')}" + } -### List agents +def setup_payment_plan(account_id: str, monthly_amount: float, num_payments: int) -> dict: + return { + "success": True, + "plan": { + "monthly_payment": monthly_amount, + "total_payments": num_payments, + "total_amount": monthly_amount * num_payments + }, + "reference": f"PLN-{datetime.now().strftime('%Y%m%d%H%M%S')}" + } -`GET /agents` — List all your agents. +async def handle_tool_call(ws, event): + tool_name = event["name"] + arguments = json.loads(event["arguments"]) + call_id = event["call_id"] + + if tool_name == "lookup_account": + result = lookup_account(arguments["account_id"]) + elif tool_name == "record_payment_promise": + result = record_payment_promise( + arguments["account_id"], + arguments["amount"], + arguments["payment_date"] + ) + elif tool_name == "setup_payment_plan": + result = setup_payment_plan( + arguments["account_id"], + arguments["monthly_amount"], + arguments["num_payments"] + ) + else: + result = {"error": "Unknown tool"} + + await ws.send(json.dumps({ + "type": "conversation.item.create", + "item": { + "type": "function_call_output", + "call_id": call_id, + "output": json.dumps(result) + } + })) + await ws.send(json.dumps({"type": "response.create"})) + +async def main(): + headers = { + "Authorization": f"Bearer {ASSEMBLYAI_API_KEY}", + "OpenAI-Beta": "realtime=v1" + } -```json -{ - "agents": ["agent1", "agent2"], - "count": 2 -} + async with websockets.connect(URL, additional_headers=headers) as ws: + await ws.send(json.dumps({ + "type": "session.update", + "session": { + "model": "universal-streaming", + "voice": "sage", + "instructions": INSTRUCTIONS, + "tools": TOOLS, + "input_audio_transcription": {"model": "universal-streaming"} + } + })) + + print("Debt Collection Agent ready. Start speaking...") + + async for message in ws: + event = json.loads(message) + + if event["type"] == "response.function_call_arguments.done": + await handle_tool_call(ws, event) + elif event["type"] == "conversation.item.input_audio_transcription.completed": + print(f"Customer: {event['transcript']}") + elif event["type"] == "response.audio_transcript.done": + print(f"Agent: {event['transcript']}") + +if __name__ == "__main__": + asyncio.run(main()) ``` + -### Get agent +### Interview agent -`GET /agents/{agent_name}` — Get an agent's configuration. +An AI interviewer that conducts structured interviews and evaluates candidates. -### Delete agent + +```python +import os +import asyncio +import json +import websockets +from datetime import datetime -`DELETE /agents/{agent_name}` — Delete an agent. +ASSEMBLYAI_API_KEY = os.environ.get("ASSEMBLYAI_API_KEY") +URL = "wss://speech-to-speech.assemblyai.com/v1/realtime" - +INSTRUCTIONS = """You are an AI interviewer conducting a technical screening interview for a software engineering position. - +Interview structure: +1. Introduction and rapport building (2 minutes) +2. Background and experience questions (5 minutes) +3. Technical questions (10 minutes) +4. Behavioral questions using STAR method (5 minutes) +5. Candidate questions (3 minutes) +6. Closing -### List conversations +Guidelines: +- Be professional, warm, and encouraging +- Ask follow-up questions to dig deeper into responses +- Take notes on key points using the record_note tool +- Score responses using the score_response tool +- Keep track of time and move through sections appropriately +- At the end, provide a summary using the generate_summary tool -`GET /agents/{agent_name}/conversations` — List all conversations for an agent. +Start by introducing yourself and the interview process.""" -```json -{ - "agent_name": "my_agent", - "conversations": [ +TOOLS = [ + { + "type": "function", + "name": "record_note", + "description": "Record a note about the candidate's response", + "parameters": { + "type": "object", + "properties": { + "category": { + "type": "string", + "enum": ["experience", "technical", "behavioral", "communication", "other"] + }, + "note": {"type": "string"}, + "sentiment": { + "type": "string", + "enum": ["positive", "neutral", "negative"] + } + }, + "required": ["category", "note"] + } + }, { - "conversation_id": "uuid", - "created_at": "2025-12-18T13:00:00Z" + "type": "function", + "name": "score_response", + "description": "Score a candidate's response to a question", + "parameters": { + "type": "object", + "properties": { + "question_topic": {"type": "string"}, + "score": { + "type": "integer", + "description": "Score from 1-5" + }, + "reasoning": {"type": "string"} + }, + "required": ["question_topic", "score", "reasoning"] + } + }, + { + "type": "function", + "name": "generate_summary", + "description": "Generate an interview summary at the end", + "parameters": { + "type": "object", + "properties": { + "overall_impression": {"type": "string"}, + "strengths": { + "type": "array", + "items": {"type": "string"} + }, + "areas_for_improvement": { + "type": "array", + "items": {"type": "string"} + }, + "recommendation": { + "type": "string", + "enum": ["strong_hire", "hire", "maybe", "no_hire"] + } + }, + "required": ["overall_impression", "strengths", "areas_for_improvement", "recommendation"] + } + }, + { + "type": "function", + "name": "end_interview", + "description": "End the interview session", + "parameters": { + "type": "object", + "properties": { + "reason": {"type": "string"} + }, + "required": ["reason"] + } } - ], - "count": 1 +] + +interview_data = { + "notes": [], + "scores": [], + "start_time": None } -``` -### Get conversation +def record_note(category: str, note: str, sentiment: str = "neutral") -> dict: + interview_data["notes"].append({ + "category": category, + "note": note, + "sentiment": sentiment, + "timestamp": datetime.now().isoformat() + }) + return {"success": True, "message": "Note recorded"} + +def score_response(question_topic: str, score: int, reasoning: str) -> dict: + interview_data["scores"].append({ + "topic": question_topic, + "score": score, + "reasoning": reasoning + }) + avg_score = sum(s["score"] for s in interview_data["scores"]) / len(interview_data["scores"]) + return {"success": True, "current_average": round(avg_score, 2)} + +def generate_summary(overall_impression: str, strengths: list, areas_for_improvement: list, recommendation: str) -> dict: + return { + "success": True, + "summary": { + "overall_impression": overall_impression, + "strengths": strengths, + "areas_for_improvement": areas_for_improvement, + "recommendation": recommendation, + "average_score": sum(s["score"] for s in interview_data["scores"]) / len(interview_data["scores"]) if interview_data["scores"] else 0, + "notes_count": len(interview_data["notes"]) + } + } -`GET /agents/{agent_name}/conversations/{conversation_id}` — Get a specific conversation with all messages. +async def handle_tool_call(ws, event): + tool_name = event["name"] + arguments = json.loads(event["arguments"]) + call_id = event["call_id"] + + if tool_name == "record_note": + result = record_note(arguments["category"], arguments["note"], arguments.get("sentiment", "neutral")) + elif tool_name == "score_response": + result = score_response(arguments["question_topic"], arguments["score"], arguments["reasoning"]) + elif tool_name == "generate_summary": + result = generate_summary( + arguments["overall_impression"], + arguments["strengths"], + arguments["areas_for_improvement"], + arguments["recommendation"] + ) + elif tool_name == "end_interview": + result = {"success": True, "message": "Interview ended", "reason": arguments["reason"]} + print(f"\n=== Interview Summary ===") + print(f"Notes: {len(interview_data['notes'])}") + print(f"Scores: {interview_data['scores']}") + else: + result = {"error": "Unknown tool"} + + await ws.send(json.dumps({ + "type": "conversation.item.create", + "item": { + "type": "function_call_output", + "call_id": call_id, + "output": json.dumps(result) + } + })) + await ws.send(json.dumps({"type": "response.create"})) + +async def main(): + interview_data["start_time"] = datetime.now() + + headers = { + "Authorization": f"Bearer {ASSEMBLYAI_API_KEY}", + "OpenAI-Beta": "realtime=v1" + } -```json -{ - "conversation_id": "uuid", - "agent_name": "my_agent", - "items": [], - "created_at": "2025-12-18T13:00:00Z" -} + async with websockets.connect(URL, additional_headers=headers) as ws: + await ws.send(json.dumps({ + "type": "session.update", + "session": { + "model": "universal-streaming", + "voice": "sage", + "instructions": INSTRUCTIONS, + "tools": TOOLS, + "input_audio_transcription": {"model": "universal-streaming"} + } + })) + + print("Interview Agent ready. The interview will begin shortly...") + + # Trigger initial greeting + await ws.send(json.dumps({"type": "response.create"})) + + async for message in ws: + event = json.loads(message) + + if event["type"] == "response.function_call_arguments.done": + await handle_tool_call(ws, event) + elif event["type"] == "conversation.item.input_audio_transcription.completed": + print(f"Candidate: {event['transcript']}") + elif event["type"] == "response.audio_transcript.done": + print(f"Interviewer: {event['transcript']}") + +if __name__ == "__main__": + asyncio.run(main()) ``` - - - -Tools follow JSON Schema format: - -```json -{ - "name": "tool_name", - "description": "What this tool does", - "parameters": { - "type": "object", - "properties": { - "param_name": { - "type": "string", - "description": "What this parameter is for" - } +### Lead qualification agent + +A sales development agent that qualifies leads using BANT methodology. + + +```python +import os +import asyncio +import json +import websockets +from datetime import datetime + +ASSEMBLYAI_API_KEY = os.environ.get("ASSEMBLYAI_API_KEY") +URL = "wss://speech-to-speech.assemblyai.com/v1/realtime" + +INSTRUCTIONS = """You are a sales development representative (SDR) for TechCorp, a B2B SaaS company. +Your goal is to qualify leads using the BANT framework: +- Budget: Do they have budget allocated? +- Authority: Are you speaking with a decision maker? +- Need: Do they have a genuine need for our solution? +- Timeline: When are they looking to implement? + +Guidelines: +- Be conversational and build rapport +- Ask open-ended questions to understand their situation +- Listen actively and respond to what they say +- Don't be pushy - focus on understanding their needs +- Use the qualification tools to track BANT criteria +- If qualified, offer to schedule a demo with an account executive +- If not qualified, politely end the call and offer resources + +Start by introducing yourself and asking about their current challenges.""" + +TOOLS = [ + { + "type": "function", + "name": "update_qualification", + "description": "Update the lead's BANT qualification status", + "parameters": { + "type": "object", + "properties": { + "criterion": { + "type": "string", + "enum": ["budget", "authority", "need", "timeline"] + }, + "status": { + "type": "string", + "enum": ["qualified", "not_qualified", "unknown"] + }, + "notes": {"type": "string"} + }, + "required": ["criterion", "status"] + } }, - "required": ["param_name"] - } + { + "type": "function", + "name": "record_company_info", + "description": "Record information about the prospect's company", + "parameters": { + "type": "object", + "properties": { + "company_name": {"type": "string"}, + "industry": {"type": "string"}, + "company_size": {"type": "string"}, + "current_solution": {"type": "string"} + } + } + }, + { + "type": "function", + "name": "schedule_demo", + "description": "Schedule a demo with an account executive", + "parameters": { + "type": "object", + "properties": { + "preferred_date": {"type": "string"}, + "preferred_time": {"type": "string"}, + "attendees": { + "type": "array", + "items": {"type": "string"} + }, + "notes": {"type": "string"} + }, + "required": ["preferred_date", "preferred_time"] + } + }, + { + "type": "function", + "name": "send_resources", + "description": "Send educational resources to the prospect", + "parameters": { + "type": "object", + "properties": { + "resource_type": { + "type": "string", + "enum": ["case_study", "whitepaper", "product_overview", "pricing_guide"] + }, + "email": {"type": "string"} + }, + "required": ["resource_type", "email"] + } + }, + { + "type": "function", + "name": "end_call", + "description": "End the qualification call", + "parameters": { + "type": "object", + "properties": { + "outcome": { + "type": "string", + "enum": ["qualified_demo_scheduled", "qualified_follow_up", "not_qualified", "callback_requested"] + }, + "summary": {"type": "string"} + }, + "required": ["outcome", "summary"] + } + } +] + +lead_data = { + "qualification": { + "budget": {"status": "unknown", "notes": ""}, + "authority": {"status": "unknown", "notes": ""}, + "need": {"status": "unknown", "notes": ""}, + "timeline": {"status": "unknown", "notes": ""} + }, + "company_info": {}, + "call_start": None } -``` -**Supported parameter types**: `string`, `number`, `boolean`, `array`, `object` +def update_qualification(criterion: str, status: str, notes: str = "") -> dict: + lead_data["qualification"][criterion] = {"status": status, "notes": notes} + qualified_count = sum(1 for c in lead_data["qualification"].values() if c["status"] == "qualified") + return { + "success": True, + "qualification_progress": f"{qualified_count}/4 criteria qualified", + "is_fully_qualified": qualified_count == 4 + } +def record_company_info(**kwargs) -> dict: + lead_data["company_info"].update(kwargs) + return {"success": True, "recorded_fields": list(kwargs.keys())} + +def schedule_demo(preferred_date: str, preferred_time: str, attendees: list = None, notes: str = "") -> dict: + return { + "success": True, + "confirmation": { + "date": preferred_date, + "time": preferred_time, + "attendees": attendees or [], + "meeting_link": "https://meet.techcorp.com/demo-abc123", + "calendar_invite_sent": True + } + } + +def send_resources(resource_type: str, email: str) -> dict: + return { + "success": True, + "message": f"{resource_type.replace('_', ' ').title()} will be sent to {email}" + } + +def end_call(outcome: str, summary: str) -> dict: + duration = (datetime.now() - lead_data["call_start"]).seconds if lead_data["call_start"] else 0 + return { + "success": True, + "call_summary": { + "outcome": outcome, + "summary": summary, + "duration_seconds": duration, + "qualification_status": lead_data["qualification"], + "company_info": lead_data["company_info"] + } + } + +async def handle_tool_call(ws, event): + tool_name = event["name"] + arguments = json.loads(event["arguments"]) + call_id = event["call_id"] + + if tool_name == "update_qualification": + result = update_qualification(arguments["criterion"], arguments["status"], arguments.get("notes", "")) + elif tool_name == "record_company_info": + result = record_company_info(**arguments) + elif tool_name == "schedule_demo": + result = schedule_demo( + arguments["preferred_date"], + arguments["preferred_time"], + arguments.get("attendees"), + arguments.get("notes", "") + ) + elif tool_name == "send_resources": + result = send_resources(arguments["resource_type"], arguments["email"]) + elif tool_name == "end_call": + result = end_call(arguments["outcome"], arguments["summary"]) + print(f"\n=== Call Summary ===") + print(json.dumps(result["call_summary"], indent=2)) + else: + result = {"error": "Unknown tool"} + + await ws.send(json.dumps({ + "type": "conversation.item.create", + "item": { + "type": "function_call_output", + "call_id": call_id, + "output": json.dumps(result) + } + })) + await ws.send(json.dumps({"type": "response.create"})) + +async def main(): + lead_data["call_start"] = datetime.now() + + headers = { + "Authorization": f"Bearer {ASSEMBLYAI_API_KEY}", + "OpenAI-Beta": "realtime=v1" + } + + async with websockets.connect(URL, additional_headers=headers) as ws: + await ws.send(json.dumps({ + "type": "session.update", + "session": { + "model": "universal-streaming", + "voice": "coral", + "instructions": INSTRUCTIONS, + "tools": TOOLS, + "input_audio_transcription": {"model": "universal-streaming"} + } + })) + + print("Lead Qualification Agent ready. Start the call...") + + # Trigger initial greeting + await ws.send(json.dumps({"type": "response.create"})) + + async for message in ws: + event = json.loads(message) + + if event["type"] == "response.function_call_arguments.done": + await handle_tool_call(ws, event) + elif event["type"] == "conversation.item.input_audio_transcription.completed": + print(f"Prospect: {event['transcript']}") + elif event["type"] == "response.audio_transcript.done": + print(f"SDR: {event['transcript']}") + +if __name__ == "__main__": + asyncio.run(main()) +``` + +## WebSocket events reference + +### Client events (you send) + +| Event | Description | +|-------|-------------| +| `session.update` | Configure session parameters, instructions, and tools | +| `input_audio_buffer.append` | Send audio data (base64-encoded PCM16) | +| `input_audio_buffer.commit` | Commit the audio buffer for processing | +| `input_audio_buffer.clear` | Clear the audio buffer | +| `conversation.item.create` | Add an item to the conversation (e.g., tool results) | +| `response.create` | Request the model to generate a response | +| `response.cancel` | Cancel an in-progress response | + +### Server events (you receive) + +| Event | Description | +|-------|-------------| +| `session.created` | Session has been created | +| `session.updated` | Session configuration has been updated | +| `conversation.item.created` | A conversation item was added | +| `conversation.item.input_audio_transcription.completed` | User speech transcription is complete | +| `response.created` | Response generation has started | +| `response.audio.delta` | Audio chunk for the response (base64-encoded) | +| `response.audio.done` | Audio generation is complete | +| `response.audio_transcript.delta` | Partial transcript of the response | +| `response.audio_transcript.done` | Full transcript of the response | +| `response.function_call_arguments.done` | Tool call with complete arguments | +| `response.done` | Response generation is complete | +| `error` | An error occurred | + +## Roadmap + +The Speech-to-Speech API is under active development. Planned features include: + +- Additional voice options +- Custom voice cloning +- Improved latency optimizations +- Enhanced turn detection +- Multi-language support +- Conversation history and context management + +## Known issues + +Current limitations of the beta: + +- Latency may vary during high-traffic periods +- Some edge cases in turn detection may cause interruptions +- Tool calling response times may occasionally be slower than expected +- WebSocket connections may timeout after extended idle periods + +Report issues or provide feedback through your AssemblyAI account representative. From 66c4aa75bbfa2f51e2ebd6bf07138852ca6af67c Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 21 Jan 2026 21:42:26 +0000 Subject: [PATCH 2/5] Update audio format to raw PCM16 in 50ms chunks (not base64) Co-Authored-By: Dan Ince --- .../voice-agents/speechtospeech.mdx | 158 ++++++++++-------- 1 file changed, 91 insertions(+), 67 deletions(-) diff --git a/fern/pages/02-speech-to-text/universal-streaming/voice-agents/speechtospeech.mdx b/fern/pages/02-speech-to-text/universal-streaming/voice-agents/speechtospeech.mdx index a0e359ed..f08510f3 100644 --- a/fern/pages/02-speech-to-text/universal-streaming/voice-agents/speechtospeech.mdx +++ b/fern/pages/02-speech-to-text/universal-streaming/voice-agents/speechtospeech.mdx @@ -18,7 +18,6 @@ Connect to the Speech-to-Speech API at `wss://speech-to-speech.assemblyai.com/v1 ```python import asyncio import json -import base64 import os import websockets import sounddevice as sd @@ -27,6 +26,12 @@ import numpy as np ASSEMBLYAI_API_KEY = os.environ.get("ASSEMBLYAI_API_KEY") URL = "wss://speech-to-speech.assemblyai.com/v1/realtime" +# Audio settings +INPUT_SAMPLE_RATE = 16000 # 16kHz for input +OUTPUT_SAMPLE_RATE = 24000 # 24kHz for output +CHUNK_MS = 50 # 50ms chunks recommended +CHUNK_SIZE = int(INPUT_SAMPLE_RATE * CHUNK_MS / 1000) * 2 # 1600 bytes per 50ms chunk + async def main(): headers = { "Authorization": f"Bearer {ASSEMBLYAI_API_KEY}", @@ -34,7 +39,7 @@ async def main(): } async with websockets.connect(URL, additional_headers=headers) as ws: - # Configure the session + # Configure the session (JSON message) await ws.send(json.dumps({ "type": "session.update", "session": { @@ -50,37 +55,43 @@ async def main(): print("Connected! Start speaking...") # Set up audio input/output + audio_buffer = bytearray() audio_queue = asyncio.Queue() def audio_callback(indata, frames, time, status): audio_queue.put_nowait(bytes(indata)) async def send_audio(): + nonlocal audio_buffer while True: audio_data = await audio_queue.get() - audio_b64 = base64.b64encode(audio_data).decode() - await ws.send(json.dumps({ - "type": "input_audio_buffer.append", - "audio": audio_b64 - })) + audio_buffer.extend(audio_data) + + # Send in 50ms chunks (1600 bytes for 16kHz mono PCM16) + while len(audio_buffer) >= CHUNK_SIZE: + chunk = bytes(audio_buffer[:CHUNK_SIZE]) + audio_buffer = audio_buffer[CHUNK_SIZE:] + # Send raw PCM16 bytes as binary WebSocket frame + await ws.send(chunk) async def receive_messages(): - with sd.OutputStream(samplerate=24000, channels=1, dtype='int16') as speaker: + with sd.OutputStream(samplerate=OUTPUT_SAMPLE_RATE, channels=1, dtype='int16') as speaker: async for message in ws: - event = json.loads(message) - - if event["type"] == "response.audio.delta": - audio_bytes = base64.b64decode(event["delta"]) - audio_array = np.frombuffer(audio_bytes, dtype=np.int16) + # Binary frames contain audio data + if isinstance(message, bytes): + audio_array = np.frombuffer(message, dtype=np.int16) speaker.write(audio_array) + else: + # Text frames contain JSON events + event = json.loads(message) - elif event["type"] == "conversation.item.input_audio_transcription.completed": - print(f"You: {event['transcript']}") + if event["type"] == "conversation.item.input_audio_transcription.completed": + print(f"You: {event['transcript']}") - elif event["type"] == "response.audio_transcript.done": - print(f"Agent: {event['transcript']}") + elif event["type"] == "response.audio_transcript.done": + print(f"Agent: {event['transcript']}") - with sd.InputStream(samplerate=16000, channels=1, dtype='int16', callback=audio_callback): + with sd.InputStream(samplerate=INPUT_SAMPLE_RATE, channels=1, dtype='int16', callback=audio_callback): await asyncio.gather(send_audio(), receive_messages()) if __name__ == "__main__": @@ -94,6 +105,12 @@ const WebSocket = require("ws"); const ASSEMBLYAI_API_KEY = process.env.ASSEMBLYAI_API_KEY; const URL = "wss://speech-to-speech.assemblyai.com/v1/realtime"; +// Audio settings +const INPUT_SAMPLE_RATE = 16000; // 16kHz for input +const OUTPUT_SAMPLE_RATE = 24000; // 24kHz for output +const CHUNK_MS = 50; // 50ms chunks recommended +const CHUNK_SIZE = (INPUT_SAMPLE_RATE * CHUNK_MS / 1000) * 2; // 1600 bytes per 50ms chunk + const ws = new WebSocket(URL, { headers: { Authorization: `Bearer ${ASSEMBLYAI_API_KEY}`, @@ -101,10 +118,12 @@ const ws = new WebSocket(URL, { }, }); +let audioBuffer = Buffer.alloc(0); + ws.on("open", () => { console.log("Connected!"); - // Configure the session + // Configure the session (JSON message) ws.send( JSON.stringify({ type: "session.update", @@ -120,35 +139,39 @@ ws.on("open", () => { ); }); -ws.on("message", (data) => { - const event = JSON.parse(data); - - switch (event.type) { - case "response.audio.delta": - // Handle audio playback - decode base64 and play - const audioBuffer = Buffer.from(event.delta, "base64"); - // Play audio using your preferred audio library - break; - - case "conversation.item.input_audio_transcription.completed": - console.log(`You: ${event.transcript}`); - break; - - case "response.audio_transcript.done": - console.log(`Agent: ${event.transcript}`); - break; +ws.on("message", (data, isBinary) => { + if (isBinary) { + // Binary frames contain raw PCM16 audio data + // Play audio using your preferred audio library + const audioData = data; + // Example: speaker.write(audioData); + } else { + // Text frames contain JSON events + const event = JSON.parse(data.toString()); + + switch (event.type) { + case "conversation.item.input_audio_transcription.completed": + console.log(`You: ${event.transcript}`); + break; + + case "response.audio_transcript.done": + console.log(`Agent: ${event.transcript}`); + break; + } } }); -// Send audio data (PCM16, 16kHz, mono) -function sendAudio(audioBuffer) { - const base64Audio = audioBuffer.toString("base64"); - ws.send( - JSON.stringify({ - type: "input_audio_buffer.append", - audio: base64Audio, - }) - ); +// Send audio data as raw PCM16 in 50ms chunks (16kHz, mono) +function sendAudio(pcm16Data) { + audioBuffer = Buffer.concat([audioBuffer, pcm16Data]); + + // Send in 50ms chunks (1600 bytes for 16kHz mono PCM16) + while (audioBuffer.length >= CHUNK_SIZE) { + const chunk = audioBuffer.subarray(0, CHUNK_SIZE); + audioBuffer = audioBuffer.subarray(CHUNK_SIZE); + // Send raw PCM16 bytes as binary WebSocket frame + ws.send(chunk); + } } ``` @@ -375,15 +398,16 @@ Configure your session using the `session.update` event: ### Audio format -The API uses the following audio formats: +The API uses raw PCM16 audio sent as binary WebSocket frames (not base64 encoded). Send audio in 50ms chunks for optimal performance. **Input audio:** -- Encoding: PCM16 (16-bit signed integer, little-endian) +- Encoding: Raw PCM16 (16-bit signed integer, little-endian) - Sample rate: 16,000 Hz - Channels: Mono +- Chunk size: 50ms recommended (1,600 bytes per chunk) **Output audio:** -- Encoding: PCM16 (16-bit signed integer, little-endian) +- Encoding: Raw PCM16 (16-bit signed integer, little-endian) - Sample rate: 24,000 Hz - Channels: Mono @@ -1177,30 +1201,30 @@ if __name__ == "__main__": | Event | Description | |-------|-------------| -| `session.update` | Configure session parameters, instructions, and tools | -| `input_audio_buffer.append` | Send audio data (base64-encoded PCM16) | -| `input_audio_buffer.commit` | Commit the audio buffer for processing | -| `input_audio_buffer.clear` | Clear the audio buffer | -| `conversation.item.create` | Add an item to the conversation (e.g., tool results) | -| `response.create` | Request the model to generate a response | -| `response.cancel` | Cancel an in-progress response | +| `session.update` | Configure session parameters, instructions, and tools (JSON) | +| Binary frame | Send raw PCM16 audio data (50ms chunks recommended) | +| `input_audio_buffer.commit` | Commit the audio buffer for processing (JSON) | +| `input_audio_buffer.clear` | Clear the audio buffer (JSON) | +| `conversation.item.create` | Add an item to the conversation, e.g., tool results (JSON) | +| `response.create` | Request the model to generate a response (JSON) | +| `response.cancel` | Cancel an in-progress response (JSON) | ### Server events (you receive) | Event | Description | |-------|-------------| -| `session.created` | Session has been created | -| `session.updated` | Session configuration has been updated | -| `conversation.item.created` | A conversation item was added | -| `conversation.item.input_audio_transcription.completed` | User speech transcription is complete | -| `response.created` | Response generation has started | -| `response.audio.delta` | Audio chunk for the response (base64-encoded) | -| `response.audio.done` | Audio generation is complete | -| `response.audio_transcript.delta` | Partial transcript of the response | -| `response.audio_transcript.done` | Full transcript of the response | -| `response.function_call_arguments.done` | Tool call with complete arguments | -| `response.done` | Response generation is complete | -| `error` | An error occurred | +| `session.created` | Session has been created (JSON) | +| `session.updated` | Session configuration has been updated (JSON) | +| `conversation.item.created` | A conversation item was added (JSON) | +| `conversation.item.input_audio_transcription.completed` | User speech transcription is complete (JSON) | +| `response.created` | Response generation has started (JSON) | +| Binary frame | Raw PCM16 audio chunk for the response | +| `response.audio.done` | Audio generation is complete (JSON) | +| `response.audio_transcript.delta` | Partial transcript of the response (JSON) | +| `response.audio_transcript.done` | Full transcript of the response (JSON) | +| `response.function_call_arguments.done` | Tool call with complete arguments (JSON) | +| `response.done` | Response generation is complete (JSON) | +| `error` | An error occurred (JSON) | ## Roadmap From 609a356ea9689bffbd75e9ce542edee02e8f8478 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 26 Jan 2026 13:10:57 +0000 Subject: [PATCH 3/5] Add AssemblyAI Python SDK examples with tool calling Co-Authored-By: Dan Ince --- .../voice-agents/speechtospeech.mdx | 300 +++++++++++++++++- 1 file changed, 294 insertions(+), 6 deletions(-) diff --git a/fern/pages/02-speech-to-text/universal-streaming/voice-agents/speechtospeech.mdx b/fern/pages/02-speech-to-text/universal-streaming/voice-agents/speechtospeech.mdx index f08510f3..6f90cd5c 100644 --- a/fern/pages/02-speech-to-text/universal-streaming/voice-agents/speechtospeech.mdx +++ b/fern/pages/02-speech-to-text/universal-streaming/voice-agents/speechtospeech.mdx @@ -11,9 +11,250 @@ AssemblyAI's Speech-to-Speech API lets you build voice agents that listen and re ## Quickstart -Connect to the Speech-to-Speech API at `wss://speech-to-speech.assemblyai.com/v1/realtime` using your AssemblyAI API key. The API accepts audio input and returns both transcriptions and synthesized speech responses. +The easiest way to get started is with the AssemblyAI Python SDK. You can also use raw WebSocket connections or the OpenAI client library for more control. + +```python +import os +import assemblyai as aai + +# Initialize the client +client = aai.speech_to_speech.SpeechToSpeechClient( + api_key=os.environ["ASSEMBLYAI_API_KEY"] +) + +# Handle audio playback +@client.on_audio +def handle_audio(audio: bytes): + # Play audio using your preferred library (e.g., pyaudio) + pass + +# Display agent responses +@client.on_text +def handle_text(text: str): + print(f"Agent: {text}") + +# Display user transcriptions +@client.on_transcript +def handle_transcript(transcript: str): + print(f"You: {transcript}") + +# Connect and start streaming +client.connect( + instructions="You are a helpful voice assistant. Be concise and friendly.", + voice=aai.speech_to_speech.Voice.SAGE, + enable_transcription=True, +) + +# Stream from microphone (requires pyaudio) +from microphone import MicrophoneStream # See full example below +mic = MicrophoneStream() +mic.start() +client.send_audio(mic) +``` + + +```python +""" +Speech-to-Speech Voice Agent with Tool Calling + +Requirements: + pip install assemblyai pyaudio + +Usage: + export ASSEMBLYAI_API_KEY=your_api_key + python voice_agent.py +""" + +import os +import queue +import threading +from typing import Optional + +import pyaudio +import assemblyai as aai + +# Audio settings +SAMPLE_RATE = 24000 +CHANNELS = 1 +CHUNK_SIZE = 4096 + + +class AudioPlayer: + """Handles audio playback in a separate thread.""" + + def __init__(self, sample_rate: int = SAMPLE_RATE): + self._sample_rate = sample_rate + self._audio = pyaudio.PyAudio() + self._stream: Optional[pyaudio.Stream] = None + self._queue: queue.Queue[bytes] = queue.Queue() + self._stop_event = threading.Event() + self._thread: Optional[threading.Thread] = None + + def start(self): + self._stream = self._audio.open( + format=pyaudio.paInt16, + channels=CHANNELS, + rate=self._sample_rate, + output=True, + frames_per_buffer=CHUNK_SIZE, + ) + self._stop_event.clear() + self._thread = threading.Thread(target=self._playback_loop, daemon=True) + self._thread.start() + + def stop(self): + self._stop_event.set() + if self._thread: + self._thread.join(timeout=1) + if self._stream: + self._stream.stop_stream() + self._stream.close() + self._audio.terminate() + + def play(self, audio_data: bytes): + self._queue.put(audio_data) + + def _playback_loop(self): + while not self._stop_event.is_set(): + try: + audio_data = self._queue.get(timeout=0.1) + if self._stream: + self._stream.write(audio_data) + except queue.Empty: + continue + + +class MicrophoneStream: + """Streams audio from the microphone.""" + + def __init__(self, sample_rate: int = SAMPLE_RATE, chunk_size: int = CHUNK_SIZE): + self._sample_rate = sample_rate + self._chunk_size = chunk_size + self._audio = pyaudio.PyAudio() + self._stream: Optional[pyaudio.Stream] = None + self._stop_event = threading.Event() + + def start(self): + self._stream = self._audio.open( + format=pyaudio.paInt16, + channels=CHANNELS, + rate=self._sample_rate, + input=True, + frames_per_buffer=self._chunk_size, + ) + self._stop_event.clear() + + def stop(self): + self._stop_event.set() + if self._stream: + self._stream.stop_stream() + self._stream.close() + self._audio.terminate() + + def __iter__(self): + while not self._stop_event.is_set(): + if self._stream: + try: + data = self._stream.read(self._chunk_size, exception_on_overflow=False) + yield data + except OSError: + break + + +def main(): + # Initialize client + client = aai.speech_to_speech.SpeechToSpeechClient( + api_key=os.environ["ASSEMBLYAI_API_KEY"] + ) + + # Initialize audio player + audio_player = AudioPlayer() + + # === Register Tools === + + @client.tool + def get_current_time() -> str: + """Get the current time.""" + from datetime import datetime + return datetime.now().strftime("%I:%M %p") + + @client.tool + def get_weather(location: str, units: str = "fahrenheit") -> dict: + """Get the current weather for a location.""" + return { + "location": location, + "temperature": 72 if units == "fahrenheit" else 22, + "units": units, + "conditions": "sunny", + } + + @client.tool + def set_reminder(message: str, minutes: int) -> str: + """Set a reminder for a specified number of minutes from now.""" + return f"Reminder set: '{message}' in {minutes} minutes" + + # === Event Handlers === + + @client.on_audio + def handle_audio(audio: bytes): + audio_player.play(audio) + + @client.on_text + def handle_text(text: str): + print(f"\nAgent: {text}", end="", flush=True) + + @client.on_transcript + def handle_transcript(transcript: str): + print(f"\nYou: {transcript}") + + @client.on_speech_started + def handle_speech_started(): + print("\nListening...", end="", flush=True) + + @client.on_speech_stopped + def handle_speech_stopped(): + print(" [processing]", end="", flush=True) + + @client.on_error + def handle_error(error: aai.speech_to_speech.SpeechToSpeechError): + print(f"Error: {error}") + + # === Main Loop === + + print("Speech-to-Speech Voice Agent") + print("Registered tools:", [t.name for t in client.tools]) + print("Press Ctrl+C to stop\n") + + audio_player.start() + + client.connect( + instructions="You are a helpful voice assistant. Be concise and friendly.", + voice=aai.speech_to_speech.Voice.SAGE, + output_modalities=["audio", "text"], + enable_transcription=True, + vad_threshold=0.5, + vad_silence_duration_ms=500, + ) + + try: + mic = MicrophoneStream() + mic.start() + client.send_audio(mic) + except KeyboardInterrupt: + print("\nStopping...") + finally: + mic.stop() + client.disconnect() + audio_player.stop() + + +if __name__ == "__main__": + main() +``` + + ```python import asyncio @@ -392,9 +633,9 @@ Configure your session using the `session.update` event: | Voice | Description | |-------|-------------| | `sage` | Calm and professional | -| `coral` | Warm and friendly | -| `verse` | Clear and articulate | -| `alloy` | Neutral and balanced | +| `ember` | Warm and expressive | +| `breeze` | Light and friendly | +| `cascade` | Clear and articulate | ### Audio format @@ -413,7 +654,52 @@ The API uses raw PCM16 audio sent as binary WebSocket frames (not base64 encoded ## Tool calling -Enable your agent to perform actions by defining tools. Tools follow the JSON Schema format used by OpenAI's function calling. +Enable your agent to perform actions by defining tools. + + + +With the SDK, use the `@client.tool` decorator to register functions as tools. The SDK automatically handles tool execution and response generation. + +```python +import assemblyai as aai + +client = aai.speech_to_speech.SpeechToSpeechClient( + api_key=os.environ["ASSEMBLYAI_API_KEY"] +) + +@client.tool +def get_current_time() -> str: + """Get the current time.""" + from datetime import datetime + return datetime.now().strftime("%I:%M %p") + +@client.tool +def get_weather(location: str, units: str = "fahrenheit") -> dict: + """Get the current weather for a location.""" + return { + "location": location, + "temperature": 72 if units == "fahrenheit" else 22, + "conditions": "sunny", + } + +@client.tool +def check_order_status(order_id: str) -> dict: + """Check the status of a customer order.""" + return { + "order_id": order_id, + "status": "shipped", + "estimated_delivery": "January 28, 2026", + } + +# Tools are automatically available when you connect +client.connect( + instructions="You help users check order status and get weather information.", + voice=aai.speech_to_speech.Voice.SAGE, +) +``` + + +With raw WebSocket, define tools using JSON Schema format and handle tool calls manually. ```python # Define tools in your session configuration @@ -422,7 +708,7 @@ await ws.send(json.dumps({ "session": { "model": "universal-streaming", "voice": "sage", - "instructions": "You help users check order status. Use the check_order_status tool when asked.", + "instructions": "You help users check order status.", "tools": [ { "type": "function", @@ -472,6 +758,8 @@ async for message in ws: # Trigger a response await ws.send(json.dumps({"type": "response.create"})) ``` + + ## Subagent routing From 78165c1a21a1bc654fdbd0bfb04cacc5631a6195 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 26 Jan 2026 13:50:50 +0000 Subject: [PATCH 4/5] Make all code examples fully working with pyaudio Co-Authored-By: Dan Ince --- .../voice-agents/speechtospeech.mdx | 731 +++++++++++++----- 1 file changed, 533 insertions(+), 198 deletions(-) diff --git a/fern/pages/02-speech-to-text/universal-streaming/voice-agents/speechtospeech.mdx b/fern/pages/02-speech-to-text/universal-streaming/voice-agents/speechtospeech.mdx index 6f90cd5c..8e7b5ce7 100644 --- a/fern/pages/02-speech-to-text/universal-streaming/voice-agents/speechtospeech.mdx +++ b/fern/pages/02-speech-to-text/universal-streaming/voice-agents/speechtospeech.mdx @@ -16,42 +16,145 @@ The easiest way to get started is with the AssemblyAI Python SDK. You can also u ```python +""" +Speech-to-Speech Voice Agent using AssemblyAI Python SDK + +Requirements: + pip install assemblyai pyaudio + +Usage: + export ASSEMBLYAI_API_KEY=your_api_key + python voice_agent.py +""" + import os +import queue +import threading + +import pyaudio import assemblyai as aai -# Initialize the client -client = aai.speech_to_speech.SpeechToSpeechClient( - api_key=os.environ["ASSEMBLYAI_API_KEY"] -) +# Audio settings +SAMPLE_RATE = 24000 +CHANNELS = 1 +CHUNK_SIZE = 4096 -# Handle audio playback -@client.on_audio -def handle_audio(audio: bytes): - # Play audio using your preferred library (e.g., pyaudio) - pass -# Display agent responses -@client.on_text -def handle_text(text: str): - print(f"Agent: {text}") +class AudioPlayer: + """Handles audio playback in a separate thread.""" -# Display user transcriptions -@client.on_transcript -def handle_transcript(transcript: str): - print(f"You: {transcript}") + def __init__(self): + self._audio = pyaudio.PyAudio() + self._stream = self._audio.open( + format=pyaudio.paInt16, + channels=CHANNELS, + rate=SAMPLE_RATE, + output=True, + frames_per_buffer=CHUNK_SIZE, + ) + self._queue = queue.Queue() + self._running = True + self._thread = threading.Thread(target=self._playback_loop, daemon=True) + self._thread.start() -# Connect and start streaming -client.connect( - instructions="You are a helpful voice assistant. Be concise and friendly.", - voice=aai.speech_to_speech.Voice.SAGE, - enable_transcription=True, -) + def play(self, audio_data: bytes): + self._queue.put(audio_data) + + def _playback_loop(self): + while self._running: + try: + audio_data = self._queue.get(timeout=0.1) + self._stream.write(audio_data) + except queue.Empty: + continue + + def stop(self): + self._running = False + self._thread.join(timeout=1) + self._stream.stop_stream() + self._stream.close() + self._audio.terminate() + + +class MicrophoneStream: + """Streams audio from the microphone.""" + + def __init__(self): + self._audio = pyaudio.PyAudio() + self._stream = self._audio.open( + format=pyaudio.paInt16, + channels=CHANNELS, + rate=SAMPLE_RATE, + input=True, + frames_per_buffer=CHUNK_SIZE, + ) + self._running = True + + def __iter__(self): + while self._running: + try: + data = self._stream.read(CHUNK_SIZE, exception_on_overflow=False) + yield data + except OSError: + break + + def stop(self): + self._running = False + self._stream.stop_stream() + self._stream.close() + self._audio.terminate() + + +def main(): + # Initialize client + client = aai.speech_to_speech.SpeechToSpeechClient( + api_key=os.environ["ASSEMBLYAI_API_KEY"] + ) + + # Initialize audio player + audio_player = AudioPlayer() -# Stream from microphone (requires pyaudio) -from microphone import MicrophoneStream # See full example below -mic = MicrophoneStream() -mic.start() -client.send_audio(mic) + # Event handlers + @client.on_audio + def handle_audio(audio: bytes): + audio_player.play(audio) + + @client.on_text + def handle_text(text: str): + print(f"\nAgent: {text}", end="", flush=True) + + @client.on_transcript + def handle_transcript(transcript: str): + print(f"\nYou: {transcript}") + + @client.on_error + def handle_error(error): + print(f"Error: {error}") + + print("Speech-to-Speech Voice Agent") + print("Press Ctrl+C to stop\n") + + # Connect to the API + client.connect( + instructions="You are a helpful voice assistant. Be concise and friendly.", + voice=aai.speech_to_speech.Voice.SAGE, + enable_transcription=True, + ) + + # Stream from microphone + mic = MicrophoneStream() + try: + client.send_audio(mic) + except KeyboardInterrupt: + print("\nStopping...") + finally: + mic.stop() + client.disconnect() + audio_player.stop() + + +if __name__ == "__main__": + main() ``` @@ -70,7 +173,6 @@ Usage: import os import queue import threading -from typing import Optional import pyaudio import assemblyai as aai @@ -84,84 +186,67 @@ CHUNK_SIZE = 4096 class AudioPlayer: """Handles audio playback in a separate thread.""" - def __init__(self, sample_rate: int = SAMPLE_RATE): - self._sample_rate = sample_rate + def __init__(self): self._audio = pyaudio.PyAudio() - self._stream: Optional[pyaudio.Stream] = None - self._queue: queue.Queue[bytes] = queue.Queue() - self._stop_event = threading.Event() - self._thread: Optional[threading.Thread] = None - - def start(self): self._stream = self._audio.open( format=pyaudio.paInt16, channels=CHANNELS, - rate=self._sample_rate, + rate=SAMPLE_RATE, output=True, frames_per_buffer=CHUNK_SIZE, ) - self._stop_event.clear() + self._queue = queue.Queue() + self._running = True self._thread = threading.Thread(target=self._playback_loop, daemon=True) self._thread.start() - def stop(self): - self._stop_event.set() - if self._thread: - self._thread.join(timeout=1) - if self._stream: - self._stream.stop_stream() - self._stream.close() - self._audio.terminate() - def play(self, audio_data: bytes): self._queue.put(audio_data) def _playback_loop(self): - while not self._stop_event.is_set(): + while self._running: try: audio_data = self._queue.get(timeout=0.1) - if self._stream: - self._stream.write(audio_data) + self._stream.write(audio_data) except queue.Empty: continue + def stop(self): + self._running = False + self._thread.join(timeout=1) + self._stream.stop_stream() + self._stream.close() + self._audio.terminate() + class MicrophoneStream: """Streams audio from the microphone.""" - def __init__(self, sample_rate: int = SAMPLE_RATE, chunk_size: int = CHUNK_SIZE): - self._sample_rate = sample_rate - self._chunk_size = chunk_size + def __init__(self): self._audio = pyaudio.PyAudio() - self._stream: Optional[pyaudio.Stream] = None - self._stop_event = threading.Event() - - def start(self): self._stream = self._audio.open( format=pyaudio.paInt16, channels=CHANNELS, - rate=self._sample_rate, + rate=SAMPLE_RATE, input=True, - frames_per_buffer=self._chunk_size, + frames_per_buffer=CHUNK_SIZE, ) - self._stop_event.clear() + self._running = True + + def __iter__(self): + while self._running: + try: + data = self._stream.read(CHUNK_SIZE, exception_on_overflow=False) + yield data + except OSError: + break def stop(self): - self._stop_event.set() - if self._stream: - self._stream.stop_stream() - self._stream.close() + self._running = False + self._stream.stop_stream() + self._stream.close() self._audio.terminate() - def __iter__(self): - while not self._stop_event.is_set(): - if self._stream: - try: - data = self._stream.read(self._chunk_size, exception_on_overflow=False) - yield data - except OSError: - break - def main(): # Initialize client @@ -218,7 +303,7 @@ def main(): print(" [processing]", end="", flush=True) @client.on_error - def handle_error(error: aai.speech_to_speech.SpeechToSpeechError): + def handle_error(error): print(f"Error: {error}") # === Main Loop === @@ -227,8 +312,7 @@ def main(): print("Registered tools:", [t.name for t in client.tools]) print("Press Ctrl+C to stop\n") - audio_player.start() - + # Connect to the API client.connect( instructions="You are a helpful voice assistant. Be concise and friendly.", voice=aai.speech_to_speech.Voice.SAGE, @@ -238,9 +322,9 @@ def main(): vad_silence_duration_ms=500, ) + # Stream from microphone + mic = MicrophoneStream() try: - mic = MicrophoneStream() - mic.start() client.send_audio(mic) except KeyboardInterrupt: print("\nStopping...") @@ -257,12 +341,25 @@ if __name__ == "__main__": ```python +""" +Speech-to-Speech Voice Agent using raw WebSocket + +Requirements: + pip install websockets pyaudio + +Usage: + export ASSEMBLYAI_API_KEY=your_api_key + python voice_agent_ws.py +""" + import asyncio import json import os +import queue +import threading + +import pyaudio import websockets -import sounddevice as sd -import numpy as np ASSEMBLYAI_API_KEY = os.environ.get("ASSEMBLYAI_API_KEY") URL = "wss://speech-to-speech.assemblyai.com/v1/realtime" @@ -270,17 +367,83 @@ URL = "wss://speech-to-speech.assemblyai.com/v1/realtime" # Audio settings INPUT_SAMPLE_RATE = 16000 # 16kHz for input OUTPUT_SAMPLE_RATE = 24000 # 24kHz for output -CHUNK_MS = 50 # 50ms chunks recommended -CHUNK_SIZE = int(INPUT_SAMPLE_RATE * CHUNK_MS / 1000) * 2 # 1600 bytes per 50ms chunk +CHANNELS = 1 +CHUNK_SIZE = 4096 + + +class AudioPlayer: + """Handles audio playback in a separate thread.""" + + def __init__(self): + self._audio = pyaudio.PyAudio() + self._stream = self._audio.open( + format=pyaudio.paInt16, + channels=CHANNELS, + rate=OUTPUT_SAMPLE_RATE, + output=True, + frames_per_buffer=CHUNK_SIZE, + ) + self._queue = queue.Queue() + self._running = True + self._thread = threading.Thread(target=self._playback_loop, daemon=True) + self._thread.start() + + def play(self, audio_data: bytes): + self._queue.put(audio_data) + + def _playback_loop(self): + while self._running: + try: + audio_data = self._queue.get(timeout=0.1) + self._stream.write(audio_data) + except queue.Empty: + continue + + def stop(self): + self._running = False + self._thread.join(timeout=1) + self._stream.stop_stream() + self._stream.close() + self._audio.terminate() + + +class MicrophoneStream: + """Streams audio from the microphone.""" + + def __init__(self): + self._audio = pyaudio.PyAudio() + self._stream = self._audio.open( + format=pyaudio.paInt16, + channels=CHANNELS, + rate=INPUT_SAMPLE_RATE, + input=True, + frames_per_buffer=CHUNK_SIZE, + ) + self._running = True + + def read(self): + if self._running: + return self._stream.read(CHUNK_SIZE, exception_on_overflow=False) + return None + + def stop(self): + self._running = False + self._stream.stop_stream() + self._stream.close() + self._audio.terminate() + async def main(): + audio_player = AudioPlayer() + mic = MicrophoneStream() + headers = { "Authorization": f"Bearer {ASSEMBLYAI_API_KEY}", "OpenAI-Beta": "realtime=v1" } async with websockets.connect(URL, additional_headers=headers) as ws: - # Configure the session (JSON message) + # Configure the session await ws.send(json.dumps({ "type": "session.update", "session": { @@ -294,159 +457,331 @@ async def main(): })) print("Connected! Start speaking...") - - # Set up audio input/output - audio_buffer = bytearray() - audio_queue = asyncio.Queue() - - def audio_callback(indata, frames, time, status): - audio_queue.put_nowait(bytes(indata)) + print("Press Ctrl+C to stop\n") async def send_audio(): - nonlocal audio_buffer + loop = asyncio.get_event_loop() while True: - audio_data = await audio_queue.get() - audio_buffer.extend(audio_data) - - # Send in 50ms chunks (1600 bytes for 16kHz mono PCM16) - while len(audio_buffer) >= CHUNK_SIZE: - chunk = bytes(audio_buffer[:CHUNK_SIZE]) - audio_buffer = audio_buffer[CHUNK_SIZE:] - # Send raw PCM16 bytes as binary WebSocket frame - await ws.send(chunk) + # Read from microphone in a thread to avoid blocking + audio_data = await loop.run_in_executor(None, mic.read) + if audio_data: + await ws.send(audio_data) async def receive_messages(): - with sd.OutputStream(samplerate=OUTPUT_SAMPLE_RATE, channels=1, dtype='int16') as speaker: - async for message in ws: + async for message in ws: + if isinstance(message, bytes): # Binary frames contain audio data - if isinstance(message, bytes): - audio_array = np.frombuffer(message, dtype=np.int16) - speaker.write(audio_array) - else: - # Text frames contain JSON events - event = json.loads(message) + audio_player.play(message) + else: + # Text frames contain JSON events + event = json.loads(message) - if event["type"] == "conversation.item.input_audio_transcription.completed": - print(f"You: {event['transcript']}") + if event["type"] == "conversation.item.input_audio_transcription.completed": + print(f"\nYou: {event['transcript']}") - elif event["type"] == "response.audio_transcript.done": - print(f"Agent: {event['transcript']}") + elif event["type"] == "response.audio_transcript.done": + print(f"\nAgent: {event['transcript']}") - with sd.InputStream(samplerate=INPUT_SAMPLE_RATE, channels=1, dtype='int16', callback=audio_callback): + try: await asyncio.gather(send_audio(), receive_messages()) + except KeyboardInterrupt: + print("\nStopping...") + finally: + mic.stop() + audio_player.stop() + if __name__ == "__main__": asyncio.run(main()) ``` - + ```javascript -const WebSocket = require("ws"); - -const ASSEMBLYAI_API_KEY = process.env.ASSEMBLYAI_API_KEY; +/** + * Speech-to-Speech Voice Agent using WebSocket (Browser) + * + * Usage: + * 1. Set your API key in the ASSEMBLYAI_API_KEY variable + * 2. Open this file in a browser + * 3. Click "Start" to begin the conversation + */ + +const ASSEMBLYAI_API_KEY = "your_api_key_here"; const URL = "wss://speech-to-speech.assemblyai.com/v1/realtime"; // Audio settings -const INPUT_SAMPLE_RATE = 16000; // 16kHz for input -const OUTPUT_SAMPLE_RATE = 24000; // 24kHz for output -const CHUNK_MS = 50; // 50ms chunks recommended -const CHUNK_SIZE = (INPUT_SAMPLE_RATE * CHUNK_MS / 1000) * 2; // 1600 bytes per 50ms chunk - -const ws = new WebSocket(URL, { - headers: { - Authorization: `Bearer ${ASSEMBLYAI_API_KEY}`, - "OpenAI-Beta": "realtime=v1", - }, -}); - -let audioBuffer = Buffer.alloc(0); - -ws.on("open", () => { - console.log("Connected!"); - - // Configure the session (JSON message) - ws.send( - JSON.stringify({ +const INPUT_SAMPLE_RATE = 16000; +const OUTPUT_SAMPLE_RATE = 24000; + +let ws; +let audioContext; +let mediaStream; +let audioWorklet; + +async function start() { + // Set up WebSocket connection + ws = new WebSocket(URL); + ws.binaryType = "arraybuffer"; + + // Set up audio context for playback + audioContext = new AudioContext({ sampleRate: OUTPUT_SAMPLE_RATE }); + + ws.onopen = async () => { + // Send auth header via first message (browser WebSocket doesn't support headers) + ws.send(JSON.stringify({ type: "session.update", session: { model: "universal-streaming", voice: "sage", instructions: "You are a helpful assistant. Be concise and friendly.", - input_audio_transcription: { - model: "universal-streaming", - }, + input_audio_transcription: { model: "universal-streaming" }, }, - }) - ); -}); - -ws.on("message", (data, isBinary) => { - if (isBinary) { - // Binary frames contain raw PCM16 audio data - // Play audio using your preferred audio library - const audioData = data; - // Example: speaker.write(audioData); - } else { - // Text frames contain JSON events - const event = JSON.parse(data.toString()); - - switch (event.type) { - case "conversation.item.input_audio_transcription.completed": - console.log(`You: ${event.transcript}`); - break; - - case "response.audio_transcript.done": - console.log(`Agent: ${event.transcript}`); - break; + })); + + console.log("Connected! Start speaking..."); + + // Start microphone capture + mediaStream = await navigator.mediaDevices.getUserMedia({ + audio: { sampleRate: INPUT_SAMPLE_RATE, channelCount: 1 }, + }); + + // Process microphone audio and send to WebSocket + const source = audioContext.createMediaStreamSource(mediaStream); + await audioContext.audioWorklet.addModule("audio-processor.js"); + audioWorklet = new AudioWorkletNode(audioContext, "audio-processor"); + + audioWorklet.port.onmessage = (event) => { + if (ws.readyState === WebSocket.OPEN) { + // Send raw PCM16 audio as binary + ws.send(event.data); + } + }; + + source.connect(audioWorklet); + }; + + ws.onmessage = (event) => { + if (event.data instanceof ArrayBuffer) { + // Binary frames contain audio - play it + playAudio(event.data); + } else { + // Text frames contain JSON events + const data = JSON.parse(event.data); + + if (data.type === "conversation.item.input_audio_transcription.completed") { + console.log(`You: ${data.transcript}`); + } else if (data.type === "response.audio_transcript.done") { + console.log(`Agent: ${data.transcript}`); + } } + }; + + ws.onerror = (error) => console.error("WebSocket error:", error); + ws.onclose = () => console.log("Disconnected"); +} + +function playAudio(arrayBuffer) { + // Convert PCM16 to Float32 for Web Audio API + const int16Array = new Int16Array(arrayBuffer); + const float32Array = new Float32Array(int16Array.length); + for (let i = 0; i < int16Array.length; i++) { + float32Array[i] = int16Array[i] / 32768; + } + + // Create and play audio buffer + const audioBuffer = audioContext.createBuffer(1, float32Array.length, OUTPUT_SAMPLE_RATE); + audioBuffer.getChannelData(0).set(float32Array); + + const source = audioContext.createBufferSource(); + source.buffer = audioBuffer; + source.connect(audioContext.destination); + source.start(); +} + +function stop() { + if (mediaStream) { + mediaStream.getTracks().forEach((track) => track.stop()); } -}); - -// Send audio data as raw PCM16 in 50ms chunks (16kHz, mono) -function sendAudio(pcm16Data) { - audioBuffer = Buffer.concat([audioBuffer, pcm16Data]); - - // Send in 50ms chunks (1600 bytes for 16kHz mono PCM16) - while (audioBuffer.length >= CHUNK_SIZE) { - const chunk = audioBuffer.subarray(0, CHUNK_SIZE); - audioBuffer = audioBuffer.subarray(CHUNK_SIZE); - // Send raw PCM16 bytes as binary WebSocket frame - ws.send(chunk); + if (ws) { + ws.close(); } + if (audioContext) { + audioContext.close(); + } + console.log("Stopped"); } ``` + +Create an `audio-processor.js` file for the AudioWorklet: + +```javascript +// audio-processor.js - AudioWorklet for capturing microphone audio +class AudioProcessor extends AudioWorkletProcessor { + constructor() { + super(); + this.buffer = []; + } + + process(inputs) { + const input = inputs[0]; + if (input.length > 0) { + const samples = input[0]; + // Convert Float32 to Int16 (PCM16) + const int16 = new Int16Array(samples.length); + for (let i = 0; i < samples.length; i++) { + int16[i] = Math.max(-32768, Math.min(32767, samples[i] * 32768)); + } + this.port.postMessage(int16.buffer, [int16.buffer]); + } + return true; + } +} + +registerProcessor("audio-processor", AudioProcessor); +``` ```python +""" +Speech-to-Speech Voice Agent using OpenAI Python Client + +Requirements: + pip install openai pyaudio + +Usage: + export ASSEMBLYAI_API_KEY=your_api_key + python voice_agent_openai.py +""" + import os +import queue +import threading + +import pyaudio from openai import OpenAI -client = OpenAI( - api_key=os.environ.get("ASSEMBLYAI_API_KEY"), - base_url="https://speech-to-speech.assemblyai.com/v1" -) +# Audio settings +SAMPLE_RATE = 24000 +CHANNELS = 1 +CHUNK_SIZE = 4096 -# Connect to the realtime API -with client.beta.realtime.connect( - model="universal-streaming" -) as connection: - # Configure the session - connection.session.update( - session={ - "voice": "sage", - "instructions": "You are a helpful assistant.", - "input_audio_transcription": { - "model": "universal-streaming" - } - } + +class AudioPlayer: + """Handles audio playback in a separate thread.""" + + def __init__(self): + self._audio = pyaudio.PyAudio() + self._stream = self._audio.open( + format=pyaudio.paInt16, + channels=CHANNELS, + rate=SAMPLE_RATE, + output=True, + frames_per_buffer=CHUNK_SIZE, + ) + self._queue = queue.Queue() + self._running = True + self._thread = threading.Thread(target=self._playback_loop, daemon=True) + self._thread.start() + + def play(self, audio_data: bytes): + self._queue.put(audio_data) + + def _playback_loop(self): + while self._running: + try: + audio_data = self._queue.get(timeout=0.1) + self._stream.write(audio_data) + except queue.Empty: + continue + + def stop(self): + self._running = False + self._thread.join(timeout=1) + self._stream.stop_stream() + self._stream.close() + self._audio.terminate() + + +class MicrophoneStream: + """Streams audio from the microphone.""" + + def __init__(self): + self._audio = pyaudio.PyAudio() + self._stream = self._audio.open( + format=pyaudio.paInt16, + channels=CHANNELS, + rate=SAMPLE_RATE, + input=True, + frames_per_buffer=CHUNK_SIZE, + ) + self._running = True + + def __iter__(self): + while self._running: + try: + data = self._stream.read(CHUNK_SIZE, exception_on_overflow=False) + yield data + except OSError: + break + + def stop(self): + self._running = False + self._stream.stop_stream() + self._stream.close() + self._audio.terminate() + + +def main(): + client = OpenAI( + api_key=os.environ.get("ASSEMBLYAI_API_KEY"), + base_url="https://speech-to-speech.assemblyai.com/v1" ) - # Send audio and receive responses - for event in connection: - if event.type == "response.audio_transcript.done": - print(f"Agent: {event.transcript}") - elif event.type == "conversation.item.input_audio_transcription.completed": - print(f"You: {event.transcript}") + audio_player = AudioPlayer() + mic = MicrophoneStream() + + print("Speech-to-Speech Voice Agent (OpenAI Client)") + print("Press Ctrl+C to stop\n") + + with client.beta.realtime.connect(model="universal-streaming") as connection: + # Configure the session + connection.session.update( + session={ + "voice": "sage", + "instructions": "You are a helpful assistant. Be concise and friendly.", + "input_audio_transcription": { + "model": "universal-streaming" + } + } + ) + + # Start sending audio in a background thread + def send_audio(): + for chunk in mic: + connection.input_audio_buffer.append(audio=chunk) + + audio_thread = threading.Thread(target=send_audio, daemon=True) + audio_thread.start() + + try: + # Receive and handle events + for event in connection: + if event.type == "response.audio.delta": + # Play audio as it arrives + audio_player.play(event.delta) + elif event.type == "response.audio_transcript.done": + print(f"\nAgent: {event.transcript}") + elif event.type == "conversation.item.input_audio_transcription.completed": + print(f"\nYou: {event.transcript}") + except KeyboardInterrupt: + print("\nStopping...") + finally: + mic.stop() + audio_player.stop() + + +if __name__ == "__main__": + main() ``` From a442a804a885db584c19cedc7287a5bedd78e0c3 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 26 Jan 2026 14:15:52 +0000 Subject: [PATCH 5/5] Simplify Python SDK examples to minimal clean approach Co-Authored-By: Dan Ince --- .../voice-agents/speechtospeech.mdx | 358 ++++++------------ 1 file changed, 111 insertions(+), 247 deletions(-) diff --git a/fern/pages/02-speech-to-text/universal-streaming/voice-agents/speechtospeech.mdx b/fern/pages/02-speech-to-text/universal-streaming/voice-agents/speechtospeech.mdx index 8e7b5ce7..12dcc5bf 100644 --- a/fern/pages/02-speech-to-text/universal-streaming/voice-agents/speechtospeech.mdx +++ b/fern/pages/02-speech-to-text/universal-streaming/voice-agents/speechtospeech.mdx @@ -17,7 +17,7 @@ The easiest way to get started is with the AssemblyAI Python SDK. You can also u ```python """ -Speech-to-Speech Voice Agent using AssemblyAI Python SDK +Minimal Speech-to-Speech Example Requirements: pip install assemblyai pyaudio @@ -28,133 +28,63 @@ Usage: """ import os -import queue -import threading import pyaudio -import assemblyai as aai - -# Audio settings -SAMPLE_RATE = 24000 -CHANNELS = 1 -CHUNK_SIZE = 4096 - - -class AudioPlayer: - """Handles audio playback in a separate thread.""" - - def __init__(self): - self._audio = pyaudio.PyAudio() - self._stream = self._audio.open( - format=pyaudio.paInt16, - channels=CHANNELS, - rate=SAMPLE_RATE, - output=True, - frames_per_buffer=CHUNK_SIZE, - ) - self._queue = queue.Queue() - self._running = True - self._thread = threading.Thread(target=self._playback_loop, daemon=True) - self._thread.start() - - def play(self, audio_data: bytes): - self._queue.put(audio_data) - - def _playback_loop(self): - while self._running: - try: - audio_data = self._queue.get(timeout=0.1) - self._stream.write(audio_data) - except queue.Empty: - continue - - def stop(self): - self._running = False - self._thread.join(timeout=1) - self._stream.stop_stream() - self._stream.close() - self._audio.terminate() +from assemblyai.speech_to_speech import SpeechToSpeechClient +# Setup +client = SpeechToSpeechClient(api_key=os.getenv("ASSEMBLYAI_API_KEY")) +audio = pyaudio.PyAudio() -class MicrophoneStream: - """Streams audio from the microphone.""" - - def __init__(self): - self._audio = pyaudio.PyAudio() - self._stream = self._audio.open( - format=pyaudio.paInt16, - channels=CHANNELS, - rate=SAMPLE_RATE, - input=True, - frames_per_buffer=CHUNK_SIZE, - ) - self._running = True - - def __iter__(self): - while self._running: - try: - data = self._stream.read(CHUNK_SIZE, exception_on_overflow=False) - yield data - except OSError: - break - - def stop(self): - self._running = False - self._stream.stop_stream() - self._stream.close() - self._audio.terminate() +# Audio output stream +output_stream = audio.open(format=pyaudio.paInt16, channels=1, rate=24000, output=True) -def main(): - # Initialize client - client = aai.speech_to_speech.SpeechToSpeechClient( - api_key=os.environ["ASSEMBLYAI_API_KEY"] - ) +# Handle events +@client.on_audio +def play(audio_bytes: bytes): + output_stream.write(audio_bytes) - # Initialize audio player - audio_player = AudioPlayer() - # Event handlers - @client.on_audio - def handle_audio(audio: bytes): - audio_player.play(audio) +@client.on_text +def show_text(text: str): + print(f"\nAgent: {text}") - @client.on_text - def handle_text(text: str): - print(f"\nAgent: {text}", end="", flush=True) - @client.on_transcript - def handle_transcript(transcript: str): - print(f"\nYou: {transcript}") +@client.on_transcript +def show_transcript(text: str): + print(f"\nYou: {text}") - @client.on_error - def handle_error(error): - print(f"Error: {error}") - print("Speech-to-Speech Voice Agent") - print("Press Ctrl+C to stop\n") +@client.on_error +def handle_error(error): + print(f"\nERROR: {error}") - # Connect to the API - client.connect( - instructions="You are a helpful voice assistant. Be concise and friendly.", - voice=aai.speech_to_speech.Voice.SAGE, - enable_transcription=True, - ) - # Stream from microphone - mic = MicrophoneStream() - try: - client.send_audio(mic) - except KeyboardInterrupt: - print("\nStopping...") - finally: - mic.stop() - client.disconnect() - audio_player.stop() +# Connect +print("Connecting to AssemblyAI Speech-to-Speech API...") +client.connect( + instructions="You are a helpful assistant. Be brief.", + output_modalities=["audio", "text"], + vad_threshold=0.3, +) +# Stream from microphone +input_stream = audio.open( + format=pyaudio.paInt16, channels=1, rate=24000, input=True, frames_per_buffer=4096 +) -if __name__ == "__main__": - main() +try: + while True: + audio_data = input_stream.read(4096, exception_on_overflow=False) + client.send_audio(audio_data) +except KeyboardInterrupt: + pass +finally: + client.disconnect() + input_stream.close() + output_stream.close() + audio.terminate() ``` @@ -171,171 +101,105 @@ Usage: """ import os -import queue -import threading import pyaudio -import assemblyai as aai - -# Audio settings -SAMPLE_RATE = 24000 -CHANNELS = 1 -CHUNK_SIZE = 4096 - - -class AudioPlayer: - """Handles audio playback in a separate thread.""" - - def __init__(self): - self._audio = pyaudio.PyAudio() - self._stream = self._audio.open( - format=pyaudio.paInt16, - channels=CHANNELS, - rate=SAMPLE_RATE, - output=True, - frames_per_buffer=CHUNK_SIZE, - ) - self._queue = queue.Queue() - self._running = True - self._thread = threading.Thread(target=self._playback_loop, daemon=True) - self._thread.start() +from assemblyai.speech_to_speech import SpeechToSpeechClient - def play(self, audio_data: bytes): - self._queue.put(audio_data) +# Setup +client = SpeechToSpeechClient(api_key=os.getenv("ASSEMBLYAI_API_KEY")) +audio = pyaudio.PyAudio() - def _playback_loop(self): - while self._running: - try: - audio_data = self._queue.get(timeout=0.1) - self._stream.write(audio_data) - except queue.Empty: - continue +# Audio output stream +output_stream = audio.open(format=pyaudio.paInt16, channels=1, rate=24000, output=True) - def stop(self): - self._running = False - self._thread.join(timeout=1) - self._stream.stop_stream() - self._stream.close() - self._audio.terminate() +# === Register Tools === -class MicrophoneStream: - """Streams audio from the microphone.""" +@client.tool +def get_current_time() -> str: + """Get the current time.""" + from datetime import datetime + return datetime.now().strftime("%I:%M %p") - def __init__(self): - self._audio = pyaudio.PyAudio() - self._stream = self._audio.open( - format=pyaudio.paInt16, - channels=CHANNELS, - rate=SAMPLE_RATE, - input=True, - frames_per_buffer=CHUNK_SIZE, - ) - self._running = True - def __iter__(self): - while self._running: - try: - data = self._stream.read(CHUNK_SIZE, exception_on_overflow=False) - yield data - except OSError: - break +@client.tool +def get_weather(location: str, units: str = "fahrenheit") -> dict: + """Get the current weather for a location.""" + return { + "location": location, + "temperature": 72 if units == "fahrenheit" else 22, + "units": units, + "conditions": "sunny", + } - def stop(self): - self._running = False - self._stream.stop_stream() - self._stream.close() - self._audio.terminate() +@client.tool +def set_reminder(message: str, minutes: int) -> str: + """Set a reminder for a specified number of minutes from now.""" + return f"Reminder set: '{message}' in {minutes} minutes" -def main(): - # Initialize client - client = aai.speech_to_speech.SpeechToSpeechClient( - api_key=os.environ["ASSEMBLYAI_API_KEY"] - ) - # Initialize audio player - audio_player = AudioPlayer() +# === Event Handlers === - # === Register Tools === +@client.on_audio +def play(audio_bytes: bytes): + output_stream.write(audio_bytes) - @client.tool - def get_current_time() -> str: - """Get the current time.""" - from datetime import datetime - return datetime.now().strftime("%I:%M %p") - @client.tool - def get_weather(location: str, units: str = "fahrenheit") -> dict: - """Get the current weather for a location.""" - return { - "location": location, - "temperature": 72 if units == "fahrenheit" else 22, - "units": units, - "conditions": "sunny", - } +@client.on_text +def show_text(text: str): + print(f"\nAgent: {text}") - @client.tool - def set_reminder(message: str, minutes: int) -> str: - """Set a reminder for a specified number of minutes from now.""" - return f"Reminder set: '{message}' in {minutes} minutes" - # === Event Handlers === +@client.on_transcript +def show_transcript(text: str): + print(f"\nYou: {text}") - @client.on_audio - def handle_audio(audio: bytes): - audio_player.play(audio) - @client.on_text - def handle_text(text: str): - print(f"\nAgent: {text}", end="", flush=True) +@client.on_speech_started +def on_speech_started(): + print("\nListening...", end="", flush=True) - @client.on_transcript - def handle_transcript(transcript: str): - print(f"\nYou: {transcript}") - @client.on_speech_started - def handle_speech_started(): - print("\nListening...", end="", flush=True) +@client.on_speech_stopped +def on_speech_stopped(): + print(" [processing]", end="", flush=True) - @client.on_speech_stopped - def handle_speech_stopped(): - print(" [processing]", end="", flush=True) - @client.on_error - def handle_error(error): - print(f"Error: {error}") +@client.on_error +def handle_error(error): + print(f"\nERROR: {error}") - # === Main Loop === - print("Speech-to-Speech Voice Agent") - print("Registered tools:", [t.name for t in client.tools]) - print("Press Ctrl+C to stop\n") +# === Main === - # Connect to the API - client.connect( - instructions="You are a helpful voice assistant. Be concise and friendly.", - voice=aai.speech_to_speech.Voice.SAGE, - output_modalities=["audio", "text"], - enable_transcription=True, - vad_threshold=0.5, - vad_silence_duration_ms=500, - ) +print("Speech-to-Speech Voice Agent") +print("Registered tools:", [t.name for t in client.tools]) +print("Press Ctrl+C to stop\n") - # Stream from microphone - mic = MicrophoneStream() - try: - client.send_audio(mic) - except KeyboardInterrupt: - print("\nStopping...") - finally: - mic.stop() - client.disconnect() - audio_player.stop() +# Connect +client.connect( + instructions="You are a helpful assistant. Be brief.", + output_modalities=["audio", "text"], + vad_threshold=0.3, +) +# Stream from microphone +input_stream = audio.open( + format=pyaudio.paInt16, channels=1, rate=24000, input=True, frames_per_buffer=4096 +) -if __name__ == "__main__": - main() +try: + while True: + audio_data = input_stream.read(4096, exception_on_overflow=False) + client.send_audio(audio_data) +except KeyboardInterrupt: + pass +finally: + client.disconnect() + input_stream.close() + output_stream.close() + audio.terminate() ```