From 8fa9aa92b1dffcd8b77985456dd6d86f3eebf6a7 Mon Sep 17 00:00:00 2001 From: Benedikt Koehler Date: Mon, 13 Apr 2026 17:45:32 +0200 Subject: [PATCH 1/2] feat: add Twilio voice channel --- config.example.json | 20 + src/channels/channel-registry.ts | 4 + src/channels/channel.ts | 12 + src/channels/voice/channel-id.ts | 25 + src/channels/voice/conversation-relay.ts | 251 +++++++++ src/channels/voice/runtime.ts | 624 +++++++++++++++++++++++ src/channels/voice/security.ts | 84 +++ src/channels/voice/session.ts | 284 +++++++++++ src/channels/voice/twilio-manager.ts | 72 +++ src/channels/voice/webhook.ts | 106 ++++ src/config/config.ts | 38 ++ src/config/runtime-config.ts | 192 +++++++ src/gateway/gateway-http-server.ts | 21 + src/gateway/gateway-service.ts | 51 +- src/gateway/gateway-types.ts | 9 + src/gateway/gateway.ts | 340 ++++++++++-- src/security/runtime-secrets.ts | 1 + src/session/session-context.ts | 1 + src/session/session-reset.ts | 2 + tests/channel-registry.test.ts | 4 + tests/gateway-http-server.test.ts | 42 ++ tests/gateway-main.test.ts | 67 +++ tests/gateway-status.test.ts | 45 +- tests/runtime-config.secret-refs.test.ts | 9 + tests/session-reset.test.ts | 3 + tests/voice.conversation-relay.test.ts | 119 +++++ tests/voice.runtime.test.ts | 126 +++++ tests/voice.security.test.ts | 46 ++ tests/voice.webhook.test.ts | 44 ++ 29 files changed, 2589 insertions(+), 53 deletions(-) create mode 100644 src/channels/voice/channel-id.ts create mode 100644 src/channels/voice/conversation-relay.ts create mode 100644 src/channels/voice/runtime.ts create mode 100644 src/channels/voice/security.ts create mode 100644 src/channels/voice/session.ts create mode 100644 src/channels/voice/twilio-manager.ts create mode 100644 src/channels/voice/webhook.ts create mode 100644 tests/voice.conversation-relay.test.ts create mode 100644 tests/voice.runtime.test.ts create mode 100644 tests/voice.security.test.ts create mode 100644 tests/voice.webhook.test.ts diff --git a/config.example.json b/config.example.json index f2add227..ccc5964d 100644 --- a/config.example.json +++ b/config.example.json @@ -14,6 +14,7 @@ "msteams": [], "slack": [], "telegram": [], + "voice": [], "whatsapp": [], "email": [] } @@ -157,6 +158,25 @@ "ackReaction": "👀", "mediaMaxMb": 20 }, + "voice": { + "enabled": false, + "provider": "twilio", + "twilio": { + "accountSid": "", + "authToken": "", + "fromNumber": "" + }, + "relay": { + "ttsProvider": "default", + "voice": "", + "transcriptionProvider": "default", + "language": "en-US", + "interruptible": true, + "welcomeGreeting": "Hello! How can I help you today?" + }, + "webhookPath": "/voice", + "maxConcurrentCalls": 8 + }, "imessage": { "enabled": false, "backend": "local", diff --git a/src/channels/channel-registry.ts b/src/channels/channel-registry.ts index 39a999fd..3edd96f3 100644 --- a/src/channels/channel-registry.ts +++ b/src/channels/channel-registry.ts @@ -11,12 +11,14 @@ import { SYSTEM_CAPABILITIES, TELEGRAM_CAPABILITIES, TUI_CAPABILITIES, + VOICE_CAPABILITIES, WHATSAPP_CAPABILITIES, } from './channel.js'; import { isEmailAddress } from './email/allowlist.js'; import { isIMessageHandle } from './imessage/handle.js'; import { isSlackChannelTarget } from './slack/target.js'; import { isTelegramChannelId } from './telegram/target.js'; +import { isVoiceChannelId } from './voice/channel-id.js'; import { isWhatsAppJid } from './whatsapp/phone.js'; const DISCORD_SNOWFLAKE_RE = /^\d{16,22}$/; @@ -31,6 +33,7 @@ const CHANNEL_CAPABILITIES: Record = { slack: SLACK_CAPABILITIES, telegram: TELEGRAM_CAPABILITIES, tui: TUI_CAPABILITIES, + voice: VOICE_CAPABILITIES, whatsapp: WHATSAPP_CAPABILITIES, }; @@ -107,6 +110,7 @@ function inferChannelKind(channelId?: string | null): ChannelKind | undefined { return 'msteams'; } if (isWhatsAppJid(normalized)) return 'whatsapp'; + if (isVoiceChannelId(normalized)) return 'voice'; if (isIMessageHandle(normalized)) return 'imessage'; if (isSlackChannelTarget(normalized)) return 'slack'; if (isTelegramChannelId(normalized)) return 'telegram'; diff --git a/src/channels/channel.ts b/src/channels/channel.ts index 9b19fa97..9f391ced 100644 --- a/src/channels/channel.ts +++ b/src/channels/channel.ts @@ -8,6 +8,7 @@ export type ChannelKind = | 'slack' | 'telegram' | 'tui' + | 'voice' | 'whatsapp'; export const SKILL_CONFIG_CHANNEL_KINDS = [ @@ -16,6 +17,7 @@ export const SKILL_CONFIG_CHANNEL_KINDS = [ 'msteams', 'slack', 'telegram', + 'voice', 'whatsapp', ] as const satisfies readonly ChannelKind[]; @@ -60,6 +62,16 @@ export const DISCORD_CAPABILITIES: ChannelCapabilities = Object.freeze({ export const TUI_CAPABILITIES: ChannelCapabilities = SYSTEM_CAPABILITIES; +export const VOICE_CAPABILITIES: ChannelCapabilities = Object.freeze({ + typing: false, + reactions: false, + threads: false, + embeds: false, + attachments: false, + messageEditing: false, + maxMessageLength: 4_000, +}); + export const WHATSAPP_CAPABILITIES: ChannelCapabilities = Object.freeze({ typing: true, reactions: true, diff --git a/src/channels/voice/channel-id.ts b/src/channels/voice/channel-id.ts new file mode 100644 index 00000000..3f1f181b --- /dev/null +++ b/src/channels/voice/channel-id.ts @@ -0,0 +1,25 @@ +const VOICE_CHANNEL_PREFIX = 'voice:'; + +export function buildVoiceChannelId(callSid: string): string { + const normalized = String(callSid || '').trim(); + if (!normalized) { + throw new Error('Voice call SID is required.'); + } + return `${VOICE_CHANNEL_PREFIX}${normalized}`; +} + +export function isVoiceChannelId(value?: string | null): boolean { + const normalized = String(value || '').trim(); + return normalized.startsWith(VOICE_CHANNEL_PREFIX); +} + +export function parseVoiceChannelId(value?: string | null): string | null { + if (!isVoiceChannelId(value)) { + return null; + } + const callSid = String(value || '') + .trim() + .slice(VOICE_CHANNEL_PREFIX.length) + .trim(); + return callSid || null; +} diff --git a/src/channels/voice/conversation-relay.ts b/src/channels/voice/conversation-relay.ts new file mode 100644 index 00000000..e23b7e42 --- /dev/null +++ b/src/channels/voice/conversation-relay.ts @@ -0,0 +1,251 @@ +import type WebSocket from 'ws'; + +function isRecord(value: unknown): value is Record { + return value !== null && typeof value === 'object' && !Array.isArray(value); +} + +function normalizeString(value: unknown): string { + return typeof value === 'string' ? value : ''; +} + +function normalizeBoolean(value: unknown, fallback: boolean): boolean { + return typeof value === 'boolean' ? value : fallback; +} + +function normalizeNumber(value: unknown): number | undefined { + return typeof value === 'number' && Number.isFinite(value) + ? value + : undefined; +} + +function rawDataToString(raw: WebSocket.Data): string { + if (typeof raw === 'string') { + return raw; + } + if (Buffer.isBuffer(raw)) { + return raw.toString('utf8'); + } + if (Array.isArray(raw)) { + return Buffer.concat(raw).toString('utf8'); + } + return Buffer.from(raw).toString('utf8'); +} + +export interface ConversationRelaySetupMessage { + type: 'setup'; + sessionId: string; + accountSid: string; + parentCallSid?: string; + callSid: string; + from: string; + to: string; + forwardedFrom?: string; + callType?: string; + callerName?: string; + direction?: string; + callStatus?: string; + customParameters?: Record; +} + +export interface ConversationRelayPromptMessage { + type: 'prompt'; + voicePrompt: string; + lang?: string; + last: boolean; +} + +export interface ConversationRelayDtmfMessage { + type: 'dtmf'; + digit: string; +} + +export interface ConversationRelayInterruptMessage { + type: 'interrupt'; + utteranceUntilInterrupt?: string; + durationUntilInterruptMs?: number; +} + +export interface ConversationRelayErrorMessage { + type: 'error'; + description: string; +} + +export type ConversationRelayInboundMessage = + | ConversationRelaySetupMessage + | ConversationRelayPromptMessage + | ConversationRelayDtmfMessage + | ConversationRelayInterruptMessage + | ConversationRelayErrorMessage; + +export function mergePromptFragment( + existing: string, + fragment: string, +): string { + const left = String(existing || ''); + const right = String(fragment || ''); + if (!left) return right; + if (!right) return left; + if (right.startsWith(left)) return right; + if (left.endsWith(right)) return left; + const needsSpace = + !/\s$/.test(left) && !/^\s/.test(right) && /^[A-Za-z0-9]/.test(right); + return needsSpace ? `${left} ${right}` : `${left}${right}`; +} + +export function parseConversationRelayMessage( + raw: WebSocket.Data, +): ConversationRelayInboundMessage { + const decoded = rawDataToString(raw).trim(); + if (!decoded) { + throw new Error('ConversationRelay message was empty.'); + } + let parsed: unknown; + try { + parsed = JSON.parse(decoded) as unknown; + } catch { + throw new Error('ConversationRelay message was not valid JSON.'); + } + if (!isRecord(parsed)) { + throw new Error('ConversationRelay message must be a JSON object.'); + } + const type = normalizeString(parsed.type); + if (type === 'setup') { + return { + type, + sessionId: normalizeString(parsed.sessionId), + accountSid: normalizeString(parsed.accountSid), + parentCallSid: normalizeString(parsed.parentCallSid) || undefined, + callSid: normalizeString(parsed.callSid), + from: normalizeString(parsed.from), + to: normalizeString(parsed.to), + forwardedFrom: normalizeString(parsed.forwardedFrom) || undefined, + callType: normalizeString(parsed.callType) || undefined, + callerName: normalizeString(parsed.callerName) || undefined, + direction: normalizeString(parsed.direction) || undefined, + callStatus: normalizeString(parsed.callStatus) || undefined, + customParameters: isRecord(parsed.customParameters) + ? Object.fromEntries( + Object.entries(parsed.customParameters).map(([name, value]) => [ + name, + normalizeString(value), + ]), + ) + : undefined, + }; + } + if (type === 'prompt') { + return { + type, + voicePrompt: normalizeString(parsed.voicePrompt), + lang: normalizeString(parsed.lang) || undefined, + last: normalizeBoolean(parsed.last, true), + }; + } + if (type === 'dtmf') { + return { + type, + digit: normalizeString(parsed.digit), + }; + } + if (type === 'interrupt') { + return { + type, + utteranceUntilInterrupt: + normalizeString(parsed.utteranceUntilInterrupt) || undefined, + durationUntilInterruptMs: normalizeNumber( + parsed.durationUntilInterruptMs, + ), + }; + } + if (type === 'error') { + return { + type, + description: normalizeString(parsed.description), + }; + } + throw new Error( + `Unsupported ConversationRelay message type: ${type || 'unknown'}`, + ); +} + +type SendFn = (payload: Record) => Promise; + +export class ConversationRelayResponseStream { + private closed = false; + private pendingToken: string | null = null; + private emittedText = false; + + constructor( + private readonly send: SendFn, + private readonly options: { + interruptible: boolean; + language: string; + onFirstToken?: () => void; + onFinished?: () => void; + }, + ) {} + + get finished(): boolean { + return this.closed; + } + + get hasEmittedText(): boolean { + return this.emittedText || Boolean(this.pendingToken); + } + + async push(token: string, opts?: { language?: string }): Promise { + if (this.closed) return; + const normalized = String(token || ''); + if (!normalized) return; + if (this.pendingToken !== null) { + await this.sendText(this.pendingToken, false, opts?.language); + } + this.pendingToken = normalized; + } + + async reply(text: string, opts?: { language?: string }): Promise { + if (this.closed) return; + await this.push(text, opts); + await this.finish(opts); + } + + async finish(opts?: { language?: string }): Promise { + if (this.closed) return; + const finalToken = this.pendingToken; + this.pendingToken = null; + if (finalToken) { + await this.sendText(finalToken, true, opts?.language); + } + this.closed = true; + this.options.onFinished?.(); + } + + async endSession(handoffData?: string): Promise { + if (this.closed) return; + this.closed = true; + await this.send({ + type: 'end', + ...(handoffData ? { handoffData } : {}), + }); + this.options.onFinished?.(); + } + + private async sendText( + token: string, + last: boolean, + language?: string, + ): Promise { + if (!this.emittedText) { + this.emittedText = true; + this.options.onFirstToken?.(); + } + await this.send({ + type: 'text', + token, + last, + lang: language || this.options.language, + interruptible: this.options.interruptible, + preemptible: false, + }); + } +} diff --git a/src/channels/voice/runtime.ts b/src/channels/voice/runtime.ts new file mode 100644 index 00000000..aa99d3d0 --- /dev/null +++ b/src/channels/voice/runtime.ts @@ -0,0 +1,624 @@ +import type { IncomingMessage, ServerResponse } from 'node:http'; +import type { Duplex } from 'node:stream'; +import WebSocket, * as wsModule from 'ws'; +import { getConfigSnapshot, TWILIO_AUTH_TOKEN } from '../../config/config.js'; +import { logger } from '../../logger.js'; +import type { MediaContextItem } from '../../types/container.js'; +import { VOICE_CAPABILITIES } from '../channel.js'; +import { registerChannel } from '../channel-registry.js'; +import { + type ConversationRelayInboundMessage, + ConversationRelayResponseStream, + type ConversationRelaySetupMessage, + mergePromptFragment, + parseConversationRelayMessage, +} from './conversation-relay.js'; +import { ReplayProtector, validateTwilioSignature } from './security.js'; +import { type VoiceCallSession, VoiceCallSessionStore } from './session.js'; +import { + buildPublicHttpUrl, + buildPublicWsUrl, + resolveVoiceWebhookPaths, +} from './twilio-manager.js'; +import { + buildConversationRelayTwiml, + buildEmptyTwiml, + buildHangupTwiml, + readTwilioFormBody, +} from './webhook.js'; + +export type VoiceReplyFn = (content: string) => Promise; + +export interface VoiceMessageContext { + abortSignal: AbortSignal; + callSid: string; + twilioSessionId: string; + remoteIp: string; + setupMessage: ConversationRelaySetupMessage | null; + responseStream: ConversationRelayResponseStream; +} + +export type VoiceMessageHandler = ( + sessionId: string, + guildId: string | null, + channelId: string, + userId: string, + username: string, + content: string, + media: MediaContextItem[], + reply: VoiceReplyFn, + context: VoiceMessageContext, +) => Promise; + +const MAX_PENDING_UPGRADES = 32; +const MAX_CONNECTIONS_PER_IP = 16; +const REPLAY_TTL_MS = 30_000; +const SHUTDOWN_DRAIN_TIMEOUT_MS = 10_000; +const SHUTDOWN_POLL_MS = 100; +const MAX_RECONNECT_ATTEMPTS = 1; +const DUPLICATE_TWILIO_REQUEST_HEADER = 'i-twilio-idempotency-token'; + +const replayProtector = new ReplayProtector(REPLAY_TTL_MS); +let runtimeInitialized = false; +let draining = false; +let voiceMessageHandler: VoiceMessageHandler | null = null; +const sessionStore = new VoiceCallSessionStore( + getConfigSnapshot().voice.maxConcurrentCalls, + MAX_PENDING_UPGRADES, + MAX_CONNECTIONS_PER_IP, +); +type WebSocketServerLike = { + on: ( + event: 'connection', + listener: (socket: WebSocket, req: IncomingMessage) => void, + ) => void; + emit: ( + event: 'connection', + socket: WebSocket, + req: IncomingMessage, + ) => boolean; + handleUpgrade: ( + req: IncomingMessage, + socket: Duplex, + head: Buffer, + cb: (socket: WebSocket) => void, + ) => void; + removeAllListeners: () => void; +}; +const WebSocketServerCtor = ( + wsModule as unknown as { + WebSocketServer: new (options: { noServer: true }) => WebSocketServerLike; + } +).WebSocketServer; +let websocketServer = new WebSocketServerCtor({ noServer: true }); + +function sendXml(res: ServerResponse, statusCode: number, body: string): void { + if (res.headersSent) { + if (!res.writableEnded) { + res.end(); + } + return; + } + res.statusCode = statusCode; + res.setHeader('content-type', 'text/xml; charset=utf-8'); + res.end(body); +} + +function writeUpgradeError( + socket: Duplex, + statusCode: number, + message: string, +): void { + const reason = String(message || 'Error'); + socket.write( + `HTTP/1.1 ${statusCode} ${reason}\r\n` + + 'Connection: close\r\n' + + 'Content-Type: text/plain; charset=utf-8\r\n' + + `Content-Length: ${Buffer.byteLength(reason)}\r\n\r\n` + + reason, + ); + socket.destroy(); +} + +function resolveRemoteIp(req: IncomingMessage): string { + const forwardedFor = req.headers['x-forwarded-for']; + const forwarded = Array.isArray(forwardedFor) + ? forwardedFor[0] + : forwardedFor; + const candidate = String(forwarded || '') + .split(',')[0] + .trim(); + return candidate || String(req.socket.remoteAddress || 'unknown').trim(); +} + +function resolveTwilioAuthToken(): string { + return String(TWILIO_AUTH_TOKEN || '').trim(); +} + +function observeReplay(req: IncomingMessage): void { + const raw = req.headers[DUPLICATE_TWILIO_REQUEST_HEADER]; + const token = Array.isArray(raw) ? raw[0] : raw; + if (!replayProtector.observe(token)) { + logger.warn( + { replayToken: token }, + 'Duplicate Twilio voice request observed', + ); + } +} + +function validateHttpWebhookSignature( + req: IncomingMessage, + url: URL, + values: Record, +): boolean { + const signature = Array.isArray(req.headers['x-twilio-signature']) + ? req.headers['x-twilio-signature'][0] + : req.headers['x-twilio-signature']; + const authToken = resolveTwilioAuthToken(); + const fullUrl = buildPublicHttpUrl(req, `${url.pathname}${url.search}`); + return validateTwilioSignature({ + authToken, + signature, + url: fullUrl, + values, + }); +} + +function validateUpgradeSignature(req: IncomingMessage, url: URL): boolean { + const signature = Array.isArray(req.headers['x-twilio-signature']) + ? req.headers['x-twilio-signature'][0] + : req.headers['x-twilio-signature']; + const authToken = resolveTwilioAuthToken(); + const fullUrl = buildPublicWsUrl(req, `${url.pathname}${url.search}`); + return validateTwilioSignature({ + authToken, + signature, + url: fullUrl, + }); +} + +function transitionSession( + callSid: string, + next: Parameters[1], +): void { + try { + sessionStore.transition(callSid, next); + } catch (error) { + logger.debug( + { error, callSid, next }, + 'Voice session state transition skipped', + ); + } +} + +async function sendWsPayload( + ws: WebSocket, + payload: Record, +): Promise { + await new Promise((resolve, reject) => { + ws.send(JSON.stringify(payload), (error) => { + if (error) { + reject(error); + return; + } + resolve(); + }); + }); +} + +async function sendBusyTwiml(res: ServerResponse): Promise { + sendXml( + res, + 200, + buildHangupTwiml( + 'HybridClaw voice is at capacity right now. Please try again shortly.', + ), + ); +} + +function isReconnectableFailure(params: Record): boolean { + const sessionStatus = String(params.SessionStatus || '') + .trim() + .toLowerCase(); + const callStatus = String(params.CallStatus || '') + .trim() + .toLowerCase(); + const errorMessage = String(params.ErrorMessage || '') + .trim() + .toLowerCase(); + return ( + sessionStatus === 'failed' && + callStatus === 'in-progress' && + errorMessage.includes('websocket') + ); +} + +function buildRelayTwimlForRequest( + req: IncomingMessage, + callSid: string, +): string { + const voiceConfig = getConfigSnapshot().voice; + const paths = resolveVoiceWebhookPaths(voiceConfig.webhookPath); + return buildConversationRelayTwiml({ + websocketUrl: buildPublicWsUrl(req, paths.relayPath), + actionUrl: buildPublicHttpUrl(req, paths.actionPath), + relay: voiceConfig.relay, + customParameters: { + callReference: callSid, + }, + }); +} + +async function dispatchPromptToHandler( + session: VoiceCallSession, + content: string, + language: string, +): Promise { + const handler = voiceMessageHandler; + if (!handler || !session.ws) { + return; + } + + session.controller?.abort(); + const controller = new AbortController(); + sessionStore.setController(session.callSid, controller); + transitionSession(session.callSid, 'thinking'); + + const responseStream = new ConversationRelayResponseStream( + async (payload) => { + if (!session.ws || session.ws.readyState !== WebSocket.OPEN) { + throw new Error('Voice websocket is not connected.'); + } + await sendWsPayload(session.ws, payload); + }, + { + interruptible: getConfigSnapshot().voice.relay.interruptible, + language, + onFirstToken: () => { + transitionSession(session.callSid, 'speaking'); + }, + onFinished: () => { + if (!controller.signal.aborted) { + transitionSession(session.callSid, 'listening'); + } + }, + }, + ); + + const reply: VoiceReplyFn = async (text) => { + await responseStream.reply(text, { language }); + }; + + try { + await handler( + session.gatewaySessionId, + null, + session.channelId, + session.userId, + session.username, + content, + [], + reply, + { + abortSignal: controller.signal, + callSid: session.callSid, + twilioSessionId: session.twilioSessionId || '', + remoteIp: session.remoteIp, + setupMessage: session.setupMessage, + responseStream, + }, + ); + if (!controller.signal.aborted && !responseStream.finished) { + if (responseStream.hasEmittedText) { + await responseStream.finish({ language }); + } else { + await responseStream.reply('I do not have a spoken response yet.', { + language, + }); + } + } + } catch (error) { + if (controller.signal.aborted) { + return; + } + logger.warn( + { error, callSid: session.callSid, channelId: session.channelId }, + 'Voice prompt handling failed', + ); + if (!responseStream.finished) { + await responseStream.reply( + 'Sorry, something went wrong while I was answering that.', + { language }, + ); + } + } finally { + sessionStore.setController(session.callSid, null); + } +} + +async function handleRelayMessage( + session: VoiceCallSession, + message: ConversationRelayInboundMessage, +): Promise { + if (message.type === 'prompt') { + const merged = mergePromptFragment( + session.promptBuffer, + message.voicePrompt, + ); + sessionStore.bufferPrompt(session.callSid, merged); + if (!message.last) { + transitionSession(session.callSid, 'listening'); + return; + } + sessionStore.clearPrompt(session.callSid); + await dispatchPromptToHandler( + session, + merged, + message.lang || getConfigSnapshot().voice.relay.language, + ); + return; + } + if (message.type === 'dtmf') { + await dispatchPromptToHandler( + session, + `The caller pressed the keypad digit "${message.digit}".`, + getConfigSnapshot().voice.relay.language, + ); + return; + } + if (message.type === 'interrupt') { + session.controller?.abort(); + transitionSession(session.callSid, 'interrupted'); + return; + } + if (message.type === 'error') { + logger.warn( + { description: message.description, callSid: session.callSid }, + 'ConversationRelay reported an error', + ); + transitionSession(session.callSid, 'failed'); + } +} + +function handleWebSocketConnection(ws: WebSocket, remoteIp: string): void { + let callSid: string | null = null; + let setupReceived = false; + + ws.on('message', (raw) => { + void (async () => { + try { + const message = parseConversationRelayMessage(raw); + if (message.type === 'setup') { + const session = sessionStore.attachSetup({ + setup: message, + remoteIp, + ws, + }); + if (!session) { + await sendWsPayload(ws, { + type: 'end', + handoffData: JSON.stringify({ reason: 'capacity-exceeded' }), + }); + ws.close(); + return; + } + callSid = message.callSid; + setupReceived = true; + transitionSession(callSid, 'relay-connecting'); + transitionSession(callSid, 'setup-received'); + transitionSession(callSid, 'listening'); + return; + } + if (!setupReceived || !callSid) { + throw new Error('ConversationRelay prompt arrived before setup.'); + } + const session = sessionStore.get(callSid); + if (!session) { + throw new Error(`Unknown voice session for call ${callSid}`); + } + await handleRelayMessage(session, message); + } catch (error) { + logger.warn({ error, callSid, remoteIp }, 'Voice relay message failed'); + if (callSid) { + transitionSession(callSid, 'failed'); + } + if (ws.readyState === WebSocket.OPEN) { + ws.close(1008, 'Invalid voice relay message'); + } + } + })(); + }); + + ws.on('close', () => { + if (!callSid) { + return; + } + const session = sessionStore.get(callSid); + if (!session) { + return; + } + session.controller?.abort(); + session.ws = null; + sessionStore.setController(callSid, null); + if (!draining && session.state !== 'ended' && session.state !== 'failed') { + transitionSession(callSid, 'reconnecting'); + } + }); + + ws.on('error', (error) => { + logger.debug({ error, callSid, remoteIp }, 'Voice relay websocket error'); + }); +} + +export async function initVoice( + messageHandler: VoiceMessageHandler, +): Promise { + voiceMessageHandler = messageHandler; + draining = false; + sessionStore.updateLimits(getConfigSnapshot().voice.maxConcurrentCalls); + if (runtimeInitialized) { + return; + } + runtimeInitialized = true; + websocketServer.removeAllListeners(); + websocketServer = new WebSocketServerCtor({ noServer: true }); + registerChannel({ + kind: 'voice', + id: 'voice', + capabilities: VOICE_CAPABILITIES, + }); + websocketServer.on('connection', (ws: WebSocket, req: IncomingMessage) => { + handleWebSocketConnection(ws, resolveRemoteIp(req)); + }); +} + +export async function handleVoiceWebhook( + req: IncomingMessage, + res: ServerResponse, + url: URL, +): Promise { + const paths = resolveVoiceWebhookPaths(getConfigSnapshot().voice.webhookPath); + if (req.method !== 'POST') { + return false; + } + + if (url.pathname === paths.webhookPath) { + const body = await readTwilioFormBody(req); + if (!validateHttpWebhookSignature(req, url, body)) { + sendXml(res, 403, buildEmptyTwiml()); + return true; + } + observeReplay(req); + if (draining) { + await sendBusyTwiml(res); + return true; + } + + const callSid = String(body.CallSid || '').trim(); + if (!callSid) { + sendXml(res, 400, buildEmptyTwiml()); + return true; + } + const session = sessionStore.getOrCreateFromWebhook({ + callSid, + remoteIp: resolveRemoteIp(req), + from: String(body.From || '').trim(), + to: String(body.To || '').trim(), + callerName: String(body.CallerName || '').trim() || undefined, + }); + if (!session) { + await sendBusyTwiml(res); + return true; + } + transitionSession(callSid, 'twiml-issued'); + sendXml(res, 200, buildRelayTwimlForRequest(req, callSid)); + return true; + } + + if (url.pathname === paths.actionPath) { + const body = await readTwilioFormBody(req); + if (!validateHttpWebhookSignature(req, url, body)) { + sendXml(res, 403, buildEmptyTwiml()); + return true; + } + observeReplay(req); + + const callSid = String(body.CallSid || '').trim(); + const session = callSid ? sessionStore.get(callSid) : undefined; + if (callSid) { + sessionStore.markActionCallback(callSid); + } + + if ( + session && + !draining && + session.reconnectAttempts < MAX_RECONNECT_ATTEMPTS && + isReconnectableFailure(body) + ) { + sessionStore.markReconnectAttempt(callSid); + transitionSession(callSid, 'relay-connecting'); + sendXml(res, 200, buildRelayTwimlForRequest(req, callSid)); + return true; + } + + if (callSid) { + const sessionStatus = String(body.SessionStatus || '') + .trim() + .toLowerCase(); + transitionSession( + callSid, + sessionStatus === 'ended' ? 'ended' : 'failed', + ); + sessionStore.remove(callSid); + } + sendXml(res, 200, buildEmptyTwiml()); + return true; + } + + return false; +} + +export function handleVoiceUpgrade( + req: IncomingMessage, + socket: Duplex, + head: Buffer, + url: URL, +): boolean { + const paths = resolveVoiceWebhookPaths(getConfigSnapshot().voice.webhookPath); + if (url.pathname !== paths.relayPath) { + return false; + } + if (draining || !runtimeInitialized) { + writeUpgradeError(socket, 503, 'Voice channel unavailable'); + return true; + } + if (!validateUpgradeSignature(req, url)) { + writeUpgradeError(socket, 403, 'Forbidden'); + return true; + } + const remoteIp = resolveRemoteIp(req); + if (!sessionStore.beginPendingConnection(remoteIp)) { + writeUpgradeError(socket, 429, 'Too Many Connections'); + return true; + } + websocketServer.handleUpgrade(req, socket, head, (ws: WebSocket) => { + sessionStore.endPendingConnection(remoteIp); + websocketServer.emit('connection', ws, req); + }); + return true; +} + +export async function shutdownVoice(opts?: { drain?: boolean }): Promise { + draining = true; + if (opts?.drain) { + const deadline = Date.now() + SHUTDOWN_DRAIN_TIMEOUT_MS; + while (sessionStore.activeCount() > 0 && Date.now() < deadline) { + await new Promise((resolve) => setTimeout(resolve, SHUTDOWN_POLL_MS)); + } + } + + await Promise.all( + sessionStore.list().map(async (session) => { + session.controller?.abort(); + if (session.ws && session.ws.readyState === WebSocket.OPEN) { + try { + await sendWsPayload(session.ws, { + type: 'end', + handoffData: JSON.stringify({ reason: 'gateway-shutdown' }), + }); + } catch (error) { + logger.debug( + { error, callSid: session.callSid }, + 'Voice shutdown end failed', + ); + } + session.ws.close(); + } + sessionStore.remove(session.callSid); + }), + ); + runtimeInitialized = false; + voiceMessageHandler = null; + websocketServer.removeAllListeners(); + websocketServer = new WebSocketServerCtor({ noServer: true }); +} diff --git a/src/channels/voice/security.ts b/src/channels/voice/security.ts new file mode 100644 index 00000000..3ede16ff --- /dev/null +++ b/src/channels/voice/security.ts @@ -0,0 +1,84 @@ +import { createHmac, timingSafeEqual } from 'node:crypto'; + +export type TwilioSignatureParams = Record; + +function normalizeParamValues( + params: TwilioSignatureParams, +): Array<[name: string, values: string[]]> { + return Object.entries(params) + .map(([name, value]): [string, string[]] => [ + name, + Array.isArray(value) + ? value.map((entry) => String(entry)) + : [String(value)], + ]) + .sort(([left], [right]) => left.localeCompare(right)); +} + +export function buildTwilioSignature(params: { + authToken: string; + url: string; + values?: TwilioSignatureParams; +}): string { + const authToken = String(params.authToken || ''); + const url = String(params.url || ''); + let payload = url; + for (const [name, values] of normalizeParamValues(params.values || {})) { + for (const value of values) { + payload += `${name}${value}`; + } + } + return createHmac('sha1', authToken).update(payload, 'utf8').digest('base64'); +} + +export function validateTwilioSignature(params: { + authToken: string; + signature: string | null | undefined; + url: string; + values?: TwilioSignatureParams; +}): boolean { + const expected = buildTwilioSignature({ + authToken: params.authToken, + url: params.url, + values: params.values, + }); + const actual = String(params.signature || '').trim(); + if (!expected || !actual) { + return false; + } + const expectedBuffer = Buffer.from(expected, 'utf8'); + const actualBuffer = Buffer.from(actual, 'utf8'); + if (expectedBuffer.length !== actualBuffer.length) { + return false; + } + return timingSafeEqual(expectedBuffer, actualBuffer); +} + +export class ReplayProtector { + private readonly entries = new Map(); + + constructor(private readonly ttlMs: number) {} + + observe(token?: string | null): boolean { + const normalized = String(token || '').trim(); + if (!normalized) { + return true; + } + const now = Date.now(); + this.prune(now); + const existing = this.entries.get(normalized); + if (existing && now - existing < this.ttlMs) { + return false; + } + this.entries.set(normalized, now); + return true; + } + + private prune(now: number): void { + for (const [token, seenAt] of this.entries) { + if (now - seenAt >= this.ttlMs) { + this.entries.delete(token); + } + } + } +} diff --git a/src/channels/voice/session.ts b/src/channels/voice/session.ts new file mode 100644 index 00000000..0f0ae8c3 --- /dev/null +++ b/src/channels/voice/session.ts @@ -0,0 +1,284 @@ +import type WebSocket from 'ws'; +import { DEFAULT_AGENT_ID } from '../../agents/agent-types.js'; +import { buildSessionKey } from '../../session/session-key.js'; +import { buildVoiceChannelId } from './channel-id.js'; +import type { ConversationRelaySetupMessage } from './conversation-relay.js'; + +export type VoiceCallState = + | 'initiated' + | 'twiml-issued' + | 'relay-connecting' + | 'setup-received' + | 'listening' + | 'thinking' + | 'speaking' + | 'interrupted' + | 'reconnecting' + | 'ending' + | 'ended' + | 'failed'; + +const TERMINAL_STATES = new Set(['ended', 'failed']); +const ALLOWED_TRANSITIONS: Record = { + initiated: ['twiml-issued', 'relay-connecting', 'failed'], + 'twiml-issued': ['relay-connecting', 'failed'], + 'relay-connecting': ['setup-received', 'failed', 'reconnecting'], + 'setup-received': ['listening', 'failed'], + listening: ['thinking', 'ending', 'failed', 'reconnecting'], + thinking: ['speaking', 'interrupted', 'ending', 'failed', 'reconnecting'], + speaking: ['listening', 'interrupted', 'ending', 'failed', 'reconnecting'], + interrupted: ['listening', 'thinking', 'ending', 'failed', 'reconnecting'], + reconnecting: ['relay-connecting', 'failed', 'ended'], + ending: ['ended', 'failed'], + ended: [], + failed: [], +}; + +export interface VoiceCallSession { + callSid: string; + twilioSessionId: string | null; + channelId: string; + gatewaySessionId: string; + remoteIp: string; + from: string; + to: string; + userId: string; + username: string; + callerName: string; + state: VoiceCallState; + promptBuffer: string; + reconnectAttempts: number; + actionCallbacks: number; + ws: WebSocket | null; + controller: AbortController | null; + setupMessage: ConversationRelaySetupMessage | null; + createdAt: number; + updatedAt: number; +} + +function now(): number { + return Date.now(); +} + +function buildGatewaySessionId(callSid: string): string { + return buildSessionKey(DEFAULT_AGENT_ID, 'voice', 'dm', callSid); +} + +function createSession(params: { + callSid: string; + remoteIp: string; + from: string; + to: string; + userId: string; + username: string; + callerName?: string; +}): VoiceCallSession { + const timestamp = now(); + return { + callSid: params.callSid, + twilioSessionId: null, + channelId: buildVoiceChannelId(params.callSid), + gatewaySessionId: buildGatewaySessionId(params.callSid), + remoteIp: params.remoteIp, + from: params.from, + to: params.to, + userId: params.userId, + username: params.username, + callerName: params.callerName || '', + state: 'initiated', + promptBuffer: '', + reconnectAttempts: 0, + actionCallbacks: 0, + ws: null, + controller: null, + setupMessage: null, + createdAt: timestamp, + updatedAt: timestamp, + }; +} + +export class VoiceCallSessionStore { + private readonly sessions = new Map(); + private readonly pendingConnectionsByIp = new Map(); + + constructor( + private maxConcurrentCalls: number, + private readonly maxPendingConnections: number, + private readonly maxConnectionsPerIp: number, + ) {} + + updateLimits(maxConcurrentCalls: number): void { + this.maxConcurrentCalls = Math.max(1, maxConcurrentCalls); + } + + activeCount(): number { + let count = 0; + for (const session of this.sessions.values()) { + if (!TERMINAL_STATES.has(session.state)) { + count += 1; + } + } + return count; + } + + get(callSid: string): VoiceCallSession | undefined { + return this.sessions.get(callSid); + } + + list(): VoiceCallSession[] { + return Array.from(this.sessions.values()); + } + + getOrCreateFromWebhook(params: { + callSid: string; + remoteIp: string; + from: string; + to: string; + callerName?: string; + }): VoiceCallSession | null { + const existing = this.sessions.get(params.callSid); + if (existing) { + existing.remoteIp = params.remoteIp; + existing.from = params.from; + existing.to = params.to; + existing.userId = params.from || params.callSid; + existing.username = params.callerName || params.from || params.callSid; + existing.callerName = params.callerName || existing.callerName; + existing.updatedAt = now(); + return existing; + } + if (this.activeCount() >= this.maxConcurrentCalls) { + return null; + } + const session = createSession({ + callSid: params.callSid, + remoteIp: params.remoteIp, + from: params.from, + to: params.to, + userId: params.from || params.callSid, + username: params.callerName || params.from || params.callSid, + callerName: params.callerName, + }); + this.sessions.set(params.callSid, session); + return session; + } + + attachSetup(params: { + setup: ConversationRelaySetupMessage; + remoteIp: string; + ws: WebSocket; + }): VoiceCallSession | null { + const setup = params.setup; + const existing = + this.sessions.get(setup.callSid) || + this.getOrCreateFromWebhook({ + callSid: setup.callSid, + remoteIp: params.remoteIp, + from: setup.from, + to: setup.to, + callerName: setup.callerName, + }); + if (!existing) { + return null; + } + existing.twilioSessionId = setup.sessionId; + existing.remoteIp = params.remoteIp; + existing.from = setup.from; + existing.to = setup.to; + existing.userId = setup.from || setup.callSid; + existing.username = setup.callerName || setup.from || setup.callSid; + existing.callerName = setup.callerName || existing.callerName; + existing.setupMessage = setup; + existing.ws = params.ws; + existing.updatedAt = now(); + return existing; + } + + bufferPrompt(callSid: string, content: string): VoiceCallSession | undefined { + const session = this.sessions.get(callSid); + if (!session) return undefined; + session.promptBuffer = content; + session.updatedAt = now(); + return session; + } + + clearPrompt(callSid: string): void { + const session = this.sessions.get(callSid); + if (!session) return; + session.promptBuffer = ''; + session.updatedAt = now(); + } + + setController(callSid: string, controller: AbortController | null): void { + const session = this.sessions.get(callSid); + if (!session) return; + session.controller = controller; + session.updatedAt = now(); + } + + transition(callSid: string, next: VoiceCallState): VoiceCallSession { + const session = this.sessions.get(callSid); + if (!session) { + throw new Error(`Unknown voice call session: ${callSid}`); + } + if (session.state === next) { + return session; + } + const allowed = ALLOWED_TRANSITIONS[session.state]; + if (!allowed.includes(next)) { + throw new Error( + `Invalid voice session state transition: ${session.state} -> ${next}`, + ); + } + session.state = next; + session.updatedAt = now(); + return session; + } + + markReconnectAttempt(callSid: string): VoiceCallSession | undefined { + const session = this.sessions.get(callSid); + if (!session) return undefined; + session.reconnectAttempts += 1; + session.updatedAt = now(); + return session; + } + + markActionCallback(callSid: string): VoiceCallSession | undefined { + const session = this.sessions.get(callSid); + if (!session) return undefined; + session.actionCallbacks += 1; + session.updatedAt = now(); + return session; + } + + remove(callSid: string): void { + this.sessions.delete(callSid); + } + + beginPendingConnection(remoteIp: string): boolean { + const normalizedIp = String(remoteIp || '').trim() || 'unknown'; + let totalPending = 0; + for (const count of this.pendingConnectionsByIp.values()) { + totalPending += count; + } + const ipPending = this.pendingConnectionsByIp.get(normalizedIp) || 0; + if ( + totalPending >= this.maxPendingConnections || + ipPending >= this.maxConnectionsPerIp + ) { + return false; + } + this.pendingConnectionsByIp.set(normalizedIp, ipPending + 1); + return true; + } + + endPendingConnection(remoteIp: string): void { + const normalizedIp = String(remoteIp || '').trim() || 'unknown'; + const count = this.pendingConnectionsByIp.get(normalizedIp) || 0; + if (count <= 1) { + this.pendingConnectionsByIp.delete(normalizedIp); + return; + } + this.pendingConnectionsByIp.set(normalizedIp, count - 1); + } +} diff --git a/src/channels/voice/twilio-manager.ts b/src/channels/voice/twilio-manager.ts new file mode 100644 index 00000000..5810da0e --- /dev/null +++ b/src/channels/voice/twilio-manager.ts @@ -0,0 +1,72 @@ +import type { IncomingMessage } from 'node:http'; +import { GATEWAY_BASE_URL, getConfigSnapshot } from '../../config/config.js'; + +export interface VoiceWebhookPaths { + basePath: string; + webhookPath: string; + relayPath: string; + actionPath: string; +} + +function normalizePath(pathValue: string): string { + const normalized = String(pathValue || '').trim() || '/voice'; + const prefixed = normalized.startsWith('/') ? normalized : `/${normalized}`; + return prefixed.replace(/\/+$/, '') || '/voice'; +} + +function firstForwardedHeader( + req: IncomingMessage, + name: 'x-forwarded-host' | 'x-forwarded-proto', +): string { + const raw = req.headers[name]; + const value = Array.isArray(raw) ? raw[0] : raw; + return String(value || '') + .split(',')[0] + .trim(); +} + +export function resolveVoiceWebhookPaths( + basePath = getConfigSnapshot().voice.webhookPath, +): VoiceWebhookPaths { + const normalizedBasePath = normalizePath(basePath); + return { + basePath: normalizedBasePath, + webhookPath: `${normalizedBasePath}/webhook`, + relayPath: `${normalizedBasePath}/relay`, + actionPath: `${normalizedBasePath}/action`, + }; +} + +export function resolvePublicBaseUrl(req: IncomingMessage): string { + const configured = String(GATEWAY_BASE_URL || '').trim(); + if (configured) { + return configured.replace(/\/+$/, ''); + } + + const host = + firstForwardedHeader(req, 'x-forwarded-host') || + String(req.headers.host || 'localhost').trim(); + const protocol = + firstForwardedHeader(req, 'x-forwarded-proto') || + (req.socket && 'encrypted' in req.socket && req.socket.encrypted + ? 'https' + : 'http'); + return `${protocol}://${host}`; +} + +export function buildPublicHttpUrl( + req: IncomingMessage, + pathValue: string, +): string { + const base = resolvePublicBaseUrl(req); + const normalizedPath = normalizePath(pathValue); + return `${base}${normalizedPath}`; +} + +export function buildPublicWsUrl( + req: IncomingMessage, + pathValue: string, +): string { + const httpUrl = buildPublicHttpUrl(req, pathValue); + return httpUrl.replace(/^http:/i, 'ws:').replace(/^https:/i, 'wss:'); +} diff --git a/src/channels/voice/webhook.ts b/src/channels/voice/webhook.ts new file mode 100644 index 00000000..e0e8c36c --- /dev/null +++ b/src/channels/voice/webhook.ts @@ -0,0 +1,106 @@ +import type { IncomingMessage } from 'node:http'; +import type { RuntimeVoiceRelayConfig } from '../../config/runtime-config.js'; + +export type TwilioFormBody = Record; + +function escapeXml(value: string): string { + return String(value) + .replaceAll('&', '&') + .replaceAll('<', '<') + .replaceAll('>', '>') + .replaceAll('"', '"') + .replaceAll("'", '''); +} + +function buildXmlAttributes( + attributes: Record, +): string { + return Object.entries(attributes) + .filter(([, value]) => value !== undefined) + .map(([name, value]) => ` ${name}="${escapeXml(String(value))}"`) + .join(''); +} + +function resolveInterruptibleMode(value: boolean): 'any' | 'none' { + return value ? 'any' : 'none'; +} + +function toTwilioTtsProvider( + provider: RuntimeVoiceRelayConfig['ttsProvider'], +): string | undefined { + if (provider === 'default') { + return undefined; + } + return provider === 'google' ? 'Google' : 'Amazon'; +} + +function toTwilioTranscriptionProvider( + provider: RuntimeVoiceRelayConfig['transcriptionProvider'], +): string | undefined { + if (provider === 'default') { + return undefined; + } + return provider === 'google' ? 'Google' : 'Deepgram'; +} + +export async function readTwilioFormBody( + req: IncomingMessage, +): Promise { + const chunks: Buffer[] = []; + for await (const chunk of req) { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); + } + const raw = Buffer.concat(chunks).toString('utf8'); + const params = new URLSearchParams(raw); + const body: TwilioFormBody = {}; + for (const [key, value] of params) { + body[key] = value; + } + return body; +} + +export function buildConversationRelayTwiml(params: { + websocketUrl: string; + actionUrl: string; + relay: RuntimeVoiceRelayConfig; + customParameters?: Record; +}): string { + const relayAttributes = buildXmlAttributes({ + url: params.websocketUrl, + welcomeGreeting: params.relay.welcomeGreeting, + welcomeGreetingInterruptible: resolveInterruptibleMode( + params.relay.interruptible, + ), + language: params.relay.language, + ttsProvider: toTwilioTtsProvider(params.relay.ttsProvider), + voice: params.relay.voice || undefined, + transcriptionProvider: toTwilioTranscriptionProvider( + params.relay.transcriptionProvider, + ), + interruptible: resolveInterruptibleMode(params.relay.interruptible), + reportInputDuringAgentSpeech: 'none', + preemptible: 'false', + }); + const parameterXml = Object.entries(params.customParameters || {}) + .map( + ([name, value]) => ``, + ) + .join(''); + return ( + '' + + `` + + `${parameterXml}` + + '' + ); +} + +export function buildHangupTwiml(message: string): string { + return ( + '' + + `${escapeXml(message)}` + ); +} + +export function buildEmptyTwiml(): string { + return ''; +} diff --git a/src/config/config.ts b/src/config/config.ts index e93709d6..f1740ab9 100644 --- a/src/config/config.ts +++ b/src/config/config.ts @@ -144,6 +144,11 @@ function syncRuntimeSecretExports(): void { 'IMESSAGE_PASSWORD', storedSecrets, ); + TWILIO_AUTH_TOKEN = readRuntimeSecretValue( + ['TWILIO_AUTH_TOKEN'], + 'TWILIO_AUTH_TOKEN', + storedSecrets, + ); MSTEAMS_APP_PASSWORD = readRuntimeSecretValue( ['MSTEAMS_APP_PASSWORD'], 'MSTEAMS_APP_PASSWORD', @@ -186,6 +191,7 @@ export let DISCORD_TOKEN = ''; export let EMAIL_PASSWORD = ''; export let TELEGRAM_BOT_TOKEN = ''; export let IMESSAGE_PASSWORD = ''; +export let TWILIO_AUTH_TOKEN = ''; export let MSTEAMS_APP_PASSWORD = ''; export let SLACK_BOT_TOKEN = ''; export let SLACK_APP_TOKEN = ''; @@ -300,6 +306,20 @@ export let WHATSAPP_DEBOUNCE_MS = 2_500; export let WHATSAPP_SEND_READ_RECEIPTS = true; export let WHATSAPP_ACK_REACTION = ''; export let WHATSAPP_MEDIA_MAX_MB = 20; +export let VOICE_ENABLED = false; +export let VOICE_PROVIDER: RuntimeConfig['voice']['provider'] = 'twilio'; +export let VOICE_TWILIO_ACCOUNT_SID = ''; +export let VOICE_TWILIO_FROM_NUMBER = ''; +export let VOICE_RELAY_TTS_PROVIDER: RuntimeConfig['voice']['relay']['ttsProvider'] = + 'default'; +export let VOICE_RELAY_VOICE = ''; +export let VOICE_RELAY_TRANSCRIPTION_PROVIDER: RuntimeConfig['voice']['relay']['transcriptionProvider'] = + 'default'; +export let VOICE_RELAY_LANGUAGE = 'en-US'; +export let VOICE_RELAY_INTERRUPTIBLE = true; +export let VOICE_RELAY_WELCOME_GREETING = 'Hello! How can I help you today?'; +export let VOICE_WEBHOOK_PATH = '/voice'; +export let VOICE_MAX_CONCURRENT_CALLS = 8; export let IMESSAGE_ENABLED = false; export let IMESSAGE_BACKEND: RuntimeConfig['imessage']['backend'] = 'local'; export let IMESSAGE_CLI_PATH = 'imsg'; @@ -624,6 +644,24 @@ function applyRuntimeConfig(config: RuntimeConfig): void { WHATSAPP_SEND_READ_RECEIPTS = config.whatsapp.sendReadReceipts; WHATSAPP_ACK_REACTION = config.whatsapp.ackReaction; WHATSAPP_MEDIA_MAX_MB = Math.max(1, config.whatsapp.mediaMaxMb); + VOICE_ENABLED = config.voice.enabled; + VOICE_PROVIDER = config.voice.provider; + VOICE_TWILIO_ACCOUNT_SID = config.voice.twilio.accountSid; + VOICE_TWILIO_FROM_NUMBER = config.voice.twilio.fromNumber; + TWILIO_AUTH_TOKEN = + readRuntimeSecretValue( + ['TWILIO_AUTH_TOKEN'], + 'TWILIO_AUTH_TOKEN', + storedSecrets, + ) || config.voice.twilio.authToken; + VOICE_RELAY_TTS_PROVIDER = config.voice.relay.ttsProvider; + VOICE_RELAY_VOICE = config.voice.relay.voice; + VOICE_RELAY_TRANSCRIPTION_PROVIDER = config.voice.relay.transcriptionProvider; + VOICE_RELAY_LANGUAGE = config.voice.relay.language; + VOICE_RELAY_INTERRUPTIBLE = config.voice.relay.interruptible; + VOICE_RELAY_WELCOME_GREETING = config.voice.relay.welcomeGreeting; + VOICE_WEBHOOK_PATH = config.voice.webhookPath; + VOICE_MAX_CONCURRENT_CALLS = Math.max(1, config.voice.maxConcurrentCalls); IMESSAGE_ENABLED = config.imessage.enabled; IMESSAGE_BACKEND = config.imessage.backend; IMESSAGE_CLI_PATH = config.imessage.cliPath; diff --git a/src/config/runtime-config.ts b/src/config/runtime-config.ts index 652122ba..c72ff3c2 100644 --- a/src/config/runtime-config.ts +++ b/src/config/runtime-config.ts @@ -353,6 +353,37 @@ export interface RuntimeWhatsAppConfig { mediaMaxMb: number; } +export type RuntimeVoiceProvider = 'twilio'; +export type RuntimeVoiceRelayTtsProvider = 'amazon' | 'default' | 'google'; +export type RuntimeVoiceRelayTranscriptionProvider = + | 'deepgram' + | 'default' + | 'google'; + +export interface RuntimeVoiceTwilioConfig { + accountSid: string; + authToken: string; + fromNumber: string; +} + +export interface RuntimeVoiceRelayConfig { + ttsProvider: RuntimeVoiceRelayTtsProvider; + voice: string; + transcriptionProvider: RuntimeVoiceRelayTranscriptionProvider; + language: string; + interruptible: boolean; + welcomeGreeting: string; +} + +export interface RuntimeVoiceConfig { + enabled: boolean; + provider: RuntimeVoiceProvider; + twilio: RuntimeVoiceTwilioConfig; + relay: RuntimeVoiceRelayConfig; + webhookPath: string; + maxConcurrentCalls: number; +} + export interface RuntimeSlackConfig { enabled: boolean; groupPolicy: SlackGroupPolicy; @@ -510,6 +541,7 @@ export interface RuntimeConfig { slack: RuntimeSlackConfig; telegram: RuntimeTelegramConfig; whatsapp: RuntimeWhatsAppConfig; + voice: RuntimeVoiceConfig; imessage: RuntimeIMessageConfig; email: RuntimeEmailConfig; hybridai: { @@ -910,6 +942,25 @@ const DEFAULT_RUNTIME_CONFIG: RuntimeConfig = { ackReaction: '👀', mediaMaxMb: 20, }, + voice: { + enabled: false, + provider: 'twilio', + twilio: { + accountSid: '', + authToken: '', + fromNumber: '', + }, + relay: { + ttsProvider: 'default', + voice: '', + transcriptionProvider: 'default', + language: 'en-US', + interruptible: true, + welcomeGreeting: 'Hello! How can I help you today?', + }, + webhookPath: '/voice', + maxConcurrentCalls: 8, + }, imessage: { enabled: false, backend: 'local', @@ -1198,6 +1249,7 @@ const SECRET_INPUT_PATHS = [ 'email.password', 'imessage.password', 'telegram.botToken', + 'voice.twilio.authToken', 'local.backends.vllm.apiKey', ] as const; type RuntimeConfigSecretInputPath = (typeof SECRET_INPUT_PATHS)[number]; @@ -1949,6 +2001,50 @@ function normalizeIMessageBackend( return fallback; } +function normalizeVoiceProvider( + value: unknown, + fallback: RuntimeVoiceProvider, +): RuntimeVoiceProvider { + if (typeof value !== 'string') return fallback; + const normalized = value.trim().toLowerCase(); + if (normalized === 'twilio') { + return 'twilio'; + } + return fallback; +} + +function normalizeVoiceRelayTtsProvider( + value: unknown, + fallback: RuntimeVoiceRelayTtsProvider, +): RuntimeVoiceRelayTtsProvider { + if (typeof value !== 'string') return fallback; + const normalized = value.trim().toLowerCase(); + if ( + normalized === 'default' || + normalized === 'google' || + normalized === 'amazon' + ) { + return normalized; + } + return fallback; +} + +function normalizeVoiceRelayTranscriptionProvider( + value: unknown, + fallback: RuntimeVoiceRelayTranscriptionProvider, +): RuntimeVoiceRelayTranscriptionProvider { + if (typeof value !== 'string') return fallback; + const normalized = value.trim().toLowerCase(); + if ( + normalized === 'default' || + normalized === 'google' || + normalized === 'deepgram' + ) { + return normalized; + } + return fallback; +} + function normalizeIMessageDmPolicy( value: unknown, fallback: IMessageDmPolicy, @@ -2111,6 +2207,73 @@ function normalizeSlackConfig( }; } +function normalizeVoiceConfig( + value: unknown, + fallback: RuntimeVoiceConfig, + opts?: { + authToken?: unknown; + }, +): RuntimeVoiceConfig { + const raw = isRecord(value) ? value : {}; + const rawTwilio = isRecord(raw.twilio) ? raw.twilio : {}; + const rawRelay = isRecord(raw.relay) ? raw.relay : {}; + return { + enabled: normalizeBoolean(raw.enabled, fallback.enabled), + provider: normalizeVoiceProvider(raw.provider, fallback.provider), + twilio: { + accountSid: normalizeString( + rawTwilio.accountSid, + fallback.twilio.accountSid, + { allowEmpty: true }, + ), + authToken: normalizeString( + opts?.authToken ?? rawTwilio.authToken, + fallback.twilio.authToken, + { allowEmpty: true }, + ), + fromNumber: normalizeString( + rawTwilio.fromNumber, + fallback.twilio.fromNumber, + { allowEmpty: true }, + ), + }, + relay: { + ttsProvider: normalizeVoiceRelayTtsProvider( + rawRelay.ttsProvider, + fallback.relay.ttsProvider, + ), + voice: normalizeString(rawRelay.voice, fallback.relay.voice, { + allowEmpty: true, + }), + transcriptionProvider: normalizeVoiceRelayTranscriptionProvider( + rawRelay.transcriptionProvider, + fallback.relay.transcriptionProvider, + ), + language: normalizeString(rawRelay.language, fallback.relay.language, { + allowEmpty: false, + }), + interruptible: normalizeBoolean( + rawRelay.interruptible, + fallback.relay.interruptible, + ), + welcomeGreeting: normalizeString( + rawRelay.welcomeGreeting, + fallback.relay.welcomeGreeting, + { allowEmpty: false }, + ), + }, + webhookPath: normalizeApiPath(raw.webhookPath, fallback.webhookPath), + maxConcurrentCalls: normalizeInteger( + raw.maxConcurrentCalls, + fallback.maxConcurrentCalls, + { + min: 1, + max: 128, + }, + ), + }; +} + function normalizeIMessageConfig( value: unknown, fallback: RuntimeIMessageConfig, @@ -3027,6 +3190,11 @@ function getSecretInputFromSource( ? telegram.botToken : undefined; } + if (secretPath === 'voice.twilio.authToken') { + const voice = isRecord(source.voice) ? source.voice : null; + const twilio = voice && isRecord(voice.twilio) ? voice.twilio : null; + return twilio && hasOwn(twilio, 'authToken') ? twilio.authToken : undefined; + } const local = isRecord(source.local) ? source.local : null; const backends = local && isRecord(local.backends) ? local.backends : null; @@ -3067,6 +3235,14 @@ function setSecretInputOnSource( telegram.botToken = value; return; } + if (secretPath === 'voice.twilio.authToken') { + const voice = isRecord(source.voice) ? source.voice : {}; + source.voice = voice; + const twilio = isRecord(voice.twilio) ? voice.twilio : {}; + voice.twilio = twilio; + twilio.authToken = value; + return; + } const local = isRecord(source.local) ? source.local : {}; source.local = local; @@ -3472,6 +3648,7 @@ function normalizeRuntimeConfig( const rawSlack = isRecord(raw.slack) ? raw.slack : {}; const rawTelegram = isRecord(raw.telegram) ? raw.telegram : {}; const rawWhatsApp = isRecord(raw.whatsapp) ? raw.whatsapp : {}; + const rawVoice = isRecord(raw.voice) ? raw.voice : {}; const rawIMessage = isRecord(raw.imessage) ? raw.imessage : {}; const rawEmail = isRecord(raw.email) ? raw.email : {}; const rawHybridAi = isRecord(raw.hybridai) ? raw.hybridai : {}; @@ -3575,6 +3752,10 @@ function normalizeRuntimeConfig( rawEmail.enabled, DEFAULT_RUNTIME_CONFIG.email.enabled, ); + const voiceEnabled = normalizeBoolean( + rawVoice.enabled, + DEFAULT_RUNTIME_CONFIG.voice.enabled, + ); const imessageEnabled = normalizeBoolean( rawIMessage.enabled, DEFAULT_RUNTIME_CONFIG.imessage.enabled, @@ -3615,6 +3796,14 @@ function normalizeRuntimeConfig( required: isSecretRefInput(rawEmail.password) && emailEnabled, }, ); + const rawVoiceTwilio = isRecord(rawVoice.twilio) ? rawVoice.twilio : {}; + const resolvedVoiceAuthToken = resolveConfiguredSecretInput( + rawVoiceTwilio.authToken, + { + path: 'voice.twilio.authToken', + required: isSecretRefInput(rawVoiceTwilio.authToken) && voiceEnabled, + }, + ); const resolvedVllmApiKey = resolveConfiguredSecretInput( rawVllmBackend.apiKey, { @@ -3930,6 +4119,9 @@ function normalizeRuntimeConfig( rawWhatsApp, DEFAULT_RUNTIME_CONFIG.whatsapp, ), + voice: normalizeVoiceConfig(rawVoice, DEFAULT_RUNTIME_CONFIG.voice, { + authToken: resolvedVoiceAuthToken, + }), imessage: normalizeIMessageConfig( rawIMessage, DEFAULT_RUNTIME_CONFIG.imessage, diff --git a/src/gateway/gateway-http-server.ts b/src/gateway/gateway-http-server.ts index 944b86a7..8c4a747a 100644 --- a/src/gateway/gateway-http-server.ts +++ b/src/gateway/gateway-http-server.ts @@ -15,6 +15,11 @@ import { normalizeEmailAddress } from '../channels/email/allowlist.js'; import { handleIMessageWebhook } from '../channels/imessage/runtime.js'; import { runMessageToolAction } from '../channels/message/tool-actions.js'; import { handleMSTeamsWebhook } from '../channels/msteams/runtime.js'; +import { + handleVoiceUpgrade, + handleVoiceWebhook, +} from '../channels/voice/runtime.js'; +import { resolveVoiceWebhookPaths } from '../channels/voice/twilio-manager.js'; import { DATA_DIR, GATEWAY_API_TOKEN, @@ -3154,6 +3159,18 @@ export function startGatewayHttpServer(): GatewayHttpServer { return; } + const voicePaths = resolveVoiceWebhookPaths( + getRuntimeConfig().voice.webhookPath, + ); + if ( + method === 'POST' && + (pathname === voicePaths.webhookPath || + pathname === voicePaths.actionPath) + ) { + dispatchWebhookRoute(res, () => handleVoiceWebhook(req, res, url)); + return; + } + if (pathname.startsWith('/api/')) { if (pathname === MSTEAMS_WEBHOOK_PATH && method === 'POST') { dispatchWebhookRoute(res, () => handleMSTeamsWebhook(req, res)); @@ -3477,6 +3494,10 @@ export function startGatewayHttpServer(): GatewayHttpServer { const host = String(req.headers.host || 'localhost'); const url = new URL(req.url || '/', `http://${host}`); + if (handleVoiceUpgrade(req, socket, head, url)) { + return; + } + if (url.pathname !== '/api/admin/terminal/stream') { writeUpgradeError(socket, 404, 'Not Found'); return; diff --git a/src/gateway/gateway-service.ts b/src/gateway/gateway-service.ts index 306e115a..50a9f67d 100644 --- a/src/gateway/gateway-service.ts +++ b/src/gateway/gateway-service.ts @@ -681,7 +681,8 @@ export function resolveChannelType( source === 'imessage' || source === 'whatsapp' || source === 'email' || - source === 'msteams' + source === 'msteams' || + source === 'voice' ) { return source; } @@ -690,7 +691,8 @@ export function resolveChannelType( inferredChannelType === 'discord' || inferredChannelType === 'imessage' || inferredChannelType === 'whatsapp' || - inferredChannelType === 'email' + inferredChannelType === 'email' || + inferredChannelType === 'voice' ) { return inferredChannelType; } @@ -2057,6 +2059,33 @@ function resolveGatewayPasswordStatus(params: { }; } +function resolveGatewayVoiceAuthStatus(params: { + envValues: Array; + configValue: string; + storedValue?: string; +}): Pick< + NonNullable, + 'authTokenConfigured' | 'authTokenSource' +> { + const credential = resolveRuntimeCredentialStatus( + 'TWILIO_AUTH_TOKEN', + params.envValues, + params.storedValue, + ); + if (credential.source) { + return { + authTokenConfigured: Boolean(credential.value), + authTokenSource: credential.source, + }; + } + + const configValue = String(params.configValue || '').trim(); + return { + authTokenConfigured: Boolean(configValue), + authTokenSource: configValue ? 'config' : null, + }; +} + function resolveGatewayTokenStatus(params: { storedSecretName: string; envValues: Array; @@ -3036,6 +3065,11 @@ export async function getGatewayStatus(): Promise { configValue: runtimeConfig.imessage.password, storedValue: storedSecrets.IMESSAGE_PASSWORD, }); + const voiceAuth = resolveGatewayVoiceAuthStatus({ + envValues: [process.env.TWILIO_AUTH_TOKEN], + configValue: runtimeConfig.voice.twilio.authToken, + storedValue: storedSecrets.TWILIO_AUTH_TOKEN, + }); return { status: 'ok', webAuthConfigured: Boolean(WEB_API_TOKEN), @@ -3069,6 +3103,19 @@ export async function getGatewayStatus(): Promise { telegram, email, imessage, + voice: { + enabled: runtimeConfig.voice.enabled, + accountSidConfigured: Boolean( + runtimeConfig.voice.twilio.accountSid.trim(), + ), + fromNumberConfigured: Boolean( + runtimeConfig.voice.twilio.fromNumber.trim(), + ), + authTokenConfigured: voiceAuth.authTokenConfigured, + authTokenSource: voiceAuth.authTokenSource, + webhookPath: runtimeConfig.voice.webhookPath, + maxConcurrentCalls: runtimeConfig.voice.maxConcurrentCalls, + }, whatsapp: { ...whatsappAuth, pairingQrText: whatsappPairing.pairingQrText, diff --git a/src/gateway/gateway-types.ts b/src/gateway/gateway-types.ts index 800b3ef6..dcfe0426 100644 --- a/src/gateway/gateway-types.ts +++ b/src/gateway/gateway-types.ts @@ -406,6 +406,15 @@ export interface GatewayStatus { passwordConfigured: boolean; passwordSource: 'config' | 'env' | 'runtime-secrets' | null; }; + voice?: { + enabled: boolean; + accountSidConfigured: boolean; + fromNumberConfigured: boolean; + authTokenConfigured: boolean; + authTokenSource: 'config' | 'env' | 'runtime-secrets' | null; + webhookPath: string; + maxConcurrentCalls: number; + }; whatsapp?: { linked: boolean; jid: string | null; diff --git a/src/gateway/gateway.ts b/src/gateway/gateway.ts index e53f80e6..0c5656aa 100644 --- a/src/gateway/gateway.ts +++ b/src/gateway/gateway.ts @@ -62,6 +62,7 @@ import { type TelegramReplyFn, } from '../channels/telegram/runtime.js'; import { isTelegramChannelId } from '../channels/telegram/target.js'; +import { initVoice, shutdownVoice } from '../channels/voice/runtime.js'; import { getWhatsAppAuthStatus, WhatsAppAuthLockError, @@ -85,6 +86,7 @@ import { PROACTIVE_QUEUE_OUTSIDE_HOURS, SLACK_APP_TOKEN, SLACK_BOT_TOKEN, + TWILIO_AUTH_TOKEN, } from '../config/config.js'; import { logger } from '../logger.js'; import { @@ -138,6 +140,7 @@ import { handleGatewayCommand, resumeEnabledFullAutoSessions, } from './gateway-service.js'; +import type { GatewayChatRequest, GatewayChatResult } from './gateway-types.js'; import { runManagedMediaCleanup } from './managed-media-cleanup.js'; import { getDreamTimezone, @@ -228,6 +231,27 @@ function hasSlackConfigChanged( next.mediaMaxMb !== prev.mediaMaxMb ); } + +function hasVoiceConfigChanged( + next: ReturnType['voice'], + prev: ReturnType['voice'], +): boolean { + return ( + next.enabled !== prev.enabled || + next.provider !== prev.provider || + next.twilio.accountSid !== prev.twilio.accountSid || + next.twilio.authToken !== prev.twilio.authToken || + next.twilio.fromNumber !== prev.twilio.fromNumber || + next.relay.ttsProvider !== prev.relay.ttsProvider || + next.relay.voice !== prev.relay.voice || + next.relay.transcriptionProvider !== prev.relay.transcriptionProvider || + next.relay.language !== prev.relay.language || + next.relay.interruptible !== prev.relay.interruptible || + next.relay.welcomeGreeting !== prev.relay.welcomeGreeting || + next.webhookPath !== prev.webhookPath || + next.maxConcurrentCalls !== prev.maxConcurrentCalls + ); +} const DISCORD_APPROVAL_PRESENTATION = createApprovalPresentation('buttons'); const SLACK_APPROVAL_PRESENTATION = createApprovalPresentation('buttons'); const TEAMS_APPROVAL_PRESENTATION = createApprovalPresentation('text'); @@ -270,6 +294,7 @@ function logGatewayStartup(params: { email: boolean; imessage: boolean; telegram: boolean; + voice: boolean; whatsapp: boolean; }; }): void { @@ -520,6 +545,82 @@ async function handleTextChannelCommand(params: { await reply(text); } +async function runTextChannelSlashCommands(params: { + sessionId: string; + guildId: string | null; + channelId: string; + userId: string; + username: string; + content: string; + reply: ReplyFn; +}): Promise { + const slashCommands = resolveTextChannelSlashCommands(params.content); + if (!slashCommands) { + return false; + } + + for (const args of slashCommands) { + await handleTextChannelCommand({ + sessionId: params.sessionId, + guildId: params.guildId, + channelId: params.channelId, + userId: params.userId, + username: params.username, + args, + reply: params.reply, + }); + } + return true; +} + +async function executeTextChannelGatewayTurn(params: { + sessionId: string; + guildId: string | null; + channelId: string; + userId: string; + username: string; + content: string; + media: GatewayChatRequest['media']; + source: string; + reply: ReplyFn; + abortSignal?: GatewayChatRequest['abortSignal']; + onTextDelta?: GatewayChatRequest['onTextDelta']; + onToolProgress?: GatewayChatRequest['onToolProgress']; + onProactiveMessage?: GatewayChatRequest['onProactiveMessage']; + resultTransform?: (result: GatewayChatResult) => GatewayChatResult; +}): Promise { + const handledSlashCommands = await runTextChannelSlashCommands({ + sessionId: params.sessionId, + guildId: params.guildId, + channelId: params.channelId, + userId: params.userId, + username: params.username, + content: params.content, + reply: params.reply, + }); + if (handledSlashCommands) { + return null; + } + + const result = normalizePlaceholderToolReply( + await handleGatewayMessage({ + sessionId: params.sessionId, + guildId: params.guildId, + channelId: params.channelId, + userId: params.userId, + username: params.username, + content: params.content, + media: params.media, + abortSignal: params.abortSignal, + onTextDelta: params.onTextDelta, + onToolProgress: params.onToolProgress, + onProactiveMessage: params.onProactiveMessage, + source: params.source, + }), + ); + return params.resultTransform ? params.resultTransform(result) : result; +} + async function deliverProactiveMessage( channelId: string, text: string, @@ -1793,59 +1894,45 @@ async function startSlackIntegration(): Promise { context, ) => { try { - const slashCommands = resolveTextChannelSlashCommands(content); - if (slashCommands) { - const textReply: ReplyFn = async (message) => { - await reply(message); - }; - for (const args of slashCommands) { - await handleTextChannelCommand({ - sessionId, - guildId, + const textReply: ReplyFn = async (message) => { + await reply(message); + }; + let sawTextDelta = false; + const result = await executeTextChannelGatewayTurn({ + sessionId, + guildId, + channelId, + userId, + username, + content, + media, + source: 'slack', + reply: textReply, + onProactiveMessage: async (message) => { + await deliverProactiveMessage( channelId, - userId, - username, - args, - reply: textReply, - }); - } + message.text, + 'delegate', + message.artifacts, + ); + }, + onTextDelta: (delta) => { + if (!delta || sawTextDelta) return; + sawTextDelta = true; + context.emitLifecyclePhase?.('streaming'); + }, + onToolProgress: (event) => { + if (sawTextDelta) return; + if (event.phase === 'start') { + context.emitLifecyclePhase?.('toolUse'); + } else { + context.emitLifecyclePhase?.('thinking'); + } + }, + }); + if (!result) { return; } - - let sawTextDelta = false; - const result = normalizePlaceholderToolReply( - await handleGatewayMessage({ - sessionId, - guildId, - channelId, - userId, - username, - content, - media, - onProactiveMessage: async (message) => { - await deliverProactiveMessage( - channelId, - message.text, - 'delegate', - message.artifacts, - ); - }, - onTextDelta: (delta) => { - if (!delta || sawTextDelta) return; - sawTextDelta = true; - context.emitLifecyclePhase?.('streaming'); - }, - onToolProgress: (event) => { - if (sawTextDelta) return; - if (event.phase === 'start') { - context.emitLifecyclePhase?.('toolUse'); - } else { - context.emitLifecyclePhase?.('thinking'); - } - }, - source: 'slack', - }), - ); if (result.status === 'error') { await reply( formatError('Agent Error', result.error || 'Unknown error'), @@ -2030,6 +2117,148 @@ async function refreshSlackIntegrationForConfigChange( await startSlackIntegration(); } +async function startVoiceIntegration(): Promise { + const voiceConfig = getConfigSnapshot().voice; + const twilioAuthToken = String(TWILIO_AUTH_TOKEN || '').trim(); + if (!voiceConfig.enabled) { + logger.info('Voice integration disabled in config'); + return false; + } + if ( + !voiceConfig.twilio.accountSid.trim() || + !twilioAuthToken || + !voiceConfig.twilio.fromNumber.trim() + ) { + logger.warn( + { + accountSidConfigured: Boolean(voiceConfig.twilio.accountSid.trim()), + authTokenConfigured: Boolean(twilioAuthToken), + fromNumberConfigured: Boolean(voiceConfig.twilio.fromNumber.trim()), + }, + 'Voice integration disabled: Twilio credentials are incomplete', + ); + return false; + } + + try { + await initVoice( + async ( + sessionId, + guildId, + channelId, + userId, + username, + content, + media, + reply, + context, + ) => { + try { + const textReply: ReplyFn = async (message) => { + await reply(message); + }; + let sawTextDelta = false; + const streamFilter = createSilentReplyStreamFilter(); + const result = await executeTextChannelGatewayTurn({ + sessionId, + guildId, + channelId, + userId, + username, + content, + media, + source: 'voice', + reply: textReply, + abortSignal: context.abortSignal, + onTextDelta: (delta) => { + const filteredDelta = streamFilter.push(delta); + if (!filteredDelta) return; + sawTextDelta = true; + void context.responseStream.push(filteredDelta); + }, + onProactiveMessage: async (message) => { + logger.debug( + { + callSid: context.callSid, + artifactCount: message.artifacts?.length || 0, + }, + 'Skipping proactive voice follow-up', + ); + }, + resultTransform: (result) => normalizePendingApprovalReply(result), + }); + if (!result) { + return; + } + if (result.status === 'error') { + await reply(formatChannelGatewayFailure(result.error)); + return; + } + + const trailingDelta = streamFilter.flush(); + if (trailingDelta) { + sawTextDelta = true; + await context.responseStream.push(trailingDelta); + } + + if (isSilentReply(result.result)) { + return; + } + + const cleanedResultText = stripSilentToken( + String(result.result || ''), + ); + if (!sawTextDelta && cleanedResultText.trim()) { + await reply(cleanedResultText); + } + } catch (error) { + logger.error( + { error, sessionId, channelId, callSid: context.callSid }, + 'Voice message handling failed', + ); + await reply(formatChannelGatewayFailure('Response interrupted.')); + } + }, + ); + logger.info( + { + provider: voiceConfig.provider, + webhookPath: voiceConfig.webhookPath, + maxConcurrentCalls: voiceConfig.maxConcurrentCalls, + }, + 'Voice integration started inside gateway', + ); + return true; + } catch (error) { + logger.warn({ error }, 'Voice integration failed to start'); + return false; + } +} + +async function refreshVoiceIntegrationForConfigChange( + next: ReturnType, + prev: ReturnType, +): Promise { + if (!hasVoiceConfigChanged(next.voice, prev.voice)) return; + + logger.info( + { + enabled: next.voice.enabled, + provider: next.voice.provider, + webhookPath: next.voice.webhookPath, + maxConcurrentCalls: next.voice.maxConcurrentCalls, + }, + 'Config changed, restarting Voice integration', + ); + await shutdownVoice().catch((error) => { + logger.debug( + { error }, + 'Failed to stop Voice runtime during config-change restart', + ); + }); + await startVoiceIntegration(); +} + async function startIMessageIntegration(): Promise { const imessageConfig = getConfigSnapshot().imessage; if (!imessageConfig.enabled) { @@ -2192,6 +2421,9 @@ function setupShutdown(broadcastShutdown: () => void): void { 'Failed to stop WhatsApp runtime during shutdown', ); }); + await shutdownVoice({ drain: opts?.drain }).catch((error) => { + logger.debug({ error }, 'Failed to stop Voice runtime during shutdown'); + }); await shutdownIMessage().catch((error) => { logger.debug( { error }, @@ -2419,6 +2651,7 @@ async function main(): Promise { const emailActive = await startEmailIntegration(); const telegramActive = await startTelegramIntegration(); const whatsappActive = await startWhatsAppIntegration(); + const voiceActive = await startVoiceIntegration(); const imessageActive = await startIMessageIntegration(); startOrRestartHeartbeat(); @@ -2451,6 +2684,12 @@ async function main(): Promise { 'Slack integration restart failed after config change', ); }); + void refreshVoiceIntegrationForConfigChange(next, prev).catch((error) => { + logger.warn( + { error }, + 'Voice integration restart failed after config change', + ); + }); const shouldRestart = next.hybridai.defaultChatbotId !== prev.hybridai.defaultChatbotId || @@ -2535,6 +2774,7 @@ async function main(): Promise { email: emailActive, imessage: imessageActive, telegram: telegramActive, + voice: voiceActive, whatsapp: whatsappActive, }, }); diff --git a/src/security/runtime-secrets.ts b/src/security/runtime-secrets.ts index 0d262cc6..8581817b 100644 --- a/src/security/runtime-secrets.ts +++ b/src/security/runtime-secrets.ts @@ -38,6 +38,7 @@ const SECRET_KEYS = [ 'EMAIL_PASSWORD', 'TELEGRAM_BOT_TOKEN', 'IMESSAGE_PASSWORD', + 'TWILIO_AUTH_TOKEN', 'MSTEAMS_APP_PASSWORD', 'SLACK_BOT_TOKEN', 'SLACK_APP_TOKEN', diff --git a/src/session/session-context.ts b/src/session/session-context.ts index 694c0952..16e306ec 100644 --- a/src/session/session-context.ts +++ b/src/session/session-context.ts @@ -42,6 +42,7 @@ const CHANNEL_KIND_LABELS: Record = { scheduler: 'Scheduler', telegram: 'Telegram', tui: 'TUI', + voice: 'Voice', whatsapp: 'WhatsApp', }; diff --git a/src/session/session-reset.ts b/src/session/session-reset.ts index be244592..29c1d709 100644 --- a/src/session/session-reset.ts +++ b/src/session/session-reset.ts @@ -2,6 +2,7 @@ import { isEmailAddress } from '../channels/email/allowlist.js'; import { isIMessageHandle } from '../channels/imessage/handle.js'; import { isSlackChannelTarget } from '../channels/slack/target.js'; import { isTelegramChannelId } from '../channels/telegram/target.js'; +import { isVoiceChannelId } from '../channels/voice/channel-id.js'; import { isWhatsAppJid } from '../channels/whatsapp/phone.js'; import type { RuntimeConfig } from '../config/runtime-config.js'; @@ -42,6 +43,7 @@ export function resolveSessionResetChannelKind( if (!normalized) return undefined; if (LOCAL_SESSION_RESET_CHANNEL_KINDS.has(normalized)) return normalized; if (isWhatsAppJid(normalized)) return 'whatsapp'; + if (isVoiceChannelId(normalized)) return 'voice'; if (isIMessageHandle(normalized)) return 'imessage'; if (isSlackChannelTarget(normalized)) return 'slack'; if (isTelegramChannelId(normalized)) return 'telegram'; diff --git a/tests/channel-registry.test.ts b/tests/channel-registry.test.ts index 8a9c8a54..7234e944 100644 --- a/tests/channel-registry.test.ts +++ b/tests/channel-registry.test.ts @@ -46,6 +46,7 @@ test('getChannelByContextId resolves Discord, WhatsApp, Telegram, iMessage, emai expect(getChannelByContextId('19:channel@thread.tacv2')?.kind).toBe( 'msteams', ); + expect(getChannelByContextId('voice:CA1234567890abcdef')?.kind).toBe('voice'); }); test('getChannel normalizes the teams alias only for registered channels', async () => { @@ -119,6 +120,7 @@ test('capability presets match expected defaults', async () => { TELEGRAM_CAPABILITIES, SYSTEM_CAPABILITIES, TUI_CAPABILITIES, + VOICE_CAPABILITIES, WHATSAPP_CAPABILITIES, } = await importFreshChannelRegistryModules(); @@ -133,4 +135,6 @@ test('capability presets match expected defaults', async () => { expect(TUI_CAPABILITIES).toBe(SYSTEM_CAPABILITIES); expect(EMAIL_CAPABILITIES.attachments).toBe(true); expect(EMAIL_CAPABILITIES.reactions).toBe(false); + expect(VOICE_CAPABILITIES.attachments).toBe(false); + expect(VOICE_CAPABILITIES.maxMessageLength).toBe(4_000); }); diff --git a/tests/gateway-http-server.test.ts b/tests/gateway-http-server.test.ts index ee328d3f..c0a0b518 100644 --- a/tests/gateway-http-server.test.ts +++ b/tests/gateway-http-server.test.ts @@ -1047,6 +1047,8 @@ async function importFreshHealth(options?: { ); const handleIMessageWebhook = vi.fn(async () => {}); const handleMSTeamsWebhook = vi.fn(async () => {}); + const handleVoiceWebhook = vi.fn(async () => false); + const handleVoiceUpgrade = vi.fn(() => false); const claimQueuedProactiveMessages = vi.fn(() => [ { id: 1, text: 'queued message' }, ]); @@ -1106,6 +1108,10 @@ async function importFreshHealth(options?: { vi.doMock('../src/channels/imessage/runtime.js', () => ({ handleIMessageWebhook, })); + vi.doMock('../src/channels/voice/runtime.js', () => ({ + handleVoiceUpgrade, + handleVoiceWebhook, + })); vi.doMock('../src/memory/db.js', () => ({ claimQueuedProactiveMessages, getSessionById, @@ -1298,6 +1304,8 @@ async function importFreshHealth(options?: { resolveGatewayChatbotId, resolveModelRuntimeCredentials, handleIMessageWebhook, + handleVoiceUpgrade, + handleVoiceWebhook, runMessageToolAction, normalizeDiscordToolAction, claimQueuedProactiveMessages, @@ -1322,6 +1330,7 @@ afterEach(() => { vi.doUnmock('../src/providers/factory.js'); vi.doUnmock('../src/channels/imessage/runtime.js'); vi.doUnmock('../src/channels/msteams/runtime.js'); + vi.doUnmock('../src/channels/voice/runtime.js'); vi.doUnmock('../src/channels/message/tool-actions.js'); vi.doUnmock('../src/channels/discord/tool-actions.js'); vi.doUnmock('../src/gateway/media-upload-quota.ts'); @@ -1358,6 +1367,39 @@ describe('gateway HTTP server', () => { expect(JSON.parse(res.body)).toEqual({ status: 'ok', sessions: 2 }); }); + test('routes voice webhooks using the configured webhookPath', async () => { + const homeDir = fs.mkdtempSync( + path.join(os.tmpdir(), 'hybridclaw-voice-http-'), + ); + tempDirs.push(homeDir); + process.env.HOME = homeDir; + writeRuntimeConfig(homeDir, (config) => { + const voice = config.voice as Record; + voice.webhookPath = '/telephony'; + }); + + const state = await importFreshHealth(); + state.handleVoiceWebhook.mockImplementationOnce(async (_req, res) => { + res.statusCode = 202; + res.end('voice-webhook'); + return true; + }); + const req = makeRequest({ + method: 'POST', + url: '/telephony/webhook', + headers: { host: 'voice.example.com' }, + }); + const res = makeResponse(); + + state.handler(req as never, res as never); + await vi.waitFor(() => + expect(state.handleVoiceWebhook).toHaveBeenCalledTimes(1), + ); + + expect(res.statusCode).toBe(202); + expect(res.body).toBe('voice-webhook'); + }); + test('rejects unauthorized API requests from non-loopback addresses', async () => { const state = await importFreshHealth(); const req = makeRequest({ diff --git a/tests/gateway-main.test.ts b/tests/gateway-main.test.ts index c5f46ae2..09e81231 100644 --- a/tests/gateway-main.test.ts +++ b/tests/gateway-main.test.ts @@ -33,6 +33,10 @@ function createGatewayMainTestState(options?: { slackEnabled?: boolean; slackInitError?: Error; hasSlackCredentials?: boolean; + twilioAuthToken?: string; + voiceEnabled?: boolean; + voiceConfigAuthToken?: string; + voiceInitError?: Error; whatsappEnabled?: boolean; whatsappLinked?: boolean; msteamsEnabled?: boolean; @@ -51,6 +55,7 @@ function createGatewayMainTestState(options?: { imessageMessageHandler: null as | null | ((...args: unknown[]) => Promise), + voiceMessageHandler: null as null | ((...args: unknown[]) => Promise), whatsappMessageHandler: null as | null | ((...args: unknown[]) => Promise), @@ -95,6 +100,27 @@ function createGatewayMainTestState(options?: { textChunkLimit: 4_000, mediaMaxMb: 20, }, + voice: { + enabled: options?.voiceEnabled ?? false, + provider: 'twilio', + twilio: { + accountSid: options?.voiceEnabled ? 'AC123' : '', + authToken: + options?.voiceConfigAuthToken ?? + (options?.voiceEnabled ? 'twilio-auth-token' : ''), + fromNumber: options?.voiceEnabled ? '+14155550123' : '', + }, + relay: { + ttsProvider: 'default', + voice: '', + transcriptionProvider: 'default', + language: 'en-US', + interruptible: true, + welcomeGreeting: 'Hello! How can I help you today?', + }, + webhookPath: '/voice', + maxConcurrentCalls: 8, + }, msteams: { enabled: options?.msteamsEnabled ?? true, webhook: { @@ -165,6 +191,7 @@ function createGatewayMainTestState(options?: { initMSTeams: vi.fn(), initSlack: vi.fn(), initTelegram: vi.fn(), + initVoice: vi.fn(), initWhatsApp: vi.fn(), initializeWorkflowRuntime: vi.fn(), initGatewayService: vi.fn( @@ -239,6 +266,8 @@ async function importFreshGatewayMain(options?: { slackEnabled?: boolean; hasSlackCredentials?: boolean; whatsappEnabled?: boolean; + voiceEnabled?: boolean; + voiceInitError?: Error; whatsappInitError?: Error; whatsappAuthLockError?: { lockPath: string; @@ -298,6 +327,12 @@ async function importFreshGatewayMain(options?: { } state.imessageMessageHandler = messageHandler; }); + state.initVoice.mockImplementation((messageHandler) => { + if (options?.voiceInitError) { + throw options.voiceInitError; + } + state.voiceMessageHandler = messageHandler; + }); class MockWhatsAppAuthLockError extends Error { readonly lockPath: string; readonly ownerPid: number | null; @@ -392,6 +427,10 @@ async function importFreshGatewayMain(options?: { sendToTelegramChat: vi.fn(async () => {}), shutdownTelegram: state.shutdownTelegram, })); + vi.doMock('../src/channels/voice/runtime.js', () => ({ + initVoice: state.initVoice, + shutdownVoice: vi.fn(async () => {}), + })); vi.doMock('../src/channels/msteams/runtime.js', () => ({ initMSTeams: state.initMSTeams, })); @@ -433,6 +472,8 @@ async function importFreshGatewayMain(options?: { SLACK_BOT_TOKEN: options?.hasSlackCredentials === false ? '' : 'xoxb-slack-bot-token', TELEGRAM_BOT_TOKEN: '', + TWILIO_AUTH_TOKEN: + options?.twilioAuthToken ?? state.currentConfig.voice.twilio.authToken, getConfigSnapshot: state.getConfigSnapshot, HEARTBEAT_CHANNEL: '', HEARTBEAT_INTERVAL: 1_000, @@ -570,6 +611,7 @@ afterEach(() => { vi.doUnmock('../src/channels/discord/runtime.js'); vi.doUnmock('../src/channels/imessage/runtime.js'); vi.doUnmock('../src/channels/telegram/runtime.js'); + vi.doUnmock('../src/channels/voice/runtime.js'); vi.doUnmock('../src/channels/msteams/attachments.js'); vi.doUnmock('../src/channels/msteams/runtime.js'); vi.doUnmock('../src/channels/slack/runtime.js'); @@ -710,6 +752,31 @@ describe('gateway bootstrap', () => { ); }); + test('starts voice integration automatically when enabled in config', async () => { + const state = await importFreshGatewayMain({ voiceEnabled: true }); + + expect(state.initVoice).toHaveBeenCalledTimes(1); + expect(state.voiceMessageHandler).not.toBeNull(); + expectInfoLog( + state, + 'Gateway channels', + expect.objectContaining({ + voice: true, + }), + ); + }); + + test('starts voice integration when the Twilio auth token comes from shared secret resolution', async () => { + const state = await importFreshGatewayMain({ + voiceEnabled: true, + voiceConfigAuthToken: '', + twilioAuthToken: 'twilio-auth-token', + }); + + expect(state.initVoice).toHaveBeenCalledTimes(1); + expect(state.voiceMessageHandler).not.toBeNull(); + }); + test('keeps gateway startup running when iMessage integration fails to initialize', async () => { const state = await importFreshGatewayMain({ imessageEnabled: true, diff --git a/tests/gateway-status.test.ts b/tests/gateway-status.test.ts index 6c21e2ee..0b11f91e 100644 --- a/tests/gateway-status.test.ts +++ b/tests/gateway-status.test.ts @@ -369,7 +369,7 @@ test('model list codex uses the current Codex models payload shape', async () => config.local.backends.vllm.enabled = false; }); - const fetchMock = vi.fn(async (input: string | URL, init?: RequestInit) => { + const fetchMock = vi.fn(async (input: string | URL, _init?: RequestInit) => { const url = new URL(String(input)); if ( url.origin === 'https://chatgpt.com' && @@ -650,6 +650,49 @@ test('getGatewayStatus includes Discord token status from runtime secrets', asyn }); }); +test('getGatewayStatus includes voice Twilio credential status', async () => { + const homeDir = makeTempHome(); + process.env.HOME = homeDir; + delete process.env.TWILIO_AUTH_TOKEN; + vi.resetModules(); + mockHealthProbes(); + + const { initDatabase } = await import('../src/memory/db.ts'); + const { saveRuntimeSecrets } = await import( + '../src/security/runtime-secrets.ts' + ); + const { updateRuntimeConfig } = await import( + '../src/config/runtime-config.ts' + ); + const { getGatewayStatus } = await import( + '../src/gateway/gateway-service.ts' + ); + + saveRuntimeSecrets({ + TWILIO_AUTH_TOKEN: 'voice-auth-token', + }); + updateRuntimeConfig((draft) => { + draft.voice.enabled = true; + draft.voice.twilio.accountSid = 'AC123'; + draft.voice.twilio.fromNumber = '+14155550123'; + draft.voice.twilio.authToken = ''; + draft.voice.webhookPath = '/voice'; + draft.voice.maxConcurrentCalls = 8; + }); + initDatabase({ quiet: true }); + const status = await getGatewayStatus(); + + expect(status.voice).toEqual({ + enabled: true, + accountSidConfigured: true, + fromNumberConfigured: true, + authTokenConfigured: true, + authTokenSource: 'runtime-secrets', + webhookPath: '/voice', + maxConcurrentCalls: 8, + }); +}); + test('status command includes the current session agent', async () => { const homeDir = makeTempHome(); process.env.HOME = homeDir; diff --git a/tests/runtime-config.secret-refs.test.ts b/tests/runtime-config.secret-refs.test.ts index b06e62bd..78a0fc05 100644 --- a/tests/runtime-config.secret-refs.test.ts +++ b/tests/runtime-config.secret-refs.test.ts @@ -112,6 +112,7 @@ describe('runtime config secret refs', () => { WEB_API_TOKEN: 'web-token-from-store', EMAIL_PASSWORD: 'email-app-password', IMESSAGE_PASSWORD: 'bluebubbles-password', + TWILIO_AUTH_TOKEN: 'twilio-auth-token', }); process.env.TEST_GATEWAY_TOKEN = 'gateway-token-from-env'; process.env.TEST_VLLM_API_KEY = 'vllm-token-from-env'; @@ -131,6 +132,13 @@ describe('runtime config secret refs', () => { email.enabled = true; email.password = { source: 'store', id: 'EMAIL_PASSWORD' }; + const voice = config.voice as Record; + voice.enabled = true; + const twilio = voice.twilio as Record; + twilio.accountSid = 'AC123'; + twilio.fromNumber = '+14155550123'; + twilio.authToken = { source: 'store', id: 'TWILIO_AUTH_TOKEN' }; + const local = config.local as Record; const backends = local.backends as Record; const vllm = backends.vllm as Record; @@ -145,6 +153,7 @@ describe('runtime config secret refs', () => { expect(config.ops.gatewayApiToken).toBe('gateway-token-from-env'); expect(config.email.password).toBe('email-app-password'); expect(config.imessage.password).toBe('bluebubbles-password'); + expect(config.voice.twilio.authToken).toBe('twilio-auth-token'); expect(config.local.backends.vllm.apiKey).toBe('vllm-token-from-env'); }); diff --git a/tests/session-reset.test.ts b/tests/session-reset.test.ts index be76edff..96e8836d 100644 --- a/tests/session-reset.test.ts +++ b/tests/session-reset.test.ts @@ -223,6 +223,9 @@ test('resolveSessionResetChannelKind infers real channel kinds from channel ids' expect(resolveSessionResetChannelKind('imessage:peer@example.com')).toBe( 'imessage', ); + expect(resolveSessionResetChannelKind('voice:CA1234567890abcdef')).toBe( + 'voice', + ); expect(resolveSessionResetChannelKind('tui')).toBe('tui'); expect(resolveSessionResetChannelKind('web')).toBe('web'); expect(resolveSessionResetChannelKind('cli')).toBe('cli'); diff --git a/tests/voice.conversation-relay.test.ts b/tests/voice.conversation-relay.test.ts new file mode 100644 index 00000000..5afd91a4 --- /dev/null +++ b/tests/voice.conversation-relay.test.ts @@ -0,0 +1,119 @@ +import { expect, test } from 'vitest'; +import { + ConversationRelayResponseStream, + mergePromptFragment, + parseConversationRelayMessage, +} from '../src/channels/voice/conversation-relay.js'; + +test('parseConversationRelayMessage decodes setup and prompt payloads', () => { + const setup = parseConversationRelayMessage( + JSON.stringify({ + type: 'setup', + sessionId: 'VX123', + accountSid: 'AC123', + callSid: 'CA123', + from: '+14155550123', + to: '+14155550124', + customParameters: { + callReference: 'CA123', + }, + }), + ); + const prompt = parseConversationRelayMessage( + JSON.stringify({ + type: 'prompt', + voicePrompt: 'Hello from caller', + lang: 'en-US', + last: true, + }), + ); + + expect(setup.type).toBe('setup'); + expect(setup.callSid).toBe('CA123'); + expect(setup.customParameters).toEqual({ callReference: 'CA123' }); + expect(prompt).toEqual({ + type: 'prompt', + voicePrompt: 'Hello from caller', + lang: 'en-US', + last: true, + }); +}); + +test('mergePromptFragment handles incremental and cumulative prompt fragments', () => { + expect(mergePromptFragment('', 'Hello')).toBe('Hello'); + expect(mergePromptFragment('Hello', ' world')).toBe('Hello world'); + expect(mergePromptFragment('Hello', 'Hello world')).toBe('Hello world'); +}); + +test('ConversationRelayResponseStream buffers the final token until finish', async () => { + const payloads: Array> = []; + const stream = new ConversationRelayResponseStream( + async (payload) => { + payloads.push(payload); + }, + { + interruptible: true, + language: 'en-US', + }, + ); + + await stream.push('Hello'); + await stream.push(' world'); + await stream.finish(); + + expect(payloads).toEqual([ + { + type: 'text', + token: 'Hello', + last: false, + lang: 'en-US', + interruptible: true, + preemptible: false, + }, + { + type: 'text', + token: ' world', + last: true, + lang: 'en-US', + interruptible: true, + preemptible: false, + }, + ]); +}); + +test('ConversationRelayResponseStream can send a single reply and end the session', async () => { + const payloads: Array> = []; + const stream = new ConversationRelayResponseStream( + async (payload) => { + payloads.push(payload); + }, + { + interruptible: false, + language: 'en-US', + }, + ); + + await stream.reply('Goodbye'); + expect(payloads[0]).toMatchObject({ + type: 'text', + token: 'Goodbye', + last: true, + interruptible: false, + }); + + const endStream = new ConversationRelayResponseStream( + async (payload) => { + payloads.push(payload); + }, + { + interruptible: false, + language: 'en-US', + }, + ); + await endStream.endSession('{"reason":"handoff"}'); + + expect(payloads.at(-1)).toEqual({ + type: 'end', + handoffData: '{"reason":"handoff"}', + }); +}); diff --git a/tests/voice.runtime.test.ts b/tests/voice.runtime.test.ts new file mode 100644 index 00000000..2657c233 --- /dev/null +++ b/tests/voice.runtime.test.ts @@ -0,0 +1,126 @@ +import { Readable } from 'node:stream'; +import { afterEach, expect, test, vi } from 'vitest'; +import { buildTwilioSignature } from '../src/channels/voice/security.js'; + +function makeFormRequest(params: { + url: string; + body: Record; + headers?: Record; +}) { + const encoded = new URLSearchParams(params.body).toString(); + return Object.assign(Readable.from([Buffer.from(encoded)]), { + method: 'POST', + url: params.url, + headers: { + host: 'voice.example.com', + 'content-type': 'application/x-www-form-urlencoded', + ...params.headers, + }, + socket: { + remoteAddress: '127.0.0.1', + }, + }); +} + +function makeResponse() { + const headers: Record = {}; + return { + body: '', + headers, + headersSent: false, + statusCode: 0, + writableEnded: false, + end(chunk?: unknown) { + if (chunk != null) { + this.body += Buffer.isBuffer(chunk) + ? chunk.toString('utf8') + : String(chunk); + } + this.headersSent = true; + this.writableEnded = true; + }, + setHeader(name: string, value: string) { + headers[name] = value; + }, + }; +} + +afterEach(() => { + vi.restoreAllMocks(); + vi.doUnmock('../src/config/config.js'); + vi.doUnmock('../src/logger.js'); + vi.resetModules(); +}); + +test('handleVoiceWebhook validates signatures with the shared resolved Twilio auth token', async () => { + const getConfigSnapshot = vi.fn(() => ({ + voice: { + enabled: true, + provider: 'twilio', + twilio: { + accountSid: 'AC123', + authToken: '', + fromNumber: '+14155550123', + }, + relay: { + ttsProvider: 'default', + voice: '', + transcriptionProvider: 'default', + language: 'en-US', + interruptible: true, + welcomeGreeting: 'Hello! How can I help you today?', + }, + webhookPath: '/voice', + maxConcurrentCalls: 8, + }, + })); + + vi.doMock('../src/config/config.js', () => ({ + GATEWAY_BASE_URL: '', + TWILIO_AUTH_TOKEN: 'env-voice-token', + getConfigSnapshot, + })); + vi.doMock('../src/logger.js', () => ({ + logger: { + debug: vi.fn(), + error: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + }, + })); + + const { handleVoiceWebhook, shutdownVoice } = await import( + '../src/channels/voice/runtime.js' + ); + const body = { + CallSid: 'CA123', + From: '+15550001111', + To: '+15550002222', + }; + const signature = buildTwilioSignature({ + authToken: 'env-voice-token', + url: 'https://voice.example.com/voice/webhook', + values: body, + }); + const req = makeFormRequest({ + url: '/voice/webhook', + body, + headers: { + 'x-forwarded-proto': 'https', + 'x-twilio-signature': signature, + }, + }); + const res = makeResponse(); + + const handled = await handleVoiceWebhook( + req as never, + res as never, + new URL('http://voice.example.com/voice/webhook'), + ); + + expect(handled).toBe(true); + expect(res.statusCode).toBe(200); + expect(res.body).toContain(' { + const authToken = 'secret'; + const url = 'https://example.com/voice/webhook'; + const values = { + CallSid: 'CA123', + From: '+14155550123', + To: '+14155550124', + }; + const signature = buildTwilioSignature({ + authToken, + url, + values, + }); + + expect( + validateTwilioSignature({ + authToken, + signature, + url, + values, + }), + ).toBe(true); + expect( + validateTwilioSignature({ + authToken, + signature: 'invalid', + url, + values, + }), + ).toBe(false); +}); + +test('ReplayProtector rejects duplicate tokens inside the TTL window', () => { + const protector = new ReplayProtector(30_000); + + expect(protector.observe('abc123')).toBe(true); + expect(protector.observe('abc123')).toBe(false); + expect(protector.observe('different')).toBe(true); +}); diff --git a/tests/voice.webhook.test.ts b/tests/voice.webhook.test.ts new file mode 100644 index 00000000..317704c9 --- /dev/null +++ b/tests/voice.webhook.test.ts @@ -0,0 +1,44 @@ +import { expect, test } from 'vitest'; +import { + buildConversationRelayTwiml, + buildHangupTwiml, +} from '../src/channels/voice/webhook.js'; + +test('buildConversationRelayTwiml renders ConversationRelay with configured attributes', () => { + const xml = buildConversationRelayTwiml({ + websocketUrl: 'wss://voice.example.com/voice/relay', + actionUrl: 'https://voice.example.com/voice/action', + relay: { + ttsProvider: 'google', + voice: 'en-US-Journey-D', + transcriptionProvider: 'deepgram', + language: 'en-US', + interruptible: true, + welcomeGreeting: 'Hello there!', + }, + customParameters: { + callReference: 'CA123', + }, + }); + + expect(xml).toContain( + '', + ); + expect(xml).toContain( + ''); +}); + +test('buildHangupTwiml escapes XML content', () => { + const xml = buildHangupTwiml('Voice & support '); + expect(xml).toContain('Voice & support <busy>'); + expect(xml).toContain(''); +}); From 86b8d8f4ae40bd0b00f06d6502689f5f7e32a431 Mon Sep 17 00:00:00 2001 From: Benedikt Koehler Date: Tue, 14 Apr 2026 00:06:09 +0200 Subject: [PATCH 2/2] feat: complete Twilio voice channel integration --- console/src/api/types.ts | 28 ++ console/src/components/channel-logo.tsx | 14 + console/src/routes/channels-catalog.ts | 38 ++ console/src/routes/channels.test.tsx | 59 +++ console/src/routes/channels.tsx | 276 +++++++++++++ console/src/routes/scheduler.test.tsx | 19 + docs/content/agents.md | 1 + docs/content/guides/README.md | 4 +- docs/content/guides/twilio-voice.md | 434 ++++++++++++++++++++ docs/content/guides/voice-tts.md | 4 + docs/content/reference/configuration.md | 3 +- docs/development/agents.md | 1 + docs/development/guides/README.md | 4 +- docs/development/guides/twilio-voice.md | 434 ++++++++++++++++++++ docs/development/guides/voice-tts.md | 4 + docs/development/reference/configuration.md | 3 +- docs/static/docs.js | 1 + src/channels/voice/conversation-relay.ts | 79 ++-- src/channels/voice/runtime.ts | 150 ++++++- src/channels/voice/security.ts | 2 +- src/channels/voice/text.ts | 112 +++++ src/channels/voice/twilio-manager.ts | 118 +++++- src/cli/help.ts | 4 +- src/command-registry.ts | 69 ++++ src/gateway/gateway-plugin-service.ts | 5 +- src/gateway/gateway-service.ts | 170 +++++++- src/gateway/gateway.ts | 72 +++- tests/command-registry.test.ts | 54 +++ tests/gateway-main.test.ts | 44 ++ tests/gateway-service.voice-command.test.ts | 190 +++++++++ tests/tui-slash-menu.test.ts | 2 + tests/voice.conversation-relay.test.ts | 70 ++++ tests/voice.security.test.ts | 18 + tests/voice.text.test.ts | 35 ++ 34 files changed, 2472 insertions(+), 49 deletions(-) create mode 100644 docs/content/guides/twilio-voice.md create mode 100644 docs/development/guides/twilio-voice.md create mode 100644 src/channels/voice/text.ts create mode 100644 tests/gateway-service.voice-command.test.ts create mode 100644 tests/voice.text.test.ts diff --git a/console/src/api/types.ts b/console/src/api/types.ts index 89a33b86..4ce8b2ae 100644 --- a/console/src/api/types.ts +++ b/console/src/api/types.ts @@ -72,6 +72,15 @@ export interface GatewayStatus { passwordConfigured: boolean; passwordSource: 'config' | 'env' | 'runtime-secrets' | null; }; + voice?: { + enabled: boolean; + accountSidConfigured: boolean; + fromNumberConfigured: boolean; + authTokenConfigured: boolean; + authTokenSource: 'config' | 'env' | 'runtime-secrets' | null; + webhookPath: string; + maxConcurrentCalls: number; + }; whatsapp?: { linked: boolean; jid: string | null; @@ -447,6 +456,25 @@ export interface AdminConfig { textChunkLimit: number; mediaMaxMb: number; }; + voice: { + enabled: boolean; + provider: 'twilio'; + twilio: { + accountSid: string; + authToken: string; + fromNumber: string; + }; + relay: { + ttsProvider: 'default' | 'google' | 'amazon'; + voice: string; + transcriptionProvider: 'default' | 'deepgram' | 'google'; + language: string; + interruptible: boolean; + welcomeGreeting: string; + }; + webhookPath: string; + maxConcurrentCalls: number; + }; whatsapp: { dmPolicy: 'open' | 'pairing' | 'allowlist' | 'disabled'; groupPolicy: 'open' | 'allowlist' | 'disabled'; diff --git a/console/src/components/channel-logo.tsx b/console/src/components/channel-logo.tsx index e6621c99..50d6d469 100644 --- a/console/src/components/channel-logo.tsx +++ b/console/src/components/channel-logo.tsx @@ -2,6 +2,7 @@ export function ChannelLogo(props: { kind: | 'discord' | 'telegram' + | 'voice' | 'whatsapp' | 'email' | 'slack' @@ -51,6 +52,19 @@ export function ChannelLogo(props: { ); + case 'voice': + return ( + + ); case 'email': return (