From f1995268fdf1b8ec511d146c839f084c6d12ac8d Mon Sep 17 00:00:00 2001 From: coji Date: Mon, 16 Mar 2026 22:36:46 +0900 Subject: [PATCH 1/2] fix: catch rejected promises from async event listeners (#136) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Detect thenable returns from listeners and attach .catch() to forward rejections to onError (without awaiting — emit stays synchronous) - Fix withLogPersistence() to use explicit fire-and-forget instead of async listener whose Promise was silently dropped Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/durably/src/events.ts | 16 +++++- .../durably/src/plugins/log-persistence.ts | 7 ++- .../durably/tests/shared/events.shared.ts | 56 +++++++++++++++++++ 3 files changed, 75 insertions(+), 4 deletions(-) diff --git a/packages/durably/src/events.ts b/packages/durably/src/events.ts index 9250fca..6125fb0 100644 --- a/packages/durably/src/events.ts +++ b/packages/durably/src/events.ts @@ -320,7 +320,21 @@ export function createEventEmitter(): EventEmitter { for (const listener of typeListeners) { try { - listener(fullEvent) + const result: unknown = listener(fullEvent) + // Catch rejected promises from async listeners (not awaited) + if ( + result != null && + typeof (result as Promise).then === 'function' + ) { + ;(result as Promise).catch((error: unknown) => { + if (errorHandler) { + errorHandler( + error instanceof Error ? error : new Error(String(error)), + fullEvent, + ) + } + }) + } } catch (error) { if (errorHandler) { errorHandler( diff --git a/packages/durably/src/plugins/log-persistence.ts b/packages/durably/src/plugins/log-persistence.ts index 2360743..7c9244e 100644 --- a/packages/durably/src/plugins/log-persistence.ts +++ b/packages/durably/src/plugins/log-persistence.ts @@ -1,14 +1,15 @@ import type { DurablyPlugin } from '../durably' /** - * Plugin that persists log events to the database + * Plugin that persists log events to the database. + * Uses fire-and-forget writes — log persistence is best-effort. */ export function withLogPersistence(): DurablyPlugin { return { name: 'log-persistence', install(durably) { - durably.on('log:write', async (event) => { - await durably.storage.createLog({ + durably.on('log:write', (event) => { + void durably.storage.createLog({ runId: event.runId, stepName: event.stepName, level: event.level, diff --git a/packages/durably/tests/shared/events.shared.ts b/packages/durably/tests/shared/events.shared.ts index f978567..e8f99de 100644 --- a/packages/durably/tests/shared/events.shared.ts +++ b/packages/durably/tests/shared/events.shared.ts @@ -211,6 +211,62 @@ export function createEventsTests(createDialect: () => Dialect) { ) }) + it('forwards rejected promises from async listeners to onError', async () => { + const errorHandler = vi.fn() + const asyncListener = vi.fn(async () => { + throw new Error('Async listener error') + }) + + durably.onError(errorHandler) + durably.on('run:leased', asyncListener) + + durably.emit({ + type: 'run:leased', + runId: 'run_1', + jobName: 'test-job', + input: {}, + leaseOwner: 'worker-1', + leaseExpiresAt: '2024-01-01T00:00:30.000Z', + labels: {}, + }) + + // emit is sync — wait for the microtask to process the rejection + await vi.waitFor(() => { + expect(errorHandler).toHaveBeenCalledTimes(1) + }) + + expect(errorHandler).toHaveBeenCalledWith( + expect.objectContaining({ message: 'Async listener error' }), + expect.objectContaining({ + type: 'run:leased', + runId: 'run_1', + }), + ) + }) + + it('does not await async listeners (emit stays synchronous)', () => { + let resolved = false + const asyncListener = vi.fn(async () => { + await new Promise((r) => setTimeout(r, 100)) + resolved = true + }) + + durably.on('run:leased', asyncListener) + + durably.emit({ + type: 'run:leased', + runId: 'run_1', + jobName: 'test-job', + input: {}, + leaseOwner: 'worker-1', + leaseExpiresAt: '2024-01-01T00:00:30.000Z', + labels: {}, + }) + + // emit returns immediately — async work hasn't completed + expect(resolved).toBe(false) + }) + it('calls onError handler when listener throws', () => { const errorHandler = vi.fn() const failingListener = vi.fn(() => { From 9506c9991e404d2229416a79a706f2a8d2569ff9 Mon Sep 17 00:00:00 2001 From: coji Date: Mon, 16 Mar 2026 22:40:35 +0900 Subject: [PATCH 2/2] refactor: extract toError helper and deduplicate error handling - Add toError(error: unknown): Error to errors.ts - Use reportError local in emit() to eliminate duplicated guard block - Replace inline ternary in job.ts with toError() Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/durably/src/errors.ts | 7 +++++++ packages/durably/src/events.ts | 21 +++++++-------------- packages/durably/src/job.ts | 4 ++-- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/packages/durably/src/errors.ts b/packages/durably/src/errors.ts index 9a712be..45edb81 100644 --- a/packages/durably/src/errors.ts +++ b/packages/durably/src/errors.ts @@ -63,3 +63,10 @@ export class ConflictError extends DurablyError { export function getErrorMessage(error: unknown): string { return error instanceof Error ? error.message : String(error) } + +/** + * Coerce unknown value to Error + */ +export function toError(error: unknown): Error { + return error instanceof Error ? error : new Error(String(error)) +} diff --git a/packages/durably/src/events.ts b/packages/durably/src/events.ts index 6125fb0..5ae3554 100644 --- a/packages/durably/src/events.ts +++ b/packages/durably/src/events.ts @@ -1,3 +1,5 @@ +import { toError } from './errors' + /** * Base event interface */ @@ -318,6 +320,9 @@ export function createEventEmitter(): EventEmitter { return } + const reportError = (error: unknown) => + errorHandler?.(toError(error), fullEvent) + for (const listener of typeListeners) { try { const result: unknown = listener(fullEvent) @@ -326,22 +331,10 @@ export function createEventEmitter(): EventEmitter { result != null && typeof (result as Promise).then === 'function' ) { - ;(result as Promise).catch((error: unknown) => { - if (errorHandler) { - errorHandler( - error instanceof Error ? error : new Error(String(error)), - fullEvent, - ) - } - }) + ;(result as Promise).catch(reportError) } } catch (error) { - if (errorHandler) { - errorHandler( - error instanceof Error ? error : new Error(String(error)), - fullEvent, - ) - } + reportError(error) // Continue to next listener regardless of error } } diff --git a/packages/durably/src/job.ts b/packages/durably/src/job.ts index 437698e..39598e8 100644 --- a/packages/durably/src/job.ts +++ b/packages/durably/src/job.ts @@ -1,6 +1,6 @@ import { type z, prettifyError } from 'zod' import type { JobDefinition } from './define-job' -import { ValidationError } from './errors' +import { toError, ValidationError } from './errors' import type { EventEmitter, LogData, ProgressData } from './events' import type { Run, RunFilter, Store } from './storage' @@ -388,7 +388,7 @@ export function createJobHandle< .catch((error) => { if (resolved) return cleanup() - reject(error instanceof Error ? error : new Error(String(error))) + reject(toError(error)) }) // Set timeout if specified