From d6d104f5dd93aff08dff4f0803cf438dfbda9d1c Mon Sep 17 00:00:00 2001 From: Devain Pal Bansal Date: Mon, 23 Feb 2026 13:03:07 +0530 Subject: [PATCH 1/2] =?UTF-8?q?fix:=20resolve=203=20critical=20bugs=20?= =?UTF-8?q?=E2=80=94=20response=20drops,=20silent=20mention=20failures,=20?= =?UTF-8?q?multi-agent=20race=20condition?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bug 1 (Response Drops): Channel clients had non-atomic send-then-ack flow causing random response loss. Added 'delivering' status to SQLite responses table, claim/unclaim API endpoints, and claim-before-send pattern in Telegram client with retry tracking (max 3 attempts). Bug 2 (Inter-Agent Mention Failures): Teammate mentions were silently dropped due to case sensitivity, typos, and missing validation logging. Added case-insensitive agent ID lookup, detailed logging in isTeammate() and extractTeammateMentions(), improved regex for bracket handling, and validateAgentResponse() helper. Bug 3 (Multi-Agent Reply Loss): Race condition on conv.pending counter when multiple agents complete simultaneously caused replies to never arrive. Added withConversationLock() promise-chain mutex, safe incrementPending/decrementPending operations, and automatic conversation state recovery. Co-Authored-By: Claude Opus 4.6 --- CLAUDE.md | 98 +++ docs/bug-fixes/PATCHES-README.md | 167 +++++ docs/bug-fixes/tinyclaw-bug-solutions.md | 842 +++++++++++++++++++++++ src/channels/telegram-client.ts | 145 +++- src/lib/conversation.ts | 107 +++ src/lib/db.ts | 46 +- src/lib/routing.ts | 125 +++- src/queue-processor.ts | 34 +- src/server/routes/queue.ts | 17 + 9 files changed, 1525 insertions(+), 56 deletions(-) create mode 100644 CLAUDE.md create mode 100644 docs/bug-fixes/PATCHES-README.md create mode 100644 docs/bug-fixes/tinyclaw-bug-solutions.md diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..78c7e9a --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,98 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Build & Run Commands + +```bash +npm install # Install dependencies +npm run build # Full build (main TypeScript + visualizer) +npm run build:main # Build queue processor & channels only (tsc) +npm run build:visualizer # Build team visualizer only (separate tsconfig) + +./tinyclaw.sh start # Launch tmux session with all components +./tinyclaw.sh stop # Kill tmux session and cleanup +./tinyclaw.sh restart # Stop and start +./tinyclaw.sh status # Show running processes + +./tinyclaw.sh logs all # Tail all logs (also: queue, discord, telegram, whatsapp) +``` + +Run individual components directly: +```bash +npm run queue # Queue processor only +npm run discord # Discord client only +npm run telegram # Telegram client only +npm run whatsapp # WhatsApp client only +npm run visualize # Team visualizer TUI +``` + +No test suite exists yet. Verify changes by building (`npm run build`) and running locally (`./tinyclaw.sh start`). + +## Architecture + +TinyClaw is a multi-agent, multi-channel AI assistant platform that runs 24/7 via tmux. Messages flow through a **file-based queue system** (no database): + +``` +Channel Clients (Discord/Telegram/WhatsApp) + → ~/.tinyclaw/queue/incoming/ (JSON message files) + → Queue Processor (routing + invocation) + → ~/.tinyclaw/queue/outgoing/ (response files) + → Channel Clients deliver response +``` + +### Two Language Layers + +- **TypeScript (`src/`)**: Core runtime — queue processor, channel clients, routing, agent invocation, team visualizer +- **Bash (`lib/`)**: CLI and daemon management — startup/shutdown, setup wizard, agent/team CRUD commands, heartbeat cron + +The main CLI entry point is `tinyclaw.sh` which loads bash libraries from `lib/` and dispatches commands. The TypeScript runtime compiles to `dist/` (ES2020, CommonJS). + +### Key TypeScript Files + +| File | Role | +|------|------| +| `src/queue-processor.ts` | Main loop — polls incoming queue, routes messages, tracks conversations, aggregates team responses | +| `src/lib/routing.ts` | Parses `@agent_id` prefixes and `[@teammate: message]` mention tags | +| `src/lib/invoke.ts` | Spawns Claude/Codex/OpenCode CLI processes per agent | +| `src/lib/config.ts` | Loads `~/.tinyclaw/settings.json`, resolves model names | +| `src/lib/types.ts` | All TypeScript interfaces (AgentConfig, TeamConfig, MessageData, Conversation) | +| `src/channels/*-client.ts` | Channel integrations — read outgoing queue, write to incoming queue | +| `src/visualizer/team-visualizer.ts` | React + Ink TUI dashboard for team collaboration | + +### Key Bash Files + +| File | Role | +|------|------| +| `lib/daemon.sh` | tmux session management, pane layout, process lifecycle | +| `lib/agents.sh` | Agent CRUD (add, remove, reset, list, provider switch) | +| `lib/teams.sh` | Team CRUD and visualization | +| `lib/messaging.sh` | CLI `send` command, log viewing | +| `lib/setup-wizard.sh` | First-run interactive configuration | +| `lib/common.sh` | Shared utilities, channel registry, settings loading | + +### Core Design Patterns + +- **Agent isolation**: Each agent has its own workspace directory (`~/tinyclaw-workspace/{agent_id}/`) with independent `.claude/` config and conversation history +- **Parallel-per-agent, sequential-per-conversation**: Different agents process concurrently; messages to the same agent are serialized +- **Actor model for teams**: Agents communicate via `[@agent_id: message]` tags parsed from responses, spawning new queue messages. No central orchestrator +- **Atomic file operations**: Queue uses filesystem move operations to prevent race conditions and message loss +- **Three-state queue**: `incoming/` → `processing/` → `outgoing/`, with recovery for orphaned files + +### Runtime Data + +All runtime state lives under `~/.tinyclaw/` (overridable via `TINYCLAW_HOME`): +- `settings.json` — agents, teams, channels, models config +- `pairing.json` — sender allowlist state +- `queue/{incoming,processing,outgoing}/` — message pipeline +- `logs/` — per-component log files +- `chats/` — team conversation history +- `events/` — real-time event files for visualizer + +### AI Provider Support + +Three providers with CLI-based invocation: Anthropic Claude (`claude`), OpenAI Codex (`codex`), and OpenCode (`opencode`). Model aliases are resolved in `src/lib/config.ts` (e.g., `sonnet` → `claude-sonnet-4-5`, `opus` → `claude-opus-4-6`). Each agent can use a different provider/model. + +### Visualizer + +The team visualizer (`src/visualizer/`) is a separate TypeScript build target (`tsconfig.visualizer.json`) that outputs ESM. It uses React + Ink for a terminal UI showing real-time team collaboration. diff --git a/docs/bug-fixes/PATCHES-README.md b/docs/bug-fixes/PATCHES-README.md new file mode 100644 index 0000000..f41e5ac --- /dev/null +++ b/docs/bug-fixes/PATCHES-README.md @@ -0,0 +1,167 @@ +# TinyClaw Bug Fix Patches + +This directory contains patches to fix three critical bugs in tinyClaw. + +## Bug Summary + +| Bug | Description | Severity | Files Affected | +|-----|-------------|----------|----------------| +| **Bug 1** | Channel clients (Telegram/Discord/WhatsApp) randomly fail to return responses due to non-atomic send-then-ack flow | Critical | `db.ts`, `telegram-client.ts`, `discord-client.ts`, `whatsapp-client.ts`, `server/index.ts` | +| **Bug 2** | Inter-agent mentions fail silently due to validation issues (case sensitivity, typos, cross-team) | High | `routing.ts` | +| **Bug 3** | Multi-agent conversations lose replies due to race condition on `conv.pending` counter | Critical | `conversation.ts`, `queue-processor.ts` | + +## Patch Files + +``` +patches/ +├── README.md # This file +├── bug1-db.patch # Database changes for delivering status +├── bug1-telegram.patch # Telegram client fix (claim-before-send pattern) +├── bug2-routing.patch # Routing improvements (logging, case-insensitive matching) +├── bug3-conversation.patch # Conversation locking mechanism +├── bug3-queue-processor.patch # Queue processor integration with locking +└── server-api.patch # New API endpoints for claim/unclaim +``` + +## Installation Order + +Apply patches in this order to avoid conflicts: + +### Phase 1: Database and Core Infrastructure + +```bash +# 1. Database changes (adds 'delivering' status) +patch -p1 < patches/bug1-db.patch + +# 2. Server API endpoints (claim/unclaim) +patch -p1 < patches/server-api.patch +``` + +### Phase 2: Core Logic Fixes + +```bash +# 3. Conversation locking mechanism +patch -p1 < patches/bug3-conversation.patch + +# 4. Queue processor integration +patch -p1 < patches/bug3-queue-processor.patch + +# 5. Routing improvements (logging, case-insensitive matching) +patch -p1 < patches/bug2-routing.patch +``` + +### Phase 3: Channel Clients + +```bash +# 6. Telegram client (as proof of concept) +patch -p1 < patches/bug1-telegram.patch + +# Note: Discord and WhatsApp clients need similar fixes +# Apply the same pattern from bug1-telegram.patch to: +# - src/channels/discord-client.ts +# - src/channels/whatsapp-client.ts +``` + +## Manual Application (If Patches Fail) + +If `patch` command fails due to line number differences, apply changes manually by referencing the solution document: + +1. See `/mnt/okcomputer/output/tinyclaw-bug-solutions.md` for detailed code changes +2. Copy the relevant sections into your files +3. Ensure all imports are updated + +## Verification + +After applying patches, verify the fixes: + +### Bug 1 Verification + +```bash +# Check that the delivering status exists in the database +sqlite3 .tinyclaw/tinyclaw.db ".schema responses" +# Should show: status TEXT CHECK(status IN ('pending', 'delivering', 'acked')) +``` + +### Bug 2 Verification + +```bash +# Check logs for mention validation messages +tail -f .tinyclaw/logs/queue.log | grep -i "mention" +# Should see: "Valid mention: @agent1 → @agent2" or "Invalid mention ..." +``` + +### Bug 3 Verification + +```bash +# Check for race condition debug messages +tail -f .tinyclaw/logs/queue.log | grep -i "pending" +# Should see: "Conversation X: pending incremented to N" and "decremented to N" +``` + +## Rollback + +If you need to rollback, restore from git: + +```bash +git checkout -- src/lib/db.ts +git checkout -- src/lib/routing.ts +git checkout -- src/lib/conversation.ts +git checkout -- src/queue-processor.ts +git checkout -- src/channels/telegram-client.ts +# ... restore other modified files +``` + +## Testing Recommendations + +### Test Bug 1 Fix + +1. Send a message through Telegram +2. Verify response is received +3. Simulate network failure (if possible) +4. Verify no duplicate responses are sent + +### Test Bug 2 Fix + +1. Configure a team with multiple agents +2. Send message with mixed-case mention: `[@Coder: help]` where agent ID is `coder` +3. Check logs for validation messages +4. Verify mentioned agent is activated + +### Test Bug 3 Fix + +1. Configure a team with 2+ agents +2. Send message: `[@agent1,agent2: task]` +3. Both agents should respond +4. Final aggregated response should be sent to user + +## Additional Notes + +### Discord/WhatsApp Client Fixes + +The same pattern from `bug1-telegram.patch` should be applied to: + +- `src/channels/discord-client.ts` (lines ~369-455) +- `src/channels/whatsapp-client.ts` (lines ~369-445) + +Key changes needed: +1. Add `inFlightResponses` tracking Set +2. Add `deliveringResponses` Map for retry tracking +3. Claim response before sending +4. Track delivery attempts +5. Handle max retry exceeded + +### Database Migration + +The patches add a new column `delivering_at` to the responses table. For existing databases: + +```sql +-- Run this if you get schema errors +ALTER TABLE responses ADD COLUMN delivering_at INTEGER; +``` + +## Support + +For questions or issues with these patches: +1. Check the detailed solution document: `tinyclaw-bug-solutions.md` +2. Review the original bug investigation in the GitHub issue +3. Test changes in a development environment first diff --git a/docs/bug-fixes/tinyclaw-bug-solutions.md b/docs/bug-fixes/tinyclaw-bug-solutions.md new file mode 100644 index 0000000..2911815 --- /dev/null +++ b/docs/bug-fixes/tinyclaw-bug-solutions.md @@ -0,0 +1,842 @@ +# TinyClaw Bug Solutions + +## Executive Summary + +This document provides detailed solutions for three critical bugs identified in the tinyClaw codebase: + +| Bug | Severity | Location | Status | +|-----|----------|----------|--------| +| Bug 1: Channel response drops | Critical | All channel clients (Telegram, Discord, WhatsApp) | Proposed fix with atomic ack-before-send | +| Bug 2: Inter-agent mention failures | High | `routing.ts` + `queue-processor.ts` | Proposed fix with validation logging & case-insensitive matching | +| Bug 3: Multi-agent reply loss | Critical | `queue-processor.ts` race condition | Proposed fix with atomic decrement operation | + +--- + +## Bug 1: Telegram/Discord/WhatsApp Response Drops + +### Problem Analysis + +The send-then-ack flow in all channel clients is **not atomic**: + +``` +1. Fetch pending responses from DB ✓ +2. bot.sendMessage() to user ✓ succeeds +3. fetch(/api/responses/{id}/ack) ✗ fails (network blip, server restart) + → Response stays "pending" in DB + → Next poll: duplicate send OR response appears "lost" +``` + +The old file-based system used atomic `fs.unlinkSync()` — either the file was deleted or it threw. The new SQLite flow has a gap between send and ack where failures are silently caught and logged. + +### Affected Code Locations + +- `src/channels/telegram-client.ts` lines 467-528 +- `src/channels/discord-client.ts` lines 382-448 +- `src/channels/whatsapp-client.ts` lines 382-438 + +### Root Cause + +```typescript +// telegram-client.ts (lines 507-520) +await bot.sendMessage(targetChatId, chunks[0]!, ...); // Step 1: Send succeeds +// ... +await fetch(`${API_BASE}/api/responses/${resp.id}/ack`, { method: 'POST' }); // Step 2: May fail +``` + +If the ack fails (network error, server restart), the response remains in `pending` status in the database and will be resent on the next poll cycle. + +### Solution: Ack-Before-Send with Retry Queue + +The fix requires **two changes**: + +#### Change 1A: Modify `db.ts` - Add "delivering" Status + +Add a new status to track responses that are in the process of being delivered: + +```typescript +// src/lib/db.ts + +export interface DbResponse { + id: number; + message_id: string; + channel: string; + sender: string; + sender_id: string | null; + message: string; + original_message: string; + agent: string | null; + files: string | null; + status: 'pending' | 'delivering' | 'acked'; // Add 'delivering' + created_at: number; + acked_at: number | null; + delivering_at: number | null; // Add this field +} + +// Add new function to atomically claim a response for delivery +export function claimResponseForDelivery(responseId: number): boolean { + const d = getDb(); + const result = d.prepare(` + UPDATE responses + SET status = 'delivering', delivering_at = ? + WHERE id = ? AND status = 'pending' + `).run(Date.now(), responseId); + return result.changes > 0; +} + +// Add function to recover stuck delivering responses +export function recoverStuckDeliveringResponses(thresholdMs = 5 * 60 * 1000): number { + const cutoff = Date.now() - thresholdMs; + const result = getDb().prepare(` + UPDATE responses + SET status = 'pending', delivering_at = NULL + WHERE status = 'delivering' AND delivering_at < ? + `).run(cutoff); + return result.changes; +} +``` + +#### Change 1B: Modify Channel Clients - Ack-Before-Send Pattern + +**For Telegram Client (`src/channels/telegram-client.ts`):** + +```typescript +// Replace the checkOutgoingQueue function (lines 455-535) + +async function checkOutgoingQueue(): Promise { + if (processingOutgoingQueue) { + return; + } + + processingOutgoingQueue = true; + + try { + const res = await fetch(`${API_BASE}/api/responses/pending?channel=telegram`); + if (!res.ok) return; + const responses = await res.json() as any[]; + + for (const resp of responses) { + // STEP 1: Atomically claim this response for delivery + const claimRes = await fetch(`${API_BASE}/api/responses/${resp.id}/claim`, { + method: 'POST' + }); + + if (!claimRes.ok) { + // Another instance claimed it, skip + log('INFO', `Response ${resp.id} already being processed by another instance`); + continue; + } + + const claimed = await claimRes.json(); + if (!claimed.success) { + continue; // Already being delivered + } + + try { + const responseText = resp.message; + const messageId = resp.messageId; + const sender = resp.sender; + const senderId = resp.senderId; + const files: string[] = resp.files || []; + + // Find pending message, or fall back to senderId for proactive messages + const pending = pendingMessages.get(messageId); + const targetChatId = pending?.chatId ?? (senderId ? Number(senderId) : null); + + if (targetChatId && !Number.isNaN(targetChatId)) { + // STEP 2: Send the message (may fail, but response is already claimed) + try { + // Send any attached files first + if (files.length > 0) { + for (const file of files) { + try { + if (!fs.existsSync(file)) continue; + const ext = path.extname(file).toLowerCase(); + if (['.jpg', '.jpeg', '.png', '.gif', '.webp'].includes(ext)) { + await bot.sendPhoto(targetChatId, file); + } else if (['.mp3', '.ogg', '.wav', '.m4a'].includes(ext)) { + await bot.sendAudio(targetChatId, file); + } else if (['.mp4', '.avi', '.mov', '.webm'].includes(ext)) { + await bot.sendVideo(targetChatId, file); + } else { + await bot.sendDocument(targetChatId, file); + } + log('INFO', `Sent file to Telegram: ${path.basename(file)}`); + } catch (fileErr) { + log('ERROR', `Failed to send file ${file}: ${(fileErr as Error).message}`); + // Continue to send other files and message + } + } + } + + // Split message if needed (Telegram 4096 char limit) + if (responseText) { + const chunks = splitMessage(responseText); + + if (chunks.length > 0) { + await bot.sendMessage(targetChatId, chunks[0]!, pending + ? { reply_to_message_id: pending.messageId } + : {}, + ); + } + for (let i = 1; i < chunks.length; i++) { + await bot.sendMessage(targetChatId, chunks[i]!); + } + } + + log('INFO', `Sent ${pending ? 'response' : 'proactive message'} to ${sender} (${responseText.length} chars${files.length > 0 ? `, ${files.length} file(s)` : ''})`); + + if (pending) pendingMessages.delete(messageId); + + // STEP 3: Final ack (cleanup) + await fetch(`${API_BASE}/api/responses/${resp.id}/ack`, { method: 'POST' }); + } catch (sendError) { + // Send failed - unclaim the response so it can be retried + log('ERROR', `Failed to send response ${resp.id}: ${(sendError as Error).message}`); + await fetch(`${API_BASE}/api/responses/${resp.id}/unclaim`, { method: 'POST' }); + } + } else { + log('WARN', `No pending message for ${messageId} and no valid senderId, acking`); + await fetch(`${API_BASE}/api/responses/${resp.id}/ack`, { method: 'POST' }); + } + } catch (error) { + log('ERROR', `Error processing response ${resp.id}: ${(error as Error).message}`); + // Try to unclaim on error + try { + await fetch(`${API_BASE}/api/responses/${resp.id}/unclaim`, { method: 'POST' }); + } catch { + // Ignore unclaim errors + } + } + } + } catch (error) { + log('ERROR', `Outgoing queue error: ${(error as Error).message}`); + } finally { + processingOutgoingQueue = false; + } +} +``` + +#### Change 1C: Add API Endpoints in Server + +Add the new `/claim` and `/unclaim` endpoints to the API server: + +```typescript +// In src/server/index.ts (or wherever API routes are defined) + +// Claim a response for delivery (atomic operation) +app.post('/api/responses/:id/claim', (req, res) => { + const responseId = parseInt(req.params.id); + const success = claimResponseForDelivery(responseId); + res.json({ success }); +}); + +// Unclaim a response (if delivery failed) +app.post('/api/responses/:id/unclaim', (req, res) => { + const responseId = parseInt(req.params.id); + const d = getDb(); + const result = d.prepare(` + UPDATE responses + SET status = 'pending', delivering_at = NULL + WHERE id = ? AND status = 'delivering' + `).run(responseId); + res.json({ success: result.changes > 0 }); +}); +``` + +#### Change 1D: Add Periodic Recovery for Stuck Delivering Responses + +```typescript +// In queue-processor.ts, add to the periodic maintenance section + +setInterval(() => { + const count = recoverStuckDeliveringResponses(); + if (count > 0) log('INFO', `Recovered ${count} stuck delivering response(s)`); +}, 5 * 60 * 1000); // every 5 min +``` + +### Alternative Simpler Solution (If Full Atomicity Not Required) + +If the claim/unclaim pattern is too complex, a simpler approach is to **ack immediately after successful send** with idempotency: + +```typescript +// Simpler pattern - ack immediately, track in-flight +const inFlightResponses = new Set(); + +async function checkOutgoingQueue(): Promise { + // ... fetch responses ... + + for (const resp of responses) { + // Skip if already being processed + if (inFlightResponses.has(resp.id)) continue; + inFlightResponses.add(resp.id); + + try { + // Send message... + await bot.sendMessage(targetChatId, message); + + // Immediately ack after successful send + await fetch(`${API_BASE}/api/responses/${resp.id}/ack`, { method: 'POST' }); + } catch (error) { + log('ERROR', `Send failed for ${resp.id}: ${error}`); + // Will retry on next poll since not acked + } finally { + inFlightResponses.delete(resp.id); + } + } +} +``` + +--- + +## Bug 2: Inter-Agent Comms Don't Activate Mentioned Agent + +### Problem Analysis + +Silent validation failures in `routing.ts:extractTeammateMentions()` cause mentions to be dropped without any logging: + +```typescript +// routing.ts lines 64-70 +for (const candidateId of candidateIds) { + if (!seen.has(candidateId) && isTeammate(candidateId, currentAgentId, teamId, teams, agents)) { + results.push({ teammateId: candidateId, message: fullMessage }); + seen.add(candidateId); + } + // If isTeammate returns false, the mention is silently dropped! +} +``` + +**Common failure modes:** +- **Case sensitivity**: `[@Coder: ...]` fails if agent ID is `coder` +- **Typo in agent name**: silently ignored +- **Cross-team mention**: silently dropped +- **Regex ambiguity**: nested brackets `[@agent: hello [world] there]` can cause early match termination + +### Solution: Add Validation Logging & Fix Case Sensitivity + +#### Change 2A: Modify `routing.ts` - Add Detailed Logging + +```typescript +// src/lib/routing.ts + +import { log } from './logging'; // Add import + +/** + * Check if a mentioned ID is a valid teammate of the current agent in the given team. + * Now returns detailed reason for validation failures. + */ +export function isTeammate( + mentionedId: string, + currentAgentId: string, + teamId: string, + teams: Record, + agents: Record +): { valid: boolean; reason?: string } { + const team = teams[teamId]; + if (!team) { + return { valid: false, reason: `Team '${teamId}' not found` }; + } + + if (mentionedId === currentAgentId) { + return { valid: false, reason: `Self-mention (agent: ${mentionedId})` }; + } + + if (!team.agents.includes(mentionedId)) { + return { valid: false, reason: `Agent '${mentionedId}' not in team '${teamId}' (members: ${team.agents.join(', ')})` }; + } + + if (!agents[mentionedId]) { + return { valid: false, reason: `Agent '${mentionedId}' not found in agents config` }; + } + + return { valid: true }; +} + +/** + * Extract teammate mentions with detailed logging for debugging. + * Supports case-insensitive agent ID matching. + */ +export function extractTeammateMentions( + response: string, + currentAgentId: string, + teamId: string, + teams: Record, + agents: Record +): { teammateId: string; message: string }[] { + const results: { teammateId: string; message: string }[] = []; + const seen = new Set(); + + // Build case-insensitive agent lookup map + const agentIdMap = new Map(); + for (const id of Object.keys(agents)) { + agentIdMap.set(id.toLowerCase(), id); + } + + // Tag format: [@agent_id: message] or [@agent1,agent2: message] + // Improved regex: handle nested brackets better by using non-greedy match + const tagRegex = /\[@([^\]]+?):\s*([\s\S]*?)\]/g; + + // Strip all [@teammate: ...] tags from the full response to get shared context + const sharedContext = response.replace(tagRegex, '').trim(); + + let tagMatch: RegExpExecArray | null; + let matchCount = 0; + + while ((tagMatch = tagRegex.exec(response)) !== null) { + matchCount++; + const rawAgentList = tagMatch[1]; + const directMessage = tagMatch[2].trim(); + + log('DEBUG', `Found mention tag #${matchCount}: "[@${rawAgentList}: ...]"`); + + const fullMessage = sharedContext + ? `${sharedContext}\n\n------\n\nDirected to you:\n${directMessage}` + : directMessage; + + // Support comma-separated agent IDs: [@coder,reviewer: message] + const candidateIds = rawAgentList.split(',').map(id => id.trim()).filter(Boolean); + + for (const rawCandidateId of candidateIds) { + // Case-insensitive lookup + const candidateId = agentIdMap.get(rawCandidateId.toLowerCase()) || rawCandidateId; + + if (seen.has(candidateId)) { + log('WARN', `Duplicate mention of @${candidateId} ignored`); + continue; + } + + const validation = isTeammate(candidateId, currentAgentId, teamId, teams, agents); + + if (validation.valid) { + results.push({ teammateId: candidateId, message: fullMessage }); + seen.add(candidateId); + log('INFO', `Valid mention: @${currentAgentId} → @${candidateId}`); + } else { + log('WARN', `Invalid mention "[@${rawCandidateId}: ...]" from @${currentAgentId}: ${validation.reason}`); + } + } + } + + if (matchCount === 0) { + log('DEBUG', `No mention tags found in response from @${currentAgentId}`); + } else if (results.length === 0 && matchCount > 0) { + log('WARN', `Found ${matchCount} mention tag(s) but none were valid. Response: "${response.substring(0, 100)}..."`); + } + + return results; +} +``` + +#### Change 2B: Update Callers to Handle New Return Type + +Update `queue-processor.ts` to use the new validation result: + +```typescript +// queue-processor.ts lines 236-259 + +// Check for teammate mentions +const teammateMentions = extractTeammateMentions( + response, agentId, conv.teamContext.teamId, teams, agents +); + +if (teammateMentions.length > 0 && conv.totalMessages < conv.maxMessages) { + // Enqueue internal messages for each mention + conv.pending += teammateMentions.length; + conv.outgoingMentions.set(agentId, teammateMentions.length); + for (const mention of teammateMentions) { + log('INFO', `@${agentId} → @${mention.teammateId}`); + emitEvent('chain_handoff', { teamId: conv.teamContext.teamId, fromAgent: agentId, toAgent: mention.teammateId }); + + const internalMsg = `[Message from teammate @${agentId}]:\n${mention.message}`; + enqueueInternalMessage(conv.id, agentId, mention.teammateId, internalMsg, { + channel: messageData.channel, + sender: messageData.sender, + senderId: messageData.senderId, + messageId: messageData.messageId, + }); + } +} else if (teammateMentions.length === 0 && conv.totalMessages < conv.maxMessages) { + // No valid mentions found - this is expected for leaf responses + log('DEBUG', `Agent @${agentId} produced no valid teammate mentions`); +} +``` + +#### Change 2C: Add Agent Response Validation Helper + +Add a helper to validate agent responses before processing: + +```typescript +// src/lib/routing.ts + +/** + * Validates that an agent response is properly formatted. + * Returns validation result with any errors found. + */ +export function validateAgentResponse( + response: string, + agentId: string, + teamId: string, + teams: Record, + agents: Record +): { valid: boolean; errors: string[]; mentions: string[] } { + const errors: string[] = []; + const mentions: string[] = []; + + // Check for potentially malformed mention tags + const openBrackets = (response.match(/\[@/g) || []).length; + const closeBrackets = (response.match(/\]/g) || []).length; + + if (openBrackets !== closeBrackets) { + errors.push(`Mismatched brackets: ${openBrackets} opening, ${closeBrackets} closing`); + } + + // Extract and validate mentions + const tagRegex = /\[@([^\]]+?):/g; + let match: RegExpExecArray | null; + + while ((match = tagRegex.exec(response)) !== null) { + const rawList = match[1]; + const ids = rawList.split(',').map(id => id.trim()).filter(Boolean); + + for (const rawId of ids) { + const normalizedId = rawId.toLowerCase(); + const actualId = Object.keys(agents).find(id => id.toLowerCase() === normalizedId); + + if (!actualId) { + errors.push(`Unknown agent: @${rawId}`); + } else { + mentions.push(actualId); + const validation = isTeammate(actualId, agentId, teamId, teams, agents); + if (!validation.valid) { + errors.push(`Invalid mention @${rawId}: ${validation.reason}`); + } + } + } + } + + return { valid: errors.length === 0, errors, mentions }; +} +``` + +--- + +## Bug 3: Multi-Agent Reply Loss (Race Condition on conv.pending) + +### Problem Analysis + +When a comma-separated mention like `[@coder,reviewer: message]` is parsed: + +``` +1. conv.pending += 2 (correct - line 243) +2. Both agents process in parallel via agentProcessingChains +3. Both read conv.pending concurrently and decrement independently (line 262) +``` + +Since JavaScript promises can interleave, both agents can read `conv.pending = 2`, both decrement to 1, and neither triggers `completeConversation()` (which fires when `pending === 0`). + +### Affected Code + +- `src/queue-processor.ts` lines 243, 262, 264 + +### Solution: Atomic Decrement with Compare-and-Swap + +#### Change 3A: Add Atomic Operations to Conversation Module + +```typescript +// src/lib/conversation.ts + +import { EventEmitter } from 'events'; + +// Event emitter for conversation state changes +export const conversationEvents = new EventEmitter(); + +/** + * Atomically decrement the pending counter and check if conversation is complete. + * Returns true if the conversation should be completed (pending reached 0). + */ +export function decrementPendingAndCheckComplete(conv: Conversation): boolean { + // Use a lock flag to prevent concurrent decrements + if ((conv as any)._decrementing) { + // Another decrement is in progress, queue this one + return false; + } + + (conv as any)._decrementing = true; + + try { + conv.pending--; + log('DEBUG', `Conversation ${conv.id}: pending decremented to ${conv.pending}`); + + if (conv.pending < 0) { + log('WARN', `Conversation ${conv.id}: pending went negative (${conv.pending}), resetting to 0`); + conv.pending = 0; + } + + const shouldComplete = conv.pending === 0; + + if (shouldComplete) { + conversationEvents.emit('conversation:complete', conv); + } + + return shouldComplete; + } finally { + (conv as any)._decrementing = false; + } +} + +/** + * Safely increment pending counter for new mentions. + */ +export function incrementPending(conv: Conversation, count: number): void { + conv.pending += count; + log('DEBUG', `Conversation ${conv.id}: pending incremented to ${conv.pending} (+${count})`); +} +``` + +#### Change 3B: Update queue-processor.ts to Use Atomic Operations + +```typescript +// queue-processor.ts - Update imports +import { + conversations, + MAX_CONVERSATION_MESSAGES, + enqueueInternalMessage, + completeConversation, + decrementPendingAndCheckComplete, + incrementPending +} from './lib/conversation'; + +// ... in processMessage function, replace lines 243 and 262-268: + +if (teammateMentions.length > 0 && conv.totalMessages < conv.maxMessages) { + // Use atomic increment + incrementPending(conv, teammateMentions.length); + conv.outgoingMentions.set(agentId, teammateMentions.length); + + for (const mention of teammateMentions) { + log('INFO', `@${agentId} → @${mention.teammateId}`); + emitEvent('chain_handoff', { teamId: conv.teamContext.teamId, fromAgent: agentId, toAgent: mention.teammateId }); + + const internalMsg = `[Message from teammate @${agentId}]:\n${mention.message}`; + enqueueInternalMessage(conv.id, agentId, mention.teammateId, internalMsg, { + channel: messageData.channel, + sender: messageData.sender, + senderId: messageData.senderId, + messageId: messageData.messageId, + }); + } +} else if (teammateMentions.length > 0) { + log('WARN', `Conversation ${conv.id} hit max messages (${conv.maxMessages}) — not enqueuing further mentions`); +} + +// This branch is done - use atomic decrement +const shouldComplete = decrementPendingAndCheckComplete(conv); + +if (shouldComplete) { + completeConversation(conv); +} else { + log('INFO', `Conversation ${conv.id}: ${conv.pending} branch(es) still pending`); +} +``` + +#### Change 3C: Alternative Solution Using Promise Sequencing (Simpler) + +If the atomic operations approach is too complex, use promise chaining to ensure sequential access: + +```typescript +// queue-processor.ts + +// Per-conversation processing locks +const conversationLocks = new Map>(); + +async function withConversationLock(convId: string, fn: () => Promise): Promise { + // Get or create lock chain for this conversation + const currentLock = conversationLocks.get(convId) || Promise.resolve(); + + // Create new lock that waits for current and then executes fn + const newLock = currentLock.then(() => fn()).finally(() => { + // Clean up if this was the last lock + if (conversationLocks.get(convId) === newLock) { + conversationLocks.delete(convId); + } + }); + + conversationLocks.set(convId, newLock); + return newLock; +} + +// In processMessage, wrap the pending decrement: +await withConversationLock(conv.id, async () => { + conv.pending--; + + if (conv.pending === 0) { + completeConversation(conv); + } else { + log('INFO', `Conversation ${conv.id}: ${conv.pending} branch(es) still pending`); + } +}); +``` + +#### Change 3D: Add Conversation State Validation + +Add validation to detect and recover from inconsistent states: + +```typescript +// src/lib/conversation.ts + +/** + * Validate conversation state and attempt recovery if needed. + */ +export function validateConversationState(conv: Conversation): { valid: boolean; issues: string[] } { + const issues: string[] = []; + + if (conv.pending < 0) { + issues.push(`Negative pending count: ${conv.pending}`); + } + + if (conv.totalMessages > conv.maxMessages) { + issues.push(`Exceeded max messages: ${conv.totalMessages}/${conv.maxMessages}`); + } + + if (conv.responses.length === 0 && conv.pending === 0) { + issues.push(`Conversation has no responses and no pending work`); + } + + // Check for orphaned conversations (pending > 0 but no actual work) + const expectedPending = conv.outgoingMentions.size > 0 + ? Array.from(conv.outgoingMentions.values()).reduce((a, b) => a + b, 0) + : 1; + + if (conv.pending !== expectedPending) { + issues.push(`Pending mismatch: expected ${expectedPending}, got ${conv.pending}`); + } + + return { valid: issues.length === 0, issues }; +} + +/** + * Attempt to recover a conversation from an inconsistent state. + */ +export function recoverConversation(conv: Conversation): boolean { + const validation = validateConversationState(conv); + + if (validation.valid) { + return true; + } + + log('WARN', `Attempting to recover conversation ${conv.id}: ${validation.issues.join(', ')}`); + + // Fix negative pending + if (conv.pending < 0) { + conv.pending = 0; + } + + // If no pending work, complete the conversation + if (conv.pending === 0 && conv.responses.length > 0) { + completeConversation(conv); + return true; + } + + return false; +} +``` + +--- + +## Implementation Priority + +### Phase 1: Critical Fixes (Deploy First) + +1. **Bug 3 (Race Condition)** - Most critical, affects multi-agent workflows directly + - Implement Change 3B (atomic decrement) or 3C (promise sequencing) + - Add Change 3D for state validation + +2. **Bug 1 (Response Drops)** - Affects all users + - Implement Change 1A (delivering status) + 1C (API endpoints) + - Update one channel client first (e.g., Telegram) as proof of concept + +### Phase 2: High Priority Fixes + +3. **Bug 2 (Silent Mention Failures)** - Improves debugging and user experience + - Implement Change 2A (logging + case-insensitive matching) + - Implement Change 2B (update callers) + +### Phase 3: Polish + +4. Complete remaining channel clients for Bug 1 +5. Add comprehensive tests for all three fixes + +--- + +## Testing Recommendations + +### For Bug 1 (Response Drops) + +```typescript +// Test case: Simulate ack failure +async function testAckFailure() { + // 1. Enqueue a response + const responseId = enqueueResponse({...}); + + // 2. Simulate network failure during ack + mockFetch.mockRejectedValueOnce(new Error('Network error')); + + // 3. Verify response is still pending + const status = getQueueStatus(); + expect(status.responsesPending).toBe(1); + + // 4. Verify response is not duplicated on retry + // (with claim pattern, should be claimed by first attempt) +} +``` + +### For Bug 2 (Mention Failures) + +```typescript +// Test case: Case-insensitive matching +function testCaseInsensitiveMentions() { + const agents = { coder: { name: 'Coder' }, reviewer: { name: 'Reviewer' } }; + const teams = { devteam: { agents: ['coder', 'reviewer'], leader_agent: 'coder' } }; + + // Should match despite case difference + const mentions = extractTeammateMentions( + '[@Coder: hello]', + 'reviewer', + 'devteam', + teams, + agents + ); + + expect(mentions).toHaveLength(1); + expect(mentions[0].teammateId).toBe('coder'); +} +``` + +### For Bug 3 (Race Condition) + +```typescript +// Test case: Concurrent decrements +async function testConcurrentDecrements() { + const conv = createConversation({ pending: 2 }); + + // Simulate two agents completing simultaneously + await Promise.all([ + decrementPendingAndCheckComplete(conv), + decrementPendingAndCheckComplete(conv) + ]); + + // Should be 0, not 1 + expect(conv.pending).toBe(0); +} +``` + +--- + +## Summary + +| Bug | Root Cause | Solution | Files Modified | +|-----|------------|----------|----------------| +| **Bug 1** | Non-atomic send-then-ack | Add "delivering" status, claim-before-send pattern | `db.ts`, `telegram-client.ts`, `discord-client.ts`, `whatsapp-client.ts`, `server/index.ts` | +| **Bug 2** | Silent validation failures | Add detailed logging, case-insensitive matching | `routing.ts`, `queue-processor.ts` | +| **Bug 3** | Race condition on `conv.pending` | Atomic decrement with locking | `conversation.ts`, `queue-processor.ts` | + +These fixes address all three critical issues while maintaining backward compatibility and adding robust error handling and logging for easier debugging in the future. diff --git a/src/channels/telegram-client.ts b/src/channels/telegram-client.ts index 4d4ef62..52057df 100644 --- a/src/channels/telegram-client.ts +++ b/src/channels/telegram-client.ts @@ -78,6 +78,28 @@ function buildUniqueFilePath(dir: string, preferredName: string): string { const pendingMessages = new Map(); let processingOutgoingQueue = false; +// Track in-flight responses to prevent duplicate processing +const inFlightResponses = new Set(); + +// Track responses being delivered for retry logic +const deliveringResponses = new Map(); + +// Maximum retry attempts for a failed delivery +const MAX_DELIVERY_RETRIES = 3; + +// Helper to track delivery attempt +function trackDeliveryAttempt(responseId: number): boolean { + const existing = deliveringResponses.get(responseId); + if (existing && existing.retryCount >= MAX_DELIVERY_RETRIES) { + return false; // Max retries exceeded + } + deliveringResponses.set(responseId, { + retryCount: (existing?.retryCount || 0) + 1, + lastAttempt: Date.now(), + }); + return true; +} + // Logger function log(level: string, message: string): void { const timestamp = new Date().toISOString(); @@ -465,6 +487,27 @@ async function checkOutgoingQueue(): Promise { const responses = await res.json() as any[]; for (const resp of responses) { + // Skip if already being processed + if (inFlightResponses.has(resp.id)) continue; + + // Check retry count + const deliveryInfo = deliveringResponses.get(resp.id); + if (deliveryInfo && deliveryInfo.retryCount >= MAX_DELIVERY_RETRIES) { + log('ERROR', `Response ${resp.id} exceeded max retries, acking to prevent infinite loop`); + await fetch(`${API_BASE}/api/responses/${resp.id}/ack`, { method: 'POST' }); + deliveringResponses.delete(resp.id); + continue; + } + + // Claim the response for delivery (atomic operation) + const claimRes = await fetch(`${API_BASE}/api/responses/${resp.id}/claim`, { method: 'POST' }); + if (!claimRes.ok || !(await claimRes.json() as { success: boolean }).success) { + log('INFO', `Response ${resp.id} already being processed by another instance`); + continue; + } + + inFlightResponses.add(resp.id); + try { const responseText = resp.message; const messageId = resp.messageId; @@ -477,54 +520,78 @@ async function checkOutgoingQueue(): Promise { const targetChatId = pending?.chatId ?? (senderId ? Number(senderId) : null); if (targetChatId && !Number.isNaN(targetChatId)) { - // Send any attached files first - if (files.length > 0) { - for (const file of files) { - try { - if (!fs.existsSync(file)) continue; - const ext = path.extname(file).toLowerCase(); - if (['.jpg', '.jpeg', '.png', '.gif', '.webp'].includes(ext)) { - await bot.sendPhoto(targetChatId, file); - } else if (['.mp3', '.ogg', '.wav', '.m4a'].includes(ext)) { - await bot.sendAudio(targetChatId, file); - } else if (['.mp4', '.avi', '.mov', '.webm'].includes(ext)) { - await bot.sendVideo(targetChatId, file); - } else { - await bot.sendDocument(targetChatId, file); + try { + // Send any attached files first + if (files.length > 0) { + for (const file of files) { + try { + if (!fs.existsSync(file)) continue; + const ext = path.extname(file).toLowerCase(); + if (['.jpg', '.jpeg', '.png', '.gif', '.webp'].includes(ext)) { + await bot.sendPhoto(targetChatId, file); + } else if (['.mp3', '.ogg', '.wav', '.m4a'].includes(ext)) { + await bot.sendAudio(targetChatId, file); + } else if (['.mp4', '.avi', '.mov', '.webm'].includes(ext)) { + await bot.sendVideo(targetChatId, file); + } else { + await bot.sendDocument(targetChatId, file); + } + log('INFO', `Sent file to Telegram: ${path.basename(file)}`); + } catch (fileErr) { + log('ERROR', `Failed to send file ${file}: ${(fileErr as Error).message}`); } - log('INFO', `Sent file to Telegram: ${path.basename(file)}`); - } catch (fileErr) { - log('ERROR', `Failed to send file ${file}: ${(fileErr as Error).message}`); } } - } - // Split message if needed (Telegram 4096 char limit) - if (responseText) { - const chunks = splitMessage(responseText); + // Split message if needed (Telegram 4096 char limit) + if (responseText) { + const chunks = splitMessage(responseText); - if (chunks.length > 0) { - await bot.sendMessage(targetChatId, chunks[0]!, pending - ? { reply_to_message_id: pending.messageId } - : {}, - ); - } - for (let i = 1; i < chunks.length; i++) { - await bot.sendMessage(targetChatId, chunks[i]!); + if (chunks.length > 0) { + await bot.sendMessage(targetChatId, chunks[0]!, pending + ? { reply_to_message_id: pending.messageId } + : {}, + ); + } + for (let i = 1; i < chunks.length; i++) { + await bot.sendMessage(targetChatId, chunks[i]!); + } } - } - log('INFO', `Sent ${pending ? 'response' : 'proactive message'} to ${sender} (${responseText.length} chars${files.length > 0 ? `, ${files.length} file(s)` : ''})`); + log('INFO', `Sent ${pending ? 'response' : 'proactive message'} to ${sender} (${responseText.length} chars${files.length > 0 ? `, ${files.length} file(s)` : ''})`); - if (pending) pendingMessages.delete(messageId); - await fetch(`${API_BASE}/api/responses/${resp.id}/ack`, { method: 'POST' }); + if (pending) pendingMessages.delete(messageId); + + // Delivery successful - cleanup tracking + deliveringResponses.delete(resp.id); + + // Final ack + await fetch(`${API_BASE}/api/responses/${resp.id}/ack`, { method: 'POST' }); + } catch (sendError) { + // Send failed - unclaim the response so it can be retried + log('ERROR', `Failed to send response ${resp.id}: ${(sendError as Error).message}`); + if (!trackDeliveryAttempt(resp.id)) { + log('ERROR', `Response ${resp.id} max retries exceeded, acking to prevent loop`); + await fetch(`${API_BASE}/api/responses/${resp.id}/ack`, { method: 'POST' }); + } else { + await fetch(`${API_BASE}/api/responses/${resp.id}/unclaim`, { method: 'POST' }); + } + } } else { log('WARN', `No pending message for ${messageId} and no valid senderId, acking`); + deliveringResponses.delete(resp.id); await fetch(`${API_BASE}/api/responses/${resp.id}/ack`, { method: 'POST' }); } } catch (error) { log('ERROR', `Error processing response ${resp.id}: ${(error as Error).message}`); - // Don't ack on error, will retry next poll + // Try to unclaim on error + try { + await fetch(`${API_BASE}/api/responses/${resp.id}/unclaim`, { method: 'POST' }); + } catch { + // Ignore unclaim errors + } + } finally { + inFlightResponses.delete(resp.id); } } } catch (error) { @@ -534,6 +601,16 @@ async function checkOutgoingQueue(): Promise { } } +// Periodic cleanup of stale delivery tracking +setInterval(() => { + const tenMinutesAgo = Date.now() - 10 * 60 * 1000; + for (const [id, info] of deliveringResponses.entries()) { + if (info.lastAttempt < tenMinutesAgo) { + deliveringResponses.delete(id); + } + } +}, 5 * 60 * 1000); + // Check outgoing queue every second setInterval(checkOutgoingQueue, 1000); diff --git a/src/lib/conversation.ts b/src/lib/conversation.ts index e141053..d623fbf 100644 --- a/src/lib/conversation.ts +++ b/src/lib/conversation.ts @@ -11,6 +11,113 @@ export const conversations = new Map(); export const MAX_CONVERSATION_MESSAGES = 50; +// Per-conversation locks to prevent race conditions +const conversationLocks = new Map>(); + +/** + * Execute a function with exclusive access to a conversation. + * This prevents race conditions when multiple agents complete simultaneously. + */ +export async function withConversationLock( + convId: string, + fn: () => Promise +): Promise { + const currentLock = conversationLocks.get(convId) || Promise.resolve(); + + let resolveLock: () => void; + const lockPromise = new Promise((resolve) => { + resolveLock = resolve; + }); + + const newLock = currentLock.then(async () => { + try { + return await fn(); + } finally { + resolveLock(); + } + }); + + conversationLocks.set(convId, lockPromise); + + newLock.finally(() => { + if (conversationLocks.get(convId) === lockPromise) { + conversationLocks.delete(convId); + } + }); + + return newLock; +} + +/** + * Safely increment the pending counter for a conversation. + */ +export function incrementPending(conv: Conversation, count: number): void { + conv.pending += count; + log('DEBUG', `Conversation ${conv.id}: pending incremented to ${conv.pending} (+${count})`); +} + +/** + * Safely decrement the pending counter and check if conversation should complete. + * Returns true if pending reached 0 and conversation should complete. + */ +export function decrementPending(conv: Conversation): boolean { + conv.pending--; + log('DEBUG', `Conversation ${conv.id}: pending decremented to ${conv.pending}`); + + if (conv.pending < 0) { + log('WARN', `Conversation ${conv.id}: pending went negative (${conv.pending}), resetting to 0`); + conv.pending = 0; + } + + return conv.pending === 0; +} + +/** + * Validate conversation state and attempt recovery if needed. + */ +export function validateConversationState(conv: Conversation): { valid: boolean; issues: string[] } { + const issues: string[] = []; + + if (conv.pending < 0) { + issues.push(`Negative pending count: ${conv.pending}`); + } + + if (conv.totalMessages > conv.maxMessages) { + issues.push(`Exceeded max messages: ${conv.totalMessages}/${conv.maxMessages}`); + } + + if (conv.responses.length === 0 && conv.pending === 0) { + issues.push(`Conversation has no responses and no pending work`); + } + + return { valid: issues.length === 0, issues }; +} + +/** + * Attempt to recover a conversation from an inconsistent state. + */ +export function recoverConversation(conv: Conversation): boolean { + const validation = validateConversationState(conv); + + if (validation.valid) { + return true; + } + + log('WARN', `Attempting to recover conversation ${conv.id}: ${validation.issues.join(', ')}`); + + if (conv.pending < 0) { + conv.pending = 0; + } + + if (conv.pending === 0 && conv.responses.length > 0) { + log('INFO', `Auto-completing conversation ${conv.id} with ${conv.responses.length} response(s)`); + completeConversation(conv); + return true; + } + + return false; +} + /** * Enqueue an internal (agent-to-agent) message into the SQLite queue. */ diff --git a/src/lib/db.ts b/src/lib/db.ts index b1690d6..5587ba3 100644 --- a/src/lib/db.ts +++ b/src/lib/db.ts @@ -41,8 +41,9 @@ export interface DbResponse { original_message: string; agent: string | null; files: string | null; // JSON array - status: 'pending' | 'acked'; + status: 'pending' | 'delivering' | 'acked'; created_at: number; + delivering_at: number | null; acked_at: number | null; } @@ -117,8 +118,9 @@ export function initQueueDb(): void { original_message TEXT NOT NULL, agent TEXT, files TEXT, - status TEXT NOT NULL DEFAULT 'pending', + status TEXT NOT NULL DEFAULT 'pending' CHECK(status IN ('pending', 'delivering', 'acked')), created_at INTEGER NOT NULL, + delivering_at INTEGER, acked_at INTEGER ); @@ -245,6 +247,46 @@ export function ackResponse(responseId: number): void { `).run(Date.now(), responseId); } +/** + * Atomically claim a response for delivery. + * Returns true if successfully claimed, false if already claimed or not pending. + */ +export function claimResponseForDelivery(responseId: number): boolean { + const d = getDb(); + const result = d.prepare(` + UPDATE responses + SET status = 'delivering', delivering_at = ? + WHERE id = ? AND status = 'pending' + `).run(Date.now(), responseId); + return result.changes > 0; +} + +/** + * Unclaim a response (if delivery failed) to allow retry. + */ +export function unclaimResponse(responseId: number): boolean { + const d = getDb(); + const result = d.prepare(` + UPDATE responses + SET status = 'pending', delivering_at = NULL + WHERE id = ? AND status = 'delivering' + `).run(responseId); + return result.changes > 0; +} + +/** + * Recover responses stuck in 'delivering' for longer than thresholdMs (default 5 min). + */ +export function recoverStuckDeliveringResponses(thresholdMs = 5 * 60 * 1000): number { + const cutoff = Date.now() - thresholdMs; + const result = getDb().prepare(` + UPDATE responses + SET status = 'pending', delivering_at = NULL + WHERE status = 'delivering' AND delivering_at < ? + `).run(cutoff); + return result.changes; +} + export function getRecentResponses(limit: number): DbResponse[] { return getDb().prepare(` SELECT * FROM responses ORDER BY created_at DESC LIMIT ? diff --git a/src/lib/routing.ts b/src/lib/routing.ts index db6f04f..7f9119c 100644 --- a/src/lib/routing.ts +++ b/src/lib/routing.ts @@ -1,5 +1,6 @@ import path from 'path'; import { AgentConfig, TeamConfig } from './types'; +import { log } from './logging'; /** * Find the first team that contains the given agent. @@ -24,12 +25,27 @@ export function isTeammate( agents: Record ): boolean { const team = teams[teamId]; - if (!team) return false; - return ( - mentionedId !== currentAgentId && - team.agents.includes(mentionedId) && - !!agents[mentionedId] - ); + if (!team) { + log('WARN', `isTeammate check failed: Team '${teamId}' not found`); + return false; + } + + if (mentionedId === currentAgentId) { + log('DEBUG', `isTeammate check failed: Self-mention (agent: ${mentionedId})`); + return false; + } + + if (!team.agents.includes(mentionedId)) { + log('WARN', `isTeammate check failed: Agent '${mentionedId}' not in team '${teamId}' (members: ${team.agents.join(', ')})`); + return false; + } + + if (!agents[mentionedId]) { + log('WARN', `isTeammate check failed: Agent '${mentionedId}' not found in agents config`); + return false; + } + + return true; } /** @@ -46,32 +62,117 @@ export function extractTeammateMentions( const results: { teammateId: string; message: string }[] = []; const seen = new Set(); - // TODO: Support cross-team communication — allow agents to mention agents - // on other teams or use [@team_id: message] to route to another team's leader. + // Build case-insensitive agent lookup map + const agentIdMap = new Map(); + for (const id of Object.keys(agents)) { + agentIdMap.set(id.toLowerCase(), id); + } // Tag format: [@agent_id: message] or [@agent1,agent2: message] - const tagRegex = /\[@(\S+?):\s*([\s\S]*?)\]/g; + // Improved regex: better handling of content with brackets + const tagRegex = /\[@([^\]]+?):\s*([\s\S]*?)\]/g; + // Strip all [@teammate: ...] tags from the full response to get shared context const sharedContext = response.replace(tagRegex, '').trim(); + let tagMatch: RegExpExecArray | null; + let matchCount = 0; + while ((tagMatch = tagRegex.exec(response)) !== null) { + matchCount++; + const rawAgentList = tagMatch[1]; const directMessage = tagMatch[2].trim(); + + log('DEBUG', `Found mention tag #${matchCount}: "[@${rawAgentList}: ...]" from @${currentAgentId}`); + const fullMessage = sharedContext ? `${sharedContext}\n\n------\n\nDirected to you:\n${directMessage}` : directMessage; // Support comma-separated agent IDs: [@coder,reviewer: message] - const candidateIds = tagMatch[1].toLowerCase().split(',').map(id => id.trim()).filter(Boolean); - for (const candidateId of candidateIds) { - if (!seen.has(candidateId) && isTeammate(candidateId, currentAgentId, teamId, teams, agents)) { + const rawCandidateIds = rawAgentList.split(',').map(id => id.trim()).filter(Boolean); + + for (const rawCandidateId of rawCandidateIds) { + // Case-insensitive lookup + const candidateId = agentIdMap.get(rawCandidateId.toLowerCase()) || rawCandidateId; + + if (seen.has(candidateId)) { + log('WARN', `Duplicate mention of @${candidateId} from @${currentAgentId} ignored`); + continue; + } + + if (isTeammate(candidateId, currentAgentId, teamId, teams, agents)) { results.push({ teammateId: candidateId, message: fullMessage }); seen.add(candidateId); + log('INFO', `Valid mention: @${currentAgentId} → @${candidateId}`); } } } + + // Log summary + if (matchCount === 0) { + log('DEBUG', `No mention tags found in response from @${currentAgentId}`); + } else if (results.length === 0) { + log('WARN', `Found ${matchCount} mention tag(s) from @${currentAgentId} but none were valid`); + log('DEBUG', `Response snippet: "${response.substring(0, 200)}..."`); + } else { + log('DEBUG', `Extracted ${results.length} valid mention(s) from ${matchCount} tag(s) for @${currentAgentId}`); + } + return results; } +/** + * Validates that an agent response is properly formatted. + * Returns validation result with any errors found. + */ +export function validateAgentResponse( + response: string, + agentId: string, + teamId: string, + teams: Record, + agents: Record +): { valid: boolean; errors: string[]; mentions: string[] } { + const errors: string[] = []; + const mentions: string[] = []; + + // Check for potentially malformed mention tags + const openBrackets = (response.match(/\[@/g) || []).length; + const closeBrackets = (response.match(/\]/g) || []).length; + + if (openBrackets !== closeBrackets) { + errors.push(`Mismatched brackets: ${openBrackets} opening, ${closeBrackets} closing`); + } + + // Build case-insensitive agent lookup + const agentIdMap = new Map(); + for (const id of Object.keys(agents)) { + agentIdMap.set(id.toLowerCase(), id); + } + + // Extract and validate mentions + const tagRegex = /\[@([^\]]+?):/g; + let match: RegExpExecArray | null; + + while ((match = tagRegex.exec(response)) !== null) { + const rawList = match[1]; + const ids = rawList.split(',').map(id => id.trim()).filter(Boolean); + + for (const rawId of ids) { + const normalizedId = rawId.toLowerCase(); + const actualId = agentIdMap.get(normalizedId); + + if (!actualId) { + errors.push(`Unknown agent: @${rawId}`); + } else { + mentions.push(actualId); + } + } + } + + return { valid: errors.length === 0, errors, mentions }; +} + /** * Get the reset flag path for a specific agent. */ diff --git a/src/queue-processor.ts b/src/queue-processor.ts index 26b16f1..37144a9 100644 --- a/src/queue-processor.ts +++ b/src/queue-processor.ts @@ -29,9 +29,13 @@ import { initQueueDb, claimNextMessage, completeMessage as dbCompleteMessage, failMessage, enqueueResponse, getPendingAgents, recoverStaleMessages, pruneAckedResponses, pruneCompletedMessages, closeQueueDb, queueEvents, DbMessage, + recoverStuckDeliveringResponses, } from './lib/db'; import { handleLongResponse, collectFiles } from './lib/response'; -import { conversations, MAX_CONVERSATION_MESSAGES, enqueueInternalMessage, completeConversation } from './lib/conversation'; +import { + conversations, MAX_CONVERSATION_MESSAGES, enqueueInternalMessage, completeConversation, + withConversationLock, incrementPending, decrementPending, recoverConversation, +} from './lib/conversation'; // Ensure directories exist [FILES_DIR, path.dirname(LOG_FILE), CHATS_DIR].forEach(dir => { @@ -240,7 +244,7 @@ async function processMessage(dbMsg: DbMessage): Promise { if (teammateMentions.length > 0 && conv.totalMessages < conv.maxMessages) { // Enqueue internal messages for each mention - conv.pending += teammateMentions.length; + incrementPending(conv, teammateMentions.length); conv.outgoingMentions.set(agentId, teammateMentions.length); for (const mention of teammateMentions) { log('INFO', `@${agentId} → @${mention.teammateId}`); @@ -258,13 +262,21 @@ async function processMessage(dbMsg: DbMessage): Promise { log('WARN', `Conversation ${conv.id} hit max messages (${conv.maxMessages}) — not enqueuing further mentions`); } - // This branch is done - conv.pending--; + // This branch is done - use atomic decrement with locking + await withConversationLock(conv.id, async () => { + const shouldComplete = decrementPending(conv); - if (conv.pending === 0) { - completeConversation(conv); - } else { - log('INFO', `Conversation ${conv.id}: ${conv.pending} branch(es) still pending`); + if (shouldComplete) { + completeConversation(conv); + } else { + log('INFO', `Conversation ${conv.id}: ${conv.pending} branch(es) still pending`); + } + }); + + // Validate conversation state after processing + const convAfter = conversations.get(conv.id); + if (convAfter) { + recoverConversation(convAfter); } // Mark message as completed in DB @@ -386,6 +398,12 @@ setInterval(() => { if (pruned > 0) log('INFO', `Pruned ${pruned} completed message(s)`); }, 60 * 60 * 1000); // every 1 hr +// Periodic recovery for stuck delivering responses +setInterval(() => { + const count = recoverStuckDeliveringResponses(); + if (count > 0) log('INFO', `Recovered ${count} stuck delivering response(s)`); +}, 5 * 60 * 1000); // every 5 min + // Graceful shutdown process.on('SIGINT', () => { log('INFO', 'Shutting down queue processor...'); diff --git a/src/server/routes/queue.ts b/src/server/routes/queue.ts index 6db82b8..b556393 100644 --- a/src/server/routes/queue.ts +++ b/src/server/routes/queue.ts @@ -4,6 +4,7 @@ import { log } from '../../lib/logging'; import { getQueueStatus, getRecentResponses, getResponsesForChannel, ackResponse, enqueueResponse, getDeadMessages, retryDeadMessage, deleteDeadMessage, + claimResponseForDelivery, unclaimResponse, } from '../../lib/db'; export function createQueueRoutes(conversations: Map) { @@ -91,6 +92,22 @@ export function createQueueRoutes(conversations: Map) { return c.json({ ok: true }); }); + // POST /api/responses/:id/claim — atomically claim for delivery + app.post('/api/responses/:id/claim', (c) => { + const id = parseInt(c.req.param('id'), 10); + if (isNaN(id)) return c.json({ error: 'Invalid response ID' }, 400); + const success = claimResponseForDelivery(id); + return c.json({ success }); + }); + + // POST /api/responses/:id/unclaim — unclaim if delivery failed + app.post('/api/responses/:id/unclaim', (c) => { + const id = parseInt(c.req.param('id'), 10); + if (isNaN(id)) return c.json({ error: 'Invalid response ID' }, 400); + const success = unclaimResponse(id); + return c.json({ success }); + }); + // GET /api/queue/dead app.get('/api/queue/dead', (c) => { return c.json(getDeadMessages()); From e8451e7dfbdc0dcbce2900dbb2fd51b39c8eb40d Mon Sep 17 00:00:00 2001 From: Devain Pal Bansal Date: Mon, 23 Feb 2026 13:41:18 +0530 Subject: [PATCH 2/2] fix: strip channel/sender prefix before parsing @agent routing The messages API route prepends [channel/sender]: to incoming messages, which caused parseAgentRouting() regex to fail since the message no longer starts with @agent_id. This made all messages fall back to the first agent regardless of @mentions. Co-Authored-By: Claude Opus 4.6 --- src/lib/routing.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/lib/routing.ts b/src/lib/routing.ts index 7f9119c..07181f9 100644 --- a/src/lib/routing.ts +++ b/src/lib/routing.ts @@ -189,7 +189,9 @@ export function parseAgentRouting( agents: Record, teams: Record = {} ): { agentId: string; message: string; isTeam?: boolean } { - const match = rawMessage.match(/^@(\S+)\s+([\s\S]*)$/); + // Strip [channel/sender]: prefix added by the messages API route + const stripped = rawMessage.replace(/^\[[^\]]*\]:\s*/, ''); + const match = stripped.match(/^@(\S+)\s+([\s\S]*)$/); if (match) { const candidateId = match[1].toLowerCase();