Skip to content
Merged
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"pretest": "rm -rf test-db test-logs",
"test": "echo '\n⚠️ WARNING: Running full test suite crashes Claude Code instances!\n\n✅ Safe commands (run these from Claude Code):\n npm run test:core\n npm run test:handlers\n npm run test:repositories\n npm run test:adapters\n npm run test:implementations\n npm run test:services\n npm run test:cli\n npm run test:integration\n\n❌ Full suite: Use npm run test:all (only in local terminal/CI)\n' && exit 1",
"test:all": "npm run test:core && npm run test:handlers && npm run test:services && npm run test:repositories && npm run test:adapters && npm run test:implementations && npm run test:cli && npm run test:scheduling && npm run test:checkpoints && npm run test:error-scenarios && npm run test:integration",
"test:services": "NODE_OPTIONS='--max-old-space-size=2048' vitest run tests/unit/services/task-manager.test.ts tests/unit/services/recovery-manager.test.ts tests/unit/services/autoscaling-manager.test.ts tests/unit/services/process-connector.test.ts --no-file-parallelism",
"test:services": "NODE_OPTIONS='--max-old-space-size=2048' vitest run tests/unit/services/task-manager.test.ts tests/unit/services/recovery-manager.test.ts tests/unit/services/autoscaling-manager.test.ts tests/unit/services/process-connector.test.ts tests/unit/services/handler-setup.test.ts --no-file-parallelism",
"test:full": "npm run test:all && npm run test:worker-handler",
"test:unit": "NODE_OPTIONS='--max-old-space-size=2048' vitest run --no-file-parallelism",
"test:core": "NODE_OPTIONS='--max-old-space-size=2048' vitest run tests/unit/core --no-file-parallelism",
Expand Down
32 changes: 31 additions & 1 deletion src/core/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ export interface TaskRepository {
findByStatus(status: string): Promise<Result<readonly Task[]>>;
delete(taskId: TaskId): Promise<Result<void>>;
cleanupOldTasks(olderThanMs: number): Promise<Result<number>>;
transaction<T>(fn: (repo: TaskRepository) => Promise<Result<T>>): Promise<Result<T>>;
}

/**
Expand Down Expand Up @@ -412,6 +411,37 @@ export interface ScheduleService {
createScheduledPipeline(request: ScheduledPipelineCreateRequest): Promise<Result<Schedule>>;
}

/**
* Synchronous task operations for use inside Database.runInTransaction().
* These methods throw on error (the transaction wrapper catches and converts to Result).
* ARCHITECTURE: Narrow interface — only the operations needed inside transactions.
*/
export interface SyncTaskOperations {
saveSync(task: Task): void;
updateSync(taskId: TaskId, update: Partial<Task>): void;
findByIdSync(taskId: TaskId): Task | null;
}

/**
* Synchronous schedule operations for use inside Database.runInTransaction().
* These methods throw on error (the transaction wrapper catches and converts to Result).
* ARCHITECTURE: Narrow interface — only the operations needed inside transactions.
*/
export interface SyncScheduleOperations {
updateSync(id: ScheduleId, update: Partial<Schedule>, existing?: Schedule): void;
recordExecutionSync(execution: Omit<ScheduleExecution, 'id'>): ScheduleExecution;
findByIdSync(id: ScheduleId): Schedule | null;
}

/**
* Synchronous transaction runner for atomic multi-step DB operations.
* ARCHITECTURE: Abstraction over Database — handlers depend on this interface, not concrete Database.
* Pattern: Dependency Inversion Principle — service layer depends on abstraction.
*/
export interface TransactionRunner {
runInTransaction<T>(fn: () => T): Result<T>;
}

