Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 12 additions & 45 deletions openclaw-channel-dmwork/src/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,62 +19,29 @@ import { parseMentions } from "./mention-utils.js";
import { handleDmworkMessageAction, parseTarget } from "./actions.js";
import { createDmworkManagementTools } from "./agent-tools.js";
import { getOrCreateGroupMdCache, registerBotGroupIds, getKnownGroupIds } from "./group-md.js";
import { UPLOAD_DIR, cleanupTempDir, streamDownloadToFile } from "./temp-utils.js";
import path from "node:path";
import os from "node:os";
import { mkdir, readFile, writeFile, unlink } from "node:fs/promises";
import { createReadStream, createWriteStream, statSync } from "node:fs";
import { createReadStream, statSync } from "node:fs";
import { randomUUID } from "node:crypto";
import { pipeline } from "node:stream/promises";
import { Readable } from "node:stream";
// HistoryEntry type - compatible with any version
type HistoryEntry = { sender: string; body: string; timestamp: number };
const DEFAULT_GROUP_HISTORY_LIMIT = 20;

const MAX_UPLOAD_SIZE = 500 * 1024 * 1024; // 500 MB
const UPLOAD_TEMP_DIR = path.join("/tmp", "dmwork-upload");

/** Download a URL to a temp file with backpressure, return the temp path. */
async function downloadToTempFile(url: string, filename: string, signal?: AbortSignal): Promise<{ tempPath: string; contentType: string | undefined }> {
await mkdir(UPLOAD_TEMP_DIR, { recursive: true });
const tempPath = path.join(UPLOAD_TEMP_DIR, `${randomUUID()}-${filename}`);

// HEAD to check size first
const head = await fetch(url, { method: "HEAD", signal: signal ?? AbortSignal.timeout(30_000) });
const contentLength = Number(head.headers.get("content-length") || 0);
if (contentLength > MAX_UPLOAD_SIZE) {
throw new Error(`File too large (${contentLength} bytes, max ${MAX_UPLOAD_SIZE})`);
}

const resp = await fetch(url, { signal: signal ?? AbortSignal.timeout(300_000) });
if (!resp.ok) throw new Error(`Failed to download media from ${url}: ${resp.status}`);
const contentType = resp.headers.get("content-type") ?? undefined;

const body = resp.body;
if (!body) throw new Error(`No response body from ${url}`);
const nodeStream = Readable.fromWeb(body as any);
const ws = createWriteStream(tempPath);
try {
await pipeline(nodeStream, ws);
} catch (err) {
// Cleanup partial temp file on download failure
await unlink(tempPath).catch(() => {});
throw err;
}
return { tempPath, contentType };
}

/** Cleanup old temp upload files (>1h). Called opportunistically. */
async function cleanupOldUploadTempFiles(): Promise<void> {
try {
const { readdir, stat, unlink: rm } = await import("node:fs/promises");
const files = await readdir(UPLOAD_TEMP_DIR);
const now = Date.now();
for (const f of files) {
const fp = path.join(UPLOAD_TEMP_DIR, f);
const st = await stat(fp).catch(() => null);
if (st && now - st.mtimeMs > 3600_000) await rm(fp).catch(() => {});
}
} catch { /* dir may not exist */ }
const result = await streamDownloadToFile({
url,
destDir: UPLOAD_DIR,
filename,
maxSize: MAX_UPLOAD_SIZE,
timeoutMs: 300_000,
headCheck: true,
});
return { tempPath: result.localPath, contentType: result.contentType };
}

