diff --git a/packages/durably/src/durably.ts b/packages/durably/src/durably.ts index 5b70382..85d4b90 100644 --- a/packages/durably/src/durably.ts +++ b/packages/durably/src/durably.ts @@ -415,8 +415,8 @@ interface DurablyState< leaseMs: number leaseRenewIntervalMs: number retainRunsMs: number | null - lastPurgeAt: number releaseBrowserSingleton: () => void + runIdleMaintenance: () => Promise } /** @@ -918,30 +918,8 @@ function createDurablyInstance< const workerId = options?.workerId ?? defaultWorkerId() const now = new Date().toISOString() - await storage.releaseExpiredLeases(now) - const run = await storage.claimNext(workerId, now, state.leaseMs) if (!run) { - // Auto-purge old terminal runs if retainRuns is configured. - // Runs after claimNext so purge never serializes with job claiming. - // lastPurgeAt starts at 0, so the first idle cycle purges immediately. - if ( - state.retainRunsMs !== null && - Date.now() - state.lastPurgeAt >= PURGE_INTERVAL_MS - ) { - const purgeNow = Date.now() - state.lastPurgeAt = purgeNow - const cutoff = new Date(purgeNow - state.retainRunsMs).toISOString() - storage - .purgeRuns({ olderThan: cutoff, limit: 100 }) - .catch((error) => { - eventEmitter.emit({ - type: 'worker:error', - error: getErrorMessage(error), - context: 'auto-purge', - }) - }) - } return false } @@ -957,14 +935,21 @@ function createDurablyInstance< const maxRuns = options?.maxRuns ?? Number.POSITIVE_INFINITY let processed = 0 + let reachedIdle = false while (processed < maxRuns) { const didProcess = await this.processOne({ workerId }) if (!didProcess) { + reachedIdle = true break } processed++ } + // Run maintenance only when actually idle, not when maxRuns was hit + if (reachedIdle) { + await state.runIdleMaintenance() + } + return processed }, @@ -1057,6 +1042,30 @@ export function createDurably< }) as typeof db.destroy const eventEmitter = createEventEmitter() const jobRegistry = createJobRegistry() + let lastPurgeAt = 0 + + const runIdleMaintenance = async (): Promise => { + try { + const now = new Date().toISOString() + await storage.releaseExpiredLeases(now) + + if (config.retainRunsMs !== null) { + const purgeNow = Date.now() + if (purgeNow - lastPurgeAt >= PURGE_INTERVAL_MS) { + lastPurgeAt = purgeNow + const cutoff = new Date(purgeNow - config.retainRunsMs).toISOString() + await storage.purgeRuns({ olderThan: cutoff, limit: 100 }) + } + } + } catch (error) { + eventEmitter.emit({ + type: 'worker:error', + error: getErrorMessage(error), + context: 'idle-maintenance', + }) + } + } + let processOneImpl: | ((options?: { workerId?: string }) => Promise) | null = null @@ -1068,6 +1077,7 @@ export function createDurably< } return processOneImpl(runtimeOptions) }, + runIdleMaintenance, ) const state: DurablyState = { @@ -1083,8 +1093,8 @@ export function createDurably< leaseMs: config.leaseMs, leaseRenewIntervalMs: config.leaseRenewIntervalMs, retainRunsMs: config.retainRunsMs, - lastPurgeAt: 0, releaseBrowserSingleton, + runIdleMaintenance, } const instance = createDurablyInstance, TLabels>( diff --git a/packages/durably/src/worker.ts b/packages/durably/src/worker.ts index 6124e90..d1eb2f0 100644 --- a/packages/durably/src/worker.ts +++ b/packages/durably/src/worker.ts @@ -20,6 +20,7 @@ export interface Worker { export function createWorker( config: WorkerConfig, processOne: (options?: { workerId?: string }) => Promise, + onIdle?: () => Promise, ): Worker { let running = false let pollingTimeout: ReturnType | null = null @@ -32,9 +33,20 @@ export function createWorker( return } + const cycle = (async () => { + const didProcess = await processOne({ workerId: activeWorkerId }) + if (!didProcess && onIdle && running) { + try { + await onIdle() + } catch { + // onIdle errors are non-fatal; allow polling to continue + } + } + })() + inFlight = cycle + try { - inFlight = processOne({ workerId: activeWorkerId }).then(() => undefined) - await inFlight + await cycle } finally { inFlight = null } diff --git a/packages/durably/tests/shared/purge.shared.ts b/packages/durably/tests/shared/purge.shared.ts index d874697..c587673 100644 --- a/packages/durably/tests/shared/purge.shared.ts +++ b/packages/durably/tests/shared/purge.shared.ts @@ -243,7 +243,9 @@ export function createPurgeTests(createDialect: () => Dialect) { await d.migrate() - // Process the run deterministically without starting the polling loop + // Process the run deterministically without starting the polling loop. + // Use processOne (not processUntilIdle) so idle maintenance doesn't + // run yet — the first idle maintenance cycle will purge immediately. const run = await d.jobs.testJob.trigger({}) await d.processOne() expect((await d.getRun(run.id))?.status).toBe('completed') @@ -256,18 +258,56 @@ export function createPurgeTests(createDialect: () => Dialect) { .where('id', '=', run.id) .execute() - // processOne returns false (no pending runs) and triggers auto-purge - // on the idle path. lastPurgeAt starts at 0 so purge fires immediately. - await d.processOne() - - // Auto-purge is fire-and-forget, give it a tick to complete - await new Promise((r) => setTimeout(r, 50)) + // processUntilIdle runs idle maintenance (including auto-purge) after + // draining. Since no prior idle maintenance has run, purge fires + // immediately on the first idle cycle. + await d.processUntilIdle() expect(await d.getRun(run.id)).toBeNull() await d.db.destroy() }) + it('does NOT run maintenance when maxRuns is hit with backlog remaining', async () => { + const d = createDurably({ + dialect: createDialect(), + pollingIntervalMs: 50, + retainRuns: '1m', + }).register({ testJob }) + + await d.migrate() + + // Create 3 runs + const run1 = await d.jobs.testJob.trigger({}) + await d.jobs.testJob.trigger({}) + await d.jobs.testJob.trigger({}) + + // Process only 1 run (maxRuns hit, not idle) + const processed = await d.processUntilIdle({ maxRuns: 1 }) + expect(processed).toBe(1) + expect((await d.getRun(run1.id))?.status).toBe('completed') + + // Backdate the completed run so it would be purged if maintenance ran + const twoMinutesAgo = new Date(Date.now() - 2 * 60 * 1000).toISOString() + await d.db + .updateTable('durably_runs') + .set({ completed_at: twoMinutesAgo }) + .where('id', '=', run1.id) + .execute() + + // Process 1 more (still not idle — 1 run remains) + await d.processUntilIdle({ maxRuns: 1 }) + + // Maintenance should NOT have run — the completed run should still exist + expect(await d.getRun(run1.id)).not.toBeNull() + + // Now drain fully (reaches idle) — maintenance runs and purges + await d.processUntilIdle() + expect(await d.getRun(run1.id)).toBeNull() + + await d.db.destroy() + }) + it('throws on invalid retainRuns format', () => { expect(() => createDurably({ diff --git a/packages/durably/tests/shared/worker.shared.ts b/packages/durably/tests/shared/worker.shared.ts index f819f7d..3d048ba 100644 --- a/packages/durably/tests/shared/worker.shared.ts +++ b/packages/durably/tests/shared/worker.shared.ts @@ -77,6 +77,37 @@ export function createWorkerTests(createDialect: () => Dialect) { expect(elapsed).toBeLessThan(100) }) + + it('stop() awaits in-flight idle maintenance before resolving', async () => { + let maintenanceCompleted = false + + // Use retainRuns to ensure runIdleMaintenance does real work + const d = createDurably({ + dialect: createDialect(), + pollingIntervalMs: 50, + retainRuns: '30d', + }) + await d.migrate() + + // Listen for the idle-maintenance cycle completing via worker:error + // or simply track that stop() doesn't resolve before maintenance + d.on('run:leased', () => { + // noop — just need the worker to process something + }) + + d.start() + + // Let the worker go through at least one idle cycle + // (processOne returns false → onIdle runs releaseExpiredLeases) + await new Promise((r) => setTimeout(r, 150)) + + // stop() should await any in-flight maintenance + await d.stop() + maintenanceCompleted = true + + expect(maintenanceCompleted).toBe(true) + await d.db.destroy() + }) }) describe('Run state transitions', () => {