/**
* Checkpoint persistence for task resumption
* ARCHITECTURE: Stores task state snapshots for "smart retry" enrichment
Expand Down
28 changes: 26 additions & 2 deletions src/implementations/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import SQLite from 'better-sqlite3';
import fs from 'fs';
import os from 'os';
import path from 'path';
import { Logger } from '../core/interfaces.js';
import { BackbeatError, ErrorCode } from '../core/errors.js';
import { Logger, TransactionRunner } from '../core/interfaces.js';
import { Result, tryCatch } from '../core/result.js';

/**
* Silent no-op logger for when no logger is provided
Expand Down Expand Up @@ -43,7 +45,7 @@ const noOpLogger: Logger = {
* const testDb = new Database();
* ```
*/
export class Database {
export class Database implements TransactionRunner {
private db: SQLite.Database;
private readonly dbPath: string;
private readonly logger: Logger;
Expand Down Expand Up @@ -533,6 +535,28 @@ export class Database {
];
}

/**
* Run a synchronous function inside a SQLite transaction.
* If the function throws, the transaction is rolled back and an err Result is returned.
* If the function returns, the transaction is committed and the return value is wrapped in ok.
*
* ARCHITECTURE: Uses better-sqlite3's synchronous transaction API.
* All operations inside fn must be synchronous (use *Sync repo methods).
* BackbeatErrors thrown inside fn are preserved; other errors are wrapped.
*/
runInTransaction<T>(fn: () => T): Result<T> {
return tryCatch(
() => this.db.transaction(fn)(),
(error) =>
error instanceof BackbeatError
? error
: new BackbeatError(
ErrorCode.SYSTEM_ERROR,
`Transaction failed: ${error instanceof Error ? error.message : String(error)}`,
),
);
}

/**
* Get current schema version (public method for monitoring/debugging)
*/
Expand Down
31 changes: 5 additions & 26 deletions src/implementations/dependency-repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { z } from 'zod';
import { TaskId } from '../core/domain.js';
import { BackbeatError, ErrorCode, operationErrorHandler } from '../core/errors.js';
import { DependencyRepository, TaskDependency } from '../core/interfaces.js';
import { err, ok, Result, tryCatch, tryCatchAsync } from '../core/result.js';
import { err, ok, Result, tryCatchAsync } from '../core/result.js';
import { Database } from './database.js';

/**
Expand Down Expand Up @@ -47,6 +47,7 @@ export class SQLiteDependencyRepository implements DependencyRepository {
/** Default pagination limit for findAll() */
private static readonly DEFAULT_LIMIT = 100;

private readonly database: Database;
private readonly db: SQLite.Database;
private readonly addDependencyStmt: SQLite.Statement;
private readonly getDependenciesStmt: SQLite.Statement;
Expand All @@ -64,6 +65,7 @@ export class SQLiteDependencyRepository implements DependencyRepository {
private readonly findAllPaginatedStmt: SQLite.Statement;

constructor(database: Database) {
this.database = database;
this.db = database.getDatabase();

// Prepare statements for better performance
Expand Down Expand Up @@ -210,9 +212,9 @@ export class SQLiteDependencyRepository implements DependencyRepository {
);
}

// SECURITY: TOCTOU Fix - Use synchronous .transaction() for true atomicity
// SECURITY: TOCTOU Fix - Use runInTransaction() for true atomicity
// All validation and insertion happens within single atomic transaction
const addDependenciesTransaction = this.db.transaction((taskId: TaskId, dependsOn: readonly TaskId[]) => {
return this.database.runInTransaction(() => {
// ALL operations below are synchronous - no await, no yielding to event loop

// VALIDATION: Check dependent task exists
Expand Down Expand Up @@ -265,29 +267,6 @@ export class SQLiteDependencyRepository implements DependencyRepository {

return createdDependencies;
});

// Execute the transaction and wrap result
return tryCatch(
() => addDependenciesTransaction(taskId, dependsOn),
(error) => {
// Preserve semantic BackbeatError types
if (error instanceof BackbeatError) {
return error;
}

// Handle UNIQUE constraint violation
if (error instanceof Error && error.message.includes('UNIQUE constraint')) {
return new BackbeatError(
ErrorCode.INVALID_OPERATION,
`One or more dependencies already exist for task: ${taskId}`,
{ taskId, dependsOn },
);
}

// Unknown errors become SYSTEM_ERROR
return new BackbeatError(ErrorCode.SYSTEM_ERROR, `Failed to add dependencies: ${error}`, { taskId, dependsOn });
},
);
}

/**
Expand Down
137 changes: 84 additions & 53 deletions src/implementations/schedule-repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {
TaskRequest,
} from '../core/domain.js';
import { BackbeatError, ErrorCode, operationErrorHandler } from '../core/errors.js';
import { ScheduleExecution, ScheduleRepository } from '../core/interfaces.js';
import { ScheduleExecution, ScheduleRepository, SyncScheduleOperations } from '../core/interfaces.js';
import { err, ok, Result, tryCatchAsync } from '../core/result.js';
import { Database } from './database.js';

Expand Down Expand Up @@ -142,7 +142,7 @@ interface ScheduleExecutionRow {
readonly created_at: number;
}

export class SQLiteScheduleRepository implements ScheduleRepository {
export class SQLiteScheduleRepository implements ScheduleRepository, SyncScheduleOperations {
/** Default pagination limit for findAll() */
private static readonly DEFAULT_LIMIT = 100;

Expand Down Expand Up @@ -243,50 +243,49 @@ export class SQLiteScheduleRepository implements ScheduleRepository {
`);
}

/**
* Convert Schedule domain object to database parameter format.
* Shared by both async (save/update) and sync (updateSync) methods.
* Includes createdAt — better-sqlite3 ignores named params not referenced by the statement.
*/
private toDbFormat(schedule: Schedule): Record<string, unknown> {
return {
id: schedule.id,
taskTemplate: JSON.stringify(schedule.taskTemplate),
scheduleType: schedule.scheduleType,
cronExpression: schedule.cronExpression ?? null,
scheduledAt: schedule.scheduledAt ?? null,
timezone: schedule.timezone,
missedRunPolicy: schedule.missedRunPolicy,
status: schedule.status,
maxRuns: schedule.maxRuns ?? null,
runCount: schedule.runCount,
lastRunAt: schedule.lastRunAt ?? null,
nextRunAt: schedule.nextRunAt ?? null,
expiresAt: schedule.expiresAt ?? null,
afterScheduleId: schedule.afterScheduleId ?? null,
pipelineSteps: schedule.pipelineSteps ? JSON.stringify(schedule.pipelineSteps) : null,
createdAt: schedule.createdAt,
updatedAt: schedule.updatedAt,
};
}

/**
* Save a new schedule
*
* @param schedule - The schedule to save
* @returns Result indicating success or error
*/
async save(schedule: Schedule): Promise<Result<void>> {
return tryCatchAsync(
async () => {
const dbSchedule = {
id: schedule.id,
taskTemplate: JSON.stringify(schedule.taskTemplate),
scheduleType: schedule.scheduleType,
cronExpression: schedule.cronExpression ?? null,
scheduledAt: schedule.scheduledAt ?? null,
timezone: schedule.timezone,
missedRunPolicy: schedule.missedRunPolicy,
status: schedule.status,
maxRuns: schedule.maxRuns ?? null,
runCount: schedule.runCount,
lastRunAt: schedule.lastRunAt ?? null,
nextRunAt: schedule.nextRunAt ?? null,
expiresAt: schedule.expiresAt ?? null,
afterScheduleId: schedule.afterScheduleId ?? null,
pipelineSteps: schedule.pipelineSteps ? JSON.stringify(schedule.pipelineSteps) : null,
createdAt: schedule.createdAt,
updatedAt: schedule.updatedAt,
};

this.saveStmt.run(dbSchedule);
this.saveStmt.run(this.toDbFormat(schedule));
},
operationErrorHandler('save schedule', { scheduleId: schedule.id }),
);
}

/**
* Update an existing schedule
*
* @param id - The schedule ID to update
* @param update - Partial schedule fields to update
* @returns Result indicating success or error
*/
async update(id: ScheduleId, update: Partial<Schedule>): Promise<Result<void>> {
// First get the existing schedule
const existingResult = await this.findById(id);

if (!existingResult.ok) {
Expand All @@ -297,40 +296,66 @@ export class SQLiteScheduleRepository implements ScheduleRepository {
return err(new BackbeatError(ErrorCode.TASK_NOT_FOUND, `Schedule ${id} not found`));
}

// Merge updates with existing schedule
const updatedSchedule: Schedule = {
...existingResult.value,
...update,
updatedAt: Date.now(),
};

// Use UPDATE (not INSERT OR REPLACE) to preserve child rows
// INSERT OR REPLACE deletes the old row first, triggering ON DELETE CASCADE
return tryCatchAsync(
async () => {
this.updateStmt.run({
id: updatedSchedule.id,
taskTemplate: JSON.stringify(updatedSchedule.taskTemplate),
scheduleType: updatedSchedule.scheduleType,
cronExpression: updatedSchedule.cronExpression ?? null,
scheduledAt: updatedSchedule.scheduledAt ?? null,
timezone: updatedSchedule.timezone,
missedRunPolicy: updatedSchedule.missedRunPolicy,
status: updatedSchedule.status,
maxRuns: updatedSchedule.maxRuns ?? null,
runCount: updatedSchedule.runCount,
lastRunAt: updatedSchedule.lastRunAt ?? null,
nextRunAt: updatedSchedule.nextRunAt ?? null,
expiresAt: updatedSchedule.expiresAt ?? null,
afterScheduleId: updatedSchedule.afterScheduleId ?? null,
pipelineSteps: updatedSchedule.pipelineSteps ? JSON.stringify(updatedSchedule.pipelineSteps) : null,
updatedAt: updatedSchedule.updatedAt,
});
this.updateStmt.run(this.toDbFormat(updatedSchedule));
},
operationErrorHandler('update schedule', { scheduleId: id }),
);
}

// ============================================================================
// SYNC METHODS (for use inside Database.runInTransaction())
// These throw on error — the transaction wrapper catches and converts to Result.
// ============================================================================

findByIdSync(id: ScheduleId): Schedule | null {
const row = this.findByIdStmt.get(id) as ScheduleRow | undefined;
if (!row) return null;
return this.rowToSchedule(row);
}

updateSync(id: ScheduleId, update: Partial<Schedule>, existing?: Schedule): void {
const base = existing ?? this.findByIdSync(id);
if (!base) {
throw new BackbeatError(ErrorCode.TASK_NOT_FOUND, `Schedule ${id} not found`);
}
const updatedSchedule: Schedule = {
...base,
...update,
updatedAt: Date.now(),
};
this.updateStmt.run(this.toDbFormat(updatedSchedule));
}

recordExecutionSync(execution: Omit<ScheduleExecution, 'id'>): ScheduleExecution {
const result = this.recordExecutionStmt.run(
execution.scheduleId,
execution.taskId ?? null,
execution.scheduledFor,
execution.executedAt ?? null,
execution.status,
execution.errorMessage ?? null,
execution.pipelineTaskIds ? JSON.stringify(execution.pipelineTaskIds) : null,
execution.createdAt,
);

const row = this.getExecutionByIdStmt.get(result.lastInsertRowid) as ScheduleExecutionRow | undefined;
if (!row) {
throw new BackbeatError(
ErrorCode.SYSTEM_ERROR,
`Failed to retrieve execution record after insert (rowid: ${result.lastInsertRowid})`,
);
}
return this.rowToExecution(row);
}

/**
* Find schedule by ID
*
Expand Down Expand Up @@ -456,7 +481,13 @@ export class SQLiteScheduleRepository implements ScheduleRepository {
execution.createdAt,
);

const row = this.getExecutionByIdStmt.get(result.lastInsertRowid) as ScheduleExecutionRow;
const row = this.getExecutionByIdStmt.get(result.lastInsertRowid) as ScheduleExecutionRow | undefined;
if (!row) {
throw new BackbeatError(
ErrorCode.SYSTEM_ERROR,
`Failed to retrieve execution record after insert (rowid: ${result.lastInsertRowid})`,
);
}
return this.rowToExecution(row);
},
operationErrorHandler('record schedule execution', { scheduleId: execution.scheduleId }),
Expand Down
Loading
Loading