// Module-level history storage — survives auto-restarts
Expand Down Expand Up @@ -397,7 +364,7 @@ export const dmworkPlugin: ChannelPlugin<ResolvedDmworkAccount> = {
let localFilePath: string | undefined; // path for parseImageDimensionsFromFile

// Opportunistic cleanup of stale temp files
cleanupOldUploadTempFiles().catch(() => {});
cleanupTempDir(UPLOAD_DIR).catch(() => {});

if (mediaUrl.startsWith("data:")) {
// Parse data URI: data:[<mediatype>][;base64],<data>
Expand Down
2 changes: 1 addition & 1 deletion openclaw-channel-dmwork/src/inbound.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ describe("downloadMediaToLocal", () => {

expect(result).toBeDefined();
expect(result).not.toContain("http");
expect(result!.startsWith("/tmp/dmwork-media/")).toBe(true);
expect(result!.startsWith("/tmp/dmwork-temp/media/")).toBe(true);
expect(result!.endsWith(".jpeg")).toBe(true);
expect(existsSync(result!)).toBe(true);
expect(readFileSync(result!)).toEqual(Buffer.from(imageData));
Expand Down
182 changes: 38 additions & 144 deletions openclaw-channel-dmwork/src/inbound.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { getDmworkRuntime } from "./runtime.js";
import { DEFAULT_HISTORY_PROMPT_TEMPLATE } from "./config-schema.js";
import { extractMentionMatches } from "./mention-utils.js";
import { registerGroupAccount, ensureGroupMd, handleGroupMdEvent, broadcastGroupMdUpdate } from "./group-md.js";
import { UPLOAD_DIR, MEDIA_DIR, FILES_DIR, cleanupTempDir, streamDownloadToFile } from "./temp-utils.js";
import { createWriteStream } from "node:fs";
import { mkdir, unlink, readdir, stat } from "node:fs/promises";
import { join, basename } from "node:path";
Expand Down Expand Up @@ -89,7 +90,6 @@ export async function uploadAndSendMedia(params: {
const { Readable } = await import("node:stream");

const MAX_UPLOAD = 500 * 1024 * 1024;
const TEMP_DIR = pathJoin("/tmp", "dmwork-upload");

let fileBody: Buffer | NodeJS.ReadableStream;
let fileSize: number;
Expand All @@ -99,32 +99,17 @@ export async function uploadAndSendMedia(params: {

if (mediaUrl.startsWith("http://") || mediaUrl.startsWith("https://")) {
filename = extractFilename(mediaUrl);
// Stream download to temp file
await fsMkdir(TEMP_DIR, { recursive: true });
tempPath = pathJoin(TEMP_DIR, `${randomUUID()}-${filename}`);

const head = await fetch(mediaUrl, { method: "HEAD" });
const cl = Number(head.headers.get("content-length") || 0);
if (cl > MAX_UPLOAD) throw new Error(`File too large (${cl} bytes, max ${MAX_UPLOAD})`);

const resp = await fetch(mediaUrl, {
signal: AbortSignal.timeout(300_000),
// Stream download to temp file via shared utility
const dl = await streamDownloadToFile({
url: mediaUrl,
destDir: UPLOAD_DIR,
filename,
maxSize: MAX_UPLOAD,
timeoutMs: 300_000,
headCheck: true,
});
if (!resp.ok) throw new Error(`Failed to fetch media: ${resp.status}`);
contentType = resp.headers.get("content-type") || "application/octet-stream";

const body = resp.body;
if (!body) throw new Error(`No response body from ${mediaUrl}`);
const nodeStream = Readable.fromWeb(body as any);
const ws = fsCreateWriteStream(tempPath);
try {
await pipeline(nodeStream, ws);
} catch (err) {
// Cleanup partial temp file on download failure
await fsUnlink(tempPath).catch(() => {});
tempPath = undefined;
throw err;
}
tempPath = dl.localPath;
contentType = dl.contentType || "application/octet-stream";

const st = fsStatSync(tempPath);
fileBody = fsCreateReadStream(tempPath);
Expand Down Expand Up @@ -402,27 +387,9 @@ export function calcDownloadTimeout(fileSize?: number): number {
return Math.min(MAX_TIMEOUT, Math.max(MIN_TIMEOUT, computed));
}

const MEDIA_TEMP_DIR = join("/tmp", "dmwork-media");
const MAX_MEDIA_DOWNLOAD_SIZE = 20 * 1024 * 1024; // 20MB cap for inbound media
const MEDIA_DOWNLOAD_TIMEOUT = 120_000; // 120 seconds

/** Best-effort cleanup of inbound media temp files older than 1 hour */
async function cleanupMediaTempFiles(): Promise<void> {
try {
const entries = await readdir(MEDIA_TEMP_DIR);
const cutoff = Date.now() - 60 * 60 * 1000;
for (const entry of entries) {
try {
const filePath = join(MEDIA_TEMP_DIR, entry);
const info = await stat(filePath);
if (info.mtimeMs < cutoff) {
await unlink(filePath);
}
} catch {}
}
} catch {}
}

/**
* Download inbound media (Image/GIF/Voice/Video) to a local temp file.
*
Expand All @@ -436,8 +403,7 @@ export async function downloadMediaToLocal(
log?: ChannelLogSink,
): Promise<string | undefined> {
try {
await mkdir(MEDIA_TEMP_DIR, { recursive: true });
cleanupMediaTempFiles().catch(() => {});
cleanupTempDir(MEDIA_DIR).catch(() => {});

// Derive a file extension from mime or URL
let ext = "";
Expand All @@ -453,77 +419,33 @@ export async function downloadMediaToLocal(
// Sanitize extension
ext = ext.replace(/[^a-zA-Z0-9.]/g, "").substring(0, 10);

const localPath = join(MEDIA_TEMP_DIR, `${randomUUID()}${ext}`);
const filename = `${randomUUID()}${ext}`;

const resp = await fetch(url, {
signal: AbortSignal.timeout(MEDIA_DOWNLOAD_TIMEOUT),
const result = await streamDownloadToFile({
url,
destDir: MEDIA_DIR,
filename,
maxSize: MAX_MEDIA_DOWNLOAD_SIZE,
timeoutMs: MEDIA_DOWNLOAD_TIMEOUT,
});
if (!resp.ok) {
log?.warn?.(`dmwork: media download failed HTTP ${resp.status} for ${url}`);
return undefined;
}
if (!resp.body) {
log?.warn?.(`dmwork: media download returned no body for ${url}`);
return undefined;
}

const ws = createWriteStream(localPath);
let totalBytes = 0;
try {
const reader = (resp.body as any).getReader() as ReadableStreamDefaultReader<Uint8Array>;
for (;;) {
const { done, value } = await reader.read();
if (done) break;
totalBytes += value.byteLength;
if (totalBytes > MAX_MEDIA_DOWNLOAD_SIZE) {
reader.cancel();
ws.destroy();
try { await unlink(localPath); } catch {}
log?.warn?.(`dmwork: media too large (>${formatSize(MAX_MEDIA_DOWNLOAD_SIZE)}), skipping: ${url}`);
return undefined;
}
if (!ws.write(value)) {
await new Promise<void>(r => ws.once("drain", r));
}
}
ws.end();
await new Promise<void>((resolve, reject) => {
ws.on("finish", resolve);
ws.on("error", reject);
});
} catch (err) {
ws.destroy();
try { await unlink(localPath); } catch {}
throw err;
}
log?.info?.(`dmwork: media downloaded to local: ${localPath} (${formatSize(totalBytes)})`);
return localPath;
log?.info?.(`dmwork: media downloaded to local: ${result.localPath} (${formatSize(result.totalBytes)})`);
return result.localPath;
} catch (err) {
log?.warn?.(`dmwork: media download failed for ${url}: ${err}`);
const msg = err instanceof Error ? err.message : String(err);
if (msg.includes("HTTP ")) {
log?.warn?.(`dmwork: media download failed ${msg} for ${url}`);
} else if (msg.includes("File too large")) {
log?.warn?.(`dmwork: media too large (>${formatSize(MAX_MEDIA_DOWNLOAD_SIZE)}), skipping: ${url}`);
} else {
log?.warn?.(`dmwork: media download failed for ${url}: ${err}`);
}
return undefined;
}
}

const TEMP_DIR = join("/tmp", "dmwork-files");
const MAX_DOWNLOAD_SIZE = 500 * 1024 * 1024; // 500MB hard cap

/** Best-effort cleanup of temp files older than 1 hour */
async function cleanupTempFiles(): Promise<void> {
try {
const entries = await readdir(TEMP_DIR);
const cutoff = Date.now() - 60 * 60 * 1000;
for (const entry of entries) {
try {
const filePath = join(TEMP_DIR, entry);
const info = await stat(filePath);
if (info.mtimeMs < cutoff) {
await unlink(filePath);
}
} catch {}
}
} catch {}
}

/** Download a file to a temp path, streaming to disk with size limit.
* Returns the local path on success. */
export async function downloadToTemp(
Expand All @@ -532,49 +454,21 @@ export async function downloadToTemp(
filename: string,
opts?: { knownSize?: number; log?: ChannelLogSink },
): Promise<string> {
await mkdir(TEMP_DIR, { recursive: true });
// Non-blocking cleanup of old temp files
cleanupTempFiles().catch(() => {});
cleanupTempDir(FILES_DIR).catch(() => {});

const safeName = basename(filename).replace(/[^a-zA-Z0-9._-]/g, '_') || 'file';
const localPath = join(TEMP_DIR, `${randomUUID()}-${safeName}`);
const timeout = calcDownloadTimeout(opts?.knownSize);
const resp = await fetch(url, {
const result = await streamDownloadToFile({
url,
destDir: FILES_DIR,
filename: safeName,
maxSize: MAX_DOWNLOAD_SIZE,
timeoutMs: timeout,
headers: { Authorization: `Bearer ${botToken}` },
signal: AbortSignal.timeout(timeout),
});
if (!resp.ok) throw new Error(`HTTP ${resp.status}`);
if (!resp.body) throw new Error("no response body");

const ws = createWriteStream(localPath);
let totalBytes = 0;
try {
const reader = (resp.body as any).getReader() as ReadableStreamDefaultReader<Uint8Array>;
for (;;) {
const { done, value } = await reader.read();
if (done) break;
totalBytes += value.byteLength;
if (totalBytes > MAX_DOWNLOAD_SIZE) {
reader.cancel();
throw new Error(`file exceeds max download size (${formatSize(MAX_DOWNLOAD_SIZE)})`);
}
if (!ws.write(value)) {
await new Promise<void>(r => ws.once('drain', r));
}
}
ws.end();
await new Promise<void>((resolve, reject) => {
ws.on("finish", resolve);
ws.on("error", reject);
});
} catch (err) {
ws.destroy();
// Best-effort cleanup
try { await unlink(localPath); } catch {}
throw err;
}
opts?.log?.info?.(`dmwork: file downloaded to temp: ${localPath}`);
return localPath;
opts?.log?.info?.(`dmwork: file downloaded to temp: ${result.localPath}`);
return result.localPath;
}

/**
Expand Down
Loading
Loading