From d0beea340b0bd29d60dbd4b06a8aafecdcafb68c Mon Sep 17 00:00:00 2001 From: Salem Date: Mon, 16 Feb 2026 17:45:19 +0000 Subject: [PATCH] Add Telegram live streaming previews for Claude responses --- src/channels/telegram-client.ts | 239 +++++++++++++++++++++++++++++--- src/lib/invoke.ts | 104 +++++++++++++- src/queue-processor.ts | 71 +++++++++- 3 files changed, 391 insertions(+), 23 deletions(-) diff --git a/src/channels/telegram-client.ts b/src/channels/telegram-client.ts index 2af065e..a7fd8ee 100644 --- a/src/channels/telegram-client.ts +++ b/src/channels/telegram-client.ts @@ -46,6 +46,8 @@ interface PendingMessage { chatId: number; messageId: number; timestamp: number; + streamPreviewMessageId?: number; + streamPreviewDisabled?: boolean; } interface QueueData { @@ -69,6 +71,12 @@ interface ResponseData { files?: string[]; } +const TELEGRAM_TEXT_LIMIT = 4096; +const TELEGRAM_MESSAGE_NOT_MODIFIED_RE = /message is not modified/i; +const DEFAULT_PENDING_MAX_AGE_MS = 10 * 60 * 1000; +const MAX_PENDING_MAX_AGE_MS = 2 * 60 * 60 * 1000; +const PENDING_FILE = path.join(TINYCLAW_HOME, 'queue', 'pending-telegram.json'); + function sanitizeFileName(fileName: string): string { const baseName = path.basename(fileName).replace(/[<>:"/\\|?*\x00-\x1f]/g, '_').trim(); return baseName.length > 0 ? baseName : 'file.bin'; @@ -98,6 +106,38 @@ function buildUniqueFilePath(dir: string, preferredName: string): string { const pendingMessages = new Map(); let processingOutgoingQueue = false; +function loadPendingMessages(): void { + try { + if (!fs.existsSync(PENDING_FILE)) { + return; + } + const data: Record = JSON.parse(fs.readFileSync(PENDING_FILE, 'utf8')); + const tenMinutesAgo = Date.now() - DEFAULT_PENDING_MAX_AGE_MS; + for (const [id, msg] of Object.entries(data)) { + if (msg.timestamp >= tenMinutesAgo) { + pendingMessages.set(id, msg); + } + } + if (pendingMessages.size > 0) { + log('INFO', `Restored ${pendingMessages.size} pending message(s) from disk`); + } + } catch (error) { + log('WARN', `Failed to load pending messages: ${(error as Error).message}`); + } +} + +function savePendingMessages(): void { + try { + const obj: Record = {}; + for (const [id, msg] of pendingMessages.entries()) { + obj[id] = msg; + } + fs.writeFileSync(PENDING_FILE, JSON.stringify(obj, null, 2)); + } catch (error) { + log('WARN', `Failed to save pending messages: ${(error as Error).message}`); + } +} + // Logger function log(level: string, message: string): void { const timestamp = new Date().toISOString(); @@ -106,6 +146,94 @@ function log(level: string, message: string): void { fs.appendFileSync(LOG_FILE, logMessage); } +function resolvePendingMaxAgeMs(): number { + try { + if (!fs.existsSync(SETTINGS_FILE)) { + return DEFAULT_PENDING_MAX_AGE_MS; + } + const settings = JSON.parse(fs.readFileSync(SETTINGS_FILE, 'utf8')); + const maxResponseTimeSec = Number(settings?.monitoring?.max_response_time); + if (!Number.isFinite(maxResponseTimeSec) || maxResponseTimeSec <= 0) { + return DEFAULT_PENDING_MAX_AGE_MS; + } + const ms = Math.trunc(maxResponseTimeSec * 1000); + return Math.max(DEFAULT_PENDING_MAX_AGE_MS, Math.min(ms, MAX_PENDING_MAX_AGE_MS)); + } catch { + return DEFAULT_PENDING_MAX_AGE_MS; + } +} + +const PENDING_MAX_AGE_MS = resolvePendingMaxAgeMs(); + +function isTelegramMessageNotModified(err: any): boolean { + const description = err?.response?.body?.description; + return typeof description === 'string' && TELEGRAM_MESSAGE_NOT_MODIFIED_RE.test(description); +} + +async function sendFormattedMessage(chatId: number, text: string, opts?: Record): Promise { + return await bot.sendMessage(chatId, text, opts || {}); +} + +async function editFormattedMessage(chatId: number, messageId: number, text: string): Promise { + try { + await bot.editMessageText(text, { + chat_id: chatId, + message_id: messageId, + }); + return true; + } catch (err: any) { + if (isTelegramMessageNotModified(err)) { + return true; + } + throw err; + } +} + +async function clearPreviewMessage(pending: PendingMessage): Promise { + const previewMessageId = pending.streamPreviewMessageId; + pending.streamPreviewMessageId = undefined; + pending.streamPreviewDisabled = false; + if (typeof previewMessageId !== 'number') { + return; + } + try { + await bot.deleteMessage(pending.chatId, previewMessageId); + } catch (err: any) { + const description = err?.response?.body?.description; + if (typeof description === 'string' && description.toLowerCase().includes('message to delete not found')) { + return; + } + log('WARN', `Failed to clear stream preview ${previewMessageId}: ${(err as Error).message}`); + } +} + +async function cleanupPendingMessages(options?: { maxAgeMs?: number; chatId?: number; reason?: string }): Promise { + const now = Date.now(); + const maxAgeMs = options?.maxAgeMs ?? PENDING_MAX_AGE_MS; + const targetChatId = options?.chatId; + let removed = 0; + + for (const [id, data] of pendingMessages.entries()) { + if (typeof targetChatId === 'number' && data.chatId !== targetChatId) { + continue; + } + const isExpired = now - data.timestamp > maxAgeMs; + const shouldRemove = typeof targetChatId === 'number' ? true : isExpired; + if (!shouldRemove) { + continue; + } + pendingMessages.delete(id); + await clearPreviewMessage(data); + removed++; + log('INFO', `Cleared pending message ${id}${options?.reason ? ` (${options.reason})` : ''}`); + } + + if (removed > 0) { + savePendingMessages(); + } + return removed; +} + // Load teams from settings for /team command function getTeamListText(): string { try { @@ -153,7 +281,7 @@ function getAgentListText(): string { } // Split long messages for Telegram's 4096 char limit -function splitMessage(text: string, maxLength = 4096): string[] { +function splitMessage(text: string, maxLength = TELEGRAM_TEXT_LIMIT): string[] { if (text.length <= maxLength) { return [text]; } @@ -259,6 +387,9 @@ function pairingMessage(code: string): string { // Initialize Telegram bot (polling mode) const bot = new TelegramBot(TELEGRAM_BOT_TOKEN, { polling: true }); +// Restore pending messages so restarts do not orphan in-flight requests. +loadPendingMessages(); + // Bot ready bot.getMe().then(async (me: TelegramBot.User) => { log('INFO', `Telegram bot connected as @${me.username}`); @@ -390,7 +521,14 @@ bot.on('message', async (msg: TelegramBot.Message) => { // Check for reset command: /reset @agent_id [@agent_id2 ...] const resetMatch = messageText.trim().match(/^[!/]reset\s+(.+)$/i); if (messageText.trim().match(/^[!/]reset$/i)) { - await bot.sendMessage(msg.chat.id, 'Usage: /reset @agent_id [@agent_id2 ...]\nSpecify which agent(s) to reset.', { + log('INFO', 'Reset command received'); + const resetFlagPath = path.join(TINYCLAW_HOME, 'reset_flag'); + fs.writeFileSync(resetFlagPath, 'reset'); + await cleanupPendingMessages({ + chatId: msg.chat.id, + reason: 'reset', + }); + await bot.sendMessage(msg.chat.id, 'Conversation reset! Next message will start a fresh conversation.', { reply_to_message_id: msg.message_id, }); return; @@ -422,6 +560,10 @@ bot.on('message', async (msg: TelegramBot.Message) => { reply_to_message_id: msg.message_id, }); } + await cleanupPendingMessages({ + chatId: msg.chat.id, + reason: 'reset', + }); return; } @@ -457,14 +599,11 @@ bot.on('message', async (msg: TelegramBot.Message) => { messageId: msg.message_id, timestamp: Date.now(), }); - - // Clean up old pending messages (older than 10 minutes) - const tenMinutesAgo = Date.now() - (10 * 60 * 1000); - for (const [id, data] of pendingMessages.entries()) { - if (data.timestamp < tenMinutesAgo) { - pendingMessages.delete(id); - } - } + await cleanupPendingMessages({ + maxAgeMs: PENDING_MAX_AGE_MS, + reason: 'expired', + }); + savePendingMessages(); } catch (error) { log('ERROR', `Message handling error: ${(error as Error).message}`); @@ -490,13 +629,58 @@ async function checkOutgoingQueue(): Promise { const responseData: ResponseData = JSON.parse(fs.readFileSync(filePath, 'utf8')); const { messageId, message: responseText, sender, senderId } = responseData; + // Handle partial streaming responses. + if (messageId.startsWith('partial_')) { + const realMessageId = messageId.replace(/^partial_/, '').replace(/_r\d+$/, ''); + const pending = pendingMessages.get(realMessageId); + if (pending && responseData.message) { + const previewText = responseData.message.trimEnd(); + if (!pending.streamPreviewDisabled && previewText) { + if (previewText.length > TELEGRAM_TEXT_LIMIT) { + pending.streamPreviewDisabled = true; + log('WARN', `Stopped stream preview for ${realMessageId}: text exceeded ${TELEGRAM_TEXT_LIMIT} chars`); + } else if (typeof pending.streamPreviewMessageId === 'number') { + await editFormattedMessage(pending.chatId, pending.streamPreviewMessageId, previewText); + } else { + const sent = await sendFormattedMessage(pending.chatId, previewText); + pending.streamPreviewMessageId = sent.message_id; + } + } + pending.timestamp = Date.now(); + savePendingMessages(); + log('INFO', `Updated stream preview (${responseData.message.length} chars) for ${realMessageId}`); + } + fs.unlinkSync(filePath); + continue; + } + // Find pending message, or fall back to senderId for proactive messages const pending = pendingMessages.get(messageId); const targetChatId = pending?.chatId ?? (senderId ? Number(senderId) : null); if (targetChatId && !Number.isNaN(targetChatId)) { + let finalizedViaPreview = false; + if (pending && typeof pending.streamPreviewMessageId === 'number' && responseText && (!responseData.files || responseData.files.length === 0)) { + const finalText = responseText.trimEnd(); + if (finalText.length > 0 && finalText.length <= TELEGRAM_TEXT_LIMIT) { + try { + await editFormattedMessage(targetChatId, pending.streamPreviewMessageId, finalText); + finalizedViaPreview = true; + pending.streamPreviewMessageId = undefined; + pending.streamPreviewDisabled = false; + log('INFO', `Finalized stream preview in place for ${messageId}`); + } catch (err: any) { + log('WARN', `Preview finalization failed for ${messageId}, falling back to normal send: ${err.message}`); + } + } + } + + if (!finalizedViaPreview && pending && typeof pending.streamPreviewMessageId === 'number') { + await clearPreviewMessage(pending); + } + // Send any attached files first - if (responseData.files && responseData.files.length > 0) { + if (!finalizedViaPreview && responseData.files && responseData.files.length > 0) { for (const file of responseData.files) { try { if (!fs.existsSync(file)) continue; @@ -518,23 +702,31 @@ async function checkOutgoingQueue(): Promise { } // Split message if needed (Telegram 4096 char limit) - if (responseText) { + if (!finalizedViaPreview && responseText) { const chunks = splitMessage(responseText); if (chunks.length > 0) { - await bot.sendMessage(targetChatId, chunks[0]!, pending + await sendFormattedMessage(targetChatId, chunks[0]!, pending ? { reply_to_message_id: pending.messageId } : {}, ); } for (let i = 1; i < chunks.length; i++) { - await bot.sendMessage(targetChatId, chunks[i]!); + await sendFormattedMessage(targetChatId, chunks[i]!); } + } else if (!finalizedViaPreview && pending) { + log('WARN', `Empty response for message ${messageId}, sending fallback`); + await bot.sendMessage(targetChatId, 'Response was empty. The agent may still be processing. Try again.', { + reply_to_message_id: pending.messageId, + }); } log('INFO', `Sent ${pending ? 'response' : 'proactive message'} to ${sender} (${responseText.length} chars${responseData.files ? `, ${responseData.files.length} file(s)` : ''})`); - if (pending) pendingMessages.delete(messageId); + if (pending) { + pendingMessages.delete(messageId); + savePendingMessages(); + } fs.unlinkSync(filePath); } else { log('WARN', `No pending message for ${messageId} and no valid senderId, cleaning up`); @@ -557,11 +749,18 @@ setInterval(checkOutgoingQueue, 1000); // Refresh typing indicator every 4 seconds for pending messages setInterval(() => { - for (const [, data] of pendingMessages.entries()) { - bot.sendChatAction(data.chatId, 'typing').catch(() => { - // Ignore typing errors silently + void (async () => { + await cleanupPendingMessages({ + maxAgeMs: PENDING_MAX_AGE_MS, + reason: 'expired', }); - } + + for (const [, data] of pendingMessages.entries()) { + bot.sendChatAction(data.chatId, 'typing').catch(() => { + // Ignore typing errors silently + }); + } + })(); }, 4000); // Handle polling errors @@ -572,12 +771,14 @@ bot.on('polling_error', (error: Error) => { // Graceful shutdown process.on('SIGINT', () => { log('INFO', 'Shutting down Telegram client...'); + savePendingMessages(); bot.stopPolling(); process.exit(0); }); process.on('SIGTERM', () => { log('INFO', 'Shutting down Telegram client...'); + savePendingMessages(); bot.stopPolling(); process.exit(0); }); diff --git a/src/lib/invoke.ts b/src/lib/invoke.ts index 471eaf2..aeb6029 100644 --- a/src/lib/invoke.ts +++ b/src/lib/invoke.ts @@ -6,7 +6,16 @@ import { SCRIPT_DIR, resolveClaudeModel, resolveCodexModel } from './config'; import { log } from './logging'; import { ensureAgentDirectory, updateAgentTeammates } from './agent-setup'; -export async function runCommand(command: string, args: string[], cwd?: string): Promise { +export interface RunCommandOptions { + onStdoutChunk?: (chunk: string) => void; +} + +export async function runCommand( + command: string, + args: string[], + cwd?: string, + options?: RunCommandOptions, +): Promise { return new Promise((resolve, reject) => { const child = spawn(command, args, { cwd: cwd || SCRIPT_DIR, @@ -21,6 +30,13 @@ export async function runCommand(command: string, args: string[], cwd?: string): child.stdout.on('data', (chunk: string) => { stdout += chunk; + if (options?.onStdoutChunk) { + try { + options.onStdoutChunk(chunk); + } catch { + // Never let streaming callbacks break command execution. + } + } }); child.stderr.on('data', (chunk: string) => { @@ -54,7 +70,8 @@ export async function invokeAgent( workspacePath: string, shouldReset: boolean, agents: Record = {}, - teams: Record = {} + teams: Record = {}, + onPartialResponse?: (text: string) => void ): Promise { // Ensure agent directory exists with config files const agentDir = path.join(workspacePath, agentId); @@ -130,8 +147,89 @@ export async function invokeAgent( if (continueConversation) { claudeArgs.push('-c'); } + + // Stream JSON is required for token-level partial updates. + const useStreamJson = typeof onPartialResponse === 'function'; + if (useStreamJson) { + claudeArgs.push( + '--output-format', 'stream-json', + '--verbose', + '--include-partial-messages', + ); + } + claudeArgs.push('-p', message); - return await runCommand('claude', claudeArgs, workingDir); + if (!useStreamJson) { + return await runCommand('claude', claudeArgs, workingDir); + } + + let streamBuffer = ''; + let partialText = ''; + let finalTextFromStream = ''; + + const processStreamLine = (line: string): void => { + const trimmed = line.trim(); + if (!trimmed || trimmed[0] !== '{') { + return; + } + + try { + const json = JSON.parse(trimmed); + + if (json?.type === 'stream_event') { + const event = json.event; + if (event?.type === 'content_block_delta' && event?.delta?.type === 'text_delta') { + const delta = event.delta.text; + if (typeof delta === 'string' && delta.length > 0) { + partialText += delta; + onPartialResponse?.(partialText); + } + } + return; + } + + if (json?.type === 'assistant' && Array.isArray(json?.message?.content)) { + const content = json.message.content + .filter((c: any) => c?.type === 'text' && typeof c?.text === 'string') + .map((c: any) => c.text) + .join(''); + if (content) { + finalTextFromStream = content; + } + return; + } + + if (json?.type === 'result' && typeof json?.result === 'string') { + finalTextFromStream = json.result; + } + } catch { + // Ignore non-JSON/non-stream lines. + } + }; + + const rawOutput = await runCommand( + 'claude', + claudeArgs, + workingDir, + { + onStdoutChunk: (chunk: string) => { + streamBuffer += chunk; + let newlineIndex = streamBuffer.indexOf('\n'); + while (newlineIndex !== -1) { + const line = streamBuffer.slice(0, newlineIndex); + streamBuffer = streamBuffer.slice(newlineIndex + 1); + processStreamLine(line); + newlineIndex = streamBuffer.indexOf('\n'); + } + }, + }, + ); + + if (streamBuffer.trim().length > 0) { + processStreamLine(streamBuffer); + } + + return (finalTextFromStream || partialText || rawOutput).trim(); } } diff --git a/src/queue-processor.ts b/src/queue-processor.ts index e89b989..9589f6a 100644 --- a/src/queue-processor.ts +++ b/src/queue-processor.ts @@ -41,6 +41,8 @@ const conversations = new Map(); const MAX_CONVERSATION_MESSAGES = 50; const LONG_RESPONSE_THRESHOLD = 4000; +const PARTIAL_STREAM_MIN_INTERVAL_MS = 1200; +const PARTIAL_STREAM_MIN_DELTA_CHARS = 80; /** * If a response exceeds the threshold, save full text as a .md file @@ -117,6 +119,61 @@ function collectFiles(response: string, fileSet: Set): void { } } +function createTelegramPartialEmitter(params: { + channel: string; + sender: string; + rawMessage: string; + messageId: string; + agentId: string; +}): (text: string) => void { + let lastSent = ''; + let lastSentAt = 0; + + const sanitizeForPartial = (text: string): string => { + const questionTagIndex = text.indexOf('[QUESTION]'); + const visible = questionTagIndex >= 0 ? text.slice(0, questionTagIndex) : text; + return visible.trimEnd(); + }; + + return (text: string): void => { + const sanitized = sanitizeForPartial(text); + if (!sanitized || sanitized === lastSent) { + return; + } + + // If stream restarted with shorter text, reset throttling baseline. + if (sanitized.length < lastSent.length) { + lastSent = ''; + lastSentAt = 0; + } + + const now = Date.now(); + const grewBy = sanitized.length - lastSent.length; + const tooSoon = now - lastSentAt < PARTIAL_STREAM_MIN_INTERVAL_MS; + if (tooSoon && grewBy < PARTIAL_STREAM_MIN_DELTA_CHARS) { + return; + } + + const partialResponse: ResponseData = { + channel: params.channel, + sender: params.sender, + message: sanitized, + originalMessage: params.rawMessage, + timestamp: now, + messageId: `partial_${params.messageId}_r0`, + agent: params.agentId, + }; + + const partialFile = path.join( + QUEUE_OUTGOING, + `${params.channel}_partial_${params.messageId}_live_${now}.json`, + ); + fs.writeFileSync(partialFile, JSON.stringify(partialResponse, null, 2)); + lastSent = sanitized; + lastSentAt = now; + }; +} + /** * Complete a conversation: aggregate responses, write to outgoing queue, save chat history. */ @@ -341,10 +398,22 @@ async function processMessage(messageFile: string): Promise { } // Invoke agent + const livePartialEmitter = channel === 'telegram' && !teamContext + ? createTelegramPartialEmitter({ channel, sender, rawMessage, messageId, agentId }) + : null; emitEvent('chain_step_start', { agentId, agentName: agent.name, fromAgent: messageData.fromAgent || null }); let response: string; try { - response = await invokeAgent(agent, agentId, message, workspacePath, shouldReset, agents, teams); + response = await invokeAgent( + agent, + agentId, + message, + workspacePath, + shouldReset, + agents, + teams, + livePartialEmitter ? (partial: string) => livePartialEmitter(partial) : undefined, + ); } catch (error) { const provider = agent.provider || 'anthropic'; log('ERROR', `${provider === 'openai' ? 'Codex' : 'Claude'} error (agent: ${agentId}): ${(error as Error).message}`);