diff --git a/packages/durably/src/claim-postgres.ts b/packages/durably/src/claim-postgres.ts new file mode 100644 index 0000000..8466277 --- /dev/null +++ b/packages/durably/src/claim-postgres.ts @@ -0,0 +1,95 @@ +import { type Kysely, sql } from 'kysely' +import type { Database } from './schema' +import type { Run } from './storage' +import { rowToRun } from './transformers' + +/** + * PostgreSQL claim implementation using FOR UPDATE SKIP LOCKED + advisory locks. + * + * This provides strong concurrency guarantees: + * - SKIP LOCKED avoids blocking on rows being claimed by other workers + * - Advisory locks serialize per-concurrency-key to prevent double-leasing + * - READ COMMITTED gives each statement a fresh snapshot after the advisory lock + */ +export async function claimNextPostgres( + db: Kysely, + workerId: string, + now: string, + leaseExpiresAt: string, + activeLeaseGuard: ReturnType>, +): Promise { + return await db.transaction().execute(async (trx) => { + const skipKeys: string[] = [] + + for (;;) { + const concurrencyCondition = + skipKeys.length > 0 + ? sql` + AND ( + concurrency_key IS NULL + OR concurrency_key NOT IN (${sql.join(skipKeys)}) + ) + ` + : sql`` + + const candidateResult = await sql<{ + id: string + concurrency_key: string | null + }>` + SELECT id, concurrency_key + FROM durably_runs + WHERE + ( + status = 'pending' + OR (status = 'leased' AND lease_expires_at IS NOT NULL AND lease_expires_at <= ${now}) + ) + AND ${activeLeaseGuard} + ${concurrencyCondition} + ORDER BY created_at ASC, id ASC + FOR UPDATE SKIP LOCKED + LIMIT 1 + `.execute(trx) + + const candidate = candidateResult.rows[0] + if (!candidate) return null + + if (candidate.concurrency_key) { + await sql`SELECT pg_advisory_xact_lock(hashtext(${candidate.concurrency_key}))`.execute( + trx, + ) + + const conflict = await sql` + SELECT 1 FROM durably_runs + WHERE concurrency_key = ${candidate.concurrency_key} + AND id <> ${candidate.id} + AND status = 'leased' + AND lease_expires_at IS NOT NULL + AND lease_expires_at > ${now} + LIMIT 1 + `.execute(trx) + + if (conflict.rows.length > 0) { + skipKeys.push(candidate.concurrency_key) + continue + } + } + + const result = await sql` + UPDATE durably_runs + SET + status = 'leased', + lease_owner = ${workerId}, + lease_expires_at = ${leaseExpiresAt}, + lease_generation = lease_generation + 1, + started_at = COALESCE(started_at, ${now}), + updated_at = ${now} + WHERE id = ${candidate.id} + RETURNING * + `.execute(trx) + + const row = result.rows[0] + if (!row) return null + return rowToRun(row) + } + }) +} diff --git a/packages/durably/src/claim-sqlite.ts b/packages/durably/src/claim-sqlite.ts new file mode 100644 index 0000000..914616a --- /dev/null +++ b/packages/durably/src/claim-sqlite.ts @@ -0,0 +1,54 @@ +import { type Kysely, sql } from 'kysely' +import type { Database } from './schema' +import type { Run } from './storage' +import { rowToRun } from './transformers' + +/** + * SQLite claim implementation using atomic UPDATE with subquery. + * + * Single-writer — relies on the process-level write mutex for safety. + * The subquery finds the next eligible candidate and the UPDATE claims + * it in a single atomic statement (no TOCTOU). + */ +export async function claimNextSqlite( + db: Kysely, + workerId: string, + now: string, + leaseExpiresAt: string, + activeLeaseGuard: ReturnType>, +): Promise { + const subquery = db + .selectFrom('durably_runs') + .select('durably_runs.id') + .where((eb) => + eb.or([ + eb('status', '=', 'pending'), + eb.and([ + eb('status', '=', 'leased'), + eb('lease_expires_at', 'is not', null), + eb('lease_expires_at', '<=', now), + ]), + ]), + ) + .where(activeLeaseGuard) + .orderBy('created_at', 'asc') + .orderBy('id', 'asc') + .limit(1) + + const row = await db + .updateTable('durably_runs') + .set({ + status: 'leased', + lease_owner: workerId, + lease_expires_at: leaseExpiresAt, + lease_generation: sql`lease_generation + 1`, + started_at: sql`COALESCE(started_at, ${now})`, + updated_at: now, + }) + .where('id', '=', (eb) => eb.selectFrom(subquery.as('sub')).select('id')) + .returningAll() + .executeTakeFirst() + + if (!row) return null + return rowToRun(row) +} diff --git a/packages/durably/src/storage.ts b/packages/durably/src/storage.ts index eb24b40..3e11542 100644 --- a/packages/durably/src/storage.ts +++ b/packages/durably/src/storage.ts @@ -1,6 +1,9 @@ import { type Kysely, sql } from 'kysely' import { monotonicFactory } from 'ulidx' +import { claimNextPostgres } from './claim-postgres' +import { claimNextSqlite } from './claim-sqlite' import type { Database } from './schema' +import { rowToLog, rowToRun, rowToStep, validateLabels } from './transformers' const ulid = monotonicFactory() @@ -255,78 +258,6 @@ export function toClientRun< return clientRun } -/** - * Validate label keys: alphanumeric, dash, underscore, dot, slash only - */ -const LABEL_KEY_PATTERN = /^[a-zA-Z0-9\-_./]+$/ - -function validateLabels(labels: Record | undefined): void { - if (!labels) return - for (const key of Object.keys(labels)) { - if (!LABEL_KEY_PATTERN.test(key)) { - throw new Error( - `Invalid label key "${key}": must contain only alphanumeric characters, dashes, underscores, dots, and slashes`, - ) - } - } -} - -function rowToRun(row: Database['durably_runs']): Run { - return { - id: row.id, - jobName: row.job_name, - input: JSON.parse(row.input), - status: row.status, - idempotencyKey: row.idempotency_key, - concurrencyKey: row.concurrency_key, - currentStepIndex: row.current_step_index, - completedStepCount: row.completed_step_count, - progress: row.progress ? JSON.parse(row.progress) : null, - output: row.output ? JSON.parse(row.output) : null, - error: row.error, - labels: JSON.parse(row.labels), - leaseOwner: row.lease_owner, - leaseExpiresAt: row.lease_expires_at, - leaseGeneration: row.lease_generation, - startedAt: row.started_at, - completedAt: row.completed_at, - createdAt: row.created_at, - updatedAt: row.updated_at, - } -} - -/** - * Convert database row to Step object - */ -function rowToStep(row: Database['durably_steps']): Step { - return { - id: row.id, - runId: row.run_id, - name: row.name, - index: row.index, - status: row.status, - output: row.output ? JSON.parse(row.output) : null, - error: row.error, - startedAt: row.started_at, - completedAt: row.completed_at, - } -} - -/** - * Convert database row to Log object - */ -function rowToLog(row: Database['durably_logs']): Log { - return { - id: row.id, - runId: row.run_id, - stepName: row.step_name, - level: row.level, - message: row.message, - data: row.data ? JSON.parse(row.data) : null, - createdAt: row.created_at, - } -} - /** * Simple async mutex for serializing write operations. * Prevents SQLITE_BUSY errors with libsql, which opens separate @@ -692,127 +623,9 @@ export function createKyselyStore( ) ` - if (backend === 'postgres') { - return await db.transaction().execute(async (trx) => { - const skipKeys: string[] = [] - - // Loop: on concurrency-key conflict, exclude that key and retry - // to find the next eligible candidate in the same transaction. - for (;;) { - const concurrencyCondition = - skipKeys.length > 0 - ? sql` - AND ( - concurrency_key IS NULL - OR concurrency_key NOT IN (${sql.join(skipKeys)}) - ) - ` - : sql`` - - // Step 1: Find and lock a candidate row - const candidateResult = await sql<{ - id: string - concurrency_key: string | null - }>` - SELECT id, concurrency_key - FROM durably_runs - WHERE - ( - status = 'pending' - OR (status = 'leased' AND lease_expires_at IS NOT NULL AND lease_expires_at <= ${now}) - ) - AND ${activeLeaseGuard} - ${concurrencyCondition} - ORDER BY created_at ASC, id ASC - FOR UPDATE SKIP LOCKED - LIMIT 1 - `.execute(trx) - - const candidate = candidateResult.rows[0] - if (!candidate) return null - - // Step 2: If the candidate has a concurrency key, serialize via - // advisory lock and re-verify with a fresh snapshot (READ COMMITTED - // gives each statement its own snapshot). - if (candidate.concurrency_key) { - await sql`SELECT pg_advisory_xact_lock(hashtext(${candidate.concurrency_key}))`.execute( - trx, - ) - - const conflict = await sql` - SELECT 1 FROM durably_runs - WHERE concurrency_key = ${candidate.concurrency_key} - AND id <> ${candidate.id} - AND status = 'leased' - AND lease_expires_at IS NOT NULL - AND lease_expires_at > ${now} - LIMIT 1 - `.execute(trx) - - if (conflict.rows.length > 0) { - // Key is occupied — exclude it and try the next candidate - skipKeys.push(candidate.concurrency_key) - continue - } - } - - // Step 3: Claim the candidate (increment lease_generation) - const result = await sql` - UPDATE durably_runs - SET - status = 'leased', - lease_owner = ${workerId}, - lease_expires_at = ${leaseExpiresAt}, - lease_generation = lease_generation + 1, - started_at = COALESCE(started_at, ${now}), - updated_at = ${now} - WHERE id = ${candidate.id} - RETURNING * - `.execute(trx) - - const row = result.rows[0] - if (!row) return null - return rowToRun(row) - } - }) - } - - let subquery = db - .selectFrom('durably_runs') - .select('durably_runs.id') - .where((eb) => - eb.or([ - eb('status', '=', 'pending'), - eb.and([ - eb('status', '=', 'leased'), - eb('lease_expires_at', 'is not', null), - eb('lease_expires_at', '<=', now), - ]), - ]), - ) - .where(activeLeaseGuard) - .orderBy('created_at', 'asc') - .orderBy('id', 'asc') - .limit(1) - - const row = await db - .updateTable('durably_runs') - .set({ - status: 'leased', - lease_owner: workerId, - lease_expires_at: leaseExpiresAt, - lease_generation: sql`lease_generation + 1`, - started_at: sql`COALESCE(started_at, ${now})`, - updated_at: now, - }) - .where('id', '=', (eb) => - eb.selectFrom(subquery.as('sub')).select('id'), - ) - .returningAll() - .executeTakeFirst() - - if (!row) return null - return rowToRun(row) + return backend === 'postgres' + ? claimNextPostgres(db, workerId, now, leaseExpiresAt, activeLeaseGuard) + : claimNextSqlite(db, workerId, now, leaseExpiresAt, activeLeaseGuard) }, async renewLease( diff --git a/packages/durably/src/transformers.ts b/packages/durably/src/transformers.ts new file mode 100644 index 0000000..0fb69a3 --- /dev/null +++ b/packages/durably/src/transformers.ts @@ -0,0 +1,71 @@ +import type { Database } from './schema' +import type { Log, Run, Step } from './storage' + +/** Convert database row to Run object */ +export function rowToRun(row: Database['durably_runs']): Run { + return { + id: row.id, + jobName: row.job_name, + input: JSON.parse(row.input), + status: row.status, + idempotencyKey: row.idempotency_key, + concurrencyKey: row.concurrency_key, + currentStepIndex: row.current_step_index, + completedStepCount: row.completed_step_count, + progress: row.progress ? JSON.parse(row.progress) : null, + output: row.output ? JSON.parse(row.output) : null, + error: row.error, + labels: JSON.parse(row.labels), + leaseOwner: row.lease_owner, + leaseExpiresAt: row.lease_expires_at, + leaseGeneration: row.lease_generation, + startedAt: row.started_at, + completedAt: row.completed_at, + createdAt: row.created_at, + updatedAt: row.updated_at, + } +} + +/** Convert database row to Step object */ +export function rowToStep(row: Database['durably_steps']): Step { + return { + id: row.id, + runId: row.run_id, + name: row.name, + index: row.index, + status: row.status, + output: row.output ? JSON.parse(row.output) : null, + error: row.error, + startedAt: row.started_at, + completedAt: row.completed_at, + } +} + +/** Convert database row to Log object */ +export function rowToLog(row: Database['durably_logs']): Log { + return { + id: row.id, + runId: row.run_id, + stepName: row.step_name, + level: row.level, + message: row.message, + data: row.data ? JSON.parse(row.data) : null, + createdAt: row.created_at, + } +} + +/** Validate label keys: alphanumeric, dash, underscore, dot, slash only */ +const LABEL_KEY_PATTERN = /^[a-zA-Z0-9\-_./]+$/ + +export function validateLabels( + labels: Record | undefined, +): void { + if (!labels) return + for (const key of Object.keys(labels)) { + if (!LABEL_KEY_PATTERN.test(key)) { + throw new Error( + `Invalid label key "${key}": must contain only alphanumeric characters, dashes, underscores, dots, and slashes`, + ) + } + } +}