Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 32 additions & 27 deletions packages/durably/src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1045,33 +1045,38 @@ export function createKyselyStore(
},
}

// Wrap all mutating methods with write lock to prevent SQLITE_BUSY.
// libsql opens separate connections for transactions, so concurrent
// writes from the same Kysely instance can conflict. The mutex
// serializes writes within a single process. Reads are not locked.
const mutatingKeys = [
'enqueue',
'enqueueMany',
'updateRun',
'deleteRun',
'purgeRuns',
'claimNext',
'renewLease',
'releaseExpiredLeases',
'completeRun',
'failRun',
'cancelRun',
'persistStep',
'deleteSteps',
'updateProgress',
'createLog',
] as const

for (const key of mutatingKeys) {
const original = store[key] as (...args: unknown[]) => Promise<unknown>
;(store as unknown as Record<string, unknown>)[key] = (
...args: unknown[]
): Promise<unknown> => withWriteLock(() => original.apply(store, args))
// SQLite/libsql: wrap mutating methods with write lock to prevent SQLITE_BUSY.
// libsql opens separate connections for transactions, so concurrent writes
// from the same Kysely instance can conflict. The mutex serializes writes
// within a single process. Reads are not locked.
//
// PostgreSQL: skip the mutex entirely. PostgreSQL handles concurrent writes
// natively via MVCC, advisory locks, and FOR UPDATE SKIP LOCKED.
if (backend !== 'postgres') {
const mutatingKeys = [
'enqueue',
'enqueueMany',
'updateRun',
'deleteRun',
'purgeRuns',
'claimNext',
'renewLease',
'releaseExpiredLeases',
'completeRun',
'failRun',
'cancelRun',
'persistStep',
'deleteSteps',
'updateProgress',
'createLog',
] as const

for (const key of mutatingKeys) {
const original = store[key] as (...args: unknown[]) => Promise<unknown>
;(store as unknown as Record<string, unknown>)[key] = (
...args: unknown[]
): Promise<unknown> => withWriteLock(() => original.apply(store, args))
}
}

return store
Expand Down