Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 34 additions & 24 deletions packages/durably/src/durably.ts
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,8 @@ interface DurablyState<
leaseMs: number
leaseRenewIntervalMs: number
retainRunsMs: number | null
lastPurgeAt: number
releaseBrowserSingleton: () => void
runIdleMaintenance: () => Promise<void>
}

/**
Expand Down Expand Up @@ -918,30 +918,8 @@ function createDurablyInstance<
const workerId = options?.workerId ?? defaultWorkerId()
const now = new Date().toISOString()

await storage.releaseExpiredLeases(now)

const run = await storage.claimNext(workerId, now, state.leaseMs)
if (!run) {
// Auto-purge old terminal runs if retainRuns is configured.
// Runs after claimNext so purge never serializes with job claiming.
// lastPurgeAt starts at 0, so the first idle cycle purges immediately.
if (
state.retainRunsMs !== null &&
Date.now() - state.lastPurgeAt >= PURGE_INTERVAL_MS
) {
const purgeNow = Date.now()
state.lastPurgeAt = purgeNow
const cutoff = new Date(purgeNow - state.retainRunsMs).toISOString()
storage
.purgeRuns({ olderThan: cutoff, limit: 100 })
.catch((error) => {
eventEmitter.emit({
type: 'worker:error',
error: getErrorMessage(error),
context: 'auto-purge',
})
})
}
return false
}

Expand All @@ -957,14 +935,21 @@ function createDurablyInstance<
const maxRuns = options?.maxRuns ?? Number.POSITIVE_INFINITY
let processed = 0

let reachedIdle = false
while (processed < maxRuns) {
const didProcess = await this.processOne({ workerId })
if (!didProcess) {
reachedIdle = true
break
}
processed++
}

// Run maintenance only when actually idle, not when maxRuns was hit
if (reachedIdle) {
await state.runIdleMaintenance()
}

return processed
},

Expand Down Expand Up @@ -1057,6 +1042,30 @@ export function createDurably<
}) as typeof db.destroy
const eventEmitter = createEventEmitter()
const jobRegistry = createJobRegistry()
let lastPurgeAt = 0

const runIdleMaintenance = async (): Promise<void> => {
try {
const now = new Date().toISOString()
await storage.releaseExpiredLeases(now)

if (config.retainRunsMs !== null) {
const purgeNow = Date.now()
if (purgeNow - lastPurgeAt >= PURGE_INTERVAL_MS) {
lastPurgeAt = purgeNow
const cutoff = new Date(purgeNow - config.retainRunsMs).toISOString()
await storage.purgeRuns({ olderThan: cutoff, limit: 100 })
}
}
} catch (error) {
eventEmitter.emit({
type: 'worker:error',
error: getErrorMessage(error),
context: 'idle-maintenance',
})
}
}

let processOneImpl:
| ((options?: { workerId?: string }) => Promise<boolean>)
| null = null
Expand All @@ -1068,6 +1077,7 @@ export function createDurably<
}
return processOneImpl(runtimeOptions)
},
runIdleMaintenance,
)

const state: DurablyState<TLabels> = {
Expand All @@ -1083,8 +1093,8 @@ export function createDurably<
leaseMs: config.leaseMs,
leaseRenewIntervalMs: config.leaseRenewIntervalMs,
retainRunsMs: config.retainRunsMs,
lastPurgeAt: 0,
releaseBrowserSingleton,
runIdleMaintenance,
}

const instance = createDurablyInstance<Record<string, never>, TLabels>(
Expand Down
16 changes: 14 additions & 2 deletions packages/durably/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export interface Worker {
export function createWorker(
config: WorkerConfig,
processOne: (options?: { workerId?: string }) => Promise<boolean>,
onIdle?: () => Promise<void>,
): Worker {
let running = false
let pollingTimeout: ReturnType<typeof setTimeout> | null = null
Expand All @@ -32,9 +33,20 @@ export function createWorker(
return
}

const cycle = (async () => {
const didProcess = await processOne({ workerId: activeWorkerId })
if (!didProcess && onIdle && running) {
try {
await onIdle()
} catch {
// onIdle errors are non-fatal; allow polling to continue
}
}
})()
inFlight = cycle

try {
inFlight = processOne({ workerId: activeWorkerId }).then(() => undefined)
await inFlight
await cycle
} finally {
inFlight = null
}
Expand Down
54 changes: 47 additions & 7 deletions packages/durably/tests/shared/purge.shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,9 @@ export function createPurgeTests(createDialect: () => Dialect) {

await d.migrate()

// Process the run deterministically without starting the polling loop
// Process the run deterministically without starting the polling loop.
// Use processOne (not processUntilIdle) so idle maintenance doesn't
// run yet — the first idle maintenance cycle will purge immediately.
const run = await d.jobs.testJob.trigger({})
await d.processOne()
expect((await d.getRun(run.id))?.status).toBe('completed')
Expand All @@ -256,18 +258,56 @@ export function createPurgeTests(createDialect: () => Dialect) {
.where('id', '=', run.id)
.execute()

// processOne returns false (no pending runs) and triggers auto-purge
// on the idle path. lastPurgeAt starts at 0 so purge fires immediately.
await d.processOne()

// Auto-purge is fire-and-forget, give it a tick to complete
await new Promise((r) => setTimeout(r, 50))
// processUntilIdle runs idle maintenance (including auto-purge) after
// draining. Since no prior idle maintenance has run, purge fires
// immediately on the first idle cycle.
await d.processUntilIdle()

expect(await d.getRun(run.id)).toBeNull()

await d.db.destroy()
})

it('does NOT run maintenance when maxRuns is hit with backlog remaining', async () => {
const d = createDurably({
dialect: createDialect(),
pollingIntervalMs: 50,
retainRuns: '1m',
}).register({ testJob })

await d.migrate()

// Create 3 runs
const run1 = await d.jobs.testJob.trigger({})
await d.jobs.testJob.trigger({})
await d.jobs.testJob.trigger({})

// Process only 1 run (maxRuns hit, not idle)
const processed = await d.processUntilIdle({ maxRuns: 1 })
expect(processed).toBe(1)
expect((await d.getRun(run1.id))?.status).toBe('completed')

// Backdate the completed run so it would be purged if maintenance ran
const twoMinutesAgo = new Date(Date.now() - 2 * 60 * 1000).toISOString()
await d.db
.updateTable('durably_runs')
.set({ completed_at: twoMinutesAgo })
.where('id', '=', run1.id)
.execute()

// Process 1 more (still not idle — 1 run remains)
await d.processUntilIdle({ maxRuns: 1 })

// Maintenance should NOT have run — the completed run should still exist
expect(await d.getRun(run1.id)).not.toBeNull()

// Now drain fully (reaches idle) — maintenance runs and purges
await d.processUntilIdle()
expect(await d.getRun(run1.id)).toBeNull()

await d.db.destroy()
})

it('throws on invalid retainRuns format', () => {
expect(() =>
createDurably({
Expand Down
31 changes: 31 additions & 0 deletions packages/durably/tests/shared/worker.shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,37 @@ export function createWorkerTests(createDialect: () => Dialect) {

expect(elapsed).toBeLessThan(100)
})

it('stop() awaits in-flight idle maintenance before resolving', async () => {
let maintenanceCompleted = false

// Use retainRuns to ensure runIdleMaintenance does real work
const d = createDurably({
dialect: createDialect(),
pollingIntervalMs: 50,
retainRuns: '30d',
})
await d.migrate()

// Listen for the idle-maintenance cycle completing via worker:error
// or simply track that stop() doesn't resolve before maintenance
d.on('run:leased', () => {
// noop — just need the worker to process something
})

d.start()

// Let the worker go through at least one idle cycle
// (processOne returns false → onIdle runs releaseExpiredLeases)
await new Promise((r) => setTimeout(r, 150))

// stop() should await any in-flight maintenance
await d.stop()
maintenanceCompleted = true

expect(maintenanceCompleted).toBe(true)
await d.db.destroy()
})
})

describe('Run state transitions', () => {
Expand Down