From deffa737a3db263e2ee5432d3700650883cbc581 Mon Sep 17 00:00:00 2001 From: Benedikt Koehler Date: Fri, 10 Apr 2026 20:04:11 +0200 Subject: [PATCH] feat: add before_agent_reply plugin hook --- src/gateway/gateway-chat-service.ts | 158 ++++++++++++++++- src/gateway/gateway-plugin-runtime.ts | 8 +- src/gateway/gateway-service.ts | 99 ++++++++++- src/gateway/gateway.ts | 55 ++++++ src/plugins/plugin-manager.ts | 35 ++++ src/plugins/plugin-sdk.ts | 3 + src/plugins/plugin-types.ts | 30 ++++ src/scheduler/heartbeat.ts | 164 +++++++++++++++--- src/scheduler/scheduled-task-runner.ts | 86 +++++++-- ...ateway-service.bootstrap-autostart.test.ts | 2 + tests/gateway-service.plugins.test.ts | 66 +++++++ tests/heartbeat.test.ts | 116 +++++++++---- .../plugin-manager.before-agent-reply.test.ts | 164 ++++++++++++++++++ ...led-task-runner.before-agent-reply.test.ts | 142 +++++++++++++++ 14 files changed, 1039 insertions(+), 89 deletions(-) create mode 100644 tests/plugin-manager.before-agent-reply.test.ts create mode 100644 tests/scheduled-task-runner.before-agent-reply.test.ts diff --git a/src/gateway/gateway-chat-service.ts b/src/gateway/gateway-chat-service.ts index 60feb04b..378ae27c 100644 --- a/src/gateway/gateway-chat-service.ts +++ b/src/gateway/gateway-chat-service.ts @@ -843,6 +843,155 @@ export async function handleGatewayMessage( if (conciergeExecutionNotice) { req.onTextDelta?.(conciergeExecutionNotice); } + if (pluginManager) { + await pluginManager.notifyBeforeAgentStart({ + sessionId: req.sessionId, + userId: req.userId, + agentId, + channelId: req.channelId, + model: model || undefined, + }); + } + const beforeAgentReplyResult = pluginManager + ? await pluginManager.runBeforeAgentReply({ + sessionId: req.sessionId, + userId: req.userId, + agentId, + channelId: req.channelId, + prompt: agentUserContent, + trigger: 'chat', + workspacePath, + model: model || undefined, + }) + : undefined; + if (beforeAgentReplyResult?.handled) { + pluginsUsed = [ + ...new Set([...pluginsUsed, beforeAgentReplyResult.pluginId]), + ]; + const syntheticResultText = String(beforeAgentReplyResult.text || ''); + const durationMs = Date.now() - startedAt; + logger.debug( + { + ...debugMeta, + durationMs, + pluginId: beforeAgentReplyResult.pluginId, + reason: beforeAgentReplyResult.reason ?? null, + syntheticReply: syntheticResultText.trim().length > 0, + }, + 'Gateway chat intercepted before agent reply', + ); + if (syntheticResultText.trim()) { + const storedUserContent = buildStoredUserTurnContent( + userTurnContent, + media, + ); + const storedTurn = recordSuccessfulTurn({ + sessionId: req.sessionId, + agentId, + chatbotId, + enableRag, + model, + channelId: req.channelId, + promptMode: req.promptMode, + runId, + turnIndex, + userId: req.userId, + username: req.username, + canonicalScopeId: canonicalContextScope, + userContent: storedUserContent, + resultText: syntheticResultText, + toolCallCount: 0, + startedAt, + replaceBuiltInMemory: pluginMemoryBehavior.replacesBuiltInMemory, + }); + const storedTurnMessages = buildStoredTurnMessages({ + sessionId: req.sessionId, + userId: req.userId, + username: req.username, + userContent: storedUserContent, + resultText: syntheticResultText, + }); + if (pluginManager) { + void pluginManager + .notifyTurnComplete({ + sessionId: req.sessionId, + userId: req.userId, + agentId, + workspacePath, + messages: storedTurnMessages, + }) + .catch((error) => { + logger.warn( + { sessionId: req.sessionId, agentId, error }, + 'Plugin turn-complete hooks failed', + ); + }); + } + if (requestMessages !== null) { + maybeRecordGatewayRequestLog({ + sessionId: req.sessionId, + model, + chatbotId, + messages: requestMessages, + status: 'success', + response: syntheticResultText, + toolExecutions: [], + toolsUsed: [], + durationMs, + }); + } + return attachSessionIdentity({ + status: 'success', + result: syntheticResultText, + toolsUsed: [], + pluginsUsed, + userMessageId: storedTurn.userMessageId, + assistantMessageId: storedTurn.assistantMessageId, + }); + } + recordAuditEvent({ + sessionId: req.sessionId, + runId, + event: { + type: 'turn.end', + turnIndex, + finishReason: 'plugin_silent', + }, + }); + recordAuditEvent({ + sessionId: req.sessionId, + runId, + event: { + type: 'session.end', + reason: 'normal', + stats: { + userMessages: 1, + assistantMessages: 0, + toolCalls: 0, + durationMs, + }, + }, + }); + if (requestMessages !== null) { + maybeRecordGatewayRequestLog({ + sessionId: req.sessionId, + model, + chatbotId, + messages: requestMessages, + status: 'success', + response: '', + toolExecutions: [], + toolsUsed: [], + durationMs, + }); + } + return attachSessionIdentity({ + status: 'success', + result: '', + toolsUsed: [], + pluginsUsed, + }); + } recordAuditEvent({ sessionId: req.sessionId, runId, @@ -855,15 +1004,6 @@ export async function handleGatewayMessage( systemPrompt: readSystemPromptMessage(messages), }, }); - if (pluginManager) { - await pluginManager.notifyBeforeAgentStart({ - sessionId: req.sessionId, - userId: req.userId, - agentId, - channelId: req.channelId, - model: model || undefined, - }); - } agentStage = 'awaiting-agent-output'; const output = await runAgent({ sessionId: req.executionSessionId || req.sessionId, diff --git a/src/gateway/gateway-plugin-runtime.ts b/src/gateway/gateway-plugin-runtime.ts index b80e9d09..3d8855d8 100644 --- a/src/gateway/gateway-plugin-runtime.ts +++ b/src/gateway/gateway-plugin-runtime.ts @@ -8,7 +8,13 @@ export async function tryEnsurePluginManagerInitializedForGateway(params: { sessionId: string; channelId: string; agentId?: string | null; - surface: 'chat' | 'command' | 'webhook'; + surface: + | 'chat' + | 'command' + | 'webhook' + | 'bootstrap' + | 'heartbeat' + | 'scheduler'; }): Promise<{ pluginManager: PluginManager | null; pluginInitError: unknown; diff --git a/src/gateway/gateway-service.ts b/src/gateway/gateway-service.ts index 7d5d1dc9..1870896f 100644 --- a/src/gateway/gateway-service.ts +++ b/src/gateway/gateway-service.ts @@ -4114,6 +4114,7 @@ export async function ensureGatewayBootstrapAutostart(params: { const enableRag = session.enable_rag === 1; const provider = resolveModelProvider(resolved.model); const turnIndex = Math.max(1, session.message_count + 1); + const bootstrapPrompt = buildBootstrapAutostartPrompt(bootstrapFile); recordAuditEvent({ sessionId: session.id, @@ -4133,7 +4134,7 @@ export async function ensureGatewayBootstrapAutostart(params: { event: { type: 'turn.start', turnIndex, - userInput: buildBootstrapAutostartPrompt(bootstrapFile), + userInput: bootstrapPrompt, username: normalizedUsername, mediaCount: 0, source: BOOTSTRAP_AUTOSTART_SOURCE, @@ -4205,7 +4206,7 @@ export async function ensureGatewayBootstrapAutostart(params: { const { messages } = buildConversationContext({ agentId: resolved.agentId, history: [], - currentUserContent: buildBootstrapAutostartPrompt(bootstrapFile), + currentUserContent: bootstrapPrompt, extraSafetyText: 'Bootstrap kickoff turn. Start the conversation proactively with a concise user-facing opening message.', runtimeInfo: { @@ -4221,7 +4222,7 @@ export async function ensureGatewayBootstrapAutostart(params: { }); messages.push({ role: 'user', - content: buildBootstrapAutostartPrompt(bootstrapFile), + content: bootstrapPrompt, }); const { pluginManager } = await tryEnsurePluginManagerInitializedForGateway( @@ -4229,7 +4230,7 @@ export async function ensureGatewayBootstrapAutostart(params: { sessionId: session.id, channelId, agentId: resolved.agentId, - surface: 'chat', + surface: 'bootstrap', }, ); if (pluginManager) { @@ -4248,6 +4249,96 @@ export async function ensureGatewayBootstrapAutostart(params: { model: resolved.model || undefined, }); } + const beforeAgentReplyResult = pluginManager + ? await pluginManager.runBeforeAgentReply({ + sessionId: session.id, + userId: normalizedUserId, + agentId: resolved.agentId, + channelId, + prompt: bootstrapPrompt, + trigger: 'bootstrap', + workspacePath, + model: resolved.model || undefined, + }) + : undefined; + if (beforeAgentReplyResult?.handled) { + const syntheticResultText = String(beforeAgentReplyResult.text || ''); + const durationMs = Date.now() - startedAt; + if (!syntheticResultText.trim()) { + setMemoryValue(session.id, BOOTSTRAP_AUTOSTART_MARKER_KEY, { + status: 'completed', + completedAt: new Date().toISOString(), + }); + recordAuditEvent({ + sessionId: session.id, + runId, + event: { + type: 'turn.end', + turnIndex, + finishReason: 'plugin_silent', + }, + }); + recordAuditEvent({ + sessionId: session.id, + runId, + event: { + type: 'session.end', + reason: 'normal', + stats: { + userMessages: 0, + assistantMessages: 0, + toolCalls: 0, + durationMs, + }, + }, + }); + return; + } + const assistantMessageId = memoryService.storeMessage({ + sessionId: session.id, + userId: 'assistant', + username: null, + role: 'assistant', + content: syntheticResultText, + }); + appendSessionTranscript(resolved.agentId, { + sessionId: session.id, + channelId, + role: 'assistant', + userId: 'assistant', + username: null, + content: syntheticResultText, + }); + setMemoryValue(session.id, BOOTSTRAP_AUTOSTART_MARKER_KEY, { + status: 'completed', + assistantMessageId, + completedAt: new Date().toISOString(), + }); + recordAuditEvent({ + sessionId: session.id, + runId, + event: { + type: 'turn.end', + turnIndex, + finishReason: 'completed', + }, + }); + recordAuditEvent({ + sessionId: session.id, + runId, + event: { + type: 'session.end', + reason: 'normal', + stats: { + userMessages: 0, + assistantMessages: 1, + toolCalls: 0, + durationMs, + }, + }, + }); + return; + } recordAuditEvent({ sessionId: session.id, diff --git a/src/gateway/gateway.ts b/src/gateway/gateway.ts index 42bad15f..1d04285a 100644 --- a/src/gateway/gateway.ts +++ b/src/gateway/gateway.ts @@ -103,6 +103,7 @@ import { import { handleGatewayMessage } from './gateway-chat-service.js'; import { classifyGatewayError } from './gateway-error-utils.js'; import { startGatewayHttpServer } from './gateway-http-server.js'; +import { tryEnsurePluginManagerInitializedForGateway } from './gateway-plugin-runtime.js'; import { initGatewayService, stopGatewayPlugins, @@ -1711,6 +1712,60 @@ async function runScheduledTask( } if (request.actionKind === 'system_event') { + const systemChannelId = + request.channelId || resolvedDeliveryChannelId || 'scheduler'; + const { pluginManager } = await tryEnsurePluginManagerInitializedForGateway( + { + sessionId: request.sessionId, + channelId: systemChannelId, + agentId: request.agentId ?? null, + surface: 'scheduler', + }, + ); + const beforeAgentReplyResult = pluginManager + ? await pluginManager.runBeforeAgentReply({ + sessionId: request.sessionId, + userId: 'scheduler', + agentId: request.agentId || 'main', + channelId: systemChannelId, + prompt: request.prompt, + trigger: 'scheduler', + }) + : undefined; + if (beforeAgentReplyResult?.handled) { + const syntheticResult = String(beforeAgentReplyResult.text || '').trim(); + if (!syntheticResult) { + logger.debug( + { + jobId: request.jobId, + taskId: request.taskId, + pluginId: beforeAgentReplyResult.pluginId, + reason: beforeAgentReplyResult.reason ?? null, + }, + 'Scheduled system event intercepted without delivery', + ); + return; + } + if (request.delivery.kind === 'webhook') { + await deliverWebhookMessage( + request.delivery.webhookUrl, + syntheticResult, + `${sourceLabel}:system`, + ); + return; + } + if (!resolvedDeliveryChannelId) { + throw new Error( + 'No delivery channel available for scheduled system event delivery.', + ); + } + await deliverProactiveMessage( + resolvedDeliveryChannelId, + syntheticResult, + `${sourceLabel}:system`, + ); + return; + } if (request.delivery.kind === 'webhook') { await deliverWebhookMessage( request.delivery.webhookUrl, diff --git a/src/plugins/plugin-manager.ts b/src/plugins/plugin-manager.ts index d2ac738c..94b62eaf 100644 --- a/src/plugins/plugin-manager.ts +++ b/src/plugins/plugin-manager.ts @@ -30,6 +30,8 @@ import type { LoadedPlugin, MemoryLayerPlugin, PluginAgentEndContext, + PluginBeforeAgentReplyContext, + PluginBeforeAgentReplyResult, PluginBinaryRequirement, PluginCandidate, PluginCommandDefinition, @@ -141,6 +143,10 @@ type RegisteredHook = { handler: PluginHookHandlerMap[K]; }; +type PluginBeforeAgentReplyResolution = PluginBeforeAgentReplyResult & { + pluginId: string; +}; + type RegisteredCommand = { pluginId: string; command: PluginCommandDefinition; @@ -1799,6 +1805,35 @@ export class PluginManager { await this.dispatchHook('before_agent_start', params); } + async runBeforeAgentReply( + context: PluginBeforeAgentReplyContext, + ): Promise { + await this.ensureInitialized(); + this.rememberSessionUserId(context.sessionId, context.userId); + this.rememberSessionWorkspaceRoot(context.sessionId, context.workspacePath); + const handlers = + (this.hooks.get('before_agent_reply') as + | RegisteredHook<'before_agent_reply'>[] + | undefined) || []; + for (const entry of handlers) { + try { + const result = await entry.handler(context); + if (result?.handled === true) { + return { + ...result, + pluginId: entry.pluginId, + }; + } + } catch (error) { + this.logger.warn( + { pluginId: entry.pluginId, hookName: 'before_agent_reply', error }, + 'Plugin lifecycle hook failed', + ); + } + } + return undefined; + } + async notifyAgentEnd(context: PluginAgentEndContext): Promise { await this.ensureInitialized(); this.rememberSessionUserId(context.sessionId, context.userId); diff --git a/src/plugins/plugin-sdk.ts b/src/plugins/plugin-sdk.ts index a066a5bd..7d8455c9 100644 --- a/src/plugins/plugin-sdk.ts +++ b/src/plugins/plugin-sdk.ts @@ -14,6 +14,9 @@ export type { LoadedPlugin, MemoryLayerPlugin, PluginAfterToolCallContext, + PluginBeforeAgentReplyContext, + PluginBeforeAgentReplyResult, + PluginBeforeAgentReplyTrigger, PluginCandidate, PluginCommandDefinition, PluginCompactionContext, diff --git a/src/plugins/plugin-types.ts b/src/plugins/plugin-types.ts index b99f8697..a50c4e25 100644 --- a/src/plugins/plugin-types.ts +++ b/src/plugins/plugin-types.ts @@ -162,6 +162,29 @@ export interface PluginMemoryBehavior { replacesBuiltInMemory: boolean; } +export type PluginBeforeAgentReplyTrigger = + | 'chat' + | 'bootstrap' + | 'heartbeat' + | 'scheduler'; + +export interface PluginBeforeAgentReplyContext { + sessionId: string; + userId: string; + agentId: string; + channelId: string; + prompt: string; + trigger: PluginBeforeAgentReplyTrigger; + workspacePath?: string; + model?: string; +} + +export interface PluginBeforeAgentReplyResult { + handled: boolean; + text?: string; + reason?: string; +} + export interface PluginPromptContextResult extends PluginMemoryBehavior { sections: string[]; pluginIds: string[]; @@ -248,6 +271,7 @@ export type PluginHookName = | 'session_reset' | 'before_prompt_build' | 'before_agent_start' + | 'before_agent_reply' | 'agent_end' | 'before_tool_call' | 'after_tool_call' @@ -284,6 +308,12 @@ export interface PluginHookHandlerMap { channelId: string; model?: string; }) => Promise | void; + before_agent_reply: ( + context: PluginBeforeAgentReplyContext, + ) => + | Promise + | PluginBeforeAgentReplyResult + | undefined; agent_end: (context: PluginAgentEndContext) => Promise | void; before_tool_call: (context: PluginToolHookContext) => Promise | void; after_tool_call: ( diff --git a/src/scheduler/heartbeat.ts b/src/scheduler/heartbeat.ts index f72e4616..bc26fa14 100644 --- a/src/scheduler/heartbeat.ts +++ b/src/scheduler/heartbeat.ts @@ -23,6 +23,7 @@ import { HEARTBEAT_ENABLED, HYBRIDAI_CHATBOT_ID, } from '../config/config.js'; +import { tryEnsurePluginManagerInitializedForGateway } from '../gateway/gateway-plugin-runtime.js'; import { agentWorkspaceDir } from '../infra/ipc.js'; import { logger } from '../logger.js'; import { getTasksForSession } from '../memory/db.js'; @@ -157,6 +158,7 @@ export function startHeartbeat( ? resolvedRuntime.chatbotId || HYBRIDAI_CHATBOT_ID || agentId : resolvedRuntime.chatbotId; const resolvedAgentId = resolvedRuntime.agentId; + const heartbeatChannelId = HEARTBEAT_CHANNEL || 'heartbeat'; const enableRag = session.enable_rag !== 0; const workspacePath = agentWorkspaceDir(resolvedAgentId); const sessionContext = buildSessionContext({ @@ -173,27 +175,6 @@ export function startHeartbeat( sessionKey: session.session_key, mainSessionKey: session.main_session_key, }); - const { messages } = buildConversationContext({ - agentId: resolvedAgentId, - sessionSummary: memoryContext.promptSummary, - history, - runtimeInfo: { - channel: getChannel('heartbeat'), - chatbotId, - model, - defaultModel: model, - channelType: 'heartbeat', - channelId, - guildId: null, - sessionContext, - workspacePath, - }, - allowedTools: HEARTBEAT_ALLOWED_TOOLS, - }); - messages.push({ role: 'user', content: HEARTBEAT_PROMPT }); - - const provider = resolveModelProvider(model); - const heartbeatChannelId = HEARTBEAT_CHANNEL || 'heartbeat'; recordAuditEvent({ sessionId, runId, @@ -216,6 +197,147 @@ export function startHeartbeat( source: 'heartbeat', }, }); + const { pluginManager } = + await tryEnsurePluginManagerInitializedForGateway({ + sessionId, + channelId: heartbeatChannelId, + agentId: resolvedAgentId, + surface: 'heartbeat', + }); + const beforeAgentReplyResult = pluginManager + ? await pluginManager.runBeforeAgentReply({ + sessionId, + userId: 'heartbeat', + agentId: resolvedAgentId, + channelId: heartbeatChannelId, + prompt: HEARTBEAT_PROMPT, + trigger: 'heartbeat', + workspacePath, + model: model || undefined, + }) + : undefined; + if (beforeAgentReplyResult?.handled) { + const syntheticResult = String( + beforeAgentReplyResult.text || '', + ).trim(); + logger.debug( + { + sessionId, + pluginId: beforeAgentReplyResult.pluginId, + reason: beforeAgentReplyResult.reason ?? null, + syntheticReply: syntheticResult.length > 0, + }, + 'Heartbeat intercepted before agent reply', + ); + if (!syntheticResult) { + recordAuditEvent({ + sessionId, + runId, + event: { + type: 'turn.end', + turnIndex, + finishReason: 'plugin_silent', + }, + }); + recordAuditEvent({ + sessionId, + runId, + event: { + type: 'session.end', + reason: 'normal', + stats: { + userMessages: 1, + assistantMessages: 0, + toolCalls: 0, + durationMs: Date.now() - startedAt, + }, + }, + }); + return; + } + memoryService.storeTurn({ + sessionId, + user: { + userId: 'heartbeat', + username: 'heartbeat', + content: HEARTBEAT_PROMPT, + }, + assistant: { + userId: 'assistant', + username: null, + content: syntheticResult, + }, + }); + appendSessionTranscript(resolvedAgentId, { + sessionId, + channelId: heartbeatChannelId, + role: 'user', + userId: 'heartbeat', + username: 'heartbeat', + content: HEARTBEAT_PROMPT, + }); + appendSessionTranscript(resolvedAgentId, { + sessionId, + channelId: heartbeatChannelId, + role: 'assistant', + userId: 'assistant', + username: null, + content: syntheticResult, + }); + await maybeCompactSession({ + sessionId, + agentId: resolvedAgentId, + chatbotId, + enableRag, + model, + channelId: heartbeatChannelId, + }); + recordAuditEvent({ + sessionId, + runId, + event: { + type: 'turn.end', + turnIndex, + finishReason: 'completed', + }, + }); + recordAuditEvent({ + sessionId, + runId, + event: { + type: 'session.end', + reason: 'normal', + stats: { + userMessages: 1, + assistantMessages: 1, + toolCalls: 0, + durationMs: Date.now() - startedAt, + }, + }, + }); + onMessage(syntheticResult); + return; + } + const { messages } = buildConversationContext({ + agentId: resolvedAgentId, + sessionSummary: memoryContext.promptSummary, + history, + runtimeInfo: { + channel: getChannel('heartbeat'), + chatbotId, + model, + defaultModel: model, + channelType: 'heartbeat', + channelId, + guildId: null, + sessionContext, + workspacePath, + }, + allowedTools: HEARTBEAT_ALLOWED_TOOLS, + }); + messages.push({ role: 'user', content: HEARTBEAT_PROMPT }); + + const provider = resolveModelProvider(model); const scheduledTasks = getTasksForSession(sessionId); const output = await runAgent({ diff --git a/src/scheduler/scheduled-task-runner.ts b/src/scheduler/scheduled-task-runner.ts index f35f20ac..bac459ce 100644 --- a/src/scheduler/scheduled-task-runner.ts +++ b/src/scheduler/scheduled-task-runner.ts @@ -6,6 +6,7 @@ import { recordAuditEvent, } from '../audit/audit-events.js'; import { getChannel } from '../channels/channel-registry.js'; +import { tryEnsurePluginManagerInitializedForGateway } from '../gateway/gateway-plugin-runtime.js'; import { agentWorkspaceDir } from '../infra/ipc.js'; import { recordUsageEvent } from '../memory/db.js'; import { resolveModelProvider } from '../providers/factory.js'; @@ -76,25 +77,6 @@ export async function runIsolatedScheduledTask(params: { sessionKey: cronSessionId, mainSessionKey: mainSessionKey?.trim() || cronSessionId, }); - const { messages } = buildConversationContext({ - agentId, - history: [], - currentUserContent: prompt, - runtimeInfo: { - channel: getChannel('scheduler'), - chatbotId, - model, - defaultModel: model, - channelType: 'scheduler', - channelId, - guildId: null, - sessionContext, - workspacePath, - }, - blockedTools: ['cron'], - }); - messages.push({ role: 'user', content: prompt }); - recordAuditEvent({ sessionId: activeSessionId, runId, @@ -119,6 +101,72 @@ export async function runIsolatedScheduledTask(params: { taskId, }, }); + const { pluginManager } = await tryEnsurePluginManagerInitializedForGateway({ + sessionId: activeSessionId, + channelId, + agentId, + surface: 'scheduler', + }); + const beforeAgentReplyResult = pluginManager + ? await pluginManager.runBeforeAgentReply({ + sessionId: activeSessionId, + userId: 'scheduler', + agentId, + channelId, + prompt, + trigger: 'scheduler', + workspacePath, + model: model || undefined, + }) + : undefined; + if (beforeAgentReplyResult?.handled) { + const syntheticResult = String(beforeAgentReplyResult.text || '').trim(); + recordAuditEvent({ + sessionId: activeSessionId, + runId, + event: { + type: 'turn.end', + turnIndex: 1, + finishReason: syntheticResult ? 'completed' : 'plugin_silent', + }, + }); + recordAuditEvent({ + sessionId: activeSessionId, + runId, + event: { + type: 'session.end', + reason: 'normal', + stats: { + userMessages: 1, + assistantMessages: syntheticResult ? 1 : 0, + toolCalls: 0, + durationMs: Date.now() - startedAt, + }, + }, + }); + if (syntheticResult) { + await onResult({ text: syntheticResult }); + } + return; + } + const { messages } = buildConversationContext({ + agentId, + history: [], + currentUserContent: prompt, + runtimeInfo: { + channel: getChannel('scheduler'), + chatbotId, + model, + defaultModel: model, + channelType: 'scheduler', + channelId, + guildId: null, + sessionContext, + workspacePath, + }, + blockedTools: ['cron'], + }); + messages.push({ role: 'user', content: prompt }); try { const output = await runAgent({ diff --git a/tests/gateway-service.bootstrap-autostart.test.ts b/tests/gateway-service.bootstrap-autostart.test.ts index 03165ae9..8f49b7a0 100644 --- a/tests/gateway-service.bootstrap-autostart.test.ts +++ b/tests/gateway-service.bootstrap-autostart.test.ts @@ -16,6 +16,7 @@ const { notifyBeforeAgentStart: vi.fn(async () => {}), notifyMemoryWrites: vi.fn(async () => {}), notifySessionStart: vi.fn(async () => {}), + runBeforeAgentReply: vi.fn(async () => undefined), }; return { runAgentMock: vi.fn(), @@ -60,6 +61,7 @@ const { setupHome } = setupGatewayTest({ pluginManagerMock.notifyBeforeAgentStart.mockClear(); pluginManagerMock.notifyMemoryWrites.mockClear(); pluginManagerMock.notifySessionStart.mockClear(); + pluginManagerMock.runBeforeAgentReply.mockClear(); }, }); diff --git a/tests/gateway-service.plugins.test.ts b/tests/gateway-service.plugins.test.ts index 88d0c987..61ba8306 100644 --- a/tests/gateway-service.plugins.test.ts +++ b/tests/gateway-service.plugins.test.ts @@ -46,6 +46,7 @@ const { }, ]), notifyBeforeAgentStart: vi.fn(async () => {}), + runBeforeAgentReply: vi.fn(async () => undefined), notifyMemoryWrites: vi.fn(async () => {}), notifyTurnComplete: vi.fn(async () => {}), notifyAgentEnd: vi.fn(async () => {}), @@ -239,6 +240,7 @@ const { setupHome } = setupGatewayTest({ pluginManagerMock.getToolDefinitions.mockClear(); pluginManagerMock.handleInboundWebhook.mockClear(); pluginManagerMock.notifyBeforeAgentStart.mockClear(); + pluginManagerMock.runBeforeAgentReply.mockClear(); pluginManagerMock.notifyMemoryWrites.mockClear(); pluginManagerMock.notifyTurnComplete.mockClear(); pluginManagerMock.notifyAgentEnd.mockClear(); @@ -397,6 +399,70 @@ test('handleGatewayMessage injects plugin prompt context and forwards plugin too ); }); +test('handleGatewayMessage returns a synthetic plugin reply before agent execution', async () => { + setupHome(); + + const { initDatabase } = await import('../src/memory/db.ts'); + const { handleGatewayMessage } = await import( + '../src/gateway/gateway-chat-service.ts' + ); + + initDatabase({ quiet: true }); + pluginManagerMock.runBeforeAgentReply.mockResolvedValueOnce({ + handled: true, + text: 'synthetic plugin reply', + pluginId: 'demo-plugin', + }); + + const result = await handleGatewayMessage({ + sessionId: 'session-plugin-claim', + guildId: null, + channelId: 'web', + userId: 'user-42', + username: 'alice', + content: 'Use the plugin fast path.', + model: 'test-model', + chatbotId: 'bot-1', + }); + + expect(result).toEqual( + expect.objectContaining({ + status: 'success', + result: 'synthetic plugin reply', + toolsUsed: [], + pluginsUsed: ['qmd-memory', 'demo-plugin'], + }), + ); + expect(pluginManagerMock.notifyBeforeAgentStart).toHaveBeenCalledTimes(1); + expect(pluginManagerMock.runBeforeAgentReply).toHaveBeenCalledWith( + expect.objectContaining({ + sessionId: 'session-plugin-claim', + userId: 'user-42', + channelId: 'web', + trigger: 'chat', + prompt: 'Use the plugin fast path.', + }), + ); + expect(runAgentMock).not.toHaveBeenCalled(); + expect(pluginManagerMock.notifyTurnComplete).toHaveBeenCalledWith( + expect.objectContaining({ + sessionId: 'session-plugin-claim', + userId: 'user-42', + messages: [ + expect.objectContaining({ + role: 'user', + content: 'Use the plugin fast path.', + }), + expect.objectContaining({ + role: 'assistant', + content: 'synthetic plugin reply', + }), + ], + }), + ); + expect(pluginManagerMock.notifyAgentEnd).not.toHaveBeenCalled(); +}); + test('handleGatewayMessage forwards successful native memory writes to plugins', async () => { setupHome(); diff --git a/tests/heartbeat.test.ts b/tests/heartbeat.test.ts index ab27cf0a..af0ce1b6 100644 --- a/tests/heartbeat.test.ts +++ b/tests/heartbeat.test.ts @@ -1,40 +1,50 @@ import { afterEach, expect, test, vi } from 'vitest'; -const mocks = vi.hoisted(() => ({ - agentWorkspaceDir: vi.fn(() => '/tmp/hybridclaw-heartbeat-workspace'), - appendSessionTranscript: vi.fn(), - buildConversationContext: vi.fn(() => ({ messages: [] })), - emitToolExecutionAuditEvents: vi.fn(), - estimateTokenCountFromMessages: vi.fn(() => 1), - estimateTokenCountFromText: vi.fn(() => 1), - getTasksForSession: vi.fn(() => []), - isWithinActiveHours: vi.fn(() => true), - logger: { - debug: vi.fn(), - error: vi.fn(), - info: vi.fn(), - warn: vi.fn(), - }, - makeAuditRunId: vi.fn(() => 'heartbeat-run'), - maybeCompactSession: vi.fn(), - memoryService: { - buildPromptMemoryContext: vi.fn(() => ({ promptSummary: '' })), - getConversationHistory: vi.fn(() => []), - getOrCreateSession: vi.fn(() => ({ message_count: 0 })), - storeTurn: vi.fn(), - }, - modelRequiresChatbotId: vi.fn(() => false), - processSideEffects: vi.fn(), - proactiveWindowLabel: vi.fn(() => 'always-on'), - recordAuditEvent: vi.fn(), - resolveAgentForRequest: vi.fn(() => ({ - agentId: 'vllm', - model: 'vllm/mistralai/Mistral-Small-3.2-24B-Instruct-2506', - chatbotId: '', - })), - resolveModelProvider: vi.fn(() => 'vllm'), - runAgent: vi.fn(), -})); +const mocks = vi.hoisted(() => { + const pluginManager = { + runBeforeAgentReply: vi.fn(async () => undefined), + }; + return { + agentWorkspaceDir: vi.fn(() => '/tmp/hybridclaw-heartbeat-workspace'), + appendSessionTranscript: vi.fn(), + buildConversationContext: vi.fn(() => ({ messages: [] })), + emitToolExecutionAuditEvents: vi.fn(), + estimateTokenCountFromMessages: vi.fn(() => 1), + estimateTokenCountFromText: vi.fn(() => 1), + getTasksForSession: vi.fn(() => []), + isWithinActiveHours: vi.fn(() => true), + logger: { + debug: vi.fn(), + error: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + }, + makeAuditRunId: vi.fn(() => 'heartbeat-run'), + maybeCompactSession: vi.fn(), + memoryService: { + buildPromptMemoryContext: vi.fn(() => ({ promptSummary: '' })), + getConversationHistory: vi.fn(() => []), + getOrCreateSession: vi.fn(() => ({ message_count: 0 })), + storeTurn: vi.fn(), + }, + pluginManager, + modelRequiresChatbotId: vi.fn(() => false), + processSideEffects: vi.fn(), + proactiveWindowLabel: vi.fn(() => 'always-on'), + recordAuditEvent: vi.fn(), + resolveAgentForRequest: vi.fn(() => ({ + agentId: 'vllm', + model: 'vllm/mistralai/Mistral-Small-3.2-24B-Instruct-2506', + chatbotId: '', + })), + resolveModelProvider: vi.fn(() => 'vllm'), + runAgent: vi.fn(), + tryEnsurePluginManagerInitializedForGateway: vi.fn(async () => ({ + pluginManager, + pluginInitError: null, + })), + }; +}); vi.mock('../src/agent/agent.js', () => ({ runAgent: mocks.runAgent, @@ -71,6 +81,11 @@ vi.mock('../src/config/config.js', () => ({ HYBRIDAI_MODEL: 'vllm/mistralai/Mistral-Small-3.2-24B-Instruct-2506', })); +vi.mock('../src/gateway/gateway-plugin-runtime.js', () => ({ + tryEnsurePluginManagerInitializedForGateway: + mocks.tryEnsurePluginManagerInitializedForGateway, +})); + vi.mock('../src/infra/ipc.js', () => ({ agentWorkspaceDir: mocks.agentWorkspaceDir, })); @@ -177,3 +192,34 @@ test('delivers substantive heartbeat messages', async () => { expect(mocks.appendSessionTranscript).toHaveBeenCalledTimes(2); expect(mocks.maybeCompactSession).toHaveBeenCalledTimes(1); }); + +test('heartbeat plugins can claim a turn with a synthetic reply', async () => { + vi.useFakeTimers(); + mocks.pluginManager.runBeforeAgentReply.mockResolvedValueOnce({ + handled: true, + text: 'Synthetic heartbeat follow-up.', + pluginId: 'memory-core', + }); + + const { startHeartbeat, stopHeartbeat } = await import( + '../src/scheduler/heartbeat.ts' + ); + const onMessage = vi.fn(); + + startHeartbeat('vllm', 1_000, onMessage); + await vi.advanceTimersByTimeAsync(1_000); + stopHeartbeat(); + + expect(mocks.pluginManager.runBeforeAgentReply).toHaveBeenCalledWith( + expect.objectContaining({ + sessionId: 'agent:vllm:channel:heartbeat:chat:system:peer:default', + channelId: 'heartbeat', + trigger: 'heartbeat', + }), + ); + expect(mocks.runAgent).not.toHaveBeenCalled(); + expect(onMessage).toHaveBeenCalledWith('Synthetic heartbeat follow-up.'); + expect(mocks.memoryService.storeTurn).toHaveBeenCalledTimes(1); + expect(mocks.appendSessionTranscript).toHaveBeenCalledTimes(2); + expect(mocks.maybeCompactSession).toHaveBeenCalledTimes(1); +}); diff --git a/tests/plugin-manager.before-agent-reply.test.ts b/tests/plugin-manager.before-agent-reply.test.ts new file mode 100644 index 00000000..953a76bd --- /dev/null +++ b/tests/plugin-manager.before-agent-reply.test.ts @@ -0,0 +1,164 @@ +import fs from 'node:fs'; +import os from 'node:os'; +import path from 'node:path'; + +import { afterEach, expect, test, vi } from 'vitest'; +import type { RuntimeConfig } from '../src/config/runtime-config.js'; + +const tempDirs: string[] = []; + +function makeTempDir(prefix: string): string { + const dir = fs.mkdtempSync(path.join(os.tmpdir(), prefix)); + tempDirs.push(dir); + return dir; +} + +function loadRuntimeConfig(): RuntimeConfig { + return JSON.parse( + fs.readFileSync(path.join(process.cwd(), 'config.example.json'), 'utf-8'), + ) as RuntimeConfig; +} + +function writeBeforeAgentReplyPlugin(params: { + rootDir: string; + pluginId: string; + priority?: number; + handlerSource: string[]; +}): void { + const pluginDir = path.join( + params.rootDir, + '.hybridclaw', + 'plugins', + params.pluginId, + ); + fs.mkdirSync(pluginDir, { recursive: true }); + fs.writeFileSync( + path.join(pluginDir, 'hybridclaw.plugin.yaml'), + [ + `id: ${params.pluginId}`, + `name: ${params.pluginId}`, + 'kind: tool', + '', + ].join('\n'), + 'utf-8', + ); + fs.writeFileSync( + path.join(pluginDir, 'index.ts'), + [ + 'export default {', + ` id: '${params.pluginId}',`, + ' register(api) {', + " api.on('before_agent_reply', async (context) => {", + ...params.handlerSource.map((line) => ` ${line}`), + ` }, { priority: ${params.priority ?? 0} });`, + ' },', + '};', + '', + ].join('\n'), + 'utf-8', + ); +} + +afterEach(() => { + vi.resetModules(); + while (tempDirs.length > 0) { + const dir = tempDirs.pop(); + if (!dir) continue; + fs.rmSync(dir, { recursive: true, force: true }); + } +}); + +test('plugin manager returns the first handled before_agent_reply result', async () => { + const homeDir = makeTempDir('hybridclaw-plugin-home-'); + const cwd = makeTempDir('hybridclaw-plugin-project-'); + writeBeforeAgentReplyPlugin({ + rootDir: cwd, + pluginId: 'first-plugin', + priority: 5, + handlerSource: [ + `return { handled: true, text: \`first:${'${'}context.prompt}\`, reason: "first" };`, + ], + }); + writeBeforeAgentReplyPlugin({ + rootDir: cwd, + pluginId: 'second-plugin', + priority: 10, + handlerSource: [ + `return { handled: true, text: \`second:${'${'}context.prompt}\`, reason: "second" };`, + ], + }); + + const config = loadRuntimeConfig(); + config.plugins.list = []; + + const { PluginManager } = await import('../src/plugins/plugin-manager.js'); + const manager = new PluginManager({ + homeDir, + cwd, + getRuntimeConfig: () => config, + }); + + await manager.ensureInitialized(); + await expect( + manager.runBeforeAgentReply({ + sessionId: 'session-1', + userId: 'user-1', + agentId: 'main', + channelId: 'web', + prompt: 'hello', + trigger: 'chat', + workspacePath: '/tmp/workspace', + model: 'test-model', + }), + ).resolves.toEqual({ + handled: true, + text: 'first:hello', + reason: 'first', + pluginId: 'first-plugin', + }); +}); + +test('plugin manager continues to later before_agent_reply hooks after failures', async () => { + const homeDir = makeTempDir('hybridclaw-plugin-home-'); + const cwd = makeTempDir('hybridclaw-plugin-project-'); + writeBeforeAgentReplyPlugin({ + rootDir: cwd, + pluginId: 'failing-plugin', + priority: 5, + handlerSource: ['throw new Error("boom");'], + }); + writeBeforeAgentReplyPlugin({ + rootDir: cwd, + pluginId: 'claimer-plugin', + priority: 10, + handlerSource: ['return { handled: true, text: "claimed" };'], + }); + + const config = loadRuntimeConfig(); + config.plugins.list = []; + + const { PluginManager } = await import('../src/plugins/plugin-manager.js'); + const manager = new PluginManager({ + homeDir, + cwd, + getRuntimeConfig: () => config, + }); + + await manager.ensureInitialized(); + await expect( + manager.runBeforeAgentReply({ + sessionId: 'session-2', + userId: 'user-2', + agentId: 'main', + channelId: 'heartbeat', + prompt: 'heartbeat poll', + trigger: 'heartbeat', + workspacePath: '/tmp/workspace', + model: 'test-model', + }), + ).resolves.toEqual({ + handled: true, + text: 'claimed', + pluginId: 'claimer-plugin', + }); +}); diff --git a/tests/scheduled-task-runner.before-agent-reply.test.ts b/tests/scheduled-task-runner.before-agent-reply.test.ts new file mode 100644 index 00000000..11a00f31 --- /dev/null +++ b/tests/scheduled-task-runner.before-agent-reply.test.ts @@ -0,0 +1,142 @@ +import { afterEach, expect, test, vi } from 'vitest'; + +const mocks = vi.hoisted(() => { + const pluginManager = { + runBeforeAgentReply: vi.fn(async () => undefined), + }; + return { + agentWorkspaceDir: vi.fn(() => '/tmp/hybridclaw-scheduled-task-workspace'), + buildConversationContext: vi.fn(() => ({ messages: [] })), + emitToolExecutionAuditEvents: vi.fn(), + getChannel: vi.fn(() => ({ + kind: 'scheduler', + id: 'scheduler', + capabilities: {}, + })), + makeAuditRunId: vi.fn(() => 'cron-run'), + pluginManager, + recordAuditEvent: vi.fn(), + recordUsageEvent: vi.fn(), + resolveModelProvider: vi.fn(() => 'test-provider'), + runAgent: vi.fn(), + tryEnsurePluginManagerInitializedForGateway: vi.fn(async () => ({ + pluginManager, + pluginInitError: null, + })), + }; +}); + +vi.mock('../src/agent/agent.js', () => ({ + runAgent: mocks.runAgent, +})); + +vi.mock('../src/agent/conversation.js', () => ({ + buildConversationContext: mocks.buildConversationContext, +})); + +vi.mock('../src/audit/audit-events.js', () => ({ + emitToolExecutionAuditEvents: mocks.emitToolExecutionAuditEvents, + makeAuditRunId: mocks.makeAuditRunId, + recordAuditEvent: mocks.recordAuditEvent, +})); + +vi.mock('../src/channels/channel-registry.js', () => ({ + getChannel: mocks.getChannel, +})); + +vi.mock('../src/gateway/gateway-plugin-runtime.js', () => ({ + tryEnsurePluginManagerInitializedForGateway: + mocks.tryEnsurePluginManagerInitializedForGateway, +})); + +vi.mock('../src/infra/ipc.js', () => ({ + agentWorkspaceDir: mocks.agentWorkspaceDir, +})); + +vi.mock('../src/memory/db.js', () => ({ + recordUsageEvent: mocks.recordUsageEvent, +})); + +vi.mock('../src/providers/factory.js', () => ({ + resolveModelProvider: mocks.resolveModelProvider, +})); + +afterEach(() => { + vi.clearAllMocks(); + vi.resetModules(); +}); + +test('scheduled-task plugins can return a synthetic reply without running the agent', async () => { + mocks.pluginManager.runBeforeAgentReply.mockResolvedValueOnce({ + handled: true, + text: 'Synthetic scheduled reply', + pluginId: 'scheduler-plugin', + }); + + const { runIsolatedScheduledTask } = await import( + '../src/scheduler/scheduled-task-runner.ts' + ); + const onResult = vi.fn(); + const onError = vi.fn(); + + await runIsolatedScheduledTask({ + taskId: 42, + prompt: 'Perform scheduled maintenance.', + channelId: 'ops', + chatbotId: 'bot-1', + model: 'test-model', + agentId: 'main', + onResult, + onError, + }); + + expect(mocks.pluginManager.runBeforeAgentReply).toHaveBeenCalledWith( + expect.objectContaining({ + sessionId: 'agent:main:channel:scheduler:chat:cron:peer:42', + channelId: 'ops', + trigger: 'scheduler', + prompt: 'Perform scheduled maintenance.', + }), + ); + expect(mocks.runAgent).not.toHaveBeenCalled(); + expect(onResult).toHaveBeenCalledWith({ text: 'Synthetic scheduled reply' }); + expect(onError).not.toHaveBeenCalled(); +}); + +test('scheduled-task plugins can swallow a run silently', async () => { + mocks.pluginManager.runBeforeAgentReply.mockResolvedValueOnce({ + handled: true, + pluginId: 'scheduler-plugin', + }); + + const { runIsolatedScheduledTask } = await import( + '../src/scheduler/scheduled-task-runner.ts' + ); + const onResult = vi.fn(); + const onError = vi.fn(); + + await runIsolatedScheduledTask({ + taskId: 43, + prompt: 'Perform scheduled maintenance.', + channelId: 'ops', + chatbotId: 'bot-1', + model: 'test-model', + agentId: 'main', + onResult, + onError, + }); + + expect(mocks.runAgent).not.toHaveBeenCalled(); + expect(onResult).not.toHaveBeenCalled(); + expect(onError).not.toHaveBeenCalled(); + expect( + mocks.recordAuditEvent.mock.calls.some(([entry]) => { + const event = ( + entry as { event?: { type?: string; finishReason?: string } } + ).event; + return ( + event?.type === 'turn.end' && event.finishReason === 'plugin_silent' + ); + }), + ).toBe(true); +});