Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions src/lib/conversation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,67 @@ export const conversations = new Map<string, Conversation>();

export const MAX_CONVERSATION_MESSAGES = 50;

// Per-conversation locks to prevent race conditions
const conversationLocks = new Map<string, Promise<void>>();

/**
* Execute a function with exclusive access to a conversation.
* This prevents race conditions when multiple agents complete simultaneously.
*/
export async function withConversationLock<T>(
convId: string,
fn: () => Promise<T>
): Promise<T> {
const currentLock = conversationLocks.get(convId) || Promise.resolve();

let resolveLock: () => void;
const lockPromise = new Promise<void>((resolve) => {
resolveLock = resolve;
});

const newLock = currentLock.then(async () => {
try {
return await fn();
} finally {
resolveLock();
}
});

conversationLocks.set(convId, lockPromise);

newLock.finally(() => {
if (conversationLocks.get(convId) === lockPromise) {
conversationLocks.delete(convId);
}
});

return newLock;
}

/**
* Safely increment the pending counter for a conversation.
*/
export function incrementPending(conv: Conversation, count: number): void {
conv.pending += count;
log('DEBUG', `Conversation ${conv.id}: pending incremented to ${conv.pending} (+${count})`);
}

/**
* Safely decrement the pending counter and check if conversation should complete.
* Returns true if pending reached 0 and conversation should complete.
*/
export function decrementPending(conv: Conversation): boolean {
conv.pending--;
log('DEBUG', `Conversation ${conv.id}: pending decremented to ${conv.pending}`);

if (conv.pending < 0) {
log('WARN', `Conversation ${conv.id}: pending went negative (${conv.pending}), resetting to 0`);
conv.pending = 0;
}

return conv.pending === 0;
}

/**
* Enqueue an internal (agent-to-agent) message into the SQLite queue.
*/
Expand Down
50 changes: 34 additions & 16 deletions src/lib/routing.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import path from 'path';
import { AgentConfig, TeamConfig } from './types';
import { log } from './logging';

/**
* Find the first team that contains the given agent.
Expand All @@ -24,12 +25,27 @@ export function isTeammate(
agents: Record<string, AgentConfig>
): boolean {
const team = teams[teamId];
if (!team) return false;
return (
mentionedId !== currentAgentId &&
team.agents.includes(mentionedId) &&
!!agents[mentionedId]
);
if (!team) {
log('WARN', `isTeammate check failed: Team '${teamId}' not found`);
return false;
}

if (mentionedId === currentAgentId) {
log('DEBUG', `isTeammate check failed: Self-mention (agent: ${mentionedId})`);
return false;
}

if (!team.agents.includes(mentionedId)) {
log('WARN', `isTeammate check failed: Agent '${mentionedId}' not in team '${teamId}' (members: ${team.agents.join(', ')})`);
return false;
}

if (!agents[mentionedId]) {
log('WARN', `isTeammate check failed: Agent '${mentionedId}' not found in agents config`);
return false;
}

return true;
}

/**
Expand All @@ -46,13 +62,12 @@ export function extractTeammateMentions(
const results: { teammateId: string; message: string }[] = [];
const seen = new Set<string>();

// TODO: Support cross-team communication — allow agents to mention agents
// on other teams or use [@team_id: message] to route to another team's leader.

// Tag format: [@agent_id: message] or [@agent1,agent2: message]
const tagRegex = /\[@(\S+?):\s*([\s\S]*?)\]/g;
const tagRegex = /\[@([^\]]+?):\s*([\s\S]*?)\]/g;

// Strip all [@teammate: ...] tags from the full response to get shared context
const sharedContext = response.replace(tagRegex, '').trim();

let tagMatch: RegExpExecArray | null;
while ((tagMatch = tagRegex.exec(response)) !== null) {
const directMessage = tagMatch[2].trim();
Expand Down Expand Up @@ -88,31 +103,34 @@ export function parseAgentRouting(
agents: Record<string, AgentConfig>,
teams: Record<string, TeamConfig> = {}
): { agentId: string; message: string; isTeam?: boolean } {
const match = rawMessage.match(/^@(\S+)\s+([\s\S]*)$/);
// Match @agent_id, optionally preceded by [channel/sender]: prefix from messages API
const match = rawMessage.match(/^(\[[^\]]*\]:\s*)?@(\S+)\s+([\s\S]*)$/);
if (match) {
const candidateId = match[1].toLowerCase();
const prefix = match[1] || '';
const candidateId = match[2].toLowerCase();
const message = prefix + match[3];

// Check agent IDs
if (agents[candidateId]) {
return { agentId: candidateId, message: match[2] };
return { agentId: candidateId, message };
}

// Check team IDs — resolve to leader agent
if (teams[candidateId]) {
return { agentId: teams[candidateId].leader_agent, message: match[2], isTeam: true };
return { agentId: teams[candidateId].leader_agent, message, isTeam: true };
}

// Match by agent name (case-insensitive)
for (const [id, config] of Object.entries(agents)) {
if (config.name.toLowerCase() === candidateId) {
return { agentId: id, message: match[2] };
return { agentId: id, message };
}
}

// Match by team name (case-insensitive)
for (const [, config] of Object.entries(teams)) {
if (config.name.toLowerCase() === candidateId) {
return { agentId: config.leader_agent, message: match[2], isTeam: true };
return { agentId: config.leader_agent, message, isTeam: true };
}
}
}
Expand Down
23 changes: 14 additions & 9 deletions src/queue-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ import {
pruneAckedResponses, pruneCompletedMessages, closeQueueDb, queueEvents, DbMessage,
} from './lib/db';
import { handleLongResponse, collectFiles } from './lib/response';
import { conversations, MAX_CONVERSATION_MESSAGES, enqueueInternalMessage, completeConversation } from './lib/conversation';
import {
conversations, MAX_CONVERSATION_MESSAGES, enqueueInternalMessage, completeConversation,
withConversationLock, incrementPending, decrementPending,
} from './lib/conversation';

// Ensure directories exist
[FILES_DIR, path.dirname(LOG_FILE), CHATS_DIR].forEach(dir => {
Expand Down Expand Up @@ -240,7 +243,7 @@ async function processMessage(dbMsg: DbMessage): Promise<void> {

if (teammateMentions.length > 0 && conv.totalMessages < conv.maxMessages) {
// Enqueue internal messages for each mention
conv.pending += teammateMentions.length;
incrementPending(conv, teammateMentions.length);
conv.outgoingMentions.set(agentId, teammateMentions.length);
for (const mention of teammateMentions) {
log('INFO', `@${agentId} → @${mention.teammateId}`);
Expand All @@ -258,14 +261,16 @@ async function processMessage(dbMsg: DbMessage): Promise<void> {
log('WARN', `Conversation ${conv.id} hit max messages (${conv.maxMessages}) — not enqueuing further mentions`);
}

// This branch is done
conv.pending--;
// This branch is done - use atomic decrement with locking
await withConversationLock(conv.id, async () => {
const shouldComplete = decrementPending(conv);

if (conv.pending === 0) {
completeConversation(conv);
} else {
log('INFO', `Conversation ${conv.id}: ${conv.pending} branch(es) still pending`);
}
if (shouldComplete) {
completeConversation(conv);
} else {
log('INFO', `Conversation ${conv.id}: ${conv.pending} branch(es) still pending`);
}
});

// Mark message as completed in DB
dbCompleteMessage(dbMsg.id);
Expand Down