diff --git a/src/gateway/r2.test.ts b/src/gateway/r2.test.ts index 83b03ae43..d1013d7a0 100644 --- a/src/gateway/r2.test.ts +++ b/src/gateway/r2.test.ts @@ -1,5 +1,5 @@ import { describe, it, expect, beforeEach } from 'vitest'; -import { mountR2Storage } from './r2'; +import { mountR2Storage, _resetMountLock } from './r2'; import { createMockEnv, createMockEnvWithR2, @@ -11,6 +11,7 @@ import { describe('mountR2Storage', () => { beforeEach(() => { suppressConsole(); + _resetMountLock(); }); describe('credential validation', () => { @@ -64,8 +65,15 @@ describe('mountR2Storage', () => { }); describe('mounting behavior', () => { - it('mounts R2 bucket when credentials provided and not already mounted', async () => { - const { sandbox, mountBucketMock } = createMockSandbox({ mounted: false }); + it('mounts R2 via s3fs when credentials provided and not already mounted', async () => { + const { sandbox, startProcessMock } = createMockSandbox({ mounted: false }); + // isR2Mounted (not mounted) → passwd setup → s3fs mount → isR2Mounted (mounted) + startProcessMock + .mockResolvedValueOnce(createMockProcess('')) // isR2Mounted check + .mockResolvedValueOnce(createMockProcess('')) // passwd file write + .mockResolvedValueOnce(createMockProcess('')) // s3fs mount + .mockResolvedValueOnce(createMockProcess('s3fs on /data/moltbot type fuse.s3fs\n')); // verify + const env = createMockEnvWithR2({ R2_ACCESS_KEY_ID: 'key123', R2_SECRET_ACCESS_KEY: 'secret', @@ -75,87 +83,154 @@ describe('mountR2Storage', () => { const result = await mountR2Storage(sandbox, env); expect(result).toBe(true); - expect(mountBucketMock).toHaveBeenCalledWith('moltbot-data', '/data/moltbot', { - endpoint: 'https://account123.r2.cloudflarestorage.com', - credentials: { - accessKeyId: 'key123', - secretAccessKey: 'secret', - }, - }); + // Verify passwd file is written with env vars (not embedded in command) + expect(startProcessMock).toHaveBeenCalledWith( + expect.stringContaining('passwd-s3fs'), + expect.objectContaining({ + env: { R2_KEY: 'key123', R2_SECRET: 'secret' }, + }), + ); + // Verify s3fs mount command + expect(startProcessMock).toHaveBeenCalledWith( + expect.stringContaining('s3fs moltbot-data /data/moltbot'), + ); }); it('uses custom bucket name from R2_BUCKET_NAME env var', async () => { - const { sandbox, mountBucketMock } = createMockSandbox({ mounted: false }); - const env = createMockEnvWithR2({ - R2_ACCESS_KEY_ID: 'key123', - R2_SECRET_ACCESS_KEY: 'secret', - CF_ACCOUNT_ID: 'account123', - R2_BUCKET_NAME: 'moltbot-e2e-test123', - }); + const { sandbox, startProcessMock } = createMockSandbox({ mounted: false }); + startProcessMock + .mockResolvedValueOnce(createMockProcess('')) + .mockResolvedValueOnce(createMockProcess('')) + .mockResolvedValueOnce(createMockProcess('')) + .mockResolvedValueOnce(createMockProcess('s3fs on /data/moltbot type fuse.s3fs\n')); + + const env = createMockEnvWithR2({ R2_BUCKET_NAME: 'custom-bucket' }); const result = await mountR2Storage(sandbox, env); expect(result).toBe(true); - expect(mountBucketMock).toHaveBeenCalledWith( - 'moltbot-e2e-test123', - '/data/moltbot', - expect.any(Object), + expect(startProcessMock).toHaveBeenCalledWith( + expect.stringContaining('s3fs custom-bucket /data/moltbot'), ); }); it('returns true immediately when bucket is already mounted', async () => { - const { sandbox, mountBucketMock } = createMockSandbox({ mounted: true }); + const { sandbox, startProcessMock } = createMockSandbox({ mounted: true }); const env = createMockEnvWithR2(); const result = await mountR2Storage(sandbox, env); expect(result).toBe(true); - expect(mountBucketMock).not.toHaveBeenCalled(); + // Only one startProcess call (the isR2Mounted check) — no mount attempted + expect(startProcessMock).toHaveBeenCalledTimes(1); expect(console.log).toHaveBeenCalledWith('R2 bucket already mounted at', '/data/moltbot'); }); - it('logs success message when mounted successfully', async () => { - const { sandbox } = createMockSandbox({ mounted: false }); + it('does not call mountBucket — uses direct s3fs instead', async () => { + const { sandbox, mountBucketMock, startProcessMock } = createMockSandbox({ mounted: false }); + startProcessMock + .mockResolvedValueOnce(createMockProcess('')) + .mockResolvedValueOnce(createMockProcess('')) + .mockResolvedValueOnce(createMockProcess('')) + .mockResolvedValueOnce(createMockProcess('s3fs on /data/moltbot type fuse.s3fs\n')); + const env = createMockEnvWithR2(); await mountR2Storage(sandbox, env); - expect(console.log).toHaveBeenCalledWith( - 'R2 bucket mounted successfully - moltbot data will persist across sessions', - ); + expect(mountBucketMock).not.toHaveBeenCalled(); }); }); describe('error handling', () => { - it('returns false when mountBucket throws and mount check fails', async () => { - const { sandbox, mountBucketMock, startProcessMock } = createMockSandbox({ mounted: false }); - mountBucketMock.mockRejectedValue(new Error('Mount failed')); + it('returns false when s3fs mount fails and post-mount check fails', async () => { + const { sandbox, startProcessMock } = createMockSandbox({ mounted: false }); startProcessMock - .mockResolvedValueOnce(createMockProcess('')) - .mockResolvedValueOnce(createMockProcess('')); + .mockResolvedValueOnce(createMockProcess('')) // isR2Mounted (not mounted) + .mockResolvedValueOnce(createMockProcess('')) // passwd write + .mockResolvedValueOnce(createMockProcess('', { exitCode: 1, stderr: 'mount error' })) // s3fs fails + .mockResolvedValueOnce(createMockProcess('')) // verify (not mounted) + .mockResolvedValueOnce(createMockProcess('')); // final check (not mounted) const env = createMockEnvWithR2(); const result = await mountR2Storage(sandbox, env); expect(result).toBe(false); - expect(console.error).toHaveBeenCalledWith('Failed to mount R2 bucket:', expect.any(Error)); + expect(console.error).toHaveBeenCalledWith( + 'Failed to mount R2 bucket: s3fs mount did not succeed', + ); }); - it('returns true if mount fails but check shows it is actually mounted', async () => { - const { sandbox, mountBucketMock, startProcessMock } = createMockSandbox(); + it('returns true if mount check passes despite errors during setup', async () => { + const { sandbox, startProcessMock } = createMockSandbox(); startProcessMock - .mockResolvedValueOnce(createMockProcess('')) - .mockResolvedValueOnce(createMockProcess('s3fs on /data/moltbot type fuse.s3fs\n')); - - mountBucketMock.mockRejectedValue(new Error('Transient error')); + .mockResolvedValueOnce(createMockProcess('')) // isR2Mounted (not mounted) + .mockRejectedValueOnce(new Error('startProcess failed')) // passwd write throws + .mockResolvedValueOnce(createMockProcess('s3fs on /data/moltbot type fuse.s3fs\n')); // final check const env = createMockEnvWithR2(); const result = await mountR2Storage(sandbox, env); expect(result).toBe(true); - expect(console.log).toHaveBeenCalledWith('R2 bucket is mounted despite error'); + expect(console.log).toHaveBeenCalledWith( + 'R2 bucket is mounted despite errors during setup', + ); + }); + }); + + describe('concurrent mount protection', () => { + it('only runs mount once when invoked concurrently', async () => { + const { sandbox, startProcessMock } = createMockSandbox({ mounted: false }); + // Default mock returns empty (not mounted), override specific calls + startProcessMock + .mockResolvedValueOnce(createMockProcess('')) // isR2Mounted + .mockResolvedValueOnce(createMockProcess('')) // passwd write + .mockResolvedValueOnce(createMockProcess('')) // s3fs mount + .mockResolvedValueOnce(createMockProcess('s3fs on /data/moltbot type fuse.s3fs\n')); // verify + + const env = createMockEnvWithR2(); + + const [result1, result2] = await Promise.all([ + mountR2Storage(sandbox, env), + mountR2Storage(sandbox, env), + ]); + + expect(result1).toBe(true); + expect(result2).toBe(true); + // passwd write + s3fs mount should only run once (plus isR2Mounted checks) + // No duplicate mount attempts + const mountCalls = startProcessMock.mock.calls.filter((call: unknown[]) => + (call[0] as string).startsWith('mkdir -p'), + ); + expect(mountCalls).toHaveLength(1); + }); + + it('resets lock after failure so next attempt can retry', async () => { + const { sandbox, startProcessMock } = createMockSandbox({ mounted: false }); + // First attempt: all checks fail + startProcessMock + .mockResolvedValueOnce(createMockProcess('')) // isR2Mounted + .mockResolvedValueOnce(createMockProcess('')) // passwd write + .mockResolvedValueOnce(createMockProcess('', { exitCode: 1 })) // s3fs fails + .mockResolvedValueOnce(createMockProcess('')) // verify (not mounted) + .mockResolvedValueOnce(createMockProcess('')); // final check (not mounted) + + const env = createMockEnvWithR2(); + + const result1 = await mountR2Storage(sandbox, env); + expect(result1).toBe(false); + + // Second attempt should work (lock was released) + startProcessMock + .mockResolvedValueOnce(createMockProcess('')) // isR2Mounted + .mockResolvedValueOnce(createMockProcess('')) // passwd write + .mockResolvedValueOnce(createMockProcess('')) // s3fs mount + .mockResolvedValueOnce(createMockProcess('s3fs on /data/moltbot type fuse.s3fs\n')); // verify + + const result2 = await mountR2Storage(sandbox, env); + expect(result2).toBe(true); }); }); }); diff --git a/src/gateway/r2.ts b/src/gateway/r2.ts index c95efc40b..b5765ddc0 100644 --- a/src/gateway/r2.ts +++ b/src/gateway/r2.ts @@ -2,21 +2,40 @@ import type { Sandbox } from '@cloudflare/sandbox'; import type { MoltbotEnv } from '../types'; import { R2_MOUNT_PATH, getR2BucketName } from '../config'; +/** + * In-flight mount promise used to deduplicate concurrent mount attempts. + * + * Multiple concurrent requests (e.g. the loading-page waitUntil + the next + * polling request) can both call mountR2Storage before the first one finishes. + * + * By caching the in-flight promise we ensure only one mount attempt runs at + * a time within a Worker isolate. + */ +let inflightMount: Promise | null = null; + +/** Wait for a sandbox process to finish (up to ~2 s by default). */ +async function waitForProcess( + proc: { status: string }, + timeoutMs = 2000, +): Promise { + const interval = 200; + const maxAttempts = Math.ceil(timeoutMs / interval); + let attempts = 0; + while (proc.status === 'running' && attempts < maxAttempts) { + // eslint-disable-next-line no-await-in-loop -- intentional sequential polling + await new Promise((r) => setTimeout(r, interval)); + attempts++; + } +} + /** * Check if R2 is already mounted by looking at the mount table */ async function isR2Mounted(sandbox: Sandbox): Promise { try { const proc = await sandbox.startProcess(`mount | grep "s3fs on ${R2_MOUNT_PATH}"`); - // Wait for the command to complete - let attempts = 0; - while (proc.status === 'running' && attempts < 10) { - // eslint-disable-next-line no-await-in-loop -- intentional sequential polling - await new Promise((r) => setTimeout(r, 200)); - attempts++; - } + await waitForProcess(proc); const logs = await proc.getLogs(); - // If stdout has content, the mount exists const mounted = !!(logs.stdout && logs.stdout.includes('s3fs')); console.log('isR2Mounted check:', mounted, 'stdout:', logs.stdout?.slice(0, 100)); return mounted; @@ -27,7 +46,19 @@ async function isR2Mounted(sandbox: Sandbox): Promise { } /** - * Mount R2 bucket for persistent storage + * Mount R2 bucket for persistent storage. + * + * Uses s3fs directly inside the container instead of sandbox.mountBucket(). + * The mountBucket() API manages the s3fs passwd file from the orchestration + * layer and appends a new credential entry on every call. Because the + * container persists across Worker invocations the entries accumulate and + * s3fs refuses to mount ("multiple entries for the same bucket(default)"). + * + * By writing the passwd file ourselves (overwrite, not append) and calling + * s3fs directly, each mount attempt starts clean — matching the pattern + * recommended in the Cloudflare Containers FUSE-mount documentation. + * + * Concurrent calls are coalesced behind a single in-flight promise. * * @param sandbox - The sandbox instance * @param env - Worker environment bindings @@ -42,37 +73,102 @@ export async function mountR2Storage(sandbox: Sandbox, env: MoltbotEnv): Promise return false; } - // Check if already mounted first - this avoids errors and is faster + // If a mount is already in progress, wait for it instead of starting another + if (inflightMount) { + console.log('R2 mount already in progress, waiting for existing attempt...'); + return inflightMount; + } + + inflightMount = doMount(sandbox, env); + try { + return await inflightMount; + } finally { + inflightMount = null; + } +} + +/** + * Internal mount implementation — always called at most once at a time. + * + * Steps: + * 1. Check if already mounted (fast path) + * 2. Write credentials to /etc/passwd-s3fs (overwrite, never append) + * 3. Run s3fs inside the container to mount the bucket + * 4. Verify the mount succeeded + */ +async function doMount(sandbox: Sandbox, env: MoltbotEnv): Promise { + // Fast path: already mounted from a previous invocation if (await isR2Mounted(sandbox)) { console.log('R2 bucket already mounted at', R2_MOUNT_PATH); return true; } const bucketName = getR2BucketName(env); + const endpoint = `https://${env.CF_ACCOUNT_ID}.r2.cloudflarestorage.com`; + try { - console.log('Mounting R2 bucket', bucketName, 'at', R2_MOUNT_PATH); - await sandbox.mountBucket(bucketName, R2_MOUNT_PATH, { - endpoint: `https://${env.CF_ACCOUNT_ID}.r2.cloudflarestorage.com`, - // Pass credentials explicitly since we use R2_* naming instead of AWS_* - credentials: { - accessKeyId: env.R2_ACCESS_KEY_ID, - secretAccessKey: env.R2_SECRET_ACCESS_KEY, + // Write credentials to the s3fs passwd file inside the container. + // Using '>' (overwrite) instead of '>>' ensures exactly one entry + // regardless of how many times this runs — avoiding the "multiple + // entries for the same bucket" error that plagues mountBucket(). + console.log('Writing s3fs credentials and mounting', bucketName, 'at', R2_MOUNT_PATH); + const setupProc = await sandbox.startProcess( + `printf '%s:%s\\n' "$R2_KEY" "$R2_SECRET" > /etc/passwd-s3fs && chmod 600 /etc/passwd-s3fs`, + { + env: { + R2_KEY: env.R2_ACCESS_KEY_ID!, + R2_SECRET: env.R2_SECRET_ACCESS_KEY!, + }, }, - }); - console.log('R2 bucket mounted successfully - moltbot data will persist across sessions'); - return true; - } catch (err) { - const errorMessage = err instanceof Error ? err.message : String(err); - console.log('R2 mount error:', errorMessage); + ); + await waitForProcess(setupProc); + + const setupLogs = await setupProc.getLogs(); + if (setupLogs.stderr) { + console.log('passwd-s3fs setup stderr:', setupLogs.stderr.slice(0, 200)); + } + + // Mount with s3fs directly inside the container + const mountProc = await sandbox.startProcess( + `mkdir -p ${R2_MOUNT_PATH} && ` + + `s3fs ${bucketName} ${R2_MOUNT_PATH}` + + ` -o passwd_file=/etc/passwd-s3fs` + + ` -o url=${endpoint}` + + ` -o use_path_request_style`, + ); + // s3fs mount can take a few seconds + await waitForProcess(mountProc, 10000); + + const mountLogs = await mountProc.getLogs(); + if (mountLogs.stderr) { + console.log('s3fs mount stderr:', mountLogs.stderr.slice(0, 300)); + } - // Check again if it's mounted - the error might be misleading + // Verify the mount succeeded if (await isR2Mounted(sandbox)) { - console.log('R2 bucket is mounted despite error'); + console.log('R2 bucket mounted successfully - moltbot data will persist across sessions'); return true; } - // Don't fail if mounting fails - moltbot can still run without persistent storage - console.error('Failed to mount R2 bucket:', err); - return false; + console.log('s3fs exited but mount not detected, checking exit code:', mountProc.exitCode); + // Fall through to error path + } catch (err) { + const errorMessage = err instanceof Error ? err.message : String(err); + console.log('R2 mount error:', errorMessage); } + + // Final check — the mount might have succeeded despite errors + if (await isR2Mounted(sandbox)) { + console.log('R2 bucket is mounted despite errors during setup'); + return true; + } + + // Don't fail the gateway — moltbot can still run without persistent storage + console.error('Failed to mount R2 bucket: s3fs mount did not succeed'); + return false; +} + +/** Exposed for testing only — reset the in-flight lock between tests */ +export function _resetMountLock(): void { + inflightMount = null; } diff --git a/wrangler.jsonc b/wrangler.jsonc index 5d64e40e3..beeec93ed 100644 --- a/wrangler.jsonc +++ b/wrangler.jsonc @@ -1,10 +1,10 @@ { "$schema": "node_modules/wrangler/config-schema.json", - "name": "moltbot-sandbox", + "name": "moltbot-sandbox-chris", "main": "src/index.ts", "compatibility_date": "2025-05-06", "compatibility_flags": ["nodejs_compat"], - "observability": { + "observability": { "enabled": true, }, // Static assets for admin UI (built by vite) @@ -59,7 +59,7 @@ "r2_buckets": [ { "binding": "MOLTBOT_BUCKET", - "bucket_name": "moltbot-data", + "bucket_name": "moltdata", }, ], // Cron trigger to sync moltbot data to R2 every 5 minutes