Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions packages/durably/src/claim-postgres.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import { type Kysely, sql } from 'kysely'
import type { Database } from './schema'
import type { Run } from './storage'
import { rowToRun } from './transformers'

/**
* PostgreSQL claim implementation using FOR UPDATE SKIP LOCKED + advisory locks.
*
* This provides strong concurrency guarantees:
* - SKIP LOCKED avoids blocking on rows being claimed by other workers
* - Advisory locks serialize per-concurrency-key to prevent double-leasing
* - READ COMMITTED gives each statement a fresh snapshot after the advisory lock
*/
export async function claimNextPostgres(
db: Kysely<Database>,
workerId: string,
now: string,
leaseExpiresAt: string,
activeLeaseGuard: ReturnType<typeof sql<boolean>>,
): Promise<Run | null> {
return await db.transaction().execute(async (trx) => {
const skipKeys: string[] = []

for (;;) {
const concurrencyCondition =
skipKeys.length > 0
? sql`
AND (
concurrency_key IS NULL
OR concurrency_key NOT IN (${sql.join(skipKeys)})
)
`
: sql``

const candidateResult = await sql<{
id: string
concurrency_key: string | null
}>`
SELECT id, concurrency_key
FROM durably_runs
WHERE
(
status = 'pending'
OR (status = 'leased' AND lease_expires_at IS NOT NULL AND lease_expires_at <= ${now})
)
AND ${activeLeaseGuard}
${concurrencyCondition}
ORDER BY created_at ASC, id ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
`.execute(trx)

const candidate = candidateResult.rows[0]
if (!candidate) return null

if (candidate.concurrency_key) {
await sql`SELECT pg_advisory_xact_lock(hashtext(${candidate.concurrency_key}))`.execute(
trx,
)

const conflict = await sql`
SELECT 1 FROM durably_runs
WHERE concurrency_key = ${candidate.concurrency_key}
AND id <> ${candidate.id}
AND status = 'leased'
AND lease_expires_at IS NOT NULL
AND lease_expires_at > ${now}
LIMIT 1
`.execute(trx)

if (conflict.rows.length > 0) {
skipKeys.push(candidate.concurrency_key)
continue
}
}

const result = await sql<Database['durably_runs']>`
UPDATE durably_runs
SET
status = 'leased',
lease_owner = ${workerId},
lease_expires_at = ${leaseExpiresAt},
lease_generation = lease_generation + 1,
started_at = COALESCE(started_at, ${now}),
updated_at = ${now}
WHERE id = ${candidate.id}
RETURNING *
`.execute(trx)

const row = result.rows[0]
if (!row) return null
return rowToRun(row)
}
})
}
54 changes: 54 additions & 0 deletions packages/durably/src/claim-sqlite.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { type Kysely, sql } from 'kysely'
import type { Database } from './schema'
import type { Run } from './storage'
import { rowToRun } from './transformers'

/**
* SQLite claim implementation using atomic UPDATE with subquery.
*
* Single-writer — relies on the process-level write mutex for safety.
* The subquery finds the next eligible candidate and the UPDATE claims
* it in a single atomic statement (no TOCTOU).
*/
export async function claimNextSqlite(
db: Kysely<Database>,
workerId: string,
now: string,
leaseExpiresAt: string,
activeLeaseGuard: ReturnType<typeof sql<boolean>>,
): Promise<Run | null> {
const subquery = db
.selectFrom('durably_runs')
.select('durably_runs.id')
.where((eb) =>
eb.or([
eb('status', '=', 'pending'),
eb.and([
eb('status', '=', 'leased'),
eb('lease_expires_at', 'is not', null),
eb('lease_expires_at', '<=', now),
]),
]),
)
.where(activeLeaseGuard)
.orderBy('created_at', 'asc')
.orderBy('id', 'asc')
.limit(1)

const row = await db
.updateTable('durably_runs')
.set({
status: 'leased',
lease_owner: workerId,
lease_expires_at: leaseExpiresAt,
lease_generation: sql`lease_generation + 1`,
started_at: sql`COALESCE(started_at, ${now})`,
updated_at: now,
})
.where('id', '=', (eb) => eb.selectFrom(subquery.as('sub')).select('id'))
.returningAll()
.executeTakeFirst()

if (!row) return null
return rowToRun(row)
}
199 changes: 6 additions & 193 deletions packages/durably/src/storage.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { type Kysely, sql } from 'kysely'
import { monotonicFactory } from 'ulidx'
import { claimNextPostgres } from './claim-postgres'
import { claimNextSqlite } from './claim-sqlite'
import type { Database } from './schema'
import { rowToLog, rowToRun, rowToStep, validateLabels } from './transformers'

const ulid = monotonicFactory()

Expand Down Expand Up @@ -255,78 +258,6 @@ export function toClientRun<
return clientRun
}

/**
* Validate label keys: alphanumeric, dash, underscore, dot, slash only
*/
const LABEL_KEY_PATTERN = /^[a-zA-Z0-9\-_./]+$/

function validateLabels(labels: Record<string, string> | undefined): void {
if (!labels) return
for (const key of Object.keys(labels)) {
if (!LABEL_KEY_PATTERN.test(key)) {
throw new Error(
`Invalid label key "${key}": must contain only alphanumeric characters, dashes, underscores, dots, and slashes`,
)
}
}
}

