Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ src/**/*.d.ts
.env.*
*.tgz
.DS_Store
openclaw-channel-dmwork.bak/
4 changes: 2 additions & 2 deletions openclaw-channel-dmwork/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 7 additions & 8 deletions openclaw-channel-dmwork/src/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import {
updateGroupMd,
} from "./api-fetch.js";
import { uploadAndSendMedia } from "./inbound.js";
import { parseMentions } from "./mention-utils.js";
import { buildEntitiesFromFallback } from "./mention-utils.js";
import type { MentionEntity } from "./types.js";
import { getKnownGroupIds } from "./group-md.js";

export interface MessageActionResult {
Expand Down Expand Up @@ -164,15 +165,12 @@ async function handleSend(params: {
// Send text message
if (message) {
let mentionUids: string[] = [];
let mentionEntities: MentionEntity[] = [];

if (channelType === ChannelType.Group && memberMap) {
const mentionNames = parseMentions(message);
for (const name of mentionNames) {
const uid = memberMap.get(name);
if (uid && !mentionUids.includes(uid)) {
mentionUids.push(uid);
}
}
const { entities, uids } = buildEntitiesFromFallback(message, memberMap);
mentionUids = uids;
mentionEntities = entities;
}

await sendMessage({
Expand All @@ -182,6 +180,7 @@ async function handleSend(params: {
channelType,
content: message,
...(mentionUids.length > 0 ? { mentionUids } : {}),
...(mentionEntities.length > 0 ? { mentionEntities } : {}),
});
}

Expand Down
29 changes: 24 additions & 5 deletions openclaw-channel-dmwork/src/api-fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* These are used by inbound/outbound where the full DMWorkAPI class is not available.
*/

import { ChannelType, MessageType } from "./types.js";
import { ChannelType, MessageType, type MentionEntity } from "./types.js";
import path from "path";
import { open } from "node:fs/promises";
// @ts-ignore — cos-nodejs-sdk-v5 has incomplete TypeScript definitions
Expand Down Expand Up @@ -63,6 +63,7 @@ export async function sendMediaMessage(params: {
width?: number;
height?: number;
mentionUids?: string[];
mentionEntities?: MentionEntity[];
signal?: AbortSignal;
}): Promise<void> {
const payload: Record<string, unknown> = {
Expand All @@ -79,8 +80,18 @@ export async function sendMediaMessage(params: {
if (params.size != null) payload.size = params.size;
}

if (params.mentionUids && params.mentionUids.length > 0) {
payload.mention = { uids: params.mentionUids };
if (
(params.mentionUids && params.mentionUids.length > 0) ||
(params.mentionEntities && params.mentionEntities.length > 0)
) {
const mention: Record<string, unknown> = {};
if (params.mentionUids && params.mentionUids.length > 0) {
mention.uids = params.mentionUids;
}
if (params.mentionEntities && params.mentionEntities.length > 0) {
mention.entities = params.mentionEntities;
}
payload.mention = mention;
}
await postJson(params.apiUrl, params.botToken, "/v1/bot/sendMessage", {
channel_id: params.channelId,
Expand Down Expand Up @@ -169,6 +180,7 @@ export async function sendMessage(params: {
channelType: ChannelType;
content: string;
mentionUids?: string[];
mentionEntities?: MentionEntity[];
mentionAll?: boolean;
streamNo?: string;
replyMsgId?: string;
Expand All @@ -178,12 +190,19 @@ export async function sendMessage(params: {
type: MessageType.Text,
content: params.content,
};
// Add mention field if any UIDs specified or mentionAll
if ((params.mentionUids && params.mentionUids.length > 0) || params.mentionAll) {
// Add mention field if any UIDs specified, entities present, or mentionAll
if (
(params.mentionUids && params.mentionUids.length > 0) ||
(params.mentionEntities && params.mentionEntities.length > 0) ||
params.mentionAll
) {
const mention: Record<string, unknown> = {};
if (params.mentionUids && params.mentionUids.length > 0) {
mention.uids = params.mentionUids;
}
if (params.mentionEntities && params.mentionEntities.length > 0) {
mention.entities = params.mentionEntities;
}
if (params.mentionAll) {
mention.all = true;
}
Expand Down
53 changes: 53 additions & 0 deletions openclaw-channel-dmwork/src/channel.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,58 @@
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";

// ─── Token refresh cooldown tests ───────────────────────────────────────────
// These test the time-based cooldown pattern used in channel.ts onError handler
// to prevent token refresh storms.

describe("token refresh cooldown logic", () => {
it("should allow refresh when cooldown has elapsed", () => {
let lastTokenRefreshAt = 0;
const TOKEN_REFRESH_COOLDOWN_MS = 60_000;

const cooldownElapsed = Date.now() - lastTokenRefreshAt > TOKEN_REFRESH_COOLDOWN_MS;
expect(cooldownElapsed).toBe(true);
});

it("should block refresh within cooldown window", () => {
const TOKEN_REFRESH_COOLDOWN_MS = 60_000;
let lastTokenRefreshAt = Date.now(); // just refreshed

const cooldownElapsed = Date.now() - lastTokenRefreshAt > TOKEN_REFRESH_COOLDOWN_MS;
expect(cooldownElapsed).toBe(false);
});

it("should allow refresh after cooldown expires", () => {
const TOKEN_REFRESH_COOLDOWN_MS = 60_000;
// Simulate a refresh that happened 61 seconds ago
let lastTokenRefreshAt = Date.now() - 61_000;

const cooldownElapsed = Date.now() - lastTokenRefreshAt > TOKEN_REFRESH_COOLDOWN_MS;
expect(cooldownElapsed).toBe(true);
});

it("should keep cooldown active even after failed refresh (no reset)", () => {
const TOKEN_REFRESH_COOLDOWN_MS = 60_000;
let lastTokenRefreshAt = 0;

// Simulate a refresh attempt (set timestamp before trying)
lastTokenRefreshAt = Date.now();

// Simulate failure — in the old code, hasRefreshedToken was reset to false
// In the new code, lastTokenRefreshAt stays set (no reset in catch block)
// So subsequent attempts within cooldown should be blocked
const cooldownElapsed = Date.now() - lastTokenRefreshAt > TOKEN_REFRESH_COOLDOWN_MS;
expect(cooldownElapsed).toBe(false);
});

it("should apply stagger delay before reconnect", async () => {
// Verify the stagger delay pattern works
const start = Date.now();
const staggerMs = Math.floor(Math.random() * 5000);
expect(staggerMs).toBeGreaterThanOrEqual(0);
expect(staggerMs).toBeLessThan(5000);
});
});

/**
* Tests for channel.ts singleton timer behavior.
* Verifies that cleanup timer doesn't accumulate during hot reloads.
Expand Down
48 changes: 29 additions & 19 deletions openclaw-channel-dmwork/src/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import { registerBot, sendMessage, sendHeartbeat, sendMediaMessage, inferContent
import { WKSocket } from "./socket.js";
import { handleInboundMessage, type DmworkStatusSink } from "./inbound.js";
import { ChannelType, MessageType, type BotMessage, type MessagePayload } from "./types.js";
import { parseMentions } from "./mention-utils.js";
import { buildEntitiesFromFallback } from "./mention-utils.js";
import type { MentionEntity } from "./types.js";
import { handleDmworkMessageAction, parseTarget } from "./actions.js";
import { createDmworkManagementTools } from "./agent-tools.js";
import { getOrCreateGroupMdCache, registerBotGroupIds, getKnownGroupIds } from "./group-md.js";
Expand Down Expand Up @@ -349,18 +350,20 @@ export const dmworkPlugin: ChannelPlugin<ResolvedDmworkAccount> = {

const { channelId, channelType } = parseTarget(targetForParse, undefined, getKnownGroupIds());

let mentionEntities: MentionEntity[] = [];

if (channelType === ChannelType.Group) {
// Parse @mentions from message content (e.g., "@chenpipi_bot", "@陈皮皮")
const contentMentionNames = parseMentions(content);
for (const name of contentMentionNames) {
if (name && !mentionUids.includes(name)) {
mentionUids.push(name);
console.log(`[dmwork] parsed @mention from content: ${name}`);
// Resolve @name to uid via memberMap (fixes name-as-uid bug)
const accountMemberMap = getOrCreateMemberMap(
ctx.accountId ?? DEFAULT_ACCOUNT_ID,
);
const { entities, uids } = buildEntitiesFromFallback(content, accountMemberMap);
for (const uid of uids) {
if (!mentionUids.includes(uid)) {
mentionUids.push(uid);
}
}
if (mentionUids.length > 0) {
console.log(`[dmwork] sending message with mentionUids: ${mentionUids.join(", ")}`);
}
mentionEntities = entities;
}

await sendMessage({
Expand All @@ -370,6 +373,7 @@ export const dmworkPlugin: ChannelPlugin<ResolvedDmworkAccount> = {
channelType,
content,
...(mentionUids.length > 0 ? { mentionUids } : {}),
...(mentionEntities.length > 0 ? { mentionEntities } : {}),
});

return { channel: "dmwork", to: ctx.to, messageId: "" };
Expand Down Expand Up @@ -661,8 +665,9 @@ export const dmworkPlugin: ChannelPlugin<ResolvedDmworkAccount> = {
// 4d. Group cache timestamps — track when each group's members were last fetched
const groupCacheTimestamps = getOrCreateGroupCacheTimestamps(account.accountId);

// 5. Token refresh state — detect stale cached token
let hasRefreshedToken = false;
// 5. Token refresh state — time-based cooldown to prevent refresh storms
let lastTokenRefreshAt = 0;
const TOKEN_REFRESH_COOLDOWN_MS = 60_000; // 60 seconds
let isRefreshingToken = false; // Guard against concurrent refreshes (#43)

// 5b. Heartbeat failure tracking — reconnect after consecutive failures (#42)
Expand Down Expand Up @@ -738,9 +743,6 @@ export const dmworkPlugin: ChannelPlugin<ResolvedDmworkAccount> = {
log?.info?.(`dmwork: WebSocket connected to ${wsUrl}`);
statusSink({ lastError: null });
startHeartbeat();
// WS connected successfully = WuKongIM accepted the token
// Reset refresh flag so we can refresh again if kicked later (#92)
hasRefreshedToken = false;
},

onDisconnected: () => {
Expand All @@ -752,12 +754,14 @@ export const dmworkPlugin: ChannelPlugin<ResolvedDmworkAccount> = {
log?.error?.(`dmwork: WebSocket error: ${err.message}`);
statusSink({ lastError: err.message });

// If kicked or connect failed, try refreshing the IM token once
// If kicked or connect failed, try refreshing the IM token with a cooldown
// to prevent refresh storms (e.g. 9000+ refreshes across 11 bots).
// Use isRefreshingToken to prevent concurrent refresh attempts (#43)
if (!hasRefreshedToken && !isRefreshingToken && !stopped &&
const cooldownElapsed = Date.now() - lastTokenRefreshAt > TOKEN_REFRESH_COOLDOWN_MS;
if (cooldownElapsed && !isRefreshingToken && !stopped &&
(err.message.includes("Kicked") || err.message.includes("Connect failed"))) {
isRefreshingToken = true;
hasRefreshedToken = true;
lastTokenRefreshAt = Date.now();
log?.warn?.("dmwork: connection rejected — refreshing IM token...");
try {
const fresh = await registerBot({
Expand All @@ -769,10 +773,16 @@ export const dmworkPlugin: ChannelPlugin<ResolvedDmworkAccount> = {
log?.info?.("dmwork: got fresh IM token, reconnecting WS...");
socket.disconnect();
socket.updateCredentials(fresh.robot_id, fresh.im_token);
// Stagger reconnect to avoid thundering herd when multiple bots
// refresh tokens simultaneously after server-wide token expiry
const staggerMs = Math.floor(Math.random() * 5000);
log?.info?.(`dmwork: staggering reconnect by ${staggerMs}ms`);
await new Promise(r => setTimeout(r, staggerMs));
if (stopped) return; // account was stopped during stagger delay
socket.connect();
} catch (refreshErr) {
log?.error?.(`dmwork: token refresh failed: ${String(refreshErr)}`);
hasRefreshedToken = false; // Allow retry on next error (#43)
// Keep cooldown active even on failure to prevent rapid retry hammering
} finally {
isRefreshingToken = false;
}
Expand Down
79 changes: 79 additions & 0 deletions openclaw-channel-dmwork/src/inbound.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ import {
downloadToTemp,
uploadAndSendMedia,
downloadMediaToLocal,
buildMemberListPrefix,
type ResolveFileResult,
} from "./inbound.js";
import { extractMentionUids } from "./mention-utils.js";
import { existsSync, unlinkSync, readFileSync } from "node:fs";

/**
Expand Down Expand Up @@ -786,3 +788,80 @@ describe("downloadMediaToLocal", () => {
tempFiles.push(result!);
});
});

/**
* Tests for Bot @ detection with entities support.
*/
describe("Bot @ 检测(entities 支持)", () => {
it("应从 entities 检测 bot 被 @", () => {
const mention: MentionPayload = {
entities: [{ uid: "bot_uid", offset: 0, length: 4 }],
};
const mentionUids = extractMentionUids(mention);
expect(mentionUids.includes("bot_uid")).toBe(true);
});

it("entities 无效时应从 uids 检测", () => {
const mention: MentionPayload = {
entities: [{} as any],
uids: ["bot_uid"],
};
const mentionUids = extractMentionUids(mention);
expect(mentionUids.includes("bot_uid")).toBe(true);
});
});

describe("buildMemberListPrefix", () => {
it("should return empty string for empty map", () => {
const map = new Map<string, string>();
expect(buildMemberListPrefix(map)).toBe("");
});

it("should inject full member list when ≤ 10 members", () => {
const map = new Map<string, string>([
["uid_alice", "Alice"],
["uid_bob", "Bob"],
["uid_chen", "陈皮皮"],
]);
const result = buildMemberListPrefix(map);
expect(result).toContain("[Group Members]");
expect(result).toContain("Alice (uid_alice)");
expect(result).toContain("Bob (uid_bob)");
expect(result).toContain("陈皮皮 (uid_chen)");
expect(result).toContain("@[uid:displayName]");
});

it("should inject full member list when exactly 10 members", () => {
const map = new Map<string, string>();
for (let i = 1; i <= 10; i++) {
map.set(`uid_${i}`, `User${i}`);
}
const result = buildMemberListPrefix(map);
expect(result).toContain("[Group Members]");
expect(result).toContain("User1 (uid_1)");
expect(result).toContain("User10 (uid_10)");
});

it("should inject hint message when > 10 members", () => {
const map = new Map<string, string>();
for (let i = 1; i <= 11; i++) {
map.set(`uid_${i}`, `User${i}`);
}
const result = buildMemberListPrefix(map);
expect(result).toContain("[Group Info]");
expect(result).toContain("11 members");
expect(result).toContain("group management tool");
expect(result).not.toContain("[Group Members]");
expect(result).not.toContain("User1 (uid_1)");
});

it("should inject hint message for large groups", () => {
const map = new Map<string, string>();
for (let i = 1; i <= 50; i++) {
map.set(`uid_${i}`, `User${i}`);
}
const result = buildMemberListPrefix(map);
expect(result).toContain("[Group Info]");
expect(result).toContain("50 members");
});
});
Loading
Loading