From c09142aa04d003538f8b4f5428737b06d3319b84 Mon Sep 17 00:00:00 2001 From: Salem Date: Sun, 15 Feb 2026 09:30:20 +0000 Subject: [PATCH] feat: add interactive question bridge and agent reliability improvements MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add question-bridge module for Claude's AskUserQuestion → Telegram inline keyboard flow with poll-based answer retrieval - Add configurable timeout (max_response_time) to agent invocations with SIGTERM→SIGKILL escalation to prevent hung sessions - Strip CLAUDECODE env var from spawned processes to avoid nested session errors - Deliver partial text before [QUESTION] tags as separate messages - Add heartbeat handler to refresh pending message TTL during long processing - Add proactive/agent-initiated messaging via senderId for direct delivery - Improve error handling with 4xx permanent error detection in telegram client Co-Authored-By: Claude Opus 4.6 --- src/channels/telegram-client.ts | 500 +++++++++++++++++++++++++++++--- src/lib/config.ts | 2 + src/lib/invoke.ts | 128 +++++++- src/lib/question-bridge.ts | 179 ++++++++++++ src/lib/types.ts | 27 ++ src/queue-processor.ts | 101 ++++++- 6 files changed, 882 insertions(+), 55 deletions(-) create mode 100644 src/lib/question-bridge.ts diff --git a/src/channels/telegram-client.ts b/src/channels/telegram-client.ts index 27067e9..696c41e 100644 --- a/src/channels/telegram-client.ts +++ b/src/channels/telegram-client.ts @@ -14,6 +14,8 @@ import path from 'path'; import https from 'https'; import http from 'http'; import { ensureSenderPaired } from '../lib/pairing'; +import { QuestionData } from '../lib/types'; +import { writeAnswer } from '../lib/question-bridge'; const SCRIPT_DIR = path.resolve(__dirname, '..', '..'); const _localTinyclaw = path.join(SCRIPT_DIR, '.tinyclaw'); @@ -26,9 +28,11 @@ const LOG_FILE = path.join(TINYCLAW_HOME, 'logs/telegram.log'); const SETTINGS_FILE = path.join(TINYCLAW_HOME, 'settings.json'); const FILES_DIR = path.join(TINYCLAW_HOME, 'files'); const PAIRING_FILE = path.join(TINYCLAW_HOME, 'pairing.json'); +const QUEUE_QUESTIONS = path.join(TINYCLAW_HOME, 'queue/questions'); +const QUEUE_ANSWERS = path.join(TINYCLAW_HOME, 'queue/answers'); // Ensure directories exist -[QUEUE_INCOMING, QUEUE_OUTGOING, path.dirname(LOG_FILE), FILES_DIR].forEach(dir => { +[QUEUE_INCOMING, QUEUE_OUTGOING, QUEUE_QUESTIONS, QUEUE_ANSWERS, path.dirname(LOG_FILE), FILES_DIR].forEach(dir => { if (!fs.existsSync(dir)) { fs.mkdirSync(dir, { recursive: true }); } @@ -93,10 +97,50 @@ function buildUniqueFilePath(dir: string, preferredName: string): string { return candidate; } -// Track pending messages (waiting for response) +// Track pending messages (waiting for response) — persisted to survive restarts +const PENDING_FILE = path.join(TINYCLAW_HOME, 'queue', 'pending-telegram.json'); const pendingMessages = new Map(); let processingOutgoingQueue = false; +function loadPendingMessages(): void { + try { + if (fs.existsSync(PENDING_FILE)) { + const data: Record = JSON.parse(fs.readFileSync(PENDING_FILE, 'utf8')); + const tenMinutesAgo = Date.now() - (10 * 60 * 1000); + for (const [id, msg] of Object.entries(data)) { + if (msg.timestamp >= tenMinutesAgo) { + pendingMessages.set(id, msg); + } + } + if (pendingMessages.size > 0) { + log('INFO', `Restored ${pendingMessages.size} pending message(s) from disk`); + } + } + } catch (error) { + log('WARN', `Failed to load pending messages: ${(error as Error).message}`); + } +} + +function savePendingMessages(): void { + try { + const obj: Record = {}; + for (const [id, msg] of pendingMessages.entries()) { + obj[id] = msg; + } + fs.writeFileSync(PENDING_FILE, JSON.stringify(obj, null, 2)); + } catch (error) { + log('WARN', `Failed to save pending messages: ${(error as Error).message}`); + } +} + +// Load persisted pending messages on startup +loadPendingMessages(); + +// Interactive question state +const pendingQuestions = new Map(); // questionId → QuestionData +const awaitingFreeText = new Map(); // chatId → questionId +const multiSelectState = new Map>(); // questionId → selected option indices + // Logger function log(level: string, message: string): void { const timestamp = new Date().toISOString(); @@ -151,6 +195,82 @@ function getAgentListText(): string { } } +// Convert GitHub-flavored Markdown to Telegram HTML +// Telegram HTML supports: , , , , ,
, , 
, +//
, +// GFM uses **bold**, *italic*, ~~strikethrough~~, # headers, > blockquotes, etc. +function gfmToTelegram(text: string): string { + let result = text; + + // Strip decorative ★ lines and Unicode box-drawing lines BEFORE backtick conversion + // (these rely on backtick delimiters which get consumed by conversion) + result = result.replace(/^`?★[^`]*`?$/gm, ''); + result = result.replace(/^`?[─═╌╍┄┅┈┉╴╶]+`?$/gm, ''); + + // Escape HTML entities first (before we add our own tags) + result = result.replace(/&/g, '&'); + result = result.replace(//g, '>'); + + // Now convert GFM syntax to Telegram HTML tags + + // Code blocks: ```lang\ncode\n``` →
code
+ // Must happen before inline conversions to protect code content + result = result.replace(/```[a-zA-Z]*\n([\s\S]*?)```/g, '
$1
'); + result = result.replace(/```([\s\S]*?)```/g, '
$1
'); + + // Inline code: `code` → code + result = result.replace(/`([^`\n]+)`/g, '$1'); + + // Headers: # text → text (with a line break after for spacing) + result = result.replace(/^#{1,6}\s+(.+)$/gm, '$1'); + + // Bold+italic: ***text*** → text + result = result.replace(/\*\*\*(.+?)\*\*\*/g, '$1'); + + // Bold: **text** → text + result = result.replace(/\*\*(.+?)\*\*/g, '$1'); + + // Italic: *text* → text (but not inside words like file*name) + result = result.replace(/(?$1
'); + + // Strikethrough: ~~text~~ → text + result = result.replace(/~~(.+?)~~/g, '$1'); + + // Links: [text](url) →
text + result = result.replace(/\[([^\]]+)\]\(([^)]+)\)/g, '$1'); + + // Blockquotes: lines starting with > (we escaped > earlier) + // Collect consecutive blockquote lines into a single
block + result = result.replace(/(^>\s?.*$(\n|$))+/gm, (match) => { + const inner = match + .split('\n') + .map(line => line.replace(/^>\s?/, '')) + .join('\n') + .trim(); + return `
${inner}
\n`; + }); + + // Remove horizontal rules (--- or ***) + result = result.replace(/^[-*_]{3,}\s*$/gm, ''); + + // Remove table formatting — convert to simple lines + // Remove table separator rows (|---|---|) + result = result.replace(/^\|[-:\s|]+\|$/gm, ''); + // Convert table rows: | cell | cell | → cell — cell + result = result.replace(/^\|(.+)\|$/gm, (_match, content: string) => { + return content.split('|').map((c: string) => c.trim()).filter(Boolean).join(' — '); + }); + + // Remove image syntax ![alt](url) — keep just the alt text + result = result.replace(/!\[([^\]]*)\]\([^)]+\)/g, '$1'); + + // Collapse multiple blank lines into max 2 + result = result.replace(/\n{3,}/g, '\n\n'); + + return result.trim(); +} + // Split long messages for Telegram's 4096 char limit function splitMessage(text: string, maxLength = 4096): string[] { if (text.length <= maxLength) { @@ -258,9 +378,17 @@ function pairingMessage(code: string): string { // Initialize Telegram bot (polling mode) const bot = new TelegramBot(TELEGRAM_BOT_TOKEN, { polling: true }); -// Bot ready -bot.getMe().then((me: TelegramBot.User) => { +// Bot ready — register commands and start listening +bot.getMe().then(async (me: TelegramBot.User) => { log('INFO', `Telegram bot connected as @${me.username}`); + + // Register bot commands so they appear in Telegram's "/" menu + await bot.setMyCommands([ + { command: 'agent', description: 'List available agents' }, + { command: 'team', description: 'List available teams' }, + { command: 'reset', description: 'Reset conversation history' }, + ]).catch((err: Error) => log('WARN', `Failed to register commands: ${err.message}`)); + log('INFO', 'Listening for messages...'); }).catch((err: Error) => { log('ERROR', `Failed to connect: ${err.message}`); @@ -275,6 +403,25 @@ bot.on('message', async (msg: TelegramBot.Message) => { return; } + // Intercept free-text answers for "Other" option in interactive questions + const pendingQuestionId = awaitingFreeText.get(msg.chat.id); + if (pendingQuestionId && msg.text) { + awaitingFreeText.delete(msg.chat.id); + const written = writeAnswer(pendingQuestionId, msg.text); + if (written) { + await bot.sendMessage(msg.chat.id, `Got it: "${msg.text}"`, { + reply_to_message_id: msg.message_id, + }); + log('INFO', `Free-text answer for question ${pendingQuestionId}: ${msg.text.substring(0, 80)}`); + } else { + await bot.sendMessage(msg.chat.id, 'That question was already answered.', { + reply_to_message_id: msg.message_id, + }); + } + pendingQuestions.delete(pendingQuestionId); + return; // Don't queue as normal message + } + // Determine message text and any media files let messageText = msg.text || msg.caption || ''; const downloadedFiles: string[] = []; @@ -378,43 +525,20 @@ bot.on('message', async (msg: TelegramBot.Message) => { return; } - // Check for reset command: /reset @agent_id [@agent_id2 ...] - const resetMatch = messageText.trim().match(/^[!/]reset\s+(.+)$/i); + // Check for reset command if (messageText.trim().match(/^[!/]reset$/i)) { - await bot.sendMessage(msg.chat.id, 'Usage: /reset @agent_id [@agent_id2 ...]\nSpecify which agent(s) to reset.', { + log('INFO', 'Reset command received'); + + // Create reset flag (TINYCLAW_HOME = ~/.tinyclaw, not SCRIPT_DIR) + const resetFlagPath = path.join(TINYCLAW_HOME, 'reset_flag'); + fs.writeFileSync(resetFlagPath, 'reset'); + + // Reply immediately + await bot.sendMessage(msg.chat.id, 'Conversation reset! Next message will start a fresh conversation.', { reply_to_message_id: msg.message_id, }); return; } - if (resetMatch) { - log('INFO', 'Per-agent reset command received'); - const agentArgs = resetMatch[1].split(/\s+/).map(a => a.replace(/^@/, '').toLowerCase()); - try { - const settingsData = fs.readFileSync(SETTINGS_FILE, 'utf8'); - const settings = JSON.parse(settingsData); - const agents = settings.agents || {}; - const workspacePath = settings?.workspace?.path || path.join(require('os').homedir(), 'tinyclaw-workspace'); - const resetResults: string[] = []; - for (const agentId of agentArgs) { - if (!agents[agentId]) { - resetResults.push(`Agent '${agentId}' not found.`); - continue; - } - const flagDir = path.join(workspacePath, agentId); - if (!fs.existsSync(flagDir)) fs.mkdirSync(flagDir, { recursive: true }); - fs.writeFileSync(path.join(flagDir, 'reset_flag'), 'reset'); - resetResults.push(`Reset @${agentId} (${agents[agentId].name}).`); - } - await bot.sendMessage(msg.chat.id, resetResults.join('\n'), { - reply_to_message_id: msg.message_id, - }); - } catch { - await bot.sendMessage(msg.chat.id, 'Could not process reset command. Check settings.', { - reply_to_message_id: msg.message_id, - }); - } - return; - } // Show typing indicator await bot.sendChatAction(msg.chat.id, 'typing'); @@ -457,6 +581,9 @@ bot.on('message', async (msg: TelegramBot.Message) => { } } + // Persist to disk so responses survive restarts + savePendingMessages(); + } catch (error) { log('ERROR', `Message handling error: ${(error as Error).message}`); } @@ -481,6 +608,49 @@ async function checkOutgoingQueue(): Promise { const responseData: ResponseData = JSON.parse(fs.readFileSync(filePath, 'utf8')); const { messageId, message: responseText, sender } = responseData; + // Handle heartbeat files — refresh pending TTL, don't send to user + if (messageId.startsWith('heartbeat_')) { + const realMessageId = messageId.replace('heartbeat_', ''); + const pending = pendingMessages.get(realMessageId); + if (pending) { + pending.timestamp = Date.now(); + savePendingMessages(); + log('INFO', `Refreshed pending TTL for ${realMessageId}`); + } + fs.unlinkSync(filePath); + continue; + } + + // Handle partial responses — send text but keep pending (question is coming next) + if (messageId.startsWith('partial_')) { + const realMessageId = messageId.replace(/^partial_/, '').replace(/_r\d+$/, ''); + const pending = pendingMessages.get(realMessageId); + if (pending && responseData.message) { + const chunks = splitMessage(responseData.message); + const sendFormatted = async (chatId: number, text: string, opts?: Record) => { + const converted = gfmToTelegram(text); + try { + await bot.sendMessage(chatId, converted, { ...opts, parse_mode: 'HTML' }); + } catch (err: any) { + if (err?.response?.body?.description?.includes("can't parse")) { + const stripped = converted.replace(/<[^>]+>/g, '').replace(/&/g, '&').replace(/</g, '<').replace(/>/g, '>').replace(/"/g, '"').trim(); + await bot.sendMessage(chatId, stripped || text, opts || {}); + } else { + throw err; + } + } + }; + for (const chunk of chunks) { + await sendFormatted(pending.chatId, chunk); + } + pending.timestamp = Date.now(); + savePendingMessages(); + log('INFO', `Sent partial response (${responseData.message.length} chars) for ${realMessageId}`); + } + fs.unlinkSync(filePath); + continue; + } + // Find pending message const pending = pendingMessages.get(messageId); if (pending) { @@ -490,7 +660,9 @@ async function checkOutgoingQueue(): Promise { try { if (!fs.existsSync(file)) continue; const ext = path.extname(file).toLowerCase(); - if (['.jpg', '.jpeg', '.png', '.gif', '.webp'].includes(ext)) { + if (ext === '.gif') { + await bot.sendAnimation(pending.chatId, file); + } else if (['.jpg', '.jpeg', '.png', '.webp'].includes(ext)) { await bot.sendPhoto(pending.chatId, file); } else if (['.mp3', '.ogg', '.wav', '.m4a'].includes(ext)) { await bot.sendAudio(pending.chatId, file); @@ -510,21 +682,57 @@ async function checkOutgoingQueue(): Promise { if (responseText) { const chunks = splitMessage(responseText); + // Send with HTML parse_mode, fallback to plain text (tags stripped) if parsing fails + const sendFormatted = async (chatId: number, text: string, opts?: Record) => { + const converted = gfmToTelegram(text); + try { + await bot.sendMessage(chatId, converted, { ...opts, parse_mode: 'HTML' }); + } catch (err: any) { + // If HTML parsing fails, strip tags and send as plain text + if (err?.response?.body?.description?.includes("can't parse")) { + log('WARN', `HTML parse failed, sending as plain text`); + const stripped = converted + .replace(/<[^>]+>/g, '') + .replace(/&/g, '&') + .replace(/</g, '<') + .replace(/>/g, '>') + .replace(/"/g, '"') + .trim(); + if (!stripped) { + // Tag-stripped result is empty — send original unconverted text + log('WARN', `Stripped text empty, sending original unconverted text`); + await bot.sendMessage(chatId, text, opts || {}); + return; + } + await bot.sendMessage(chatId, stripped, opts || {}); + } else { + throw err; + } + } + }; + // First chunk as reply, rest as follow-up messages if (chunks.length > 0) { - await bot.sendMessage(pending.chatId, chunks[0]!, { + await sendFormatted(pending.chatId, chunks[0]!, { reply_to_message_id: pending.messageId, }); } for (let i = 1; i < chunks.length; i++) { - await bot.sendMessage(pending.chatId, chunks[i]!); + await sendFormatted(pending.chatId, chunks[i]!); } + } else { + // Agent returned empty response — notify user instead of silent drop + log('WARN', `Empty response for message ${messageId}, sending fallback`); + await bot.sendMessage(pending.chatId, '⏳ Response was empty — the agent may still be processing. Try again.', { + reply_to_message_id: pending.messageId, + }); } log('INFO', `Sent response to ${sender} (${responseText.length} chars${responseData.files ? `, ${responseData.files.length} file(s)` : ''})`); // Clean up pendingMessages.delete(messageId); + savePendingMessages(); fs.unlinkSync(filePath); } else if (responseData.senderId) { // Proactive/agent-initiated message — send directly to user @@ -566,9 +774,14 @@ async function checkOutgoingQueue(): Promise { log('WARN', `No pending message for ${messageId} and no senderId, cleaning up`); fs.unlinkSync(filePath); } - } catch (error) { + } catch (error: any) { + const statusCode = error?.response?.statusCode; log('ERROR', `Error processing response file ${file}: ${(error as Error).message}`); - // Don't delete file on error, might retry + // Delete on permanent client errors (4xx) to prevent infinite retry loops + if (statusCode && statusCode >= 400 && statusCode < 500) { + log('WARN', `Permanent error (${statusCode}), removing response file ${file}`); + try { fs.unlinkSync(filePath); } catch {} + } } } } catch (error) { @@ -581,6 +794,211 @@ async function checkOutgoingQueue(): Promise { // Check outgoing queue every second setInterval(checkOutgoingQueue, 1000); +// ─── Interactive Questions: Inline Keyboard ──────────────────────────────── + +/** + * Build a Telegram InlineKeyboardMarkup from question options. + * Each button's callback_data: "q::" + * "Other" button triggers free-text mode. + */ +function buildQuestionKeyboard(questionId: string, options: { label: string; description?: string }[], multiSelect: boolean, selected?: Set): TelegramBot.InlineKeyboardMarkup { + const rows: TelegramBot.InlineKeyboardButton[][] = []; + + for (let i = 0; i < options.length; i++) { + const opt = options[i]; + const prefix = multiSelect && selected?.has(i) ? '✓ ' : ''; + rows.push([{ + text: `${prefix}${opt.label}`, + callback_data: `q:${questionId}:${i}`, + }]); + } + + if (multiSelect) { + rows.push([{ + text: '✅ Done', + callback_data: `q:${questionId}:done`, + }]); + } + + rows.push([{ + text: '💬 Other (type answer)', + callback_data: `q:${questionId}:other`, + }]); + + return { inline_keyboard: rows }; +} + +// Watch questions queue for Telegram questions +async function checkQuestionsQueue(): Promise { + try { + if (!fs.existsSync(QUEUE_QUESTIONS)) return; + + const files = fs.readdirSync(QUEUE_QUESTIONS) + .filter(f => f.startsWith('telegram_') && f.endsWith('.json')); + + for (const file of files) { + const filePath = path.join(QUEUE_QUESTIONS, file); + try { + const question: QuestionData = JSON.parse(fs.readFileSync(filePath, 'utf8')); + + // Delete BEFORE sending to prevent duplicate delivery on next poll tick + try { fs.unlinkSync(filePath); } catch {} + + // Build inline keyboard + const keyboard = buildQuestionKeyboard(question.questionId, question.options, question.multiSelect); + + // Format question text with option descriptions + let text = `❓ ${question.question}`; + const descriptions = question.options + .filter(o => o.description) + .map(o => `• ${o.label} — ${o.description}`); + if (descriptions.length > 0) { + text += '\n\n' + descriptions.join('\n'); + } + + // Send to Telegram with inline keyboard + await bot.sendMessage(question.chatId, text, { + parse_mode: 'HTML', + reply_markup: keyboard, + }); + + // Store for callback resolution + pendingQuestions.set(question.questionId, question); + if (question.multiSelect) { + multiSelectState.set(question.questionId, new Set()); + } + + log('INFO', `Sent question ${question.questionId} to chat ${question.chatId}`); + + } catch (error) { + log('ERROR', `Error processing question file ${file}: ${(error as Error).message}`); + // Delete malformed files to prevent infinite retry + try { fs.unlinkSync(filePath); } catch {} + } + } + } catch (error) { + log('ERROR', `Questions queue error: ${(error as Error).message}`); + } +} + +// Poll questions queue every 500ms +setInterval(checkQuestionsQueue, 500); + +// Handle inline keyboard button presses +bot.on('callback_query', async (query: TelegramBot.CallbackQuery) => { + try { + const data = query.data; + if (!data || !data.startsWith('q:')) { + await bot.answerCallbackQuery(query.id); + return; + } + + // Parse callback_data: "q::" + const parts = data.split(':'); + if (parts.length < 3) { + await bot.answerCallbackQuery(query.id, { text: 'Invalid selection' }); + return; + } + + const questionId = parts[1]; + const action = parts.slice(2).join(':'); // Rejoin in case questionId contains colons + const question = pendingQuestions.get(questionId); + + if (!question) { + await bot.answerCallbackQuery(query.id, { text: 'This question has expired.' }); + return; + } + + // Handle "Other" — enter free-text mode + if (action === 'other') { + awaitingFreeText.set(question.chatId, questionId); + await bot.answerCallbackQuery(query.id); + await bot.sendMessage(question.chatId, 'Type your answer:'); + return; + } + + // Handle multiSelect "Done" — submit selected options + if (action === 'done' && question.multiSelect) { + const selected = multiSelectState.get(questionId); + if (!selected || selected.size === 0) { + await bot.answerCallbackQuery(query.id, { text: 'Please select at least one option.' }); + return; + } + + const selectedLabels = Array.from(selected) + .sort((a, b) => a - b) + .map(i => question.options[i]?.label) + .filter(Boolean) + .join(', '); + + const written = writeAnswer(questionId, selectedLabels); + await bot.answerCallbackQuery(query.id, { text: written ? 'Submitted!' : 'Already answered.' }); + + // Edit message to show selection (only if write succeeded) + if (written && query.message) { + await bot.editMessageText( + `${question.question}\n\n✅ Selected: ${selectedLabels}`, + { chat_id: query.message.chat.id, message_id: query.message.message_id } + ).catch(() => {}); + } + + pendingQuestions.delete(questionId); + multiSelectState.delete(questionId); + return; + } + + // Handle option selection + const optionIndex = parseInt(action, 10); + if (isNaN(optionIndex) || optionIndex < 0 || optionIndex >= question.options.length) { + await bot.answerCallbackQuery(query.id, { text: 'Invalid option.' }); + return; + } + + if (question.multiSelect) { + // Toggle selection + const selected = multiSelectState.get(questionId) || new Set(); + if (selected.has(optionIndex)) { + selected.delete(optionIndex); + } else { + selected.add(optionIndex); + } + multiSelectState.set(questionId, selected); + + // Update keyboard to show check marks + const keyboard = buildQuestionKeyboard(questionId, question.options, true, selected); + if (query.message) { + await bot.editMessageReplyMarkup(keyboard, { + chat_id: query.message.chat.id, + message_id: query.message.message_id, + }).catch(() => {}); + } + + const label = question.options[optionIndex].label; + await bot.answerCallbackQuery(query.id, { + text: selected.has(optionIndex) ? `Selected: ${label}` : `Deselected: ${label}`, + }); + } else { + // Single select — submit immediately + const selectedLabel = question.options[optionIndex].label; + const written = writeAnswer(questionId, selectedLabel); + await bot.answerCallbackQuery(query.id, { text: written ? `Selected: ${selectedLabel}` : 'Already answered.' }); + + // Edit message to show selection (only if write succeeded) + if (written && query.message) { + await bot.editMessageText( + `${question.question}\n\n✅ ${selectedLabel}`, + { chat_id: query.message.chat.id, message_id: query.message.message_id } + ).catch(() => {}); + } + + pendingQuestions.delete(questionId); + } + } catch (error) { + log('ERROR', `Callback query error: ${(error as Error).message}`); + try { await bot.answerCallbackQuery(query.id); } catch {} + } +}); + // Refresh typing indicator every 4 seconds for pending messages setInterval(() => { for (const [, data] of pendingMessages.entries()) { diff --git a/src/lib/config.ts b/src/lib/config.ts index ef3d356..715eb7c 100644 --- a/src/lib/config.ts +++ b/src/lib/config.ts @@ -15,6 +15,8 @@ export const SETTINGS_FILE = path.join(TINYCLAW_HOME, 'settings.json'); export const EVENTS_DIR = path.join(TINYCLAW_HOME, 'events'); export const CHATS_DIR = path.join(TINYCLAW_HOME, 'chats'); export const FILES_DIR = path.join(TINYCLAW_HOME, 'files'); +export const QUEUE_QUESTIONS = path.join(TINYCLAW_HOME, 'queue/questions'); +export const QUEUE_ANSWERS = path.join(TINYCLAW_HOME, 'queue/answers'); export function getSettings(): Settings { try { diff --git a/src/lib/invoke.ts b/src/lib/invoke.ts index 471eaf2..e8682dd 100644 --- a/src/lib/invoke.ts +++ b/src/lib/invoke.ts @@ -1,20 +1,54 @@ import { spawn } from 'child_process'; import fs from 'fs'; import path from 'path'; -import { AgentConfig, TeamConfig } from './types'; -import { SCRIPT_DIR, resolveClaudeModel, resolveCodexModel } from './config'; +import { AgentConfig, TeamConfig, InvokeResult } from './types'; +import { SCRIPT_DIR, resolveClaudeModel, resolveCodexModel, getSettings } from './config'; +import { QUESTION_PROMPT } from './question-bridge'; import { log } from './logging'; import { ensureAgentDirectory, updateAgentTeammates } from './agent-setup'; -export async function runCommand(command: string, args: string[], cwd?: string): Promise { +export interface CommandResult { + stdout: string; + stderr: string; +} + +/** + * Run a command with an optional timeout (in ms). + * If the timeout fires, the child is killed (SIGTERM → SIGKILL) and the + * promise rejects with a descriptive error. + */ +export async function runCommand(command: string, args: string[], cwd?: string, timeoutMs?: number): Promise { return new Promise((resolve, reject) => { + // Strip CLAUDECODE env var so spawned Claude sessions don't think they're nested + const env = { ...process.env }; + delete env.CLAUDECODE; + const child = spawn(command, args, { cwd: cwd || SCRIPT_DIR, stdio: ['ignore', 'pipe', 'pipe'], + env, }); let stdout = ''; let stderr = ''; + let killed = false; + let timer: NodeJS.Timeout | undefined; + let killTimer: NodeJS.Timeout | undefined; + + if (timeoutMs && timeoutMs > 0) { + timer = setTimeout(() => { + killed = true; + log('WARN', `Command timed out after ${Math.round(timeoutMs / 1000)}s — sending SIGTERM (pid ${child.pid})`); + child.kill('SIGTERM'); + // Force-kill if still alive after 5s + killTimer = setTimeout(() => { + if (!child.killed) { + log('WARN', `Force-killing command (pid ${child.pid}) with SIGKILL`); + child.kill('SIGKILL'); + } + }, 5000); + }, timeoutMs); + } child.stdout.setEncoding('utf8'); child.stderr.setEncoding('utf8'); @@ -28,12 +62,22 @@ export async function runCommand(command: string, args: string[], cwd?: string): }); child.on('error', (error) => { + if (timer) clearTimeout(timer); + if (killTimer) clearTimeout(killTimer); reject(error); }); child.on('close', (code) => { + if (timer) clearTimeout(timer); + if (killTimer) clearTimeout(killTimer); + + if (killed) { + reject(new Error(`Command timed out after ${Math.round(timeoutMs! / 1000)} seconds and was terminated`)); + return; + } + if (code === 0) { - resolve(stdout); + resolve({ stdout, stderr }); return; } @@ -45,7 +89,10 @@ export async function runCommand(command: string, args: string[], cwd?: string): /** * Invoke a single agent with a message. Contains all Claude/Codex invocation logic. - * Returns the raw response text. + * Returns the raw response text (or InvokeResult when interactive=true). + * + * @param interactive - Enable question bridge (disables AskUserQuestion, adds system prompt, captures session_id) + * @param resumeSessionId - Resume a specific session instead of using -c (for question continuation rounds) */ export async function invokeAgent( agent: AgentConfig, @@ -54,8 +101,10 @@ export async function invokeAgent( workspacePath: string, shouldReset: boolean, agents: Record = {}, - teams: Record = {} -): Promise { + teams: Record = {}, + interactive: boolean = false, + resumeSessionId?: string +): Promise { // Ensure agent directory exists with config files const agentDir = path.join(workspacePath, agentId); const isNewAgent = !fs.existsSync(agentDir); @@ -76,6 +125,11 @@ export async function invokeAgent( const provider = agent.provider || 'anthropic'; + // Read timeout from settings (default: 3 minutes) + const settings = getSettings(); + const maxResponseTimeSec = settings?.monitoring?.max_response_time ?? 180; + const timeoutMs = maxResponseTimeSec * 1000; + if (provider === 'openai') { log('INFO', `Using Codex CLI (agent: ${agentId})`); @@ -95,7 +149,7 @@ export async function invokeAgent( } codexArgs.push('--skip-git-repo-check', '--dangerously-bypass-approvals-and-sandbox', '--json', message); - const codexOutput = await runCommand('codex', codexArgs, workingDir); + const { stdout: codexOutput } = await runCommand('codex', codexArgs, workingDir, timeoutMs); // Parse JSONL output and extract final agent_message let response = ''; @@ -111,7 +165,7 @@ export async function invokeAgent( } } - return response || 'Sorry, I could not generate a response from Codex.'; + return { response: response || 'Sorry, I could not generate a response from Codex.' }; } else { // Default to Claude (Anthropic) log('INFO', `Using Claude provider (agent: ${agentId})`); @@ -127,11 +181,63 @@ export async function invokeAgent( if (modelId) { claudeArgs.push('--model', modelId); } - if (continueConversation) { + + // Interactive mode: disable AskUserQuestion, inject question system prompt + if (interactive) { + claudeArgs.push( + '--disallowed-tools', 'AskUserQuestion', + '--append-system-prompt', QUESTION_PROMPT + ); + } + + // Session resumption: --resume takes priority over -c + if (resumeSessionId) { + claudeArgs.push('--resume', resumeSessionId); + } else if (continueConversation) { claudeArgs.push('-c'); } + + // Use JSON output when interactive (to capture session_id) + if (interactive) { + claudeArgs.push('--output-format', 'json'); + } + claudeArgs.push('-p', message); - return await runCommand('claude', claudeArgs, workingDir); + const { stdout, stderr } = await runCommand('claude', claudeArgs, workingDir, timeoutMs); + + // Parse response — JSON mode for interactive, plain text otherwise + if (interactive && stdout.trim()) { + try { + const jsonResponse = JSON.parse(stdout.trim()); + const sessionId = jsonResponse.session_id || undefined; + const responseText = jsonResponse.result || ''; + if (responseText) { + return { response: responseText, sessionId }; + } + // Fall through to stderr extraction if result is empty + } catch (e) { + // JSON parse failed — treat as plain text + log('WARN', `Failed to parse JSON output for agent ${agentId}, treating as plain text`); + return { response: stdout }; + } + } + + if (stdout.trim()) { + return { response: stdout }; + } + + // Claude CLI sometimes outputs only to stderr when performing tool actions + // (file edits, bash commands, etc.) without producing a text response on stdout. + // Extract a meaningful summary from stderr as fallback. + if (stderr.trim()) { + log('WARN', `Claude returned empty stdout for agent ${agentId}, extracting from stderr (${stderr.length} chars)`); + // stderr contains progress/status lines — return last meaningful portion + const stderrLines = stderr.trim().split('\n').filter(l => l.trim()); + const lastLines = stderrLines.slice(-10).join('\n'); + return { response: lastLines || 'I completed the requested actions but had no text response to return.' }; + } + + return { response: 'I completed the requested actions but had no text response to return.' }; } } diff --git a/src/lib/question-bridge.ts b/src/lib/question-bridge.ts new file mode 100644 index 0000000..90e4159 --- /dev/null +++ b/src/lib/question-bridge.ts @@ -0,0 +1,179 @@ +import fs from 'fs'; +import path from 'path'; +import { QuestionData, AnswerData } from './types'; +import { QUEUE_QUESTIONS, QUEUE_ANSWERS } from './config'; +import { log } from './logging'; + +// Ensure queue directories exist +[QUEUE_QUESTIONS, QUEUE_ANSWERS].forEach(dir => { + if (!fs.existsSync(dir)) { + fs.mkdirSync(dir, { recursive: true }); + } +}); + +/** + * System prompt instructing Claude to output questions in a parseable format + * instead of using the (disabled) AskUserQuestion tool. + */ +export const QUESTION_PROMPT = `MANDATORY: You are communicating through a messaging app (Telegram). You CANNOT ask questions as plain text — the user cannot type free-form answers easily. When you need to ask the user ANY clarifying question, you MUST use this structured format so the system can render interactive buttons: + +[QUESTION]{"question":"Your question here","options":[{"label":"Option 1","description":"Brief description"},{"label":"Option 2","description":"Brief description"}],"multiSelect":false}[/QUESTION] + +Rules: +- NEVER ask questions as plain text. ALWAYS use the [QUESTION] tag format above. +- Provide 2-4 concrete options the user can tap. Include an option for "other" if the list isn't exhaustive. +- Set "multiSelect":true if the user can pick more than one. +- Ask ONE question at a time. After outputting a [QUESTION] block, STOP and wait for the user's response. +- You may include a brief sentence of context before the [QUESTION] tag, but nothing after it. +- Do NOT answer the question yourself or assume what the user wants.`; + +interface ParsedQuestion { + question: string; + options: { label: string; description?: string }[]; + multiSelect: boolean; +} + +/** + * Parse [QUESTION]{json}[/QUESTION] blocks from Claude's response text. + * Treats model output as untrusted — any parse failure returns empty array. + */ +export function parseQuestions(text: string): ParsedQuestion[] { + const questions: ParsedQuestion[] = []; + const regex = /\[QUESTION\]([\s\S]*?)\[\/QUESTION\]/g; + let match: RegExpExecArray | null; + + while ((match = regex.exec(text)) !== null) { + try { + const json = JSON.parse(match[1].trim()); + + // Validate required fields + if ( + typeof json.question !== 'string' || + !Array.isArray(json.options) || + json.options.length === 0 + ) { + log('WARN', `Invalid question structure, skipping: ${match[1].substring(0, 100)}`); + continue; + } + + // Validate each option has a label + const validOptions = json.options.filter( + (o: any) => typeof o?.label === 'string' && o.label.trim().length > 0 + ); + if (validOptions.length === 0) { + log('WARN', `No valid options in question, skipping`); + continue; + } + + questions.push({ + question: json.question, + options: validOptions.map((o: any) => ({ + label: o.label, + description: typeof o.description === 'string' ? o.description : undefined, + })), + multiSelect: !!json.multiSelect, + }); + } catch (e) { + log('WARN', `Failed to parse question JSON: ${(e as Error).message}`); + // Continue — other questions in the response might be valid + } + } + + return questions; +} + +/** + * Strip [QUESTION]...[/QUESTION] tags from response text, + * preserving any text before/between tags. + */ +export function stripQuestionTags(text: string): string { + return text.replace(/\[QUESTION\][\s\S]*?\[\/QUESTION\]/g, '').trim(); +} + +/** + * Write a question to the questions queue for the channel client to pick up. + */ +export function emitQuestion(question: QuestionData): void { + const filename = `${question.channel}_${question.questionId}.json`; + const filepath = path.join(QUEUE_QUESTIONS, filename); + fs.writeFileSync(filepath, JSON.stringify(question, null, 2)); + log('INFO', `Emitted question ${question.questionId} to ${filename}`); +} + +/** + * Poll for an answer file in the answers queue. + * Returns the answer data or null on timeout. + * + * @param questionId - The question ID to wait for + * @param timeoutMs - Max wait time (default 5 minutes) + * @param intervalMs - Poll interval (default 500ms) + */ +export function pollForAnswer( + questionId: string, + timeoutMs: number = 5 * 60 * 1000, + intervalMs: number = 500 +): Promise { + const answerFile = path.join(QUEUE_ANSWERS, `answer_${questionId}.json`); + const deadline = Date.now() + timeoutMs; + + return new Promise((resolve) => { + const check = () => { + if (Date.now() > deadline) { + log('WARN', `Answer poll timed out for question ${questionId}`); + resolve(null); + return; + } + + if (fs.existsSync(answerFile)) { + try { + const data: AnswerData = JSON.parse(fs.readFileSync(answerFile, 'utf8')); + // Clean up the answer file after reading + try { fs.unlinkSync(answerFile); } catch {} + log('INFO', `Received answer for question ${questionId}: ${data.answer}`); + resolve(data); + return; + } catch (e) { + // Malformed answer file — delete to prevent infinite retry + try { fs.unlinkSync(answerFile); } catch {} + log('WARN', `Malformed answer file for ${questionId}, deleted: ${(e as Error).message}`); + resolve(null); + return; + } + } + + setTimeout(check, intervalMs); + }; + + check(); + }); +} + +/** + * Write an answer file with exclusive-create semantics (wx flag). + * Returns true if written, false if answer already exists (duplicate tap). + */ +export function writeAnswer(questionId: string, answer: string): boolean { + const answerFile = path.join(QUEUE_ANSWERS, `answer_${questionId}.json`); + + // Ensure directory exists + if (!fs.existsSync(QUEUE_ANSWERS)) { + fs.mkdirSync(QUEUE_ANSWERS, { recursive: true }); + } + + const data: AnswerData = { + questionId, + answer, + answeredAt: Date.now(), + }; + + try { + fs.writeFileSync(answerFile, JSON.stringify(data, null, 2), { flag: 'wx' }); + return true; + } catch (e: any) { + if (e.code === 'EEXIST') { + log('WARN', `Answer already exists for question ${questionId} (duplicate tap)`); + return false; + } + throw e; + } +} diff --git a/src/lib/types.ts b/src/lib/types.ts index f0b2e40..5714b4c 100644 --- a/src/lib/types.ts +++ b/src/lib/types.ts @@ -40,6 +40,7 @@ export interface Settings { teams?: Record; monitoring?: { heartbeat_interval?: number; + max_response_time?: number; // seconds — max time a single agent invocation can run (default: 180) }; } @@ -91,6 +92,32 @@ export interface QueueFile { time: number; } +// Interactive question/answer types (for AskUserQuestion bridge) +export interface QuestionData { + questionId: string; // unique ID for correlation + messageId: string; // original user message ID + agentId: string; + channel: string; + chatId: number; // Telegram chat ID (from pendingMessages) + question: string; + options: { label: string; description?: string }[]; + multiSelect: boolean; + timestamp: number; + expiresAt: number; +} + +export interface AnswerData { + questionId: string; + answer: string; // selected label(s) or free text + answeredAt: number; +} + +// Invoke result with optional session ID for multi-turn question loops +export interface InvokeResult { + response: string; + sessionId?: string; // captured from --output-format json +} + // Model name mapping export const CLAUDE_MODEL_IDS: Record = { 'sonnet': 'claude-sonnet-4-5', diff --git a/src/queue-processor.ts b/src/queue-processor.ts index e89b989..23ec3d2 100644 --- a/src/queue-processor.ts +++ b/src/queue-processor.ts @@ -16,18 +16,20 @@ import fs from 'fs'; import path from 'path'; -import { MessageData, ResponseData, QueueFile, ChainStep, Conversation, TeamConfig } from './lib/types'; +import { MessageData, ResponseData, QueueFile, ChainStep, Conversation, TeamConfig, QuestionData } from './lib/types'; import { QUEUE_INCOMING, QUEUE_OUTGOING, QUEUE_PROCESSING, LOG_FILE, EVENTS_DIR, CHATS_DIR, FILES_DIR, + QUEUE_QUESTIONS, QUEUE_ANSWERS, getSettings, getAgents, getTeams } from './lib/config'; import { log, emitEvent } from './lib/logging'; import { parseAgentRouting, findTeamForAgent, getAgentResetFlag, extractTeammateMentions } from './lib/routing'; import { invokeAgent } from './lib/invoke'; +import { parseQuestions, stripQuestionTags, emitQuestion, pollForAnswer } from './lib/question-bridge'; // Ensure directories exist -[QUEUE_INCOMING, QUEUE_OUTGOING, QUEUE_PROCESSING, FILES_DIR, path.dirname(LOG_FILE)].forEach(dir => { +[QUEUE_INCOMING, QUEUE_OUTGOING, QUEUE_PROCESSING, QUEUE_QUESTIONS, QUEUE_ANSWERS, FILES_DIR, path.dirname(LOG_FILE)].forEach(dir => { if (!fs.existsSync(dir)) { fs.mkdirSync(dir, { recursive: true }); } @@ -341,10 +343,103 @@ async function processMessage(messageFile: string): Promise { } // Invoke agent + const isInteractive = !teamContext && channel === 'telegram'; // Only enable interactive questions for non-team Telegram emitEvent('chain_step_start', { agentId, agentName: agent.name, fromAgent: messageData.fromAgent || null }); let response: string; try { - response = await invokeAgent(agent, agentId, message, workspacePath, shouldReset, agents, teams); + const result = await invokeAgent(agent, agentId, message, workspacePath, shouldReset, agents, teams, isInteractive); + response = result.response; + let sessionId = result.sessionId; + + // Interactive question-answer loop (only for non-team Telegram messages) + if (isInteractive && messageData.senderId) { + if (!sessionId) { + log('WARN', `Interactive mode but no session ID captured — continuation will use -c fallback`); + } + for (let round = 0; ; round++) { + const questions = parseQuestions(response); + if (questions.length === 0) break; + + log('INFO', `Question round ${round + 1}: ${questions.length} question(s) detected`); + + // Strip [QUESTION] tags — send any preceding text as a partial message + const partialText = stripQuestionTags(response); + if (partialText) { + log('INFO', `Sending partial response before question (${partialText.length} chars)`); + const partialResponse: ResponseData = { + channel, + sender, + message: partialText, + originalMessage: rawMessage, + timestamp: Date.now(), + messageId: `partial_${messageId}_r${round}`, + agent: agentId, + }; + const partialFile = path.join(QUEUE_OUTGOING, `${channel}_partial_${messageId}_r${round}_${Date.now()}.json`); + fs.writeFileSync(partialFile, JSON.stringify(partialResponse, null, 2)); + } + + // Process questions sequentially + const answers: string[] = []; + for (const q of questions) { + const questionId = `${messageId}_q${round}_${Date.now()}`; + const questionData: QuestionData = { + questionId, + messageId, + agentId, + channel, + chatId: parseInt(messageData.senderId!, 10), + question: q.question, + options: q.options, + multiSelect: q.multiSelect, + timestamp: Date.now(), + expiresAt: Date.now() + (5 * 60 * 1000), + }; + + emitQuestion(questionData); + + // Write typing heartbeat to refresh pending TTL in telegram client + const heartbeatData: ResponseData = { + channel, + sender, + message: '', + originalMessage: rawMessage, + timestamp: Date.now(), + messageId: `heartbeat_${messageId}`, + agent: agentId, + }; + const heartbeatFile = path.join(QUEUE_OUTGOING, `telegram_heartbeat_${messageId}_${Date.now()}.json`); + fs.writeFileSync(heartbeatFile, JSON.stringify(heartbeatData, null, 2)); + + const answer = await pollForAnswer(questionId, 5 * 60 * 1000); + if (answer) { + answers.push(answer.answer); + } else { + answers.push('User did not respond in time. Use your best judgment to proceed.'); + } + } + + // Continue conversation with the user's answer(s) + const answerText = answers.length === 1 + ? `User answered: ${answers[0]}` + : answers.map((a, i) => `Answer ${i + 1}: ${a}`).join('\n'); + + log('INFO', `Continuing conversation with answer: ${answerText.substring(0, 100)}...`); + + const continuation = await invokeAgent( + agent, agentId, answerText, workspacePath, + false, agents, teams, + isInteractive, sessionId + ); + response = continuation.response; + if (continuation.sessionId) { + sessionId = continuation.sessionId; + } + } + + // Strip any remaining question tags from the final response + response = stripQuestionTags(response); + } } catch (error) { const provider = agent.provider || 'anthropic'; log('ERROR', `${provider === 'openai' ? 'Codex' : 'Claude'} error (agent: ${agentId}): ${(error as Error).message}`);