From ce0f26e7644b35d36f9a7f6f83cd593b0181e002 Mon Sep 17 00:00:00 2001 From: sr-anam Date: Mon, 5 Jan 2026 15:03:07 +0000 Subject: [PATCH] feat: handle webrtc reasoning events --- src/AnamClient.ts | 7 ++ src/modules/ReasoningHistoryClient.ts | 76 +++++++++++++++++++ src/modules/StreamingClient.ts | 8 ++ src/modules/index.ts | 1 + src/types/AgentAudioInputStream.ts | 4 +- src/types/events/internal/InternalEvent.ts | 1 + .../events/internal/InternalEventCallbacks.ts | 4 + src/types/events/public/AnamEvent.ts | 2 + src/types/events/public/EventCallbacks.ts | 8 ++ src/types/messageHistory/ReasoningMessage.ts | 5 ++ .../messageHistory/ReasoningStreamEvent.ts | 6 ++ src/types/messageHistory/index.ts | 2 + src/types/streaming/DataChannelMessage.ts | 1 + .../WebRtcReasoningTextMessageEvent.ts | 7 ++ src/types/streaming/index.ts | 1 + 15 files changed, 132 insertions(+), 1 deletion(-) create mode 100644 src/modules/ReasoningHistoryClient.ts create mode 100644 src/types/messageHistory/ReasoningMessage.ts create mode 100644 src/types/messageHistory/ReasoningStreamEvent.ts create mode 100644 src/types/streaming/WebRtcReasoningTextMessageEvent.ts diff --git a/src/AnamClient.ts b/src/AnamClient.ts index 8276006..c1c81c0 100644 --- a/src/AnamClient.ts +++ b/src/AnamClient.ts @@ -18,6 +18,7 @@ import { MessageHistoryClient, PublicEventEmitter, StreamingClient, + ReasoningHistoryClient, } from './modules'; import { AnamClientOptions, @@ -38,6 +39,7 @@ export default class AnamClient { private internalEventEmitter: InternalEventEmitter; private readonly messageHistoryClient: MessageHistoryClient; + private readonly reasoningHistoryClient: ReasoningHistoryClient; private personaConfig: PersonaConfig | undefined; private clientOptions: AnamClientOptions | undefined; @@ -104,6 +106,11 @@ export default class AnamClient { this.publicEventEmitter, this.internalEventEmitter, ); + + this.reasoningHistoryClient = new ReasoningHistoryClient( + this.publicEventEmitter, + this.internalEventEmitter, + ); } private decodeJwt(token: string): any { diff --git a/src/modules/ReasoningHistoryClient.ts b/src/modules/ReasoningHistoryClient.ts new file mode 100644 index 0000000..4032551 --- /dev/null +++ b/src/modules/ReasoningHistoryClient.ts @@ -0,0 +1,76 @@ +import { PublicEventEmitter, InternalEventEmitter } from '.'; +import { + AnamEvent, + InternalEvent, + ReasoningMessage, + ReasoningStreamEvent, + WebRtcReasoningTextMessageEvent, +} from '../types'; + +export class ReasoningHistoryClient { + private publicEventEmitter: PublicEventEmitter; + private internalEventEmitter: InternalEventEmitter; + + private reasoning_messages: ReasoningMessage[] = []; + constructor( + publicEventEmitter: PublicEventEmitter, + internalEventEmitter: InternalEventEmitter, + ) { + this.publicEventEmitter = publicEventEmitter; + this.internalEventEmitter = internalEventEmitter; + // register for events + this.internalEventEmitter.addListener( + InternalEvent.WEBRTC_REASONING_TEXT_MESSAGE_RECEIVED, + this.processWebRtcReasoningTextMessageEvent.bind(this), + ); + } + + private webRtcTextMessageEventToReasoningStreamEvent( + event: WebRtcReasoningTextMessageEvent, + ): ReasoningStreamEvent { + return { + id: `${event.role}::${event.message_id}`, + content: event.content, + endOfThought: event.end_of_thought, + role: event.role, + }; + } + + private processWebRtcReasoningTextMessageEvent( + event: WebRtcReasoningTextMessageEvent, + ): void { + const ReasoningStreamEvent: ReasoningStreamEvent = + this.webRtcTextMessageEventToReasoningStreamEvent(event); + + this.publicEventEmitter.emit( + AnamEvent.REASONING_STREAM_EVENT_RECEIVED, + ReasoningStreamEvent, + ); + + const message: ReasoningMessage = { + id: ReasoningStreamEvent.id, + content: ReasoningStreamEvent.content, + role: ReasoningStreamEvent.role, + }; + + const existingMessageIndex = this.reasoning_messages.findIndex( + (m) => m.id === message.id, + ); + if (existingMessageIndex !== -1) { + // update existing message + const existingMessage = this.reasoning_messages[existingMessageIndex]; + existingMessage.content += message.content; + this.reasoning_messages[existingMessageIndex] = existingMessage; + } else { + // new message + this.reasoning_messages.push(message); + } + + if (ReasoningStreamEvent.endOfThought) { + this.publicEventEmitter.emit( + AnamEvent.REASONING_HISTORY_UPDATED, + this.reasoning_messages, + ); + } + } +} diff --git a/src/modules/StreamingClient.ts b/src/modules/StreamingClient.ts index 16b88cc..c272a96 100644 --- a/src/modules/StreamingClient.ts +++ b/src/modules/StreamingClient.ts @@ -25,6 +25,7 @@ import { StreamingClientOptions, WebRtcClientToolEvent, WebRtcTextMessageEvent, + WebRtcReasoningTextMessageEvent, } from '../types'; import { AgentAudioInputStream } from '../types/AgentAudioInputStream'; import { TalkMessageStream } from '../types/TalkMessageStream'; @@ -693,6 +694,7 @@ export class StreamingClient { message.data as WebRtcTextMessageEvent, ); break; + case DataChannelMessage.CLIENT_TOOL_EVENT: const webRtcToolEvent = message.data as WebRtcClientToolEvent; @@ -709,6 +711,12 @@ export class StreamingClient { clientToolEvent, ); break; + case DataChannelMessage.REASONING_TEXT: + this.internalEventEmitter.emit( + InternalEvent.WEBRTC_REASONING_TEXT_MESSAGE_RECEIVED, + message.data as WebRtcReasoningTextMessageEvent, + ); + break; // Unknown message types are silently ignored to maintain forward compatibility default: break; diff --git a/src/modules/index.ts b/src/modules/index.ts index 73698ac..82003bc 100644 --- a/src/modules/index.ts +++ b/src/modules/index.ts @@ -6,3 +6,4 @@ export { MessageHistoryClient } from './MessageHistoryClient'; export { PublicEventEmitter } from './PublicEventEmitter'; export { StreamingClient } from './StreamingClient'; export { ToolCallManager } from './ToolCallManager'; +export { ReasoningHistoryClient } from './ReasoningHistoryClient'; diff --git a/src/types/AgentAudioInputStream.ts b/src/types/AgentAudioInputStream.ts index a3a2810..f520257 100644 --- a/src/types/AgentAudioInputStream.ts +++ b/src/types/AgentAudioInputStream.ts @@ -62,7 +62,9 @@ export class AgentAudioInputStream { private arrayBufferToBase64(buffer: ArrayBuffer | Uint8Array): string { const bytes = buffer instanceof Uint8Array ? buffer : new Uint8Array(buffer); - const binary = Array.from(bytes, byte => String.fromCharCode(byte)).join(''); + const binary = Array.from(bytes, (byte) => String.fromCharCode(byte)).join( + '', + ); return btoa(binary); } } diff --git a/src/types/events/internal/InternalEvent.ts b/src/types/events/internal/InternalEvent.ts index bd677da..67e36c6 100644 --- a/src/types/events/internal/InternalEvent.ts +++ b/src/types/events/internal/InternalEvent.ts @@ -3,4 +3,5 @@ export enum InternalEvent { SIGNAL_MESSAGE_RECEIVED = 'SIGNAL_MESSAGE_RECEIVED', WEBRTC_CHAT_MESSAGE_RECEIVED = 'WEBRTC_CHAT_MESSAGE_RECEIVED', WEBRTC_CLIENT_TOOL_EVENT_RECEIVED = 'WEBRTC_CLIENT_TOOL_EVENT_RECEIVED', + WEBRTC_REASONING_TEXT_MESSAGE_RECEIVED = 'WEBRTC_REASONING_TEXT_MESSAGE_RECEIVED', } diff --git a/src/types/events/internal/InternalEventCallbacks.ts b/src/types/events/internal/InternalEventCallbacks.ts index 5edbbad..8fded79 100644 --- a/src/types/events/internal/InternalEventCallbacks.ts +++ b/src/types/events/internal/InternalEventCallbacks.ts @@ -3,6 +3,7 @@ import { SignalMessage, WebRtcTextMessageEvent, WebRtcClientToolEvent, + WebRtcReasoningTextMessageEvent, } from '../../index'; export type InternalEventCallbacks = { @@ -16,4 +17,7 @@ export type InternalEventCallbacks = { [InternalEvent.WEBRTC_CLIENT_TOOL_EVENT_RECEIVED]: ( webRtcToolEvent: WebRtcClientToolEvent, ) => void; + [InternalEvent.WEBRTC_REASONING_TEXT_MESSAGE_RECEIVED]: ( + message: WebRtcReasoningTextMessageEvent, + ) => void; }; diff --git a/src/types/events/public/AnamEvent.ts b/src/types/events/public/AnamEvent.ts index 1d8e185..1392b58 100644 --- a/src/types/events/public/AnamEvent.ts +++ b/src/types/events/public/AnamEvent.ts @@ -15,4 +15,6 @@ export enum AnamEvent { MIC_PERMISSION_DENIED = 'MIC_PERMISSION_DENIED', INPUT_AUDIO_DEVICE_CHANGED = 'INPUT_AUDIO_DEVICE_CHANGED', CLIENT_TOOL_EVENT_RECEIVED = 'CLIENT_TOOL_EVENT_RECEIVED', + REASONING_HISTORY_UPDATED = 'REASONING_HISTORY_UPDATED', + REASONING_STREAM_EVENT_RECEIVED = 'REASONING_STREAM_EVENT_RECEIVED', } diff --git a/src/types/events/public/EventCallbacks.ts b/src/types/events/public/EventCallbacks.ts index dcd258f..3c43510 100644 --- a/src/types/events/public/EventCallbacks.ts +++ b/src/types/events/public/EventCallbacks.ts @@ -4,6 +4,8 @@ import { MessageStreamEvent, AnamEvent, ClientToolEvent, + ReasoningMessage, + ReasoningStreamEvent, } from '../../index'; export type EventCallbacks = { @@ -30,4 +32,10 @@ export type EventCallbacks = { [AnamEvent.CLIENT_TOOL_EVENT_RECEIVED]: ( clientToolEvent: ClientToolEvent, ) => void; + [AnamEvent.REASONING_HISTORY_UPDATED]: ( + thoughtMessages: ReasoningMessage[], + ) => void; + [AnamEvent.REASONING_STREAM_EVENT_RECEIVED]: ( + thoughtEvent: ReasoningStreamEvent, + ) => void; }; diff --git a/src/types/messageHistory/ReasoningMessage.ts b/src/types/messageHistory/ReasoningMessage.ts new file mode 100644 index 0000000..091d5de --- /dev/null +++ b/src/types/messageHistory/ReasoningMessage.ts @@ -0,0 +1,5 @@ +export interface ReasoningMessage { + id: string; + content: string; + role: string; +} diff --git a/src/types/messageHistory/ReasoningStreamEvent.ts b/src/types/messageHistory/ReasoningStreamEvent.ts new file mode 100644 index 0000000..c0bccfc --- /dev/null +++ b/src/types/messageHistory/ReasoningStreamEvent.ts @@ -0,0 +1,6 @@ +export interface ReasoningStreamEvent { + id: string; + content: string; + endOfThought: boolean; + role: string; +} diff --git a/src/types/messageHistory/index.ts b/src/types/messageHistory/index.ts index 79fcedd..88ff8aa 100644 --- a/src/types/messageHistory/index.ts +++ b/src/types/messageHistory/index.ts @@ -1,3 +1,5 @@ export { MessageRole } from './MessageRole'; export type { Message } from './Message'; export type { MessageStreamEvent } from './MessageStreamEvent'; +export type { ReasoningMessage } from './ReasoningMessage'; +export type { ReasoningStreamEvent } from './ReasoningStreamEvent'; diff --git a/src/types/streaming/DataChannelMessage.ts b/src/types/streaming/DataChannelMessage.ts index 7913f4b..4e0970b 100644 --- a/src/types/streaming/DataChannelMessage.ts +++ b/src/types/streaming/DataChannelMessage.ts @@ -1,4 +1,5 @@ export enum DataChannelMessage { SPEECH_TEXT = 'speechText', CLIENT_TOOL_EVENT = 'clientToolEvent', + REASONING_TEXT = 'reasoningText', } diff --git a/src/types/streaming/WebRtcReasoningTextMessageEvent.ts b/src/types/streaming/WebRtcReasoningTextMessageEvent.ts new file mode 100644 index 0000000..661d62a --- /dev/null +++ b/src/types/streaming/WebRtcReasoningTextMessageEvent.ts @@ -0,0 +1,7 @@ +export interface WebRtcReasoningTextMessageEvent { + message_id: string; + content_index: number; + content: string; + role: string; + end_of_thought: boolean; +} diff --git a/src/types/streaming/index.ts b/src/types/streaming/index.ts index 2d383e2..523faca 100644 --- a/src/types/streaming/index.ts +++ b/src/types/streaming/index.ts @@ -4,3 +4,4 @@ export type { WebRtcClientToolEvent } from './WebRtcClientToolEvent'; export type { StreamingClientOptions } from './StreamingClientOptions'; export type { InputAudioOptions } from './InputAudioOptions'; export { DataChannelMessage } from './DataChannelMessage'; +export { WebRtcReasoningTextMessageEvent } from './WebRtcReasoningTextMessageEvent';