diff --git a/packages/durably/src/errors.ts b/packages/durably/src/errors.ts index 9a712be..45edb81 100644 --- a/packages/durably/src/errors.ts +++ b/packages/durably/src/errors.ts @@ -63,3 +63,10 @@ export class ConflictError extends DurablyError { export function getErrorMessage(error: unknown): string { return error instanceof Error ? error.message : String(error) } + +/** + * Coerce unknown value to Error + */ +export function toError(error: unknown): Error { + return error instanceof Error ? error : new Error(String(error)) +} diff --git a/packages/durably/src/events.ts b/packages/durably/src/events.ts index 9250fca..5ae3554 100644 --- a/packages/durably/src/events.ts +++ b/packages/durably/src/events.ts @@ -1,3 +1,5 @@ +import { toError } from './errors' + /** * Base event interface */ @@ -318,16 +320,21 @@ export function createEventEmitter(): EventEmitter { return } + const reportError = (error: unknown) => + errorHandler?.(toError(error), fullEvent) + for (const listener of typeListeners) { try { - listener(fullEvent) - } catch (error) { - if (errorHandler) { - errorHandler( - error instanceof Error ? error : new Error(String(error)), - fullEvent, - ) + const result: unknown = listener(fullEvent) + // Catch rejected promises from async listeners (not awaited) + if ( + result != null && + typeof (result as Promise).then === 'function' + ) { + ;(result as Promise).catch(reportError) } + } catch (error) { + reportError(error) // Continue to next listener regardless of error } } diff --git a/packages/durably/src/job.ts b/packages/durably/src/job.ts index 437698e..39598e8 100644 --- a/packages/durably/src/job.ts +++ b/packages/durably/src/job.ts @@ -1,6 +1,6 @@ import { type z, prettifyError } from 'zod' import type { JobDefinition } from './define-job' -import { ValidationError } from './errors' +import { toError, ValidationError } from './errors' import type { EventEmitter, LogData, ProgressData } from './events' import type { Run, RunFilter, Store } from './storage' @@ -388,7 +388,7 @@ export function createJobHandle< .catch((error) => { if (resolved) return cleanup() - reject(error instanceof Error ? error : new Error(String(error))) + reject(toError(error)) }) // Set timeout if specified diff --git a/packages/durably/src/plugins/log-persistence.ts b/packages/durably/src/plugins/log-persistence.ts index 2360743..7c9244e 100644 --- a/packages/durably/src/plugins/log-persistence.ts +++ b/packages/durably/src/plugins/log-persistence.ts @@ -1,14 +1,15 @@ import type { DurablyPlugin } from '../durably' /** - * Plugin that persists log events to the database + * Plugin that persists log events to the database. + * Uses fire-and-forget writes — log persistence is best-effort. */ export function withLogPersistence(): DurablyPlugin { return { name: 'log-persistence', install(durably) { - durably.on('log:write', async (event) => { - await durably.storage.createLog({ + durably.on('log:write', (event) => { + void durably.storage.createLog({ runId: event.runId, stepName: event.stepName, level: event.level, diff --git a/packages/durably/tests/shared/events.shared.ts b/packages/durably/tests/shared/events.shared.ts index f978567..e8f99de 100644 --- a/packages/durably/tests/shared/events.shared.ts +++ b/packages/durably/tests/shared/events.shared.ts @@ -211,6 +211,62 @@ export function createEventsTests(createDialect: () => Dialect) { ) }) + it('forwards rejected promises from async listeners to onError', async () => { + const errorHandler = vi.fn() + const asyncListener = vi.fn(async () => { + throw new Error('Async listener error') + }) + + durably.onError(errorHandler) + durably.on('run:leased', asyncListener) + + durably.emit({ + type: 'run:leased', + runId: 'run_1', + jobName: 'test-job', + input: {}, + leaseOwner: 'worker-1', + leaseExpiresAt: '2024-01-01T00:00:30.000Z', + labels: {}, + }) + + // emit is sync — wait for the microtask to process the rejection + await vi.waitFor(() => { + expect(errorHandler).toHaveBeenCalledTimes(1) + }) + + expect(errorHandler).toHaveBeenCalledWith( + expect.objectContaining({ message: 'Async listener error' }), + expect.objectContaining({ + type: 'run:leased', + runId: 'run_1', + }), + ) + }) + + it('does not await async listeners (emit stays synchronous)', () => { + let resolved = false + const asyncListener = vi.fn(async () => { + await new Promise((r) => setTimeout(r, 100)) + resolved = true + }) + + durably.on('run:leased', asyncListener) + + durably.emit({ + type: 'run:leased', + runId: 'run_1', + jobName: 'test-job', + input: {}, + leaseOwner: 'worker-1', + leaseExpiresAt: '2024-01-01T00:00:30.000Z', + labels: {}, + }) + + // emit returns immediately — async work hasn't completed + expect(resolved).toBe(false) + }) + it('calls onError handler when listener throws', () => { const errorHandler = vi.fn() const failingListener = vi.fn(() => {