diff --git a/package.json b/package.json index ea849b2..8bf59df 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "ode", - "version": "0.1.21", + "version": "0.1.22", "description": "Coding anywhere with your coding agents connected", "module": "packages/core/index.ts", "type": "module", diff --git a/packages/core/kernel/request-run.ts b/packages/core/kernel/request-run.ts index b818a36..03fd77d 100644 --- a/packages/core/kernel/request-run.ts +++ b/packages/core/kernel/request-run.ts @@ -551,7 +551,9 @@ export async function runTrackedRequest( liveParsedState.delete(getStatusMessageKey(request)); const errorStatus = `Error: ${message}\n_${suggestion}_`; + deps.im.cancelPendingUpdates?.(request.channelId, request.statusMessageTs); await deps.im.updateMessage(request.channelId, request.statusMessageTs, errorStatus); + deps.im.markMessageFinalized?.(request.channelId, request.statusMessageTs); onFail(message); return { responses: null }; } finally { diff --git a/packages/core/kernel/runtime-support.ts b/packages/core/kernel/runtime-support.ts index 5cb8c6f..63f4407 100644 --- a/packages/core/kernel/runtime-support.ts +++ b/packages/core/kernel/runtime-support.ts @@ -50,12 +50,18 @@ export async function publishFinalText(params: { const statusFormat = getUserGeneralSettings().defaultStatusMessageFormat; const finalChunks = splitResultMessage(text); const singleChunk = finalChunks[0] ?? text; + let finalStatusTs = statusTs; const statusRateLimited = im.wasRateLimited?.(channelId, statusTs) ?? false; const statusRateLimitError = im.getRateLimitError?.(channelId, statusTs); + im.cancelPendingUpdates?.(channelId, statusTs); + if (finalChunks.length > 1) { if (statusFormat !== "aggressive" && !statusRateLimited) { - await im.updateMessage(channelId, statusTs, "Final result posted below in multiple messages."); + const updatedStatusTs = await im.updateMessage(channelId, statusTs, "Final result posted below in multiple messages."); + if (typeof updatedStatusTs === "string" && updatedStatusTs.length > 0) { + finalStatusTs = updatedStatusTs; + } } else if (statusRateLimited) { log.warn("Skipping final status update due to prior 429; posting final chunks as new messages", { channelId, @@ -68,11 +74,13 @@ export async function publishFinalText(params: { for (const chunk of finalChunks) { await im.sendMessage(channelId, threadId, chunk); } + im.markMessageFinalized?.(channelId, finalStatusTs); return; } if (statusFormat === "aggressive") { await im.sendMessage(channelId, threadId, singleChunk); + im.markMessageFinalized?.(channelId, finalStatusTs); return; } @@ -84,15 +92,24 @@ export async function publishFinalText(params: { ...(statusRateLimitError ? { error: statusRateLimitError } : {}), }); await im.sendMessage(channelId, threadId, singleChunk); + im.markMessageFinalized?.(channelId, finalStatusTs); return; } const maxEditableMessageChars = im.maxEditableMessageChars; if (typeof maxEditableMessageChars === "number" && singleChunk.length > maxEditableMessageChars) { - await im.updateMessage(channelId, statusTs, "Final result posted below."); + const updatedStatusTs = await im.updateMessage(channelId, statusTs, "Final result posted below."); + if (typeof updatedStatusTs === "string" && updatedStatusTs.length > 0) { + finalStatusTs = updatedStatusTs; + } await im.sendMessage(channelId, threadId, singleChunk); + im.markMessageFinalized?.(channelId, finalStatusTs); return; } - await im.updateMessage(channelId, statusTs, singleChunk); + const updatedStatusTs = await im.updateMessage(channelId, statusTs, singleChunk); + if (typeof updatedStatusTs === "string" && updatedStatusTs.length > 0) { + finalStatusTs = updatedStatusTs; + } + im.markMessageFinalized?.(channelId, finalStatusTs); } diff --git a/packages/core/runtime/message-updates.ts b/packages/core/runtime/message-updates.ts index bbd0c49..0a6811b 100644 --- a/packages/core/runtime/message-updates.ts +++ b/packages/core/runtime/message-updates.ts @@ -13,6 +13,7 @@ export function createRateLimitedImAdapter( intervalMs = getMessageUpdateIntervalMs() ): IMAdapter { const rateLimitedMessages = new Set(); + const finalizedMessages = new Set(); const rateLimitErrors = new Map(); const updateErrors = new Map(); @@ -23,17 +24,24 @@ export function createRateLimitedImAdapter( const queue = new CoalescedUpdateQueue( intervalMs, async ({ channelId, messageId }, text) => { + const updateKey = key(channelId, messageId); + if (finalizedMessages.has(updateKey)) { + log.debug("Skipping queued message update after finalization", { + channelId, + messageTs: messageId, + }); + return undefined; + } try { const maybeUpdatedTs = await im.updateMessage(channelId, messageId, text); - updateErrors.delete(key(channelId, messageId)); + updateErrors.delete(updateKey); return typeof maybeUpdatedTs === "string" ? maybeUpdatedTs : undefined; } catch (error) { const errorText = String(error); - const updateErrorKey = key(channelId, messageId); - updateErrors.set(updateErrorKey, errorText); + updateErrors.set(updateKey, errorText); if (isRateLimitError(error)) { - rateLimitedMessages.add(updateErrorKey); - rateLimitErrors.set(updateErrorKey, errorText); + rateLimitedMessages.add(updateKey); + rateLimitErrors.set(updateKey, errorText); log.warn("IM message update hit rate limit (429)", { channelId, messageTs: messageId, @@ -65,6 +73,21 @@ export function createRateLimitedImAdapter( } return rateLimitErrors.get(key(channelId, messageTs)); }, + cancelPendingUpdates: (channelId: string, messageTs: string): void => { + if (typeof im.cancelPendingUpdates === "function") { + im.cancelPendingUpdates(channelId, messageTs); + } + queue.cancel({ channelId, messageId: messageTs }); + }, + markMessageFinalized: (channelId: string, messageTs: string): void => { + if (typeof im.markMessageFinalized === "function") { + im.markMessageFinalized(channelId, messageTs); + } + const updateKey = key(channelId, messageTs); + finalizedMessages.add(updateKey); + queue.cancel({ channelId, messageId: messageTs }); + updateErrors.delete(updateKey); + }, takeUpdateError: (channelId: string, messageTs: string): string | undefined => { const updateErrorKey = key(channelId, messageTs); if (typeof im.takeUpdateError === "function") { @@ -84,6 +107,13 @@ export function createRateLimitedImAdapter( messageTs: string, text: string ): Promise => { + if (finalizedMessages.has(key(channelId, messageTs))) { + log.debug("Skipping message update after finalization", { + channelId, + messageTs, + }); + return undefined; + } return queue.enqueue({ channelId, messageId: messageTs }, text); }, }; diff --git a/packages/core/test/message-updates-integration.test.ts b/packages/core/test/message-updates-integration.test.ts index 8527ad5..42658bc 100644 --- a/packages/core/test/message-updates-integration.test.ts +++ b/packages/core/test/message-updates-integration.test.ts @@ -45,4 +45,44 @@ describe("createRateLimitedImAdapter integration", () => { expect(adapter.wasRateLimited?.("C2", "100.2")).toBe(true); expect(adapter.getRateLimitError?.("C2", "100.2")).toContain("429"); }); + + it("cancels pending updates and ignores updates after finalization", async () => { + const calls: Array<{ channelId: string; messageTs: string; text: string }> = []; + const firstUpdateControl: { release?: () => void } = {}; + let firstUpdateStarted = false; + const adapter = createRateLimitedImAdapter({ + sendMessage: async () => "m1", + updateMessage: async (channelId: string, messageTs: string, text: string) => { + calls.push({ channelId, messageTs, text }); + if (text === "first") { + firstUpdateStarted = true; + await new Promise((resolve) => { + firstUpdateControl.release = resolve; + }); + } + }, + deleteMessage: async () => {}, + fetchThreadHistory: async () => null, + buildAgentContext: async () => ({}), + }, 0); + + const firstUpdate = adapter.updateMessage("C3", "100.3", "first"); + while (!firstUpdateStarted) { + await sleep(1); + } + const pendingUpdate = adapter.updateMessage("C3", "100.3", "stale"); + adapter.cancelPendingUpdates?.("C3", "100.3"); + if (firstUpdateControl.release) { + firstUpdateControl.release(); + } + + await Promise.all([firstUpdate, pendingUpdate]); + expect(calls).toEqual([{ channelId: "C3", messageTs: "100.3", text: "first" }]); + + adapter.markMessageFinalized?.("C3", "100.3"); + await adapter.updateMessage("C3", "100.3", "after-final"); + await sleep(20); + + expect(calls).toEqual([{ channelId: "C3", messageTs: "100.3", text: "first" }]); + }); }); diff --git a/packages/core/types.ts b/packages/core/types.ts index f3acc2e..7155002 100644 --- a/packages/core/types.ts +++ b/packages/core/types.ts @@ -52,6 +52,8 @@ export interface IMAdapter { wasRateLimited?(channelId: string, messageTs: string): boolean; getRateLimitError?(channelId: string, messageTs: string): string | undefined; takeUpdateError?(channelId: string, messageTs: string): string | undefined; + cancelPendingUpdates?(channelId: string, messageTs: string): void; + markMessageFinalized?(channelId: string, messageTs: string): void; deleteMessage(channelId: string, messageTs: string): Promise; fetchThreadHistory(channelId: string, threadId: string, messageId: string): Promise; buildAgentContext(params: AgentContextBuilderParams): Promise; diff --git a/packages/ims/slack/client.ts b/packages/ims/slack/client.ts index 4027580..018e2ed 100644 --- a/packages/ims/slack/client.ts +++ b/packages/ims/slack/client.ts @@ -27,6 +27,7 @@ import { } from "@/ims/shared/processor-id"; import { createProcessorManager } from "@/ims/shared/processor-manager"; import { SlackAuthRegistry, type WorkspaceAuth } from "@/ims/slack/state/auth-registry"; +import { SlackMessageUpdateManager } from "@/ims/slack/message-update-manager"; export interface MessageContext { channelId: string; @@ -42,6 +43,9 @@ const appRegistry = new Map(); const TRACE_SLACK_ROUTER = process.env.ODE_SLACK_TRACE === "1"; const slackAuthRegistry = new SlackAuthRegistry(); +const slackMessageUpdateManager = new SlackMessageUpdateManager(async ({ channelId, messageTs, text, processorId }) => { + await performSlackMessageUpdate(channelId, messageTs, text, processorId); +}); const slackProcessorManager = createProcessorManager({ createRuntime: (processorId) => createCoreRuntime({ platform: "slack", @@ -59,6 +63,7 @@ export function clearSlackAuthState(): void { export function resetSlackState(): void { clearSlackAuthState(); appRegistry.clear(); + slackMessageUpdateManager.clear(); slackProcessorManager.clear(); } @@ -363,7 +368,7 @@ export async function deleteMessage( } } -export async function updateMessage( +async function performSlackMessageUpdate( channelId: string, messageTs: string, text: string, @@ -396,6 +401,15 @@ export async function updateMessage( } } +export async function updateMessage( + channelId: string, + messageTs: string, + text: string, + processorId?: string +): Promise { + await slackMessageUpdateManager.updateMessage({ channelId, messageTs, text, processorId }); +} + async function fetchThreadHistory( channelId: string, threadId: string, @@ -419,6 +433,10 @@ function createSlackAdapter(processorId?: string): IMAdapter { sendMessage(channelId, threadId, text, processorId), updateMessage: (channelId: string, messageTs: string, text: string) => updateMessage(channelId, messageTs, text, processorId), + cancelPendingUpdates: (channelId: string, messageTs: string) => + slackMessageUpdateManager.cancelPendingUpdates(channelId, messageTs), + markMessageFinalized: (channelId: string, messageTs: string) => + slackMessageUpdateManager.markMessageFinalized(channelId, messageTs), deleteMessage: (channelId: string, messageTs: string) => deleteMessage(channelId, messageTs, processorId), fetchThreadHistory: (channelId: string, threadId: string, messageId: string) => diff --git a/packages/ims/slack/message-update-manager.test.ts b/packages/ims/slack/message-update-manager.test.ts new file mode 100644 index 0000000..4ee95d1 --- /dev/null +++ b/packages/ims/slack/message-update-manager.test.ts @@ -0,0 +1,49 @@ +import { describe, expect, it } from "bun:test"; +import { SlackMessageUpdateManager } from "@/ims/slack/message-update-manager"; + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +describe("SlackMessageUpdateManager", () => { + it("coalesces updates across callers for the same message", async () => { + const calls: string[] = []; + const manager = new SlackMessageUpdateManager(async ({ text }) => { + calls.push(text); + await sleep(10); + }); + + const first = manager.updateMessage({ channelId: "C1", messageTs: "1", text: "first", processorId: "p1" }); + const second = manager.updateMessage({ channelId: "C1", messageTs: "1", text: "second", processorId: "p2" }); + + await Promise.all([first, second]); + + expect(calls).toEqual(["second"]); + }); + + it("drops pending and future updates after finalization", async () => { + const calls: string[] = []; + const control: { release?: () => void } = {}; + const manager = new SlackMessageUpdateManager(async ({ text }) => { + calls.push(text); + if (text === "live") { + await new Promise((resolve) => { + control.release = resolve; + }); + } + }); + + const live = manager.updateMessage({ channelId: "C2", messageTs: "2", text: "live", processorId: "p1" }); + await sleep(5); + const stale = manager.updateMessage({ channelId: "C2", messageTs: "2", text: "stale", processorId: "p2" }); + manager.markMessageFinalized("C2", "2"); + if (control.release) { + control.release(); + } + + await Promise.all([live, stale]); + await manager.updateMessage({ channelId: "C2", messageTs: "2", text: "after-final", processorId: "p3" }); + + expect(calls).toEqual(["live"]); + }); +}); diff --git a/packages/ims/slack/message-update-manager.ts b/packages/ims/slack/message-update-manager.ts new file mode 100644 index 0000000..238da2e --- /dev/null +++ b/packages/ims/slack/message-update-manager.ts @@ -0,0 +1,80 @@ +import { CoalescedUpdateQueue } from "@/shared/queue/coalesced-update-queue"; +import { log } from "@/utils"; + +type SlackUpdateKey = { + channelId: string; + messageTs: string; +}; + +type SlackUpdatePayload = { + text: string; + processorId?: string; +}; + +function buildKey(channelId: string, messageTs: string): string { + return `${channelId}:${messageTs}`; +} + +export class SlackMessageUpdateManager { + private readonly finalizedMessages = new Set(); + private readonly queue: CoalescedUpdateQueue; + + constructor( + worker: (params: { + channelId: string; + messageTs: string; + text: string; + processorId?: string; + }) => Promise + ) { + this.queue = new CoalescedUpdateQueue(0, async ({ channelId, messageId }, payload) => { + const updateKey = buildKey(channelId, messageId); + if (this.finalizedMessages.has(updateKey)) { + log.debug("Skipping globally queued Slack update after finalization", { + channelId, + messageTs: messageId, + }); + return; + } + + await worker({ + channelId, + messageTs: messageId, + text: payload.text, + processorId: payload.processorId, + }); + }); + } + + async updateMessage(params: { + channelId: string; + messageTs: string; + text: string; + processorId?: string; + }): Promise { + const { channelId, messageTs, text, processorId } = params; + if (this.finalizedMessages.has(buildKey(channelId, messageTs))) { + log.debug("Skipping Slack update after global finalization", { + channelId, + messageTs, + }); + return; + } + + await this.queue.enqueue({ channelId, messageId: messageTs }, { text, processorId }); + } + + cancelPendingUpdates(channelId: string, messageTs: string): void { + this.queue.cancel({ channelId, messageId: messageTs }); + } + + markMessageFinalized(channelId: string, messageTs: string): void { + this.finalizedMessages.add(buildKey(channelId, messageTs)); + this.queue.cancel({ channelId, messageId: messageTs }); + } + + clear(): void { + this.finalizedMessages.clear(); + this.queue.clear(); + } +} diff --git a/packages/shared/queue/coalesced-update-queue.ts b/packages/shared/queue/coalesced-update-queue.ts index 3322741..f5ae669 100644 --- a/packages/shared/queue/coalesced-update-queue.ts +++ b/packages/shared/queue/coalesced-update-queue.ts @@ -5,19 +5,19 @@ type QueueKey = { messageId: string; }; -type PendingItem = { +type PendingItem = { key: QueueKey; - payload: string; + payload: TPayload; resolve: (value: TResult) => void; }; -export class CoalescedUpdateQueue { +export class CoalescedUpdateQueue { private readonly limiter: Bottleneck; - private readonly pendingByKey = new Map>(); + private readonly pendingByKey = new Map>(); constructor( minTimeMs: number, - private readonly worker: (key: QueueKey, payload: string) => Promise + private readonly worker: (key: QueueKey, payload: TPayload) => Promise ) { this.limiter = new Bottleneck({ maxConcurrent: 1, @@ -25,13 +25,9 @@ export class CoalescedUpdateQueue { }); } - enqueue(key: QueueKey, payload: string): Promise { + enqueue(key: QueueKey, payload: TPayload): Promise { const dedupKey = this.buildKey(key); - const existing = this.pendingByKey.get(dedupKey); - if (existing) { - existing.resolve(undefined as TResult); - this.pendingByKey.delete(dedupKey); - } + this.resolvePending(dedupKey); return new Promise((resolve) => { this.pendingByKey.set(dedupKey, { @@ -49,6 +45,10 @@ export class CoalescedUpdateQueue { }); } + cancel(key: QueueKey): void { + this.resolvePending(this.buildKey(key)); + } + clear(): void { for (const pending of this.pendingByKey.values()) { pending.resolve(undefined as TResult); @@ -60,4 +60,11 @@ export class CoalescedUpdateQueue { private buildKey(key: QueueKey): string { return `${key.channelId}:${key.messageId}`; } + + private resolvePending(dedupKey: string): void { + const existing = this.pendingByKey.get(dedupKey); + if (!existing) return; + existing.resolve(undefined as TResult); + this.pendingByKey.delete(dedupKey); + } }