diff --git a/src/channels/telegram-client.ts b/src/channels/telegram-client.ts index 2af065e..3edd50c 100644 --- a/src/channels/telegram-client.ts +++ b/src/channels/telegram-client.ts @@ -67,6 +67,9 @@ interface ResponseData { timestamp: number; messageId: string; files?: string[]; + sessionId?: string; + partial?: boolean; + updateType?: 'activity' | 'final'; } function sanitizeFileName(fileName: string): string { @@ -481,14 +484,15 @@ async function checkOutgoingQueue(): Promise { try { const files = fs.readdirSync(QUEUE_OUTGOING) - .filter(f => f.startsWith('telegram_') && f.endsWith('.json')); + .filter(f => f.startsWith('telegram_') && f.endsWith('.json')) + .sort(); for (const file of files) { const filePath = path.join(QUEUE_OUTGOING, file); try { const responseData: ResponseData = JSON.parse(fs.readFileSync(filePath, 'utf8')); - const { messageId, message: responseText, sender, senderId } = responseData; + const { messageId, message: responseText, sender, senderId, sessionId } = responseData; // Find pending message, or fall back to senderId for proactive messages const pending = pendingMessages.get(messageId); @@ -519,7 +523,11 @@ async function checkOutgoingQueue(): Promise { // Split message if needed (Telegram 4096 char limit) if (responseText) { - const chunks = splitMessage(responseText); + // Append session ID to final (non-partial) responses + const messageWithSession = (!responseData.partial && sessionId) + ? `${responseText}\n\nSession: ${sessionId}` + : responseText; + const chunks = splitMessage(messageWithSession); if (chunks.length > 0) { await bot.sendMessage(targetChatId, chunks[0]!, pending @@ -534,7 +542,11 @@ async function checkOutgoingQueue(): Promise { log('INFO', `Sent ${pending ? 'response' : 'proactive message'} to ${sender} (${responseText.length} chars${responseData.files ? `, ${responseData.files.length} file(s)` : ''})`); - if (pending) pendingMessages.delete(messageId); + // Only clear pending state when the final response arrives + // Partial (activity) updates keep the message pending + if (!responseData.partial) { + if (pending) pendingMessages.delete(messageId); + } fs.unlinkSync(filePath); } else { log('WARN', `No pending message for ${messageId} and no valid senderId, cleaning up`); diff --git a/src/lib/invoke.ts b/src/lib/invoke.ts index 471eaf2..f3cd0e5 100644 --- a/src/lib/invoke.ts +++ b/src/lib/invoke.ts @@ -1,11 +1,352 @@ import { spawn } from 'child_process'; import fs from 'fs'; +import os from 'os'; import path from 'path'; import { AgentConfig, TeamConfig } from './types'; import { SCRIPT_DIR, resolveClaudeModel, resolveCodexModel } from './config'; import { log } from './logging'; import { ensureAgentDirectory, updateAgentTeammates } from './agent-setup'; +type JsonObject = Record; +type ActivityCallback = (activity: string) => void; + +export interface AgentInvokeResult { + text: string; + sessionId?: string; +} + +function isObject(value: unknown): value is JsonObject { + return typeof value === 'object' && value !== null && !Array.isArray(value); +} + +function toObject(value: unknown): JsonObject | null { + return isObject(value) ? value : null; +} + +function getString(value: unknown): string | null { + return typeof value === 'string' ? value : null; +} + +function getArray(value: unknown): unknown[] { + return Array.isArray(value) ? value : []; +} + +function truncateText(text: string, maxLength = 120): string { + if (text.length <= maxLength) { + return text; + } + return `${text.slice(0, maxLength - 3)}...`; +} + +function pickInputString(input: JsonObject, keys: string[]): string | null { + for (const key of keys) { + const value = getString(input[key]); + if (value && value.trim()) { + return value.trim(); + } + } + return null; +} + +function summarizeToolUse(toolName: string, input: JsonObject | null): string { + const normalized = toolName.trim(); + const lower = normalized.toLowerCase(); + const safeInput = input || {}; + + if (lower === 'read') { + const filePath = pickInputString(safeInput, ['file_path', 'path', 'file']); + return filePath ? `Read ${truncateText(filePath, 90)}` : 'Read a file'; + } + if (lower === 'write') { + const filePath = pickInputString(safeInput, ['file_path', 'path', 'file']); + return filePath ? `Wrote ${truncateText(filePath, 90)}` : 'Wrote a file'; + } + if (lower === 'edit') { + const filePath = pickInputString(safeInput, ['file_path', 'path', 'file']); + return filePath ? `Edited ${truncateText(filePath, 90)}` : 'Edited a file'; + } + if (lower === 'bash') { + const command = pickInputString(safeInput, ['command', 'cmd']); + return command ? `Ran ${truncateText(command, 90)}` : 'Ran a shell command'; + } + if (lower === 'grep') { + const pattern = pickInputString(safeInput, ['pattern', 'query']); + return pattern ? `Searched for "${truncateText(pattern, 70)}"` : 'Searched with grep'; + } + if (lower === 'glob') { + const pattern = pickInputString(safeInput, ['pattern']); + return pattern ? `Matched files with "${truncateText(pattern, 70)}"` : 'Matched files'; + } + if (lower === 'webfetch') { + const url = pickInputString(safeInput, ['url']); + return url ? `Fetched ${truncateText(url, 90)}` : 'Fetched a URL'; + } + if (lower === 'websearch') { + const query = pickInputString(safeInput, ['query']); + return query ? `Searched web for "${truncateText(query, 70)}"` : 'Searched the web'; + } + + return `Used ${normalized || 'a tool'}`; +} + +function recordActivity( + summary: string, + seenActivities: Set, + onActivity?: ActivityCallback, + collectedActivities?: string[], + dedupe = true +): void { + const trimmed = summary.trim(); + if (!trimmed) { + return; + } + if (dedupe && seenActivities.has(trimmed)) { + return; + } + if (dedupe) { + seenActivities.add(trimmed); + } + if (collectedActivities) { + collectedActivities.push(trimmed); + } + if (onActivity) { + try { + onActivity(trimmed); + } catch { + // Ignore callback failures + } + } +} + +function processClaudeEvent( + eventObj: JsonObject, + seenActivities: Set, + toolSummaryById: Map, + onActivity?: ActivityCallback, + collectedActivities?: string[] +): string { + let latestResponse = ''; + const eventType = getString(eventObj.type) || ''; + const eventSubtype = getString(eventObj.subtype) || ''; + + if (eventType === 'result') { + const resultText = getString(eventObj.result); + if (resultText && resultText.trim()) { + latestResponse = resultText.trim(); + } + } + + if (eventType === 'assistant') { + const messageObj = toObject(eventObj.message); + const contentBlocks = getArray(messageObj?.content); + for (const block of contentBlocks) { + const blockObj = toObject(block); + if (!blockObj) { + continue; + } + + const blockType = getString(blockObj.type) || ''; + if (blockType === 'text') { + const text = getString(blockObj.text); + if (text && text.trim()) { + latestResponse = text.trim(); + } + } + if (blockType === 'tool_use') { + const toolName = getString(blockObj.name) || 'tool'; + const summary = summarizeToolUse(toolName, toObject(blockObj.input)); + const toolUseId = getString(blockObj.id); + if (toolUseId) { + toolSummaryById.set(toolUseId, summary); + } + recordActivity(summary, seenActivities, onActivity, collectedActivities, true); + } + } + } + + if (eventType === 'user') { + const messageObj = toObject(eventObj.message); + const contentBlocks = getArray(messageObj?.content); + for (const block of contentBlocks) { + const blockObj = toObject(block); + if (!blockObj) { + continue; + } + if ((getString(blockObj.type) || '') !== 'tool_result') { + continue; + } + const toolUseId = getString(blockObj.tool_use_id) || ''; + const priorSummary = toolSummaryById.get(toolUseId); + const completion = priorSummary + ? `Completed ${priorSummary.toLowerCase()}` + : 'Tool result received'; + // Tool results should stream even if text repeats. + recordActivity(completion, seenActivities, onActivity, collectedActivities, false); + } + } + + if (eventType.includes('tool') || eventSubtype.includes('tool')) { + const toolName = getString(eventObj.tool_name) + || getString(eventObj.name) + || getString(toObject(eventObj.tool)?.name) + || 'tool'; + const input = toObject(eventObj.input) || toObject(eventObj.arguments) || toObject(eventObj.tool_input); + const summary = summarizeToolUse(toolName, input); + recordActivity(summary, seenActivities, onActivity, collectedActivities, true); + } + + return latestResponse; +} + +function parseSessionId(value: unknown): string | null { + const raw = getString(value); + if (!raw) return null; + const trimmed = raw.trim(); + return /^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i.test(trimmed) + ? trimmed + : null; +} + +function tryExtractSessionId(eventObj: JsonObject): string | null { + const direct = parseSessionId(eventObj.sessionId) || parseSessionId(eventObj.session_id); + if (direct) return direct; + + const messageObj = toObject(eventObj.message); + const fromMessage = messageObj + ? (parseSessionId(messageObj.sessionId) || parseSessionId(messageObj.session_id)) + : null; + if (fromMessage) return fromMessage; + + const payload = toObject(eventObj.payload); + if (!payload) return null; + return parseSessionId(payload.sessionId) || parseSessionId(payload.session_id); +} + +function getLatestClaudeSessionId(workingDir: string): string | undefined { + try { + const projectKey = workingDir.replace(/\//g, '-'); + const projectDir = path.join(os.homedir(), '.claude', 'projects', projectKey); + if (!fs.existsSync(projectDir)) return undefined; + + const latest = fs.readdirSync(projectDir) + .filter(name => name.endsWith('.jsonl')) + .map(name => { + const fullPath = path.join(projectDir, name); + return { name, mtimeMs: fs.statSync(fullPath).mtimeMs }; + }) + .sort((a, b) => b.mtimeMs - a.mtimeMs)[0]; + + if (!latest) return undefined; + return latest.name.replace(/\.jsonl$/, ''); + } catch { + return undefined; + } +} + +async function runClaudeCommand( + args: string[], + cwd: string, + onActivity?: ActivityCallback +): Promise<{ output: string; parsed: { response: string; activities: string[] }; sessionId?: string }> { + return new Promise((resolve, reject) => { + const child = spawn('claude', args, { + cwd, + stdio: ['ignore', 'pipe', 'pipe'], + }); + + let stdout = ''; + let stderr = ''; + let stdoutBuffer = ''; + let stderrBuffer = ''; + let finalResponse = ''; + let sessionId: string | undefined; + const activities: string[] = []; + const seenActivities = new Set(); + const toolSummaryById = new Map(); + + function processLine(line: string): void { + const trimmed = line.trim(); + if (!trimmed) { + return; + } + let event: unknown; + try { + event = JSON.parse(trimmed); + } catch { + return; + } + const eventObj = toObject(event); + if (!eventObj) { + return; + } + const extractedSessionId = tryExtractSessionId(eventObj); + if (extractedSessionId) { + sessionId = extractedSessionId; + } + const responseUpdate = processClaudeEvent(eventObj, seenActivities, toolSummaryById, onActivity, activities); + if (responseUpdate) { + finalResponse = responseUpdate; + } + } + + function consumeChunk(chunk: string, isStdout: boolean): void { + if (isStdout) { + stdout += chunk; + stdoutBuffer += chunk; + while (stdoutBuffer.includes('\n')) { + const idx = stdoutBuffer.indexOf('\n'); + const line = stdoutBuffer.slice(0, idx); + stdoutBuffer = stdoutBuffer.slice(idx + 1); + processLine(line); + } + } else { + stderr += chunk; + stderrBuffer += chunk; + while (stderrBuffer.includes('\n')) { + const idx = stderrBuffer.indexOf('\n'); + const line = stderrBuffer.slice(0, idx); + stderrBuffer = stderrBuffer.slice(idx + 1); + processLine(line); + } + } + } + + child.stdout.setEncoding('utf8'); + child.stderr.setEncoding('utf8'); + + child.stdout.on('data', (chunk: string) => consumeChunk(chunk, true)); + child.stderr.on('data', (chunk: string) => consumeChunk(chunk, false)); + + child.on('error', (error) => { + reject(error); + }); + + child.on('close', (code) => { + if (stdoutBuffer.trim()) { + processLine(stdoutBuffer); + } + if (stderrBuffer.trim()) { + processLine(stderrBuffer); + } + + if (code === 0) { + resolve({ + output: stdout, + parsed: { + response: finalResponse, + activities: activities.slice(0, 20), + }, + sessionId, + }); + return; + } + + const errorMessage = stderr.trim() || stdout.trim() || `Command exited with code ${code}`; + reject(new Error(errorMessage)); + }); + }); +} + export async function runCommand(command: string, args: string[], cwd?: string): Promise { return new Promise((resolve, reject) => { const child = spawn(command, args, { @@ -45,7 +386,7 @@ export async function runCommand(command: string, args: string[], cwd?: string): /** * Invoke a single agent with a message. Contains all Claude/Codex invocation logic. - * Returns the raw response text. + * Returns the response text and optional session ID. */ export async function invokeAgent( agent: AgentConfig, @@ -54,8 +395,9 @@ export async function invokeAgent( workspacePath: string, shouldReset: boolean, agents: Record = {}, - teams: Record = {} -): Promise { + teams: Record = {}, + options?: { onActivity?: ActivityCallback } +): Promise { // Ensure agent directory exists with config files const agentDir = path.join(workspacePath, agentId); const isNewAgent = !fs.existsSync(agentDir); @@ -111,7 +453,9 @@ export async function invokeAgent( } } - return response || 'Sorry, I could not generate a response from Codex.'; + return { + text: response || 'Sorry, I could not generate a response from Codex.', + }; } else { // Default to Claude (Anthropic) log('INFO', `Using Claude provider (agent: ${agentId})`); @@ -130,8 +474,29 @@ export async function invokeAgent( if (continueConversation) { claudeArgs.push('-c'); } - claudeArgs.push('-p', message); + claudeArgs.push('--verbose', '--output-format', 'stream-json', '-p', message); - return await runCommand('claude', claudeArgs, workingDir); + const runResult = await runClaudeCommand(claudeArgs, workingDir, options?.onActivity); + const parsed = runResult.parsed; + const hasActivities = parsed.activities.length > 0; + const hasResponse = parsed.response.trim().length > 0; + + if (hasActivities && hasResponse && !options?.onActivity) { + const activityLines = parsed.activities.map(item => `- ${item}`).join('\n'); + return { + text: `Activity:\n${activityLines}\n\n${parsed.response}`, + sessionId: runResult.sessionId || getLatestClaudeSessionId(workingDir), + }; + } + if (hasActivities && !options?.onActivity) { + return { + text: `Activity:\n${parsed.activities.map(item => `- ${item}`).join('\n')}`, + sessionId: runResult.sessionId || getLatestClaudeSessionId(workingDir), + }; + } + return { + text: parsed.response || 'Sorry, I could not generate a response from Claude.', + sessionId: runResult.sessionId || getLatestClaudeSessionId(workingDir), + }; } } diff --git a/src/lib/types.ts b/src/lib/types.ts index f0b2e40..6aa535f 100644 --- a/src/lib/types.ts +++ b/src/lib/types.ts @@ -14,6 +14,7 @@ export interface TeamConfig { export interface ChainStep { agentId: string; response: string; + sessionId?: string; } export interface Settings { @@ -83,6 +84,9 @@ export interface ResponseData { messageId: string; agent?: string; // which agent handled this files?: string[]; + sessionId?: string; + partial?: boolean; + updateType?: 'activity' | 'final'; } export interface QueueFile { diff --git a/src/queue-processor.ts b/src/queue-processor.ts index e89b989..a81215f 100644 --- a/src/queue-processor.ts +++ b/src/queue-processor.ts @@ -24,7 +24,7 @@ import { } from './lib/config'; import { log, emitEvent } from './lib/logging'; import { parseAgentRouting, findTeamForAgent, getAgentResetFlag, extractTeammateMentions } from './lib/routing'; -import { invokeAgent } from './lib/invoke'; +import { invokeAgent, AgentInvokeResult } from './lib/invoke'; // Ensure directories exist [QUEUE_INCOMING, QUEUE_OUTGOING, QUEUE_PROCESSING, FILES_DIR, path.dirname(LOG_FILE)].forEach(dir => { @@ -232,6 +232,36 @@ async function processMessage(messageFile: string): Promise { const { channel, sender, message: rawMessage, timestamp, messageId } = messageData; const isInternal = !!messageData.conversationId; + // Helper to write responses (both partial activity updates and final responses) + const writeResponse = (payload: { + message: string; + agent?: string; + files?: string[]; + sessionId?: string; + partial?: boolean; + updateType?: 'activity' | 'final'; + }): void => { + const responseData: ResponseData = { + channel, + sender, + message: payload.message, + originalMessage: rawMessage, + timestamp: Date.now(), + messageId, + agent: payload.agent, + files: payload.files, + sessionId: payload.sessionId, + partial: payload.partial, + updateType: payload.updateType, + }; + + const responseFile = channel === 'heartbeat' + ? path.join(QUEUE_OUTGOING, `${messageId}.json`) + : path.join(QUEUE_OUTGOING, `${channel}_${messageId}_${Date.now()}.json`); + + fs.writeFileSync(responseFile, JSON.stringify(responseData, null, 2)); + }; + log('INFO', `Processing [${isInternal ? 'internal' : channel}] ${isInternal ? `@${messageData.fromAgent}→@${messageData.agent}` : `from ${sender}`}: ${rawMessage.substring(0, 50)}...`); if (!isInternal) { emitEvent('message_received', { channel, sender, message: rawMessage.substring(0, 120), messageId }); @@ -340,11 +370,31 @@ async function processMessage(messageFile: string): Promise { } } + // Activity streaming: send real-time tool use updates to Telegram + const streamActivitiesEnabled = channel === 'telegram'; + const activityEmitterForAgent = (activeAgentId: string) => (activity: string): void => { + const prefix = streamActivitiesEnabled && teamContext ? `@${activeAgentId}: ` : ''; + const activityMessage = `${prefix}${activity}`; + log('DEBUG', `[ACTIVITY] Emitting activity for ${activeAgentId}: ${activity}`); + writeResponse({ + message: activityMessage, + agent: activeAgentId, + partial: true, + updateType: 'activity', + }); + }; + // Invoke agent emitEvent('chain_step_start', { agentId, agentName: agent.name, fromAgent: messageData.fromAgent || null }); let response: string; + let sessionId: string | undefined; try { - response = await invokeAgent(agent, agentId, message, workspacePath, shouldReset, agents, teams); + const invokeResult = await invokeAgent( + agent, agentId, message, workspacePath, shouldReset, agents, teams, + streamActivitiesEnabled ? { onActivity: activityEmitterForAgent(agentId) } : undefined + ); + response = invokeResult.text; + sessionId = invokeResult.sessionId; } catch (error) { const provider = agent.provider || 'anthropic'; log('ERROR', `${provider === 'openai' ? 'Codex' : 'Claude'} error (agent: ${agentId}): ${(error as Error).message}`); @@ -368,22 +418,13 @@ async function processMessage(messageFile: string): Promise { // Handle long responses — send as file attachment const { message: responseMessage, files: allFiles } = handleLongResponse(finalResponse, outboundFiles); - const responseData: ResponseData = { - channel, - sender, + writeResponse({ message: responseMessage, - originalMessage: rawMessage, - timestamp: Date.now(), - messageId, agent: agentId, files: allFiles.length > 0 ? allFiles : undefined, - }; - - const responseFile = channel === 'heartbeat' - ? path.join(QUEUE_OUTGOING, `${messageId}.json`) - : path.join(QUEUE_OUTGOING, `${channel}_${messageId}_${Date.now()}.json`); - - fs.writeFileSync(responseFile, JSON.stringify(responseData, null, 2)); + sessionId, + updateType: 'final', + }); log('INFO', `✓ Response ready [${channel}] ${sender} via agent:${agentId} (${finalResponse.length} chars)`); emitEvent('response_ready', { channel, sender, agentId, responseLength: finalResponse.length, responseText: finalResponse, messageId }); @@ -422,7 +463,7 @@ async function processMessage(messageFile: string): Promise { } // Record this agent's response - conv.responses.push({ agentId, response }); + conv.responses.push({ agentId, response, sessionId }); conv.totalMessages++; collectFiles(response, conv.files);