Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
500 changes: 459 additions & 41 deletions src/channels/telegram-client.ts

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions src/lib/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ export const SETTINGS_FILE = path.join(TINYCLAW_HOME, 'settings.json');
export const EVENTS_DIR = path.join(TINYCLAW_HOME, 'events');
export const CHATS_DIR = path.join(TINYCLAW_HOME, 'chats');
export const FILES_DIR = path.join(TINYCLAW_HOME, 'files');
export const QUEUE_QUESTIONS = path.join(TINYCLAW_HOME, 'queue/questions');
export const QUEUE_ANSWERS = path.join(TINYCLAW_HOME, 'queue/answers');

export function getSettings(): Settings {
try {
Expand Down
128 changes: 117 additions & 11 deletions src/lib/invoke.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,54 @@
import { spawn } from 'child_process';
import fs from 'fs';
import path from 'path';
import { AgentConfig, TeamConfig } from './types';
import { SCRIPT_DIR, resolveClaudeModel, resolveCodexModel } from './config';
import { AgentConfig, TeamConfig, InvokeResult } from './types';
import { SCRIPT_DIR, resolveClaudeModel, resolveCodexModel, getSettings } from './config';
import { QUESTION_PROMPT } from './question-bridge';
import { log } from './logging';
import { ensureAgentDirectory, updateAgentTeammates } from './agent-setup';

export async function runCommand(command: string, args: string[], cwd?: string): Promise<string> {
export interface CommandResult {
stdout: string;
stderr: string;
}

/**
* Run a command with an optional timeout (in ms).
* If the timeout fires, the child is killed (SIGTERM → SIGKILL) and the
* promise rejects with a descriptive error.
*/
export async function runCommand(command: string, args: string[], cwd?: string, timeoutMs?: number): Promise<CommandResult> {
return new Promise((resolve, reject) => {
// Strip CLAUDECODE env var so spawned Claude sessions don't think they're nested
const env = { ...process.env };
delete env.CLAUDECODE;

const child = spawn(command, args, {
cwd: cwd || SCRIPT_DIR,
stdio: ['ignore', 'pipe', 'pipe'],
env,
});

let stdout = '';
let stderr = '';
let killed = false;
let timer: NodeJS.Timeout | undefined;
let killTimer: NodeJS.Timeout | undefined;

if (timeoutMs && timeoutMs > 0) {
timer = setTimeout(() => {
killed = true;
log('WARN', `Command timed out after ${Math.round(timeoutMs / 1000)}s — sending SIGTERM (pid ${child.pid})`);
child.kill('SIGTERM');
// Force-kill if still alive after 5s
killTimer = setTimeout(() => {
if (!child.killed) {
log('WARN', `Force-killing command (pid ${child.pid}) with SIGKILL`);
child.kill('SIGKILL');
}
}, 5000);
}, timeoutMs);
}

child.stdout.setEncoding('utf8');
child.stderr.setEncoding('utf8');
Expand All @@ -28,12 +62,22 @@ export async function runCommand(command: string, args: string[], cwd?: string):
});

child.on('error', (error) => {
if (timer) clearTimeout(timer);
if (killTimer) clearTimeout(killTimer);
reject(error);
});

child.on('close', (code) => {
if (timer) clearTimeout(timer);
if (killTimer) clearTimeout(killTimer);

if (killed) {
reject(new Error(`Command timed out after ${Math.round(timeoutMs! / 1000)} seconds and was terminated`));
return;
}

if (code === 0) {
resolve(stdout);
resolve({ stdout, stderr });
return;
}

Expand All @@ -45,7 +89,10 @@ export async function runCommand(command: string, args: string[], cwd?: string):

/**
* Invoke a single agent with a message. Contains all Claude/Codex invocation logic.
* Returns the raw response text.
* Returns the raw response text (or InvokeResult when interactive=true).
*
* @param interactive - Enable question bridge (disables AskUserQuestion, adds system prompt, captures session_id)
* @param resumeSessionId - Resume a specific session instead of using -c (for question continuation rounds)
*/
export async function invokeAgent(
agent: AgentConfig,
Expand All @@ -54,8 +101,10 @@ export async function invokeAgent(
workspacePath: string,
shouldReset: boolean,
agents: Record<string, AgentConfig> = {},
teams: Record<string, TeamConfig> = {}
): Promise<string> {
teams: Record<string, TeamConfig> = {},
interactive: boolean = false,
resumeSessionId?: string
): Promise<InvokeResult> {
// Ensure agent directory exists with config files
const agentDir = path.join(workspacePath, agentId);
const isNewAgent = !fs.existsSync(agentDir);
Expand All @@ -76,6 +125,11 @@ export async function invokeAgent(

const provider = agent.provider || 'anthropic';

// Read timeout from settings (default: 3 minutes)
const settings = getSettings();
const maxResponseTimeSec = settings?.monitoring?.max_response_time ?? 180;
const timeoutMs = maxResponseTimeSec * 1000;

if (provider === 'openai') {
log('INFO', `Using Codex CLI (agent: ${agentId})`);

Expand All @@ -95,7 +149,7 @@ export async function invokeAgent(
}
codexArgs.push('--skip-git-repo-check', '--dangerously-bypass-approvals-and-sandbox', '--json', message);

const codexOutput = await runCommand('codex', codexArgs, workingDir);
const { stdout: codexOutput } = await runCommand('codex', codexArgs, workingDir, timeoutMs);

// Parse JSONL output and extract final agent_message
let response = '';
Expand All @@ -111,7 +165,7 @@ export async function invokeAgent(
}
}

return response || 'Sorry, I could not generate a response from Codex.';
return { response: response || 'Sorry, I could not generate a response from Codex.' };
} else {
// Default to Claude (Anthropic)
log('INFO', `Using Claude provider (agent: ${agentId})`);
Expand All @@ -127,11 +181,63 @@ export async function invokeAgent(
if (modelId) {
claudeArgs.push('--model', modelId);
}
if (continueConversation) {

// Interactive mode: disable AskUserQuestion, inject question system prompt
if (interactive) {
claudeArgs.push(
'--disallowed-tools', 'AskUserQuestion',
'--append-system-prompt', QUESTION_PROMPT
);
}

// Session resumption: --resume takes priority over -c
if (resumeSessionId) {
claudeArgs.push('--resume', resumeSessionId);
} else if (continueConversation) {
claudeArgs.push('-c');
}

// Use JSON output when interactive (to capture session_id)
if (interactive) {
claudeArgs.push('--output-format', 'json');
}

claudeArgs.push('-p', message);

return await runCommand('claude', claudeArgs, workingDir);
const { stdout, stderr } = await runCommand('claude', claudeArgs, workingDir, timeoutMs);

// Parse response — JSON mode for interactive, plain text otherwise
if (interactive && stdout.trim()) {
try {
const jsonResponse = JSON.parse(stdout.trim());
const sessionId = jsonResponse.session_id || undefined;
const responseText = jsonResponse.result || '';
if (responseText) {
return { response: responseText, sessionId };
}
// Fall through to stderr extraction if result is empty
} catch (e) {
// JSON parse failed — treat as plain text
log('WARN', `Failed to parse JSON output for agent ${agentId}, treating as plain text`);
return { response: stdout };
}
}

if (stdout.trim()) {
return { response: stdout };
}

// Claude CLI sometimes outputs only to stderr when performing tool actions
// (file edits, bash commands, etc.) without producing a text response on stdout.
// Extract a meaningful summary from stderr as fallback.
if (stderr.trim()) {
log('WARN', `Claude returned empty stdout for agent ${agentId}, extracting from stderr (${stderr.length} chars)`);
// stderr contains progress/status lines — return last meaningful portion
const stderrLines = stderr.trim().split('\n').filter(l => l.trim());
const lastLines = stderrLines.slice(-10).join('\n');
return { response: lastLines || 'I completed the requested actions but had no text response to return.' };
}

return { response: 'I completed the requested actions but had no text response to return.' };
}
}
179 changes: 179 additions & 0 deletions src/lib/question-bridge.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import fs from 'fs';
import path from 'path';
import { QuestionData, AnswerData } from './types';
import { QUEUE_QUESTIONS, QUEUE_ANSWERS } from './config';
import { log } from './logging';

// Ensure queue directories exist
[QUEUE_QUESTIONS, QUEUE_ANSWERS].forEach(dir => {
if (!fs.existsSync(dir)) {
fs.mkdirSync(dir, { recursive: true });
}
});

/**
* System prompt instructing Claude to output questions in a parseable format
* instead of using the (disabled) AskUserQuestion tool.
*/
export const QUESTION_PROMPT = `MANDATORY: You are communicating through a messaging app (Telegram). You CANNOT ask questions as plain text — the user cannot type free-form answers easily. When you need to ask the user ANY clarifying question, you MUST use this structured format so the system can render interactive buttons:

[QUESTION]{"question":"Your question here","options":[{"label":"Option 1","description":"Brief description"},{"label":"Option 2","description":"Brief description"}],"multiSelect":false}[/QUESTION]

Rules:
- NEVER ask questions as plain text. ALWAYS use the [QUESTION] tag format above.
- Provide 2-4 concrete options the user can tap. Include an option for "other" if the list isn't exhaustive.
- Set "multiSelect":true if the user can pick more than one.
- Ask ONE question at a time. After outputting a [QUESTION] block, STOP and wait for the user's response.
- You may include a brief sentence of context before the [QUESTION] tag, but nothing after it.
- Do NOT answer the question yourself or assume what the user wants.`;

interface ParsedQuestion {
question: string;
options: { label: string; description?: string }[];
multiSelect: boolean;
}

/**
* Parse [QUESTION]{json}[/QUESTION] blocks from Claude's response text.
* Treats model output as untrusted — any parse failure returns empty array.
*/
export function parseQuestions(text: string): ParsedQuestion[] {
const questions: ParsedQuestion[] = [];
const regex = /\[QUESTION\]([\s\S]*?)\[\/QUESTION\]/g;
let match: RegExpExecArray | null;

while ((match = regex.exec(text)) !== null) {
try {
const json = JSON.parse(match[1].trim());

// Validate required fields
if (
typeof json.question !== 'string' ||
!Array.isArray(json.options) ||
json.options.length === 0
) {
log('WARN', `Invalid question structure, skipping: ${match[1].substring(0, 100)}`);
continue;
}

// Validate each option has a label
const validOptions = json.options.filter(
(o: any) => typeof o?.label === 'string' && o.label.trim().length > 0
);
if (validOptions.length === 0) {
log('WARN', `No valid options in question, skipping`);
continue;
}

questions.push({
question: json.question,
options: validOptions.map((o: any) => ({
label: o.label,
description: typeof o.description === 'string' ? o.description : undefined,
})),
multiSelect: !!json.multiSelect,
});
} catch (e) {
log('WARN', `Failed to parse question JSON: ${(e as Error).message}`);
// Continue — other questions in the response might be valid
}
}

return questions;
}

/**
* Strip [QUESTION]...[/QUESTION] tags from response text,
* preserving any text before/between tags.
*/
export function stripQuestionTags(text: string): string {
return text.replace(/\[QUESTION\][\s\S]*?\[\/QUESTION\]/g, '').trim();
}

/**
* Write a question to the questions queue for the channel client to pick up.
*/
export function emitQuestion(question: QuestionData): void {
const filename = `${question.channel}_${question.questionId}.json`;
const filepath = path.join(QUEUE_QUESTIONS, filename);
fs.writeFileSync(filepath, JSON.stringify(question, null, 2));
log('INFO', `Emitted question ${question.questionId} to ${filename}`);
}

/**
* Poll for an answer file in the answers queue.
* Returns the answer data or null on timeout.
*
* @param questionId - The question ID to wait for
* @param timeoutMs - Max wait time (default 5 minutes)
* @param intervalMs - Poll interval (default 500ms)
*/
export function pollForAnswer(
questionId: string,
timeoutMs: number = 5 * 60 * 1000,
intervalMs: number = 500
): Promise<AnswerData | null> {
const answerFile = path.join(QUEUE_ANSWERS, `answer_${questionId}.json`);
const deadline = Date.now() + timeoutMs;

return new Promise((resolve) => {
const check = () => {
if (Date.now() > deadline) {
log('WARN', `Answer poll timed out for question ${questionId}`);
resolve(null);
return;
}

if (fs.existsSync(answerFile)) {
try {
const data: AnswerData = JSON.parse(fs.readFileSync(answerFile, 'utf8'));
// Clean up the answer file after reading
try { fs.unlinkSync(answerFile); } catch {}
log('INFO', `Received answer for question ${questionId}: ${data.answer}`);
resolve(data);
return;
} catch (e) {
// Malformed answer file — delete to prevent infinite retry
try { fs.unlinkSync(answerFile); } catch {}
log('WARN', `Malformed answer file for ${questionId}, deleted: ${(e as Error).message}`);
resolve(null);
return;
}
}

setTimeout(check, intervalMs);
};

check();
});
}

/**
* Write an answer file with exclusive-create semantics (wx flag).
* Returns true if written, false if answer already exists (duplicate tap).
*/
export function writeAnswer(questionId: string, answer: string): boolean {
const answerFile = path.join(QUEUE_ANSWERS, `answer_${questionId}.json`);

// Ensure directory exists
if (!fs.existsSync(QUEUE_ANSWERS)) {
fs.mkdirSync(QUEUE_ANSWERS, { recursive: true });
}

const data: AnswerData = {
questionId,
answer,
answeredAt: Date.now(),
};

try {
fs.writeFileSync(answerFile, JSON.stringify(data, null, 2), { flag: 'wx' });
return true;
} catch (e: any) {
if (e.code === 'EEXIST') {
log('WARN', `Answer already exists for question ${questionId} (duplicate tap)`);
return false;
}
throw e;
}
}
Loading