function rowToRun(row: Database['durably_runs']): Run {
return {
id: row.id,
jobName: row.job_name,
input: JSON.parse(row.input),
status: row.status,
idempotencyKey: row.idempotency_key,
concurrencyKey: row.concurrency_key,
currentStepIndex: row.current_step_index,
completedStepCount: row.completed_step_count,
progress: row.progress ? JSON.parse(row.progress) : null,
output: row.output ? JSON.parse(row.output) : null,
error: row.error,
labels: JSON.parse(row.labels),
leaseOwner: row.lease_owner,
leaseExpiresAt: row.lease_expires_at,
leaseGeneration: row.lease_generation,
startedAt: row.started_at,
completedAt: row.completed_at,
createdAt: row.created_at,
updatedAt: row.updated_at,
}
}

/**
* Convert database row to Step object
*/
function rowToStep(row: Database['durably_steps']): Step {
return {
id: row.id,
runId: row.run_id,
name: row.name,
index: row.index,
status: row.status,
output: row.output ? JSON.parse(row.output) : null,
error: row.error,
startedAt: row.started_at,
completedAt: row.completed_at,
}
}

/**
* Convert database row to Log object
*/
function rowToLog(row: Database['durably_logs']): Log {
return {
id: row.id,
runId: row.run_id,
stepName: row.step_name,
level: row.level,
message: row.message,
data: row.data ? JSON.parse(row.data) : null,
createdAt: row.created_at,
}
}

/**
* Simple async mutex for serializing write operations.
* Prevents SQLITE_BUSY errors with libsql, which opens separate
Expand Down Expand Up @@ -692,127 +623,9 @@ export function createKyselyStore(
)
`

if (backend === 'postgres') {
return await db.transaction().execute(async (trx) => {
const skipKeys: string[] = []

// Loop: on concurrency-key conflict, exclude that key and retry
// to find the next eligible candidate in the same transaction.
for (;;) {
const concurrencyCondition =
skipKeys.length > 0
? sql`
AND (
concurrency_key IS NULL
OR concurrency_key NOT IN (${sql.join(skipKeys)})
)
`
: sql``

// Step 1: Find and lock a candidate row
const candidateResult = await sql<{
id: string
concurrency_key: string | null
}>`
SELECT id, concurrency_key
FROM durably_runs
WHERE
(
status = 'pending'
OR (status = 'leased' AND lease_expires_at IS NOT NULL AND lease_expires_at <= ${now})
)
AND ${activeLeaseGuard}
${concurrencyCondition}
ORDER BY created_at ASC, id ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
`.execute(trx)

const candidate = candidateResult.rows[0]
if (!candidate) return null

// Step 2: If the candidate has a concurrency key, serialize via
// advisory lock and re-verify with a fresh snapshot (READ COMMITTED
// gives each statement its own snapshot).
if (candidate.concurrency_key) {
await sql`SELECT pg_advisory_xact_lock(hashtext(${candidate.concurrency_key}))`.execute(
trx,
)

const conflict = await sql`
SELECT 1 FROM durably_runs
WHERE concurrency_key = ${candidate.concurrency_key}
AND id <> ${candidate.id}
AND status = 'leased'
AND lease_expires_at IS NOT NULL
AND lease_expires_at > ${now}
LIMIT 1
`.execute(trx)

if (conflict.rows.length > 0) {
// Key is occupied — exclude it and try the next candidate
skipKeys.push(candidate.concurrency_key)
continue
}
}

// Step 3: Claim the candidate (increment lease_generation)
const result = await sql<Database['durably_runs']>`
UPDATE durably_runs
SET
status = 'leased',
lease_owner = ${workerId},
lease_expires_at = ${leaseExpiresAt},
lease_generation = lease_generation + 1,
started_at = COALESCE(started_at, ${now}),
updated_at = ${now}
WHERE id = ${candidate.id}
RETURNING *
`.execute(trx)

const row = result.rows[0]
if (!row) return null
return rowToRun(row)
}
})
}

let subquery = db
.selectFrom('durably_runs')
.select('durably_runs.id')
.where((eb) =>
eb.or([
eb('status', '=', 'pending'),
eb.and([
eb('status', '=', 'leased'),
eb('lease_expires_at', 'is not', null),
eb('lease_expires_at', '<=', now),
]),
]),
)
.where(activeLeaseGuard)
.orderBy('created_at', 'asc')
.orderBy('id', 'asc')
.limit(1)

const row = await db
.updateTable('durably_runs')
.set({
status: 'leased',
lease_owner: workerId,
lease_expires_at: leaseExpiresAt,
lease_generation: sql`lease_generation + 1`,
started_at: sql`COALESCE(started_at, ${now})`,
updated_at: now,
})
.where('id', '=', (eb) =>
eb.selectFrom(subquery.as('sub')).select('id'),
)
.returningAll()
.executeTakeFirst()

if (!row) return null
return rowToRun(row)
return backend === 'postgres'
? claimNextPostgres(db, workerId, now, leaseExpiresAt, activeLeaseGuard)
: claimNextSqlite(db, workerId, now, leaseExpiresAt, activeLeaseGuard)
},

async renewLease(
Expand Down
Loading