Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ode",
"version": "0.1.21",
"version": "0.1.22",
"description": "Coding anywhere with your coding agents connected",
"module": "packages/core/index.ts",
"type": "module",
Expand Down
2 changes: 2 additions & 0 deletions packages/core/kernel/request-run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,9 @@ export async function runTrackedRequest(
liveParsedState.delete(getStatusMessageKey(request));

const errorStatus = `Error: ${message}\n_${suggestion}_`;
deps.im.cancelPendingUpdates?.(request.channelId, request.statusMessageTs);
await deps.im.updateMessage(request.channelId, request.statusMessageTs, errorStatus);
deps.im.markMessageFinalized?.(request.channelId, request.statusMessageTs);
onFail(message);
return { responses: null };
} finally {
Expand Down
23 changes: 20 additions & 3 deletions packages/core/kernel/runtime-support.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,18 @@ export async function publishFinalText(params: {
const statusFormat = getUserGeneralSettings().defaultStatusMessageFormat;
const finalChunks = splitResultMessage(text);
const singleChunk = finalChunks[0] ?? text;
let finalStatusTs = statusTs;
const statusRateLimited = im.wasRateLimited?.(channelId, statusTs) ?? false;
const statusRateLimitError = im.getRateLimitError?.(channelId, statusTs);

im.cancelPendingUpdates?.(channelId, statusTs);

if (finalChunks.length > 1) {
if (statusFormat !== "aggressive" && !statusRateLimited) {
await im.updateMessage(channelId, statusTs, "Final result posted below in multiple messages.");
const updatedStatusTs = await im.updateMessage(channelId, statusTs, "Final result posted below in multiple messages.");
if (typeof updatedStatusTs === "string" && updatedStatusTs.length > 0) {
finalStatusTs = updatedStatusTs;
}
} else if (statusRateLimited) {
log.warn("Skipping final status update due to prior 429; posting final chunks as new messages", {
channelId,
Expand All @@ -68,11 +74,13 @@ export async function publishFinalText(params: {
for (const chunk of finalChunks) {
await im.sendMessage(channelId, threadId, chunk);
}
im.markMessageFinalized?.(channelId, finalStatusTs);
return;
}

if (statusFormat === "aggressive") {
await im.sendMessage(channelId, threadId, singleChunk);
im.markMessageFinalized?.(channelId, finalStatusTs);
return;
}

Expand All @@ -84,15 +92,24 @@ export async function publishFinalText(params: {
...(statusRateLimitError ? { error: statusRateLimitError } : {}),
});
await im.sendMessage(channelId, threadId, singleChunk);
im.markMessageFinalized?.(channelId, finalStatusTs);
return;
}

const maxEditableMessageChars = im.maxEditableMessageChars;
if (typeof maxEditableMessageChars === "number" && singleChunk.length > maxEditableMessageChars) {
await im.updateMessage(channelId, statusTs, "Final result posted below.");
const updatedStatusTs = await im.updateMessage(channelId, statusTs, "Final result posted below.");
if (typeof updatedStatusTs === "string" && updatedStatusTs.length > 0) {
finalStatusTs = updatedStatusTs;
}
await im.sendMessage(channelId, threadId, singleChunk);
im.markMessageFinalized?.(channelId, finalStatusTs);
return;
}

await im.updateMessage(channelId, statusTs, singleChunk);
const updatedStatusTs = await im.updateMessage(channelId, statusTs, singleChunk);
if (typeof updatedStatusTs === "string" && updatedStatusTs.length > 0) {
finalStatusTs = updatedStatusTs;
}
im.markMessageFinalized?.(channelId, finalStatusTs);
}
40 changes: 35 additions & 5 deletions packages/core/runtime/message-updates.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export function createRateLimitedImAdapter(
intervalMs = getMessageUpdateIntervalMs()
): IMAdapter {
const rateLimitedMessages = new Set<string>();
const finalizedMessages = new Set<string>();
const rateLimitErrors = new Map<string, string>();
const updateErrors = new Map<string, string>();

Expand All @@ -23,17 +24,24 @@ export function createRateLimitedImAdapter(
const queue = new CoalescedUpdateQueue<string | undefined>(
intervalMs,
async ({ channelId, messageId }, text) => {
const updateKey = key(channelId, messageId);
if (finalizedMessages.has(updateKey)) {
log.debug("Skipping queued message update after finalization", {
channelId,
messageTs: messageId,
});
return undefined;
}
try {
const maybeUpdatedTs = await im.updateMessage(channelId, messageId, text);
updateErrors.delete(key(channelId, messageId));
updateErrors.delete(updateKey);
return typeof maybeUpdatedTs === "string" ? maybeUpdatedTs : undefined;
} catch (error) {
const errorText = String(error);
const updateErrorKey = key(channelId, messageId);
updateErrors.set(updateErrorKey, errorText);
updateErrors.set(updateKey, errorText);
if (isRateLimitError(error)) {
rateLimitedMessages.add(updateErrorKey);
rateLimitErrors.set(updateErrorKey, errorText);
rateLimitedMessages.add(updateKey);
rateLimitErrors.set(updateKey, errorText);
log.warn("IM message update hit rate limit (429)", {
channelId,
messageTs: messageId,
Expand Down Expand Up @@ -65,6 +73,21 @@ export function createRateLimitedImAdapter(
}
return rateLimitErrors.get(key(channelId, messageTs));
},
cancelPendingUpdates: (channelId: string, messageTs: string): void => {
if (typeof im.cancelPendingUpdates === "function") {
im.cancelPendingUpdates(channelId, messageTs);
}
queue.cancel({ channelId, messageId: messageTs });
},
markMessageFinalized: (channelId: string, messageTs: string): void => {
if (typeof im.markMessageFinalized === "function") {
im.markMessageFinalized(channelId, messageTs);
}
const updateKey = key(channelId, messageTs);
finalizedMessages.add(updateKey);
queue.cancel({ channelId, messageId: messageTs });
updateErrors.delete(updateKey);
},
takeUpdateError: (channelId: string, messageTs: string): string | undefined => {
const updateErrorKey = key(channelId, messageTs);
if (typeof im.takeUpdateError === "function") {
Expand All @@ -84,6 +107,13 @@ export function createRateLimitedImAdapter(
messageTs: string,
text: string
): Promise<string | undefined> => {
if (finalizedMessages.has(key(channelId, messageTs))) {
log.debug("Skipping message update after finalization", {
channelId,
messageTs,
});
return undefined;
}
return queue.enqueue({ channelId, messageId: messageTs }, text);
},
};
Expand Down
40 changes: 40 additions & 0 deletions packages/core/test/message-updates-integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,44 @@ describe("createRateLimitedImAdapter integration", () => {
expect(adapter.wasRateLimited?.("C2", "100.2")).toBe(true);
expect(adapter.getRateLimitError?.("C2", "100.2")).toContain("429");
});

it("cancels pending updates and ignores updates after finalization", async () => {
const calls: Array<{ channelId: string; messageTs: string; text: string }> = [];
const firstUpdateControl: { release?: () => void } = {};
let firstUpdateStarted = false;
const adapter = createRateLimitedImAdapter({
sendMessage: async () => "m1",
updateMessage: async (channelId: string, messageTs: string, text: string) => {
calls.push({ channelId, messageTs, text });
if (text === "first") {
firstUpdateStarted = true;
await new Promise<void>((resolve) => {
firstUpdateControl.release = resolve;
});
}
},
deleteMessage: async () => {},
fetchThreadHistory: async () => null,
buildAgentContext: async () => ({}),
}, 0);

const firstUpdate = adapter.updateMessage("C3", "100.3", "first");
while (!firstUpdateStarted) {
await sleep(1);
}
const pendingUpdate = adapter.updateMessage("C3", "100.3", "stale");
adapter.cancelPendingUpdates?.("C3", "100.3");
if (firstUpdateControl.release) {
firstUpdateControl.release();
}

await Promise.all([firstUpdate, pendingUpdate]);
expect(calls).toEqual([{ channelId: "C3", messageTs: "100.3", text: "first" }]);

adapter.markMessageFinalized?.("C3", "100.3");
await adapter.updateMessage("C3", "100.3", "after-final");
await sleep(20);

expect(calls).toEqual([{ channelId: "C3", messageTs: "100.3", text: "first" }]);
});
});
2 changes: 2 additions & 0 deletions packages/core/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ export interface IMAdapter {
wasRateLimited?(channelId: string, messageTs: string): boolean;
getRateLimitError?(channelId: string, messageTs: string): string | undefined;
takeUpdateError?(channelId: string, messageTs: string): string | undefined;
cancelPendingUpdates?(channelId: string, messageTs: string): void;
markMessageFinalized?(channelId: string, messageTs: string): void;
deleteMessage(channelId: string, messageTs: string): Promise<void>;
fetchThreadHistory(channelId: string, threadId: string, messageId: string): Promise<string | null>;
buildAgentContext(params: AgentContextBuilderParams): Promise<OpenCodeMessageContext>;
Expand Down
20 changes: 19 additions & 1 deletion packages/ims/slack/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {
} from "@/ims/shared/processor-id";
import { createProcessorManager } from "@/ims/shared/processor-manager";
import { SlackAuthRegistry, type WorkspaceAuth } from "@/ims/slack/state/auth-registry";
import { SlackMessageUpdateManager } from "@/ims/slack/message-update-manager";

export interface MessageContext {
channelId: string;
Expand All @@ -42,6 +43,9 @@ const appRegistry = new Map<string, App>();
const TRACE_SLACK_ROUTER = process.env.ODE_SLACK_TRACE === "1";

const slackAuthRegistry = new SlackAuthRegistry();
const slackMessageUpdateManager = new SlackMessageUpdateManager(async ({ channelId, messageTs, text, processorId }) => {
await performSlackMessageUpdate(channelId, messageTs, text, processorId);
});
const slackProcessorManager = createProcessorManager({
createRuntime: (processorId) => createCoreRuntime({
platform: "slack",
Expand All @@ -59,6 +63,7 @@ export function clearSlackAuthState(): void {
export function resetSlackState(): void {
clearSlackAuthState();
appRegistry.clear();
slackMessageUpdateManager.clear();
slackProcessorManager.clear();
}

Expand Down Expand Up @@ -363,7 +368,7 @@ export async function deleteMessage(
}
}

export async function updateMessage(
async function performSlackMessageUpdate(
channelId: string,
messageTs: string,
text: string,
Expand Down Expand Up @@ -396,6 +401,15 @@ export async function updateMessage(
}
}

export async function updateMessage(
channelId: string,
messageTs: string,
text: string,
processorId?: string
): Promise<void> {
await slackMessageUpdateManager.updateMessage({ channelId, messageTs, text, processorId });
}

async function fetchThreadHistory(
channelId: string,
threadId: string,
Expand All @@ -419,6 +433,10 @@ function createSlackAdapter(processorId?: string): IMAdapter {
sendMessage(channelId, threadId, text, processorId),
updateMessage: (channelId: string, messageTs: string, text: string) =>
updateMessage(channelId, messageTs, text, processorId),
cancelPendingUpdates: (channelId: string, messageTs: string) =>
slackMessageUpdateManager.cancelPendingUpdates(channelId, messageTs),
markMessageFinalized: (channelId: string, messageTs: string) =>
slackMessageUpdateManager.markMessageFinalized(channelId, messageTs),
deleteMessage: (channelId: string, messageTs: string) =>
deleteMessage(channelId, messageTs, processorId),
fetchThreadHistory: (channelId: string, threadId: string, messageId: string) =>
Expand Down
49 changes: 49 additions & 0 deletions packages/ims/slack/message-update-manager.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { describe, expect, it } from "bun:test";
import { SlackMessageUpdateManager } from "@/ims/slack/message-update-manager";

function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

describe("SlackMessageUpdateManager", () => {
it("coalesces updates across callers for the same message", async () => {
const calls: string[] = [];
const manager = new SlackMessageUpdateManager(async ({ text }) => {
calls.push(text);
await sleep(10);
});

const first = manager.updateMessage({ channelId: "C1", messageTs: "1", text: "first", processorId: "p1" });
const second = manager.updateMessage({ channelId: "C1", messageTs: "1", text: "second", processorId: "p2" });

await Promise.all([first, second]);

expect(calls).toEqual(["second"]);
});

it("drops pending and future updates after finalization", async () => {
const calls: string[] = [];
const control: { release?: () => void } = {};
const manager = new SlackMessageUpdateManager(async ({ text }) => {
calls.push(text);
if (text === "live") {
await new Promise<void>((resolve) => {
control.release = resolve;
});
}
});

const live = manager.updateMessage({ channelId: "C2", messageTs: "2", text: "live", processorId: "p1" });
await sleep(5);
const stale = manager.updateMessage({ channelId: "C2", messageTs: "2", text: "stale", processorId: "p2" });
manager.markMessageFinalized("C2", "2");
if (control.release) {
control.release();
}

await Promise.all([live, stale]);
await manager.updateMessage({ channelId: "C2", messageTs: "2", text: "after-final", processorId: "p3" });

expect(calls).toEqual(["live"]);
});
});
80 changes: 80 additions & 0 deletions packages/ims/slack/message-update-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import { CoalescedUpdateQueue } from "@/shared/queue/coalesced-update-queue";
import { log } from "@/utils";

type SlackUpdateKey = {
channelId: string;
messageTs: string;
};

type SlackUpdatePayload = {
text: string;
processorId?: string;
};

function buildKey(channelId: string, messageTs: string): string {
return `${channelId}:${messageTs}`;
}

export class SlackMessageUpdateManager {
private readonly finalizedMessages = new Set<string>();
private readonly queue: CoalescedUpdateQueue<void, SlackUpdatePayload>;

constructor(
worker: (params: {
channelId: string;
messageTs: string;
text: string;
processorId?: string;
}) => Promise<void>
) {
this.queue = new CoalescedUpdateQueue<void, SlackUpdatePayload>(0, async ({ channelId, messageId }, payload) => {
const updateKey = buildKey(channelId, messageId);
if (this.finalizedMessages.has(updateKey)) {
log.debug("Skipping globally queued Slack update after finalization", {
channelId,
messageTs: messageId,
});
return;
}

await worker({
channelId,
messageTs: messageId,
text: payload.text,
processorId: payload.processorId,
});
});
}

async updateMessage(params: {
channelId: string;
messageTs: string;
text: string;
processorId?: string;
}): Promise<void> {
const { channelId, messageTs, text, processorId } = params;
if (this.finalizedMessages.has(buildKey(channelId, messageTs))) {
log.debug("Skipping Slack update after global finalization", {
channelId,
messageTs,
});
return;
}

await this.queue.enqueue({ channelId, messageId: messageTs }, { text, processorId });
}

cancelPendingUpdates(channelId: string, messageTs: string): void {
this.queue.cancel({ channelId, messageId: messageTs });
}

markMessageFinalized(channelId: string, messageTs: string): void {
this.finalizedMessages.add(buildKey(channelId, messageTs));
this.queue.cancel({ channelId, messageId: messageTs });
}

clear(): void {
this.finalizedMessages.clear();
this.queue.clear();
}
}
Loading
Loading