Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 216 additions & 1 deletion src/queue-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@

import { spawn } from 'child_process';
import fs from 'fs';
import http from 'http';
import path from 'path';
import crypto from 'crypto';

const SCRIPT_DIR = path.resolve(__dirname, '..');
const TINYCLAW_HOME = path.join(require('os').homedir(), '.tinyclaw');
Expand All @@ -22,6 +24,7 @@ const QUEUE_PROCESSING = path.join(TINYCLAW_HOME, 'queue/processing');
const LOG_FILE = path.join(TINYCLAW_HOME, 'logs/queue.log');
const RESET_FLAG = path.join(TINYCLAW_HOME, 'reset_flag');
const SETTINGS_FILE = path.join(TINYCLAW_HOME, 'settings.json');
const DEFAULT_WEBHOOK_PORT = 3077;

// Model name mapping
const CLAUDE_MODEL_IDS: Record<string, string> = {
Expand Down Expand Up @@ -658,11 +661,223 @@ function logAgentConfig(): void {
}
}

// Main loop
// ─── Webhook HTTP Server ───────────────────────────────────────────────────

interface WebhookMessageBody {
channel: string;
sender: string;
senderId?: string;
message: string;
timestamp?: number;
messageId?: string;
}

interface WebhookResponse {
success: boolean;
messageId?: string;
error?: string;
}

function getWebhookPort(): number {
try {
const settings = getSettings();
const port = (settings as Record<string, unknown>)['webhook_port'];
if (port && typeof port === 'number') {
return port;
}
} catch { }
return DEFAULT_WEBHOOK_PORT;
}

function readRequestBody(req: http.IncomingMessage): Promise<string> {
return new Promise((resolve, reject) => {
const chunks: Buffer[] = [];
let size = 0;
const MAX_BODY = 1024 * 1024; // 1MB limit

req.on('data', (chunk: Buffer) => {
size += chunk.length;
if (size > MAX_BODY) {
req.destroy();
reject(new Error('Request body too large'));
return;
}
chunks.push(chunk);
});
req.on('end', () => resolve(Buffer.concat(chunks).toString('utf8')));
req.on('error', reject);
});
}

function sendJson(res: http.ServerResponse, statusCode: number, data: object): void {
const body = JSON.stringify(data, null, 2);
res.writeHead(statusCode, {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(body),
});
res.end(body);
}

function handleWebhookMessage(body: WebhookMessageBody): WebhookResponse {
// Validate required fields
if (!body.channel || typeof body.channel !== 'string') {
return { success: false, error: 'Missing or invalid "channel" field' };
}
if (!body.sender || typeof body.sender !== 'string') {
return { success: false, error: 'Missing or invalid "sender" field' };
}
if (!body.message || typeof body.message !== 'string') {
return { success: false, error: 'Missing or invalid "message" field' };
}

const timestamp = body.timestamp || Date.now();
const messageId = body.messageId || `${body.channel}_${timestamp}_${crypto.randomBytes(4).toString('hex')}`;

const messageData: MessageData = {
channel: body.channel,
sender: body.sender,
senderId: body.senderId || `webhook_${body.sender}`,
message: body.message,
timestamp,
messageId,
};

const filename = `${body.channel}_${timestamp}_${crypto.randomBytes(4).toString('hex')}.json`;
const filePath = path.join(QUEUE_INCOMING, filename);

fs.writeFileSync(filePath, JSON.stringify(messageData, null, 2));

log('INFO', `[webhook] Queued message [${body.channel}] from ${body.sender}: ${body.message.substring(0, 50)}...`);

return { success: true, messageId };
}

