Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions packages/durably/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,10 @@ export class ConflictError extends DurablyError {
export function getErrorMessage(error: unknown): string {
return error instanceof Error ? error.message : String(error)
}

/**
* Coerce unknown value to Error
*/
export function toError(error: unknown): Error {
return error instanceof Error ? error : new Error(String(error))
}
21 changes: 14 additions & 7 deletions packages/durably/src/events.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { toError } from './errors'

/**
* Base event interface
*/
Expand Down Expand Up @@ -318,16 +320,21 @@ export function createEventEmitter(): EventEmitter {
return
}

const reportError = (error: unknown) =>
errorHandler?.(toError(error), fullEvent)

for (const listener of typeListeners) {
try {
listener(fullEvent)
} catch (error) {
if (errorHandler) {
errorHandler(
error instanceof Error ? error : new Error(String(error)),
fullEvent,
)
const result: unknown = listener(fullEvent)
// Catch rejected promises from async listeners (not awaited)
if (
result != null &&
typeof (result as Promise<unknown>).then === 'function'
) {
;(result as Promise<unknown>).catch(reportError)
}
} catch (error) {
reportError(error)
// Continue to next listener regardless of error
}
}
Expand Down
4 changes: 2 additions & 2 deletions packages/durably/src/job.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { type z, prettifyError } from 'zod'
import type { JobDefinition } from './define-job'
import { ValidationError } from './errors'
import { toError, ValidationError } from './errors'
import type { EventEmitter, LogData, ProgressData } from './events'
import type { Run, RunFilter, Store } from './storage'

Expand Down Expand Up @@ -388,7 +388,7 @@ export function createJobHandle<
.catch((error) => {
if (resolved) return
cleanup()
reject(error instanceof Error ? error : new Error(String(error)))
reject(toError(error))
})

// Set timeout if specified
Expand Down
7 changes: 4 additions & 3 deletions packages/durably/src/plugins/log-persistence.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import type { DurablyPlugin } from '../durably'

/**
* Plugin that persists log events to the database
* Plugin that persists log events to the database.
* Uses fire-and-forget writes — log persistence is best-effort.
*/
export function withLogPersistence(): DurablyPlugin {
return {
name: 'log-persistence',
install(durably) {
durably.on('log:write', async (event) => {
await durably.storage.createLog({
durably.on('log:write', (event) => {
void durably.storage.createLog({
runId: event.runId,
stepName: event.stepName,
level: event.level,
Expand Down
56 changes: 56 additions & 0 deletions packages/durably/tests/shared/events.shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,62 @@ export function createEventsTests(createDialect: () => Dialect) {
)
})

it('forwards rejected promises from async listeners to onError', async () => {
const errorHandler = vi.fn()
const asyncListener = vi.fn(async () => {
throw new Error('Async listener error')
})

durably.onError(errorHandler)
durably.on('run:leased', asyncListener)

durably.emit({
type: 'run:leased',
runId: 'run_1',
jobName: 'test-job',
input: {},
leaseOwner: 'worker-1',
leaseExpiresAt: '2024-01-01T00:00:30.000Z',
labels: {},
})

// emit is sync — wait for the microtask to process the rejection
await vi.waitFor(() => {
expect(errorHandler).toHaveBeenCalledTimes(1)
})

expect(errorHandler).toHaveBeenCalledWith(
expect.objectContaining({ message: 'Async listener error' }),
expect.objectContaining({
type: 'run:leased',
runId: 'run_1',
}),
)
})

it('does not await async listeners (emit stays synchronous)', () => {
let resolved = false
const asyncListener = vi.fn(async () => {
await new Promise((r) => setTimeout(r, 100))
resolved = true
})

durably.on('run:leased', asyncListener)

durably.emit({
type: 'run:leased',
runId: 'run_1',
jobName: 'test-job',
input: {},
leaseOwner: 'worker-1',
leaseExpiresAt: '2024-01-01T00:00:30.000Z',
labels: {},
})

// emit returns immediately — async work hasn't completed
expect(resolved).toBe(false)
})

it('calls onError handler when listener throws', () => {
const errorHandler = vi.fn()
const failingListener = vi.fn(() => {
Expand Down