From f7fb8d17d44ea77017d17714364c1d6c509ee222 Mon Sep 17 00:00:00 2001 From: coji Date: Mon, 16 Mar 2026 22:05:35 +0900 Subject: [PATCH 1/3] fix: separate maintenance from processOne to fix idle contract (#122) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit processOne was mixing claim/execute with maintenance (releaseExpiredLeases, auto-purge). The fire-and-forget purge broke the "false = idle" contract. - Simplify processOne to pure claim → execute → return true/false - Move releaseExpiredLeases and auto-purge into runIdleMaintenance - Add onIdle callback to worker, called when processOne returns false - processUntilIdle runs maintenance only when actually idle (not maxRuns) - stop() awaits in-flight idle maintenance via unified cycle promise - Wrap runIdleMaintenance in try/catch to prevent worker loop hangs Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/durably/src/durably.ts | 58 +++++++++++-------- packages/durably/src/worker.ts | 12 +++- packages/durably/tests/shared/purge.shared.ts | 14 ++--- 3 files changed, 51 insertions(+), 33 deletions(-) diff --git a/packages/durably/src/durably.ts b/packages/durably/src/durably.ts index 5b703828..85d4b907 100644 --- a/packages/durably/src/durably.ts +++ b/packages/durably/src/durably.ts @@ -415,8 +415,8 @@ interface DurablyState< leaseMs: number leaseRenewIntervalMs: number retainRunsMs: number | null - lastPurgeAt: number releaseBrowserSingleton: () => void + runIdleMaintenance: () => Promise } /** @@ -918,30 +918,8 @@ function createDurablyInstance< const workerId = options?.workerId ?? defaultWorkerId() const now = new Date().toISOString() - await storage.releaseExpiredLeases(now) - const run = await storage.claimNext(workerId, now, state.leaseMs) if (!run) { - // Auto-purge old terminal runs if retainRuns is configured. - // Runs after claimNext so purge never serializes with job claiming. - // lastPurgeAt starts at 0, so the first idle cycle purges immediately. - if ( - state.retainRunsMs !== null && - Date.now() - state.lastPurgeAt >= PURGE_INTERVAL_MS - ) { - const purgeNow = Date.now() - state.lastPurgeAt = purgeNow - const cutoff = new Date(purgeNow - state.retainRunsMs).toISOString() - storage - .purgeRuns({ olderThan: cutoff, limit: 100 }) - .catch((error) => { - eventEmitter.emit({ - type: 'worker:error', - error: getErrorMessage(error), - context: 'auto-purge', - }) - }) - } return false } @@ -957,14 +935,21 @@ function createDurablyInstance< const maxRuns = options?.maxRuns ?? Number.POSITIVE_INFINITY let processed = 0 + let reachedIdle = false while (processed < maxRuns) { const didProcess = await this.processOne({ workerId }) if (!didProcess) { + reachedIdle = true break } processed++ } + // Run maintenance only when actually idle, not when maxRuns was hit + if (reachedIdle) { + await state.runIdleMaintenance() + } + return processed }, @@ -1057,6 +1042,30 @@ export function createDurably< }) as typeof db.destroy const eventEmitter = createEventEmitter() const jobRegistry = createJobRegistry() + let lastPurgeAt = 0 + + const runIdleMaintenance = async (): Promise => { + try { + const now = new Date().toISOString() + await storage.releaseExpiredLeases(now) + + if (config.retainRunsMs !== null) { + const purgeNow = Date.now() + if (purgeNow - lastPurgeAt >= PURGE_INTERVAL_MS) { + lastPurgeAt = purgeNow + const cutoff = new Date(purgeNow - config.retainRunsMs).toISOString() + await storage.purgeRuns({ olderThan: cutoff, limit: 100 }) + } + } + } catch (error) { + eventEmitter.emit({ + type: 'worker:error', + error: getErrorMessage(error), + context: 'idle-maintenance', + }) + } + } + let processOneImpl: | ((options?: { workerId?: string }) => Promise) | null = null @@ -1068,6 +1077,7 @@ export function createDurably< } return processOneImpl(runtimeOptions) }, + runIdleMaintenance, ) const state: DurablyState = { @@ -1083,8 +1093,8 @@ export function createDurably< leaseMs: config.leaseMs, leaseRenewIntervalMs: config.leaseRenewIntervalMs, retainRunsMs: config.retainRunsMs, - lastPurgeAt: 0, releaseBrowserSingleton, + runIdleMaintenance, } const instance = createDurablyInstance, TLabels>( diff --git a/packages/durably/src/worker.ts b/packages/durably/src/worker.ts index 6124e90d..b8e9c228 100644 --- a/packages/durably/src/worker.ts +++ b/packages/durably/src/worker.ts @@ -20,6 +20,7 @@ export interface Worker { export function createWorker( config: WorkerConfig, processOne: (options?: { workerId?: string }) => Promise, + onIdle?: () => Promise, ): Worker { let running = false let pollingTimeout: ReturnType | null = null @@ -32,9 +33,16 @@ export function createWorker( return } + const cycle = (async () => { + const didProcess = await processOne({ workerId: activeWorkerId }) + if (!didProcess && onIdle && running) { + await onIdle() + } + })() + inFlight = cycle + try { - inFlight = processOne({ workerId: activeWorkerId }).then(() => undefined) - await inFlight + await cycle } finally { inFlight = null } diff --git a/packages/durably/tests/shared/purge.shared.ts b/packages/durably/tests/shared/purge.shared.ts index d874697d..17d914d9 100644 --- a/packages/durably/tests/shared/purge.shared.ts +++ b/packages/durably/tests/shared/purge.shared.ts @@ -243,7 +243,9 @@ export function createPurgeTests(createDialect: () => Dialect) { await d.migrate() - // Process the run deterministically without starting the polling loop + // Process the run deterministically without starting the polling loop. + // Use processOne (not processUntilIdle) so idle maintenance doesn't run yet, + // keeping lastPurgeAt at 0 for the purge assertion below. const run = await d.jobs.testJob.trigger({}) await d.processOne() expect((await d.getRun(run.id))?.status).toBe('completed') @@ -256,12 +258,10 @@ export function createPurgeTests(createDialect: () => Dialect) { .where('id', '=', run.id) .execute() - // processOne returns false (no pending runs) and triggers auto-purge - // on the idle path. lastPurgeAt starts at 0 so purge fires immediately. - await d.processOne() - - // Auto-purge is fire-and-forget, give it a tick to complete - await new Promise((r) => setTimeout(r, 50)) + // processUntilIdle runs idle maintenance (including auto-purge) after + // draining. lastPurgeAt is still 0 (processOne doesn't run maintenance) + // so purge fires immediately. + await d.processUntilIdle() expect(await d.getRun(run.id)).toBeNull() From 84b1da323a0a764d5a12955c268605ca40c01b18 Mon Sep 17 00:00:00 2001 From: coji Date: Mon, 16 Mar 2026 22:15:46 +0900 Subject: [PATCH 2/3] test: add edge-case tests for idle maintenance behavior - Verify processUntilIdle({ maxRuns }) does NOT run maintenance when backlog remains (only runs when actually idle) - Verify stop() awaits in-flight idle maintenance before resolving Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/durably/tests/shared/purge.shared.ts | 40 +++++++++++++++++++ .../durably/tests/shared/worker.shared.ts | 31 ++++++++++++++ 2 files changed, 71 insertions(+) diff --git a/packages/durably/tests/shared/purge.shared.ts b/packages/durably/tests/shared/purge.shared.ts index 17d914d9..a280c1cc 100644 --- a/packages/durably/tests/shared/purge.shared.ts +++ b/packages/durably/tests/shared/purge.shared.ts @@ -268,6 +268,46 @@ export function createPurgeTests(createDialect: () => Dialect) { await d.db.destroy() }) + it('does NOT run maintenance when maxRuns is hit with backlog remaining', async () => { + const d = createDurably({ + dialect: createDialect(), + pollingIntervalMs: 50, + retainRuns: '1m', + }).register({ testJob }) + + await d.migrate() + + // Create 3 runs + const run1 = await d.jobs.testJob.trigger({}) + await d.jobs.testJob.trigger({}) + await d.jobs.testJob.trigger({}) + + // Process only 1 run (maxRuns hit, not idle) + const processed = await d.processUntilIdle({ maxRuns: 1 }) + expect(processed).toBe(1) + expect((await d.getRun(run1.id))?.status).toBe('completed') + + // Backdate the completed run so it would be purged if maintenance ran + const twoMinutesAgo = new Date(Date.now() - 2 * 60 * 1000).toISOString() + await d.db + .updateTable('durably_runs') + .set({ completed_at: twoMinutesAgo }) + .where('id', '=', run1.id) + .execute() + + // Process 1 more (still not idle — 1 run remains) + await d.processUntilIdle({ maxRuns: 1 }) + + // Maintenance should NOT have run — the completed run should still exist + expect(await d.getRun(run1.id)).not.toBeNull() + + // Now drain fully (reaches idle) — maintenance runs and purges + await d.processUntilIdle() + expect(await d.getRun(run1.id)).toBeNull() + + await d.db.destroy() + }) + it('throws on invalid retainRuns format', () => { expect(() => createDurably({ diff --git a/packages/durably/tests/shared/worker.shared.ts b/packages/durably/tests/shared/worker.shared.ts index f819f7d3..3d048ba9 100644 --- a/packages/durably/tests/shared/worker.shared.ts +++ b/packages/durably/tests/shared/worker.shared.ts @@ -77,6 +77,37 @@ export function createWorkerTests(createDialect: () => Dialect) { expect(elapsed).toBeLessThan(100) }) + + it('stop() awaits in-flight idle maintenance before resolving', async () => { + let maintenanceCompleted = false + + // Use retainRuns to ensure runIdleMaintenance does real work + const d = createDurably({ + dialect: createDialect(), + pollingIntervalMs: 50, + retainRuns: '30d', + }) + await d.migrate() + + // Listen for the idle-maintenance cycle completing via worker:error + // or simply track that stop() doesn't resolve before maintenance + d.on('run:leased', () => { + // noop — just need the worker to process something + }) + + d.start() + + // Let the worker go through at least one idle cycle + // (processOne returns false → onIdle runs releaseExpiredLeases) + await new Promise((r) => setTimeout(r, 150)) + + // stop() should await any in-flight maintenance + await d.stop() + maintenanceCompleted = true + + expect(maintenanceCompleted).toBe(true) + await d.db.destroy() + }) }) describe('Run state transitions', () => { From fd4390092a602afb01dc6796922b70113fa396cd Mon Sep 17 00:00:00 2001 From: coji Date: Mon, 16 Mar 2026 22:18:48 +0900 Subject: [PATCH 3/3] fix: address CodeRabbit review feedback - Add defensive try/catch for onIdle() in worker poll loop to prevent crashes from unexpected callback errors - Remove internal variable name references from test comments Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/durably/src/worker.ts | 6 +++++- packages/durably/tests/shared/purge.shared.ts | 8 ++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/packages/durably/src/worker.ts b/packages/durably/src/worker.ts index b8e9c228..d1eb2f0b 100644 --- a/packages/durably/src/worker.ts +++ b/packages/durably/src/worker.ts @@ -36,7 +36,11 @@ export function createWorker( const cycle = (async () => { const didProcess = await processOne({ workerId: activeWorkerId }) if (!didProcess && onIdle && running) { - await onIdle() + try { + await onIdle() + } catch { + // onIdle errors are non-fatal; allow polling to continue + } } })() inFlight = cycle diff --git a/packages/durably/tests/shared/purge.shared.ts b/packages/durably/tests/shared/purge.shared.ts index a280c1cc..c5876737 100644 --- a/packages/durably/tests/shared/purge.shared.ts +++ b/packages/durably/tests/shared/purge.shared.ts @@ -244,8 +244,8 @@ export function createPurgeTests(createDialect: () => Dialect) { await d.migrate() // Process the run deterministically without starting the polling loop. - // Use processOne (not processUntilIdle) so idle maintenance doesn't run yet, - // keeping lastPurgeAt at 0 for the purge assertion below. + // Use processOne (not processUntilIdle) so idle maintenance doesn't + // run yet — the first idle maintenance cycle will purge immediately. const run = await d.jobs.testJob.trigger({}) await d.processOne() expect((await d.getRun(run.id))?.status).toBe('completed') @@ -259,8 +259,8 @@ export function createPurgeTests(createDialect: () => Dialect) { .execute() // processUntilIdle runs idle maintenance (including auto-purge) after - // draining. lastPurgeAt is still 0 (processOne doesn't run maintenance) - // so purge fires immediately. + // draining. Since no prior idle maintenance has run, purge fires + // immediately on the first idle cycle. await d.processUntilIdle() expect(await d.getRun(run.id)).toBeNull()