function getMessageStatus(messageId: string): { status: string; data?: object } {
// Check outgoing (completed)
const outFiles = fs.readdirSync(QUEUE_OUTGOING).filter(f => f.endsWith('.json'));
for (const f of outFiles) {
try {
const data = JSON.parse(fs.readFileSync(path.join(QUEUE_OUTGOING, f), 'utf8'));
if (data.messageId === messageId) {
return { status: 'completed', data };
}
} catch { }
}

// Check processing (in progress)
const procFiles = fs.readdirSync(QUEUE_PROCESSING).filter(f => f.endsWith('.json'));
for (const f of procFiles) {
try {
const data = JSON.parse(fs.readFileSync(path.join(QUEUE_PROCESSING, f), 'utf8'));
if (data.messageId === messageId) {
return { status: 'processing' };
}
} catch { }
}

// Check incoming (queued)
const inFiles = fs.readdirSync(QUEUE_INCOMING).filter(f => f.endsWith('.json'));
for (const f of inFiles) {
try {
const data = JSON.parse(fs.readFileSync(path.join(QUEUE_INCOMING, f), 'utf8'));
if (data.messageId === messageId) {
return { status: 'queued' };
}
} catch { }
}

return { status: 'not_found' };
}

function startWebhookServer(): void {
const port = getWebhookPort();

const server = http.createServer(async (req, res) => {
const url = req.url || '/';
const method = req.method || 'GET';

// CORS headers for broad compatibility
res.setHeader('Access-Control-Allow-Origin', '*');
res.setHeader('Access-Control-Allow-Methods', 'GET, POST, OPTIONS');
res.setHeader('Access-Control-Allow-Headers', 'Content-Type');

if (method === 'OPTIONS') {
res.writeHead(204);
res.end();
return;
}

// POST /webhook/message — queue a new message
if (method === 'POST' && url === '/webhook/message') {
try {
const rawBody = await readRequestBody(req);
const body: WebhookMessageBody = JSON.parse(rawBody);
const result = handleWebhookMessage(body);
sendJson(res, result.success ? 200 : 400, result);
} catch (error) {
sendJson(res, 400, { success: false, error: 'Invalid JSON body' });
}
return;
}

// GET /webhook/health — health check
if (method === 'GET' && url === '/webhook/health') {
const incomingCount = fs.readdirSync(QUEUE_INCOMING).filter(f => f.endsWith('.json')).length;
const processingCount = fs.readdirSync(QUEUE_PROCESSING).filter(f => f.endsWith('.json')).length;
const outgoingCount = fs.readdirSync(QUEUE_OUTGOING).filter(f => f.endsWith('.json')).length;

sendJson(res, 200, {
status: 'ok',
uptime: process.uptime(),
queue: {
incoming: incomingCount,
processing: processingCount,
outgoing: outgoingCount,
},
});
return;
}

// GET /webhook/status/:messageId — check message status
const statusMatch = url.match(/^\/webhook\/status\/(.+)$/);
if (method === 'GET' && statusMatch) {
const messageId = decodeURIComponent(statusMatch[1]);
const result = getMessageStatus(messageId);
const statusCode = result.status === 'not_found' ? 404 : 200;
sendJson(res, statusCode, result);
return;
}

// 404 for everything else
sendJson(res, 404, { error: 'Not found' });
});

server.listen(port, () => {
log('INFO', `Webhook server listening on port ${port}`);
log('INFO', ` POST http://localhost:${port}/webhook/message`);
log('INFO', ` GET http://localhost:${port}/webhook/health`);
log('INFO', ` GET http://localhost:${port}/webhook/status/:messageId`);
});

server.on('error', (err: NodeJS.ErrnoException) => {
if (err.code === 'EADDRINUSE') {
log('ERROR', `Webhook port ${port} is already in use. Webhook server disabled.`);
} else {
log('ERROR', `Webhook server error: ${err.message}`);
}
});
}

// ─── Main Loop ─────────────────────────────────────────────────────────────


log('INFO', 'Queue processor started');
log('INFO', `Watching: ${QUEUE_INCOMING}`);
logAgentConfig();

// Start webhook HTTP server
startWebhookServer();

// Process queue every 1 second
setInterval(processQueue, 1000);

Expand Down