Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 149 additions & 9 deletions src/gateway/gateway-chat-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,155 @@ export async function handleGatewayMessage(
if (conciergeExecutionNotice) {
req.onTextDelta?.(conciergeExecutionNotice);
}
if (pluginManager) {
await pluginManager.notifyBeforeAgentStart({
sessionId: req.sessionId,
userId: req.userId,
agentId,
channelId: req.channelId,
model: model || undefined,
});
}
const beforeAgentReplyResult = pluginManager
? await pluginManager.runBeforeAgentReply({
sessionId: req.sessionId,
userId: req.userId,
agentId,
channelId: req.channelId,
prompt: agentUserContent,
trigger: 'chat',
workspacePath,
model: model || undefined,
})
: undefined;
if (beforeAgentReplyResult?.handled) {
pluginsUsed = [
...new Set([...pluginsUsed, beforeAgentReplyResult.pluginId]),
];
const syntheticResultText = String(beforeAgentReplyResult.text || '');
const durationMs = Date.now() - startedAt;
logger.debug(
{
...debugMeta,
durationMs,
pluginId: beforeAgentReplyResult.pluginId,
reason: beforeAgentReplyResult.reason ?? null,
syntheticReply: syntheticResultText.trim().length > 0,
},
'Gateway chat intercepted before agent reply',
);
if (syntheticResultText.trim()) {
const storedUserContent = buildStoredUserTurnContent(
userTurnContent,
media,
);
const storedTurn = recordSuccessfulTurn({
sessionId: req.sessionId,
agentId,
chatbotId,
enableRag,
model,
channelId: req.channelId,
promptMode: req.promptMode,
runId,
turnIndex,
userId: req.userId,
username: req.username,
canonicalScopeId: canonicalContextScope,
userContent: storedUserContent,
resultText: syntheticResultText,
toolCallCount: 0,
startedAt,
replaceBuiltInMemory: pluginMemoryBehavior.replacesBuiltInMemory,
});
const storedTurnMessages = buildStoredTurnMessages({
sessionId: req.sessionId,
userId: req.userId,
username: req.username,
userContent: storedUserContent,
resultText: syntheticResultText,
});
if (pluginManager) {
void pluginManager
.notifyTurnComplete({
sessionId: req.sessionId,
userId: req.userId,
agentId,
workspacePath,
messages: storedTurnMessages,
})
.catch((error) => {
logger.warn(
{ sessionId: req.sessionId, agentId, error },
'Plugin turn-complete hooks failed',
);
});
}
if (requestMessages !== null) {
maybeRecordGatewayRequestLog({
sessionId: req.sessionId,
model,
chatbotId,
messages: requestMessages,
status: 'success',
response: syntheticResultText,
toolExecutions: [],
toolsUsed: [],
durationMs,
});
}
return attachSessionIdentity({
status: 'success',
result: syntheticResultText,
toolsUsed: [],
pluginsUsed,
userMessageId: storedTurn.userMessageId,
assistantMessageId: storedTurn.assistantMessageId,
});
}
recordAuditEvent({
sessionId: req.sessionId,
runId,
event: {
type: 'turn.end',
turnIndex,
finishReason: 'plugin_silent',
},
});
recordAuditEvent({
sessionId: req.sessionId,
runId,
event: {
type: 'session.end',
reason: 'normal',
stats: {
userMessages: 1,
assistantMessages: 0,
toolCalls: 0,
durationMs,
},
},
});
if (requestMessages !== null) {
maybeRecordGatewayRequestLog({
sessionId: req.sessionId,
model,
chatbotId,
messages: requestMessages,
status: 'success',
response: '',
toolExecutions: [],
toolsUsed: [],
durationMs,
});
}
return attachSessionIdentity({
status: 'success',
result: '',
toolsUsed: [],
pluginsUsed,
});
}
recordAuditEvent({
sessionId: req.sessionId,
runId,
Expand All @@ -855,15 +1004,6 @@ export async function handleGatewayMessage(
systemPrompt: readSystemPromptMessage(messages),
},
});
if (pluginManager) {
await pluginManager.notifyBeforeAgentStart({
sessionId: req.sessionId,
userId: req.userId,
agentId,
channelId: req.channelId,
model: model || undefined,
});
}
agentStage = 'awaiting-agent-output';
const output = await runAgent({
sessionId: req.executionSessionId || req.sessionId,
Expand Down
8 changes: 7 additions & 1 deletion src/gateway/gateway-plugin-runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@ export async function tryEnsurePluginManagerInitializedForGateway(params: {
sessionId: string;
channelId: string;
agentId?: string | null;
surface: 'chat' | 'command' | 'webhook';
surface:
| 'chat'
| 'command'
| 'webhook'
| 'bootstrap'
| 'heartbeat'
| 'scheduler';
}): Promise<{
pluginManager: PluginManager | null;
pluginInitError: unknown;
Expand Down
99 changes: 95 additions & 4 deletions src/gateway/gateway-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4114,6 +4114,7 @@ export async function ensureGatewayBootstrapAutostart(params: {
const enableRag = session.enable_rag === 1;
const provider = resolveModelProvider(resolved.model);
const turnIndex = Math.max(1, session.message_count + 1);
const bootstrapPrompt = buildBootstrapAutostartPrompt(bootstrapFile);

recordAuditEvent({
sessionId: session.id,
Expand All @@ -4133,7 +4134,7 @@ export async function ensureGatewayBootstrapAutostart(params: {
event: {
type: 'turn.start',
turnIndex,
userInput: buildBootstrapAutostartPrompt(bootstrapFile),
userInput: bootstrapPrompt,
username: normalizedUsername,
mediaCount: 0,
source: BOOTSTRAP_AUTOSTART_SOURCE,
Expand Down Expand Up @@ -4205,7 +4206,7 @@ export async function ensureGatewayBootstrapAutostart(params: {
const { messages } = buildConversationContext({
agentId: resolved.agentId,
history: [],
currentUserContent: buildBootstrapAutostartPrompt(bootstrapFile),
currentUserContent: bootstrapPrompt,
extraSafetyText:
'Bootstrap kickoff turn. Start the conversation proactively with a concise user-facing opening message.',
runtimeInfo: {
Expand All @@ -4221,15 +4222,15 @@ export async function ensureGatewayBootstrapAutostart(params: {
});
messages.push({
role: 'user',
content: buildBootstrapAutostartPrompt(bootstrapFile),
content: bootstrapPrompt,
});

const { pluginManager } = await tryEnsurePluginManagerInitializedForGateway(
{
sessionId: session.id,
channelId,
agentId: resolved.agentId,
surface: 'chat',
surface: 'bootstrap',
},
);
if (pluginManager) {
Expand All @@ -4248,6 +4249,96 @@ export async function ensureGatewayBootstrapAutostart(params: {
model: resolved.model || undefined,
});
}
const beforeAgentReplyResult = pluginManager
? await pluginManager.runBeforeAgentReply({
sessionId: session.id,
userId: normalizedUserId,
agentId: resolved.agentId,
channelId,
prompt: bootstrapPrompt,
trigger: 'bootstrap',
workspacePath,
model: resolved.model || undefined,
})
: undefined;
if (beforeAgentReplyResult?.handled) {
const syntheticResultText = String(beforeAgentReplyResult.text || '');
const durationMs = Date.now() - startedAt;
if (!syntheticResultText.trim()) {
setMemoryValue(session.id, BOOTSTRAP_AUTOSTART_MARKER_KEY, {
status: 'completed',
completedAt: new Date().toISOString(),
});
recordAuditEvent({
sessionId: session.id,
runId,
event: {
type: 'turn.end',
turnIndex,
finishReason: 'plugin_silent',
},
});
recordAuditEvent({
sessionId: session.id,
runId,
event: {
type: 'session.end',
reason: 'normal',
stats: {
userMessages: 0,
assistantMessages: 0,
toolCalls: 0,
durationMs,
},
},
});
return;
}
const assistantMessageId = memoryService.storeMessage({
sessionId: session.id,
userId: 'assistant',
username: null,
role: 'assistant',
content: syntheticResultText,
});
appendSessionTranscript(resolved.agentId, {
sessionId: session.id,
channelId,
role: 'assistant',
userId: 'assistant',
username: null,
content: syntheticResultText,
});
setMemoryValue(session.id, BOOTSTRAP_AUTOSTART_MARKER_KEY, {
status: 'completed',
assistantMessageId,
completedAt: new Date().toISOString(),
});
recordAuditEvent({
sessionId: session.id,
runId,
event: {
type: 'turn.end',
turnIndex,
finishReason: 'completed',
},
});
recordAuditEvent({
sessionId: session.id,
runId,
event: {
type: 'session.end',
reason: 'normal',
stats: {
userMessages: 0,
assistantMessages: 1,
toolCalls: 0,
durationMs,
},
},
});
return;
}

recordAuditEvent({
sessionId: session.id,
Expand Down
55 changes: 55 additions & 0 deletions src/gateway/gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ import {
import { handleGatewayMessage } from './gateway-chat-service.js';
import { classifyGatewayError } from './gateway-error-utils.js';
import { startGatewayHttpServer } from './gateway-http-server.js';
import { tryEnsurePluginManagerInitializedForGateway } from './gateway-plugin-runtime.js';
import {
initGatewayService,
stopGatewayPlugins,
Expand Down Expand Up @@ -1711,6 +1712,60 @@ async function runScheduledTask(
}

if (request.actionKind === 'system_event') {
const systemChannelId =
request.channelId || resolvedDeliveryChannelId || 'scheduler';
const { pluginManager } = await tryEnsurePluginManagerInitializedForGateway(
{
sessionId: request.sessionId,
channelId: systemChannelId,
agentId: request.agentId ?? null,
surface: 'scheduler',
},
);
const beforeAgentReplyResult = pluginManager
? await pluginManager.runBeforeAgentReply({
sessionId: request.sessionId,
userId: 'scheduler',
agentId: request.agentId || 'main',
channelId: systemChannelId,
prompt: request.prompt,
trigger: 'scheduler',
})
: undefined;
if (beforeAgentReplyResult?.handled) {
const syntheticResult = String(beforeAgentReplyResult.text || '').trim();
if (!syntheticResult) {
logger.debug(
{
jobId: request.jobId,
taskId: request.taskId,
pluginId: beforeAgentReplyResult.pluginId,
reason: beforeAgentReplyResult.reason ?? null,
},
'Scheduled system event intercepted without delivery',
);
return;
}
if (request.delivery.kind === 'webhook') {
await deliverWebhookMessage(
request.delivery.webhookUrl,
syntheticResult,
`${sourceLabel}:system`,
);
return;
}
if (!resolvedDeliveryChannelId) {
throw new Error(
'No delivery channel available for scheduled system event delivery.',
);
}
await deliverProactiveMessage(
resolvedDeliveryChannelId,
syntheticResult,
`${sourceLabel}:system`,
);
return;
}
if (request.delivery.kind === 'webhook') {
await deliverWebhookMessage(
request.delivery.webhookUrl,
Expand Down
Loading
Loading