diff --git a/src/queue-processor.ts b/src/queue-processor.ts index 2daa373..deb6c0c 100644 --- a/src/queue-processor.ts +++ b/src/queue-processor.ts @@ -12,7 +12,9 @@ import { spawn } from 'child_process'; import fs from 'fs'; +import http from 'http'; import path from 'path'; +import crypto from 'crypto'; const SCRIPT_DIR = path.resolve(__dirname, '..'); const TINYCLAW_HOME = path.join(require('os').homedir(), '.tinyclaw'); @@ -22,6 +24,7 @@ const QUEUE_PROCESSING = path.join(TINYCLAW_HOME, 'queue/processing'); const LOG_FILE = path.join(TINYCLAW_HOME, 'logs/queue.log'); const RESET_FLAG = path.join(TINYCLAW_HOME, 'reset_flag'); const SETTINGS_FILE = path.join(TINYCLAW_HOME, 'settings.json'); +const DEFAULT_WEBHOOK_PORT = 3077; // Model name mapping const CLAUDE_MODEL_IDS: Record = { @@ -658,11 +661,223 @@ function logAgentConfig(): void { } } -// Main loop +// ─── Webhook HTTP Server ─────────────────────────────────────────────────── + +interface WebhookMessageBody { + channel: string; + sender: string; + senderId?: string; + message: string; + timestamp?: number; + messageId?: string; +} + +interface WebhookResponse { + success: boolean; + messageId?: string; + error?: string; +} + +function getWebhookPort(): number { + try { + const settings = getSettings(); + const port = (settings as Record)['webhook_port']; + if (port && typeof port === 'number') { + return port; + } + } catch { } + return DEFAULT_WEBHOOK_PORT; +} + +function readRequestBody(req: http.IncomingMessage): Promise { + return new Promise((resolve, reject) => { + const chunks: Buffer[] = []; + let size = 0; + const MAX_BODY = 1024 * 1024; // 1MB limit + + req.on('data', (chunk: Buffer) => { + size += chunk.length; + if (size > MAX_BODY) { + req.destroy(); + reject(new Error('Request body too large')); + return; + } + chunks.push(chunk); + }); + req.on('end', () => resolve(Buffer.concat(chunks).toString('utf8'))); + req.on('error', reject); + }); +} + +function sendJson(res: http.ServerResponse, statusCode: number, data: object): void { + const body = JSON.stringify(data, null, 2); + res.writeHead(statusCode, { + 'Content-Type': 'application/json', + 'Content-Length': Buffer.byteLength(body), + }); + res.end(body); +} + +function handleWebhookMessage(body: WebhookMessageBody): WebhookResponse { + // Validate required fields + if (!body.channel || typeof body.channel !== 'string') { + return { success: false, error: 'Missing or invalid "channel" field' }; + } + if (!body.sender || typeof body.sender !== 'string') { + return { success: false, error: 'Missing or invalid "sender" field' }; + } + if (!body.message || typeof body.message !== 'string') { + return { success: false, error: 'Missing or invalid "message" field' }; + } + + const timestamp = body.timestamp || Date.now(); + const messageId = body.messageId || `${body.channel}_${timestamp}_${crypto.randomBytes(4).toString('hex')}`; + + const messageData: MessageData = { + channel: body.channel, + sender: body.sender, + senderId: body.senderId || `webhook_${body.sender}`, + message: body.message, + timestamp, + messageId, + }; + + const filename = `${body.channel}_${timestamp}_${crypto.randomBytes(4).toString('hex')}.json`; + const filePath = path.join(QUEUE_INCOMING, filename); + + fs.writeFileSync(filePath, JSON.stringify(messageData, null, 2)); + + log('INFO', `[webhook] Queued message [${body.channel}] from ${body.sender}: ${body.message.substring(0, 50)}...`); + + return { success: true, messageId }; +} + +function getMessageStatus(messageId: string): { status: string; data?: object } { + // Check outgoing (completed) + const outFiles = fs.readdirSync(QUEUE_OUTGOING).filter(f => f.endsWith('.json')); + for (const f of outFiles) { + try { + const data = JSON.parse(fs.readFileSync(path.join(QUEUE_OUTGOING, f), 'utf8')); + if (data.messageId === messageId) { + return { status: 'completed', data }; + } + } catch { } + } + + // Check processing (in progress) + const procFiles = fs.readdirSync(QUEUE_PROCESSING).filter(f => f.endsWith('.json')); + for (const f of procFiles) { + try { + const data = JSON.parse(fs.readFileSync(path.join(QUEUE_PROCESSING, f), 'utf8')); + if (data.messageId === messageId) { + return { status: 'processing' }; + } + } catch { } + } + + // Check incoming (queued) + const inFiles = fs.readdirSync(QUEUE_INCOMING).filter(f => f.endsWith('.json')); + for (const f of inFiles) { + try { + const data = JSON.parse(fs.readFileSync(path.join(QUEUE_INCOMING, f), 'utf8')); + if (data.messageId === messageId) { + return { status: 'queued' }; + } + } catch { } + } + + return { status: 'not_found' }; +} + +function startWebhookServer(): void { + const port = getWebhookPort(); + + const server = http.createServer(async (req, res) => { + const url = req.url || '/'; + const method = req.method || 'GET'; + + // CORS headers for broad compatibility + res.setHeader('Access-Control-Allow-Origin', '*'); + res.setHeader('Access-Control-Allow-Methods', 'GET, POST, OPTIONS'); + res.setHeader('Access-Control-Allow-Headers', 'Content-Type'); + + if (method === 'OPTIONS') { + res.writeHead(204); + res.end(); + return; + } + + // POST /webhook/message — queue a new message + if (method === 'POST' && url === '/webhook/message') { + try { + const rawBody = await readRequestBody(req); + const body: WebhookMessageBody = JSON.parse(rawBody); + const result = handleWebhookMessage(body); + sendJson(res, result.success ? 200 : 400, result); + } catch (error) { + sendJson(res, 400, { success: false, error: 'Invalid JSON body' }); + } + return; + } + + // GET /webhook/health — health check + if (method === 'GET' && url === '/webhook/health') { + const incomingCount = fs.readdirSync(QUEUE_INCOMING).filter(f => f.endsWith('.json')).length; + const processingCount = fs.readdirSync(QUEUE_PROCESSING).filter(f => f.endsWith('.json')).length; + const outgoingCount = fs.readdirSync(QUEUE_OUTGOING).filter(f => f.endsWith('.json')).length; + + sendJson(res, 200, { + status: 'ok', + uptime: process.uptime(), + queue: { + incoming: incomingCount, + processing: processingCount, + outgoing: outgoingCount, + }, + }); + return; + } + + // GET /webhook/status/:messageId — check message status + const statusMatch = url.match(/^\/webhook\/status\/(.+)$/); + if (method === 'GET' && statusMatch) { + const messageId = decodeURIComponent(statusMatch[1]); + const result = getMessageStatus(messageId); + const statusCode = result.status === 'not_found' ? 404 : 200; + sendJson(res, statusCode, result); + return; + } + + // 404 for everything else + sendJson(res, 404, { error: 'Not found' }); + }); + + server.listen(port, () => { + log('INFO', `Webhook server listening on port ${port}`); + log('INFO', ` POST http://localhost:${port}/webhook/message`); + log('INFO', ` GET http://localhost:${port}/webhook/health`); + log('INFO', ` GET http://localhost:${port}/webhook/status/:messageId`); + }); + + server.on('error', (err: NodeJS.ErrnoException) => { + if (err.code === 'EADDRINUSE') { + log('ERROR', `Webhook port ${port} is already in use. Webhook server disabled.`); + } else { + log('ERROR', `Webhook server error: ${err.message}`); + } + }); +} + +// ─── Main Loop ───────────────────────────────────────────────────────────── + + log('INFO', 'Queue processor started'); log('INFO', `Watching: ${QUEUE_INCOMING}`); logAgentConfig(); +// Start webhook HTTP server +startWebhookServer(); + // Process queue every 1 second setInterval(processQueue, 1000);