diff --git a/package.json b/package.json index e9b921c..70c2065 100644 --- a/package.json +++ b/package.json @@ -17,7 +17,7 @@ "pretest": "rm -rf test-db test-logs", "test": "echo '\n⚠️ WARNING: Running full test suite crashes Claude Code instances!\n\n✅ Safe commands (run these from Claude Code):\n npm run test:core\n npm run test:handlers\n npm run test:repositories\n npm run test:adapters\n npm run test:implementations\n npm run test:services\n npm run test:cli\n npm run test:integration\n\n❌ Full suite: Use npm run test:all (only in local terminal/CI)\n' && exit 1", "test:all": "npm run test:core && npm run test:handlers && npm run test:services && npm run test:repositories && npm run test:adapters && npm run test:implementations && npm run test:cli && npm run test:scheduling && npm run test:checkpoints && npm run test:error-scenarios && npm run test:integration", - "test:services": "NODE_OPTIONS='--max-old-space-size=2048' vitest run tests/unit/services/task-manager.test.ts tests/unit/services/recovery-manager.test.ts tests/unit/services/autoscaling-manager.test.ts tests/unit/services/process-connector.test.ts --no-file-parallelism", + "test:services": "NODE_OPTIONS='--max-old-space-size=2048' vitest run tests/unit/services/task-manager.test.ts tests/unit/services/recovery-manager.test.ts tests/unit/services/autoscaling-manager.test.ts tests/unit/services/process-connector.test.ts tests/unit/services/handler-setup.test.ts --no-file-parallelism", "test:full": "npm run test:all && npm run test:worker-handler", "test:unit": "NODE_OPTIONS='--max-old-space-size=2048' vitest run --no-file-parallelism", "test:core": "NODE_OPTIONS='--max-old-space-size=2048' vitest run tests/unit/core --no-file-parallelism", diff --git a/src/core/interfaces.ts b/src/core/interfaces.ts index 28e9b2a..4c96b39 100644 --- a/src/core/interfaces.ts +++ b/src/core/interfaces.ts @@ -130,7 +130,6 @@ export interface TaskRepository { findByStatus(status: string): Promise>; delete(taskId: TaskId): Promise>; cleanupOldTasks(olderThanMs: number): Promise>; - transaction(fn: (repo: TaskRepository) => Promise>): Promise>; } /** @@ -412,6 +411,37 @@ export interface ScheduleService { createScheduledPipeline(request: ScheduledPipelineCreateRequest): Promise>; } +/** + * Synchronous task operations for use inside Database.runInTransaction(). + * These methods throw on error (the transaction wrapper catches and converts to Result). + * ARCHITECTURE: Narrow interface — only the operations needed inside transactions. + */ +export interface SyncTaskOperations { + saveSync(task: Task): void; + updateSync(taskId: TaskId, update: Partial): void; + findByIdSync(taskId: TaskId): Task | null; +} + +/** + * Synchronous schedule operations for use inside Database.runInTransaction(). + * These methods throw on error (the transaction wrapper catches and converts to Result). + * ARCHITECTURE: Narrow interface — only the operations needed inside transactions. + */ +export interface SyncScheduleOperations { + updateSync(id: ScheduleId, update: Partial, existing?: Schedule): void; + recordExecutionSync(execution: Omit): ScheduleExecution; + findByIdSync(id: ScheduleId): Schedule | null; +} + +/** + * Synchronous transaction runner for atomic multi-step DB operations. + * ARCHITECTURE: Abstraction over Database — handlers depend on this interface, not concrete Database. + * Pattern: Dependency Inversion Principle — service layer depends on abstraction. + */ +export interface TransactionRunner { + runInTransaction(fn: () => T): Result; +} + /** * Checkpoint persistence for task resumption * ARCHITECTURE: Stores task state snapshots for "smart retry" enrichment diff --git a/src/implementations/database.ts b/src/implementations/database.ts index 05b138e..1b36173 100644 --- a/src/implementations/database.ts +++ b/src/implementations/database.ts @@ -7,7 +7,9 @@ import SQLite from 'better-sqlite3'; import fs from 'fs'; import os from 'os'; import path from 'path'; -import { Logger } from '../core/interfaces.js'; +import { BackbeatError, ErrorCode } from '../core/errors.js'; +import { Logger, TransactionRunner } from '../core/interfaces.js'; +import { Result, tryCatch } from '../core/result.js'; /** * Silent no-op logger for when no logger is provided @@ -43,7 +45,7 @@ const noOpLogger: Logger = { * const testDb = new Database(); * ``` */ -export class Database { +export class Database implements TransactionRunner { private db: SQLite.Database; private readonly dbPath: string; private readonly logger: Logger; @@ -533,6 +535,28 @@ export class Database { ]; } + /** + * Run a synchronous function inside a SQLite transaction. + * If the function throws, the transaction is rolled back and an err Result is returned. + * If the function returns, the transaction is committed and the return value is wrapped in ok. + * + * ARCHITECTURE: Uses better-sqlite3's synchronous transaction API. + * All operations inside fn must be synchronous (use *Sync repo methods). + * BackbeatErrors thrown inside fn are preserved; other errors are wrapped. + */ + runInTransaction(fn: () => T): Result { + return tryCatch( + () => this.db.transaction(fn)(), + (error) => + error instanceof BackbeatError + ? error + : new BackbeatError( + ErrorCode.SYSTEM_ERROR, + `Transaction failed: ${error instanceof Error ? error.message : String(error)}`, + ), + ); + } + /** * Get current schema version (public method for monitoring/debugging) */ diff --git a/src/implementations/dependency-repository.ts b/src/implementations/dependency-repository.ts index 7fc504c..d6c9a75 100644 --- a/src/implementations/dependency-repository.ts +++ b/src/implementations/dependency-repository.ts @@ -10,7 +10,7 @@ import { z } from 'zod'; import { TaskId } from '../core/domain.js'; import { BackbeatError, ErrorCode, operationErrorHandler } from '../core/errors.js'; import { DependencyRepository, TaskDependency } from '../core/interfaces.js'; -import { err, ok, Result, tryCatch, tryCatchAsync } from '../core/result.js'; +import { err, ok, Result, tryCatchAsync } from '../core/result.js'; import { Database } from './database.js'; /** @@ -47,6 +47,7 @@ export class SQLiteDependencyRepository implements DependencyRepository { /** Default pagination limit for findAll() */ private static readonly DEFAULT_LIMIT = 100; + private readonly database: Database; private readonly db: SQLite.Database; private readonly addDependencyStmt: SQLite.Statement; private readonly getDependenciesStmt: SQLite.Statement; @@ -64,6 +65,7 @@ export class SQLiteDependencyRepository implements DependencyRepository { private readonly findAllPaginatedStmt: SQLite.Statement; constructor(database: Database) { + this.database = database; this.db = database.getDatabase(); // Prepare statements for better performance @@ -210,9 +212,9 @@ export class SQLiteDependencyRepository implements DependencyRepository { ); } - // SECURITY: TOCTOU Fix - Use synchronous .transaction() for true atomicity + // SECURITY: TOCTOU Fix - Use runInTransaction() for true atomicity // All validation and insertion happens within single atomic transaction - const addDependenciesTransaction = this.db.transaction((taskId: TaskId, dependsOn: readonly TaskId[]) => { + return this.database.runInTransaction(() => { // ALL operations below are synchronous - no await, no yielding to event loop // VALIDATION: Check dependent task exists @@ -265,29 +267,6 @@ export class SQLiteDependencyRepository implements DependencyRepository { return createdDependencies; }); - - // Execute the transaction and wrap result - return tryCatch( - () => addDependenciesTransaction(taskId, dependsOn), - (error) => { - // Preserve semantic BackbeatError types - if (error instanceof BackbeatError) { - return error; - } - - // Handle UNIQUE constraint violation - if (error instanceof Error && error.message.includes('UNIQUE constraint')) { - return new BackbeatError( - ErrorCode.INVALID_OPERATION, - `One or more dependencies already exist for task: ${taskId}`, - { taskId, dependsOn }, - ); - } - - // Unknown errors become SYSTEM_ERROR - return new BackbeatError(ErrorCode.SYSTEM_ERROR, `Failed to add dependencies: ${error}`, { taskId, dependsOn }); - }, - ); } /** diff --git a/src/implementations/schedule-repository.ts b/src/implementations/schedule-repository.ts index 1ffc373..af2b523 100644 --- a/src/implementations/schedule-repository.ts +++ b/src/implementations/schedule-repository.ts @@ -19,7 +19,7 @@ import { TaskRequest, } from '../core/domain.js'; import { BackbeatError, ErrorCode, operationErrorHandler } from '../core/errors.js'; -import { ScheduleExecution, ScheduleRepository } from '../core/interfaces.js'; +import { ScheduleExecution, ScheduleRepository, SyncScheduleOperations } from '../core/interfaces.js'; import { err, ok, Result, tryCatchAsync } from '../core/result.js'; import { Database } from './database.js'; @@ -142,7 +142,7 @@ interface ScheduleExecutionRow { readonly created_at: number; } -export class SQLiteScheduleRepository implements ScheduleRepository { +export class SQLiteScheduleRepository implements ScheduleRepository, SyncScheduleOperations { /** Default pagination limit for findAll() */ private static readonly DEFAULT_LIMIT = 100; @@ -243,36 +243,40 @@ export class SQLiteScheduleRepository implements ScheduleRepository { `); } + /** + * Convert Schedule domain object to database parameter format. + * Shared by both async (save/update) and sync (updateSync) methods. + * Includes createdAt — better-sqlite3 ignores named params not referenced by the statement. + */ + private toDbFormat(schedule: Schedule): Record { + return { + id: schedule.id, + taskTemplate: JSON.stringify(schedule.taskTemplate), + scheduleType: schedule.scheduleType, + cronExpression: schedule.cronExpression ?? null, + scheduledAt: schedule.scheduledAt ?? null, + timezone: schedule.timezone, + missedRunPolicy: schedule.missedRunPolicy, + status: schedule.status, + maxRuns: schedule.maxRuns ?? null, + runCount: schedule.runCount, + lastRunAt: schedule.lastRunAt ?? null, + nextRunAt: schedule.nextRunAt ?? null, + expiresAt: schedule.expiresAt ?? null, + afterScheduleId: schedule.afterScheduleId ?? null, + pipelineSteps: schedule.pipelineSteps ? JSON.stringify(schedule.pipelineSteps) : null, + createdAt: schedule.createdAt, + updatedAt: schedule.updatedAt, + }; + } + /** * Save a new schedule - * - * @param schedule - The schedule to save - * @returns Result indicating success or error */ async save(schedule: Schedule): Promise> { return tryCatchAsync( async () => { - const dbSchedule = { - id: schedule.id, - taskTemplate: JSON.stringify(schedule.taskTemplate), - scheduleType: schedule.scheduleType, - cronExpression: schedule.cronExpression ?? null, - scheduledAt: schedule.scheduledAt ?? null, - timezone: schedule.timezone, - missedRunPolicy: schedule.missedRunPolicy, - status: schedule.status, - maxRuns: schedule.maxRuns ?? null, - runCount: schedule.runCount, - lastRunAt: schedule.lastRunAt ?? null, - nextRunAt: schedule.nextRunAt ?? null, - expiresAt: schedule.expiresAt ?? null, - afterScheduleId: schedule.afterScheduleId ?? null, - pipelineSteps: schedule.pipelineSteps ? JSON.stringify(schedule.pipelineSteps) : null, - createdAt: schedule.createdAt, - updatedAt: schedule.updatedAt, - }; - - this.saveStmt.run(dbSchedule); + this.saveStmt.run(this.toDbFormat(schedule)); }, operationErrorHandler('save schedule', { scheduleId: schedule.id }), ); @@ -280,13 +284,8 @@ export class SQLiteScheduleRepository implements ScheduleRepository { /** * Update an existing schedule - * - * @param id - The schedule ID to update - * @param update - Partial schedule fields to update - * @returns Result indicating success or error */ async update(id: ScheduleId, update: Partial): Promise> { - // First get the existing schedule const existingResult = await this.findById(id); if (!existingResult.ok) { @@ -297,40 +296,66 @@ export class SQLiteScheduleRepository implements ScheduleRepository { return err(new BackbeatError(ErrorCode.TASK_NOT_FOUND, `Schedule ${id} not found`)); } - // Merge updates with existing schedule const updatedSchedule: Schedule = { ...existingResult.value, ...update, updatedAt: Date.now(), }; - // Use UPDATE (not INSERT OR REPLACE) to preserve child rows - // INSERT OR REPLACE deletes the old row first, triggering ON DELETE CASCADE return tryCatchAsync( async () => { - this.updateStmt.run({ - id: updatedSchedule.id, - taskTemplate: JSON.stringify(updatedSchedule.taskTemplate), - scheduleType: updatedSchedule.scheduleType, - cronExpression: updatedSchedule.cronExpression ?? null, - scheduledAt: updatedSchedule.scheduledAt ?? null, - timezone: updatedSchedule.timezone, - missedRunPolicy: updatedSchedule.missedRunPolicy, - status: updatedSchedule.status, - maxRuns: updatedSchedule.maxRuns ?? null, - runCount: updatedSchedule.runCount, - lastRunAt: updatedSchedule.lastRunAt ?? null, - nextRunAt: updatedSchedule.nextRunAt ?? null, - expiresAt: updatedSchedule.expiresAt ?? null, - afterScheduleId: updatedSchedule.afterScheduleId ?? null, - pipelineSteps: updatedSchedule.pipelineSteps ? JSON.stringify(updatedSchedule.pipelineSteps) : null, - updatedAt: updatedSchedule.updatedAt, - }); + this.updateStmt.run(this.toDbFormat(updatedSchedule)); }, operationErrorHandler('update schedule', { scheduleId: id }), ); } + // ============================================================================ + // SYNC METHODS (for use inside Database.runInTransaction()) + // These throw on error — the transaction wrapper catches and converts to Result. + // ============================================================================ + + findByIdSync(id: ScheduleId): Schedule | null { + const row = this.findByIdStmt.get(id) as ScheduleRow | undefined; + if (!row) return null; + return this.rowToSchedule(row); + } + + updateSync(id: ScheduleId, update: Partial, existing?: Schedule): void { + const base = existing ?? this.findByIdSync(id); + if (!base) { + throw new BackbeatError(ErrorCode.TASK_NOT_FOUND, `Schedule ${id} not found`); + } + const updatedSchedule: Schedule = { + ...base, + ...update, + updatedAt: Date.now(), + }; + this.updateStmt.run(this.toDbFormat(updatedSchedule)); + } + + recordExecutionSync(execution: Omit): ScheduleExecution { + const result = this.recordExecutionStmt.run( + execution.scheduleId, + execution.taskId ?? null, + execution.scheduledFor, + execution.executedAt ?? null, + execution.status, + execution.errorMessage ?? null, + execution.pipelineTaskIds ? JSON.stringify(execution.pipelineTaskIds) : null, + execution.createdAt, + ); + + const row = this.getExecutionByIdStmt.get(result.lastInsertRowid) as ScheduleExecutionRow | undefined; + if (!row) { + throw new BackbeatError( + ErrorCode.SYSTEM_ERROR, + `Failed to retrieve execution record after insert (rowid: ${result.lastInsertRowid})`, + ); + } + return this.rowToExecution(row); + } + /** * Find schedule by ID * @@ -456,7 +481,13 @@ export class SQLiteScheduleRepository implements ScheduleRepository { execution.createdAt, ); - const row = this.getExecutionByIdStmt.get(result.lastInsertRowid) as ScheduleExecutionRow; + const row = this.getExecutionByIdStmt.get(result.lastInsertRowid) as ScheduleExecutionRow | undefined; + if (!row) { + throw new BackbeatError( + ErrorCode.SYSTEM_ERROR, + `Failed to retrieve execution record after insert (rowid: ${result.lastInsertRowid})`, + ); + } return this.rowToExecution(row); }, operationErrorHandler('record schedule execution', { scheduleId: execution.scheduleId }), diff --git a/src/implementations/task-repository.ts b/src/implementations/task-repository.ts index f08e74e..6ddc2a6 100644 --- a/src/implementations/task-repository.ts +++ b/src/implementations/task-repository.ts @@ -8,7 +8,7 @@ import { z } from 'zod'; import { AGENT_PROVIDERS_TUPLE, AgentProvider } from '../core/agents.js'; import { Priority, Task, TaskId, TaskStatus, WorkerId } from '../core/domain.js'; import { BackbeatError, ErrorCode, operationErrorHandler } from '../core/errors.js'; -import { TaskRepository } from '../core/interfaces.js'; +import { SyncTaskOperations, TaskRepository } from '../core/interfaces.js'; import { err, ok, Result, tryCatchAsync } from '../core/result.js'; import { Database } from './database.js'; @@ -62,7 +62,7 @@ interface TaskRow { readonly agent: string | null; } -export class SQLiteTaskRepository implements TaskRepository { +export class SQLiteTaskRepository implements TaskRepository, SyncTaskOperations { private readonly db: SQLite.Database; private readonly saveStmt: SQLite.Statement; private readonly updateStmt: SQLite.Statement; @@ -165,39 +165,44 @@ export class SQLiteTaskRepository implements TaskRepository { `); } + /** + * Convert Task domain object to database parameter format. + * Shared by both async (save/update) and sync (saveSync/updateSync) methods. + * Includes createdAt — better-sqlite3 ignores named params not referenced by the statement. + */ + private toDbFormat(task: Task): Record { + return { + id: task.id, + prompt: task.prompt, + status: task.status, + priority: task.priority, + workingDirectory: task.workingDirectory || null, + timeout: task.timeout ?? null, + maxOutputBuffer: task.maxOutputBuffer ?? null, + createdAt: task.createdAt, + startedAt: task.startedAt || null, + completedAt: task.completedAt || null, + workerId: task.workerId || null, + exitCode: task.exitCode ?? null, + dependencies: null, // Dependencies stored in task_dependencies table + parentTaskId: task.parentTaskId || null, + retryCount: task.retryCount ?? null, + retryOf: task.retryOf || null, + continueFrom: task.continueFrom || null, + agent: task.agent || null, + }; + } + async save(task: Task): Promise> { return tryCatchAsync( async () => { - // Convert task to database format - const dbTask = { - id: task.id, - prompt: task.prompt, - status: task.status, - priority: task.priority, - workingDirectory: task.workingDirectory || null, - timeout: task.timeout || null, - maxOutputBuffer: task.maxOutputBuffer || null, - createdAt: task.createdAt, - startedAt: task.startedAt || null, - completedAt: task.completedAt || null, - workerId: task.workerId || null, - exitCode: task.exitCode ?? null, - dependencies: null, // Dependencies stored in task_dependencies table - parentTaskId: task.parentTaskId || null, - retryCount: task.retryCount || null, - retryOf: task.retryOf || null, - continueFrom: task.continueFrom || null, - agent: task.agent || null, - }; - - this.saveStmt.run(dbTask); + this.saveStmt.run(this.toDbFormat(task)); }, operationErrorHandler('save task', { taskId: task.id }), ); } async update(taskId: TaskId, update: Partial): Promise> { - // First get the existing task const existingResult = await this.findById(taskId); if (!existingResult.ok) { @@ -208,36 +213,40 @@ export class SQLiteTaskRepository implements TaskRepository { return err(new BackbeatError(ErrorCode.TASK_NOT_FOUND, `Task ${taskId} not found`)); } - // Merge updates with existing task const updatedTask = { ...existingResult.value, ...update }; - // Use UPDATE (not INSERT OR REPLACE) to preserve child rows return tryCatchAsync( async () => { - this.updateStmt.run({ - id: updatedTask.id, - prompt: updatedTask.prompt, - status: updatedTask.status, - priority: updatedTask.priority, - workingDirectory: updatedTask.workingDirectory || null, - timeout: updatedTask.timeout || null, - maxOutputBuffer: updatedTask.maxOutputBuffer || null, - startedAt: updatedTask.startedAt || null, - completedAt: updatedTask.completedAt || null, - workerId: updatedTask.workerId || null, - exitCode: updatedTask.exitCode ?? null, - dependencies: null, - parentTaskId: updatedTask.parentTaskId || null, - retryCount: updatedTask.retryCount || null, - retryOf: updatedTask.retryOf || null, - continueFrom: updatedTask.continueFrom || null, - agent: updatedTask.agent || null, - }); + this.updateStmt.run(this.toDbFormat(updatedTask)); }, operationErrorHandler('update task', { taskId }), ); } + // ============================================================================ + // SYNC METHODS (for use inside Database.runInTransaction()) + // These throw on error — the transaction wrapper catches and converts to Result. + // ============================================================================ + + saveSync(task: Task): void { + this.saveStmt.run(this.toDbFormat(task)); + } + + findByIdSync(taskId: TaskId): Task | null { + const row = this.findByIdStmt.get(taskId) as TaskRow | undefined; + if (!row) return null; + return this.rowToTask(row); + } + + updateSync(taskId: TaskId, update: Partial): void { + const existing = this.findByIdSync(taskId); + if (!existing) { + throw new BackbeatError(ErrorCode.TASK_NOT_FOUND, `Task ${taskId} not found`); + } + const updatedTask = { ...existing, ...update }; + this.updateStmt.run(this.toDbFormat(updatedTask)); + } + async findById(taskId: TaskId): Promise> { return tryCatchAsync( async () => { @@ -304,21 +313,6 @@ export class SQLiteTaskRepository implements TaskRepository { }, operationErrorHandler('cleanup old tasks')); } - async transaction(fn: (repo: TaskRepository) => Promise>): Promise> { - try { - const transactionFn = this.db.transaction(async () => { - // Create a transaction-wrapped repository - const txRepo = new TransactionTaskRepository(this); - return await fn(txRepo); - }); - - // Execute the transaction and return the result - return await transactionFn(); - } catch (error) { - return err(new BackbeatError(ErrorCode.SYSTEM_ERROR, `Transaction failed: ${error}`)); - } - } - /** * Convert database row to Task domain object * Pattern: Validate at boundary - ensures data integrity from database @@ -333,10 +327,10 @@ export class SQLiteTaskRepository implements TaskRepository { status: data.status as TaskStatus, priority: data.priority as Priority, workingDirectory: data.working_directory || undefined, - timeout: data.timeout || undefined, - maxOutputBuffer: data.max_output_buffer || undefined, + timeout: data.timeout ?? undefined, + maxOutputBuffer: data.max_output_buffer ?? undefined, parentTaskId: data.parent_task_id ? (data.parent_task_id as TaskId) : undefined, - retryCount: data.retry_count || undefined, + retryCount: data.retry_count ?? undefined, retryOf: data.retry_of ? (data.retry_of as TaskId) : undefined, continueFrom: data.continue_from ? (data.continue_from as TaskId) : undefined, agent: data.agent ?? undefined, @@ -348,52 +342,3 @@ export class SQLiteTaskRepository implements TaskRepository { }; } } - -/** - * Transaction-wrapped repository that delegates to the main repository - * All operations run within the same SQLite transaction - */ -class TransactionTaskRepository implements TaskRepository { - constructor(private readonly mainRepo: SQLiteTaskRepository) {} - - async save(task: Task): Promise> { - return this.mainRepo.save(task); - } - - async update(taskId: TaskId, update: Partial): Promise> { - return this.mainRepo.update(taskId, update); - } - - async findById(taskId: TaskId): Promise> { - return this.mainRepo.findById(taskId); - } - - async findAll(limit?: number, offset?: number): Promise> { - return this.mainRepo.findAll(limit, offset); - } - - async findAllUnbounded(): Promise> { - return this.mainRepo.findAllUnbounded(); - } - - async count(): Promise> { - return this.mainRepo.count(); - } - - async findByStatus(status: string): Promise> { - return this.mainRepo.findByStatus(status); - } - - async delete(taskId: TaskId): Promise> { - return this.mainRepo.delete(taskId); - } - - async cleanupOldTasks(olderThanMs: number): Promise> { - return this.mainRepo.cleanupOldTasks(olderThanMs); - } - - async transaction(fn: (repo: TaskRepository) => Promise>): Promise> { - // Nested transactions not supported - just execute the function - return fn(this); - } -} diff --git a/src/services/handler-setup.ts b/src/services/handler-setup.ts index a05ec83..e6f39e1 100644 --- a/src/services/handler-setup.ts +++ b/src/services/handler-setup.ts @@ -16,15 +16,17 @@ import { OutputCapture, ResourceMonitor, ScheduleRepository, + SyncScheduleOperations, + SyncTaskOperations, TaskQueue, TaskRepository, + TransactionRunner, WorkerPool, } from '../core/interfaces.js'; import { err, ok, Result } from '../core/result.js'; import { CheckpointHandler } from './handlers/checkpoint-handler.js'; import { DependencyHandler } from './handlers/dependency-handler.js'; import { OutputHandler } from './handlers/output-handler.js'; -// Event Handlers import { PersistenceHandler } from './handlers/persistence-handler.js'; import { QueryHandler } from './handlers/query-handler.js'; import { QueueHandler } from './handlers/queue-handler.js'; @@ -39,13 +41,14 @@ export interface HandlerDependencies { readonly config: Configuration; readonly logger: Logger; readonly eventBus: EventBus; - readonly taskRepository: TaskRepository; + readonly database: TransactionRunner; + readonly taskRepository: TaskRepository & SyncTaskOperations; readonly outputCapture: OutputCapture; readonly taskQueue: TaskQueue; readonly dependencyRepository: DependencyRepository; readonly workerPool: WorkerPool; readonly resourceMonitor: ResourceMonitor; - readonly scheduleRepository: ScheduleRepository; + readonly scheduleRepository: ScheduleRepository & SyncScheduleOperations; readonly checkpointRepository: CheckpointRepository; } @@ -99,7 +102,7 @@ function getDependency(container: Container, key: string): Result { * ``` */ export function extractHandlerDependencies(container: Container): Result { - // Extract all 11 dependencies - fail fast on any missing + // Extract all 12 dependencies - fail fast on any missing const configResult = getDependency(container, 'config'); if (!configResult.ok) return configResult; @@ -109,7 +112,10 @@ export function extractHandlerDependencies(container: Container): Result(container, 'eventBus'); if (!eventBusResult.ok) return eventBusResult; - const taskRepositoryResult = getDependency(container, 'taskRepository'); + const databaseResult = getDependency(container, 'database'); + if (!databaseResult.ok) return databaseResult; + + const taskRepositoryResult = getDependency(container, 'taskRepository'); if (!taskRepositoryResult.ok) return taskRepositoryResult; const outputCaptureResult = getDependency(container, 'outputCapture'); @@ -127,7 +133,10 @@ export function extractHandlerDependencies(container: Container): Result(container, 'resourceMonitor'); if (!resourceMonitorResult.ok) return resourceMonitorResult; - const scheduleRepositoryResult = getDependency(container, 'scheduleRepository'); + const scheduleRepositoryResult = getDependency( + container, + 'scheduleRepository', + ); if (!scheduleRepositoryResult.ok) return scheduleRepositoryResult; const checkpointRepositoryResult = getDependency(container, 'checkpointRepository'); @@ -137,6 +146,7 @@ export function extractHandlerDependencies(container: Container): Result> { const handlerLogger = logger.child ? logger.child({ module: 'ScheduleHandler' }) : logger; // Create handler - const handler = new ScheduleHandler(scheduleRepo, taskRepo, eventBus, handlerLogger, options); + const handler = new ScheduleHandler(scheduleRepo, taskRepo, eventBus, database, handlerLogger, options); // Subscribe to events const subscribeResult = handler.subscribeToEvents(); @@ -264,41 +266,64 @@ export class ScheduleHandler extends BaseEventHandler { } /** - * Handle single-task trigger - existing logic extracted verbatim + * Handle single-task trigger — atomic via Database.runInTransaction() */ private async handleSingleTaskTrigger(schedule: Schedule, triggeredAt: number): Promise> { const scheduleId = schedule.id; // afterScheduleId enforcement: inject dependency on chained schedule's latest task + // (reads only, needs async for history lookup — stays OUTSIDE transaction) const afterTaskId = await this.resolveAfterScheduleTaskId(schedule); const dependsOn = afterTaskId ? [...(schedule.taskTemplate.dependsOn ?? []), afterTaskId] : schedule.taskTemplate.dependsOn; - // Create task from template + // Create task domain object (pure computation — OUTSIDE transaction) const task = createTask({ ...schedule.taskTemplate, dependsOn }); - const taskSaveResult = await this.taskRepo.save(task); - if (!taskSaveResult.ok) { + + // Pure computation OUTSIDE transaction + const scheduleUpdates = this.computeScheduleUpdates(schedule, triggeredAt); + + // Atomic: save task + record execution + update schedule + const txResult = this.database.runInTransaction(() => { + try { + this.taskRepo.saveSync(task); + } catch (error) { + throw new BackbeatError( + ErrorCode.SYSTEM_ERROR, + `Schedule trigger failed: ${error instanceof Error ? error.message : String(error)}`, + ); + } + + this.scheduleRepo.recordExecutionSync({ + scheduleId, + taskId: task.id, + scheduledFor: schedule.nextRunAt ?? triggeredAt, + executedAt: triggeredAt, + status: 'triggered', + createdAt: Date.now(), + }); + + this.scheduleRepo.updateSync(schedule.id, scheduleUpdates, schedule); + }); + + if (!txResult.ok) { + // Transaction rolled back — best-effort audit trail await this.recordFailedExecution( scheduleId, schedule.nextRunAt ?? triggeredAt, triggeredAt, - `Failed to create task: ${taskSaveResult.error.message}`, + txResult.error.message, ); - return taskSaveResult; + return txResult; } - // Record successful execution - await this.recordTriggeredExecution(scheduleId, task.id, schedule.nextRunAt ?? triggeredAt, triggeredAt); + // Post-commit logging + this.logScheduleTransition(schedule, scheduleUpdates); - // Update schedule state - const updateResult = await this.updateScheduleAfterTrigger(schedule, triggeredAt); - if (!updateResult.ok) return updateResult; - - // Emit TaskDelegated event for the created task + // Events emitted AFTER transaction commit (never emit for uncommitted data) await this.eventBus.emit('TaskDelegated', { task }); - // Emit ScheduleExecuted with the task ID (for concurrency tracking) await this.eventBus.emit('ScheduleExecuted', { scheduleId, taskId: task.id, @@ -315,7 +340,8 @@ export class ScheduleHandler extends BaseEventHandler { } /** - * Handle pipeline trigger - create N tasks with linear dependencies + * Handle pipeline trigger — atomic via Database.runInTransaction(). + * If any task save throws → automatic rollback → zero tasks persisted → no cleanup needed. */ private async handlePipelineTrigger( schedule: Schedule, @@ -331,95 +357,89 @@ export class ScheduleHandler extends BaseEventHandler { }); // afterScheduleId handling: resolve predecessor dependency for step 0 + // (reads only, needs async for history lookup — stays OUTSIDE transaction) const afterTaskId = await this.resolveAfterScheduleTaskId(schedule); const step0DependsOn: TaskId[] | undefined = afterTaskId ? [afterTaskId] : undefined; - // Create tasks for each step with linear dependencies - // TODO: Wrap in a proper async-safe transaction once better-sqlite3 async - // transaction support is available. Current db.transaction() is synchronous - // and does not support awaited operations inside the callback. - const savedTasks: Task[] = []; + // Pre-create ALL task domain objects OUTSIDE transaction (pure computation) + const tasks: Task[] = []; for (let i = 0; i < steps.length; i++) { const step = steps[i]; const dependsOn: TaskId[] = []; - // Step 0 gets afterScheduleId dependency if present if (i === 0 && step0DependsOn) { dependsOn.push(...step0DependsOn); } - // Step i depends on step i-1 if (i > 0) { - dependsOn.push(savedTasks[i - 1].id); + dependsOn.push(tasks[i - 1].id); } - const task = createTask({ - prompt: step.prompt, - priority: step.priority ?? defaults.priority, - workingDirectory: step.workingDirectory ?? defaults.workingDirectory, - agent: step.agent ?? defaults.agent, - dependsOn: dependsOn.length > 0 ? dependsOn : undefined, - }); - - const saveResult = await this.taskRepo.save(task); - if (!saveResult.ok) { - // ARCHITECTURE EXCEPTION: Direct taskRepo.update() instead of emitting - // TaskCancellationRequested events. At this point tasks are just DB rows — - // no TaskDelegated events have been emitted and no workers have been spawned, - // so there is nothing to coordinate via the event bus. Direct cleanup is correct. - this.logger.error('Pipeline task save failed, cleaning up', saveResult.error, { - scheduleId, - failedStep: i, - savedSteps: savedTasks.length, - }); + tasks.push( + createTask({ + prompt: step.prompt, + priority: step.priority ?? defaults.priority, + workingDirectory: step.workingDirectory ?? defaults.workingDirectory, + agent: step.agent ?? defaults.agent, + dependsOn: dependsOn.length > 0 ? dependsOn : undefined, + }), + ); + } - for (const savedTask of savedTasks) { - const cancelResult = await this.taskRepo.update(savedTask.id, { status: TaskStatus.CANCELLED }); - if (!cancelResult.ok) { - this.logger.error('Failed to cancel pipeline task during cleanup', cancelResult.error, { - taskId: savedTask.id, - }); - } + const allTaskIds = tasks.map((t) => t.id); + const firstTaskId = tasks[0].id; + const lastTaskId = tasks[tasks.length - 1].id; + + // Pure computation OUTSIDE transaction + const scheduleUpdates = this.computeScheduleUpdates(schedule, triggeredAt); + + // Atomic: save N tasks + record execution + update schedule + const txResult = this.database.runInTransaction(() => { + for (let i = 0; i < tasks.length; i++) { + try { + this.taskRepo.saveSync(tasks[i]); + } catch (error) { + throw new BackbeatError( + ErrorCode.SYSTEM_ERROR, + `Pipeline failed at step ${i + 1}: ${error instanceof Error ? error.message : String(error)}`, + ); } - - await this.recordFailedExecution( - scheduleId, - schedule.nextRunAt ?? triggeredAt, - triggeredAt, - `Pipeline failed at step ${i + 1}: ${saveResult.error.message}`, - ); - return saveResult; } - savedTasks.push(task); + // Record execution with lastTaskId for afterScheduleId chaining + this.scheduleRepo.recordExecutionSync({ + scheduleId, + taskId: lastTaskId, + scheduledFor: schedule.nextRunAt ?? triggeredAt, + executedAt: triggeredAt, + status: 'triggered', + pipelineTaskIds: allTaskIds, + createdAt: Date.now(), + }); + + this.scheduleRepo.updateSync(schedule.id, scheduleUpdates, schedule); + }); + + if (!txResult.ok) { + // Transaction rolled back — zero tasks exist, no cleanup needed. + // Best-effort audit trail for the failure. + await this.recordFailedExecution( + scheduleId, + schedule.nextRunAt ?? triggeredAt, + triggeredAt, + txResult.error.message, + ); + return txResult; } - const allTaskIds = savedTasks.map((t) => t.id); - const firstTaskId = savedTasks[0].id; - const lastTaskId = savedTasks[savedTasks.length - 1].id; + // Post-commit logging + this.logScheduleTransition(schedule, scheduleUpdates); - // Record execution with lastTaskId — chained schedules (afterScheduleId) resolve - // the predecessor's execution.taskId to check if it's terminal. Using lastTaskId - // ensures the chain fires when the FULL pipeline completes, not just step 1. - await this.recordTriggeredExecution( - scheduleId, - lastTaskId, - schedule.nextRunAt ?? triggeredAt, - triggeredAt, - allTaskIds, - ); - - // Update schedule state - const updateResult = await this.updateScheduleAfterTrigger(schedule, triggeredAt); - if (!updateResult.ok) return updateResult; - - // Emit TaskDelegated for each task - // Step 0 failure is fatal — it's the only task that becomes runnable; all later - // steps block on it. If it's never delegated, the entire pipeline is orphaned. - // Steps 1–N failures are best-effort — they're already saved with dependencies - // and will be enqueued when their predecessor completes. - for (let ti = 0; ti < savedTasks.length; ti++) { - const task = savedTasks[ti]; + // Events emitted AFTER transaction commit (never emit for uncommitted data) + // Step 0 failure is fatal — it's the only task that becomes runnable. + // Steps 1–N failures are best-effort. + for (let ti = 0; ti < tasks.length; ti++) { + const task = tasks[ti]; const emitResult = await this.eventBus.emit('TaskDelegated', { task }); if (!emitResult.ok) { if (ti === 0) { @@ -427,7 +447,8 @@ export class ScheduleHandler extends BaseEventHandler { taskId: task.id, scheduleId, }); - for (const savedTask of savedTasks) { + // Post-commit cancellation — tasks are committed but event pipeline failed + for (const savedTask of tasks) { await this.taskRepo.update(savedTask.id, { status: TaskStatus.CANCELLED }); } return emitResult; @@ -519,36 +540,11 @@ export class ScheduleHandler extends BaseEventHandler { } /** - * Record a triggered execution in the audit trail + * Compute schedule update fields after a trigger (runCount, lastRunAt, nextRunAt, status). + * Pure computation — no side effects. */ - private async recordTriggeredExecution( - scheduleId: ScheduleId, - taskId: TaskId, - scheduledFor: number, - triggeredAt: number, - pipelineTaskIds?: readonly TaskId[], - ): Promise { - const result = await this.scheduleRepo.recordExecution({ - scheduleId, - taskId, - scheduledFor, - executedAt: triggeredAt, - status: 'triggered', - pipelineTaskIds, - createdAt: Date.now(), - }); - if (!result.ok) { - this.logger.error('Failed to record triggered execution', result.error, { scheduleId }); - } - } - - /** - * Update schedule state after a trigger (runCount, lastRunAt, nextRunAt, status) - */ - private async updateScheduleAfterTrigger(schedule: Schedule, triggeredAt: number): Promise> { - const scheduleId = schedule.id; + private computeScheduleUpdates(schedule: Schedule, triggeredAt: number): Partial { const newRunCount = schedule.runCount + 1; - let newStatus: ScheduleStatus | undefined; let newNextRunAt: number | undefined; @@ -558,10 +554,6 @@ export class ScheduleHandler extends BaseEventHandler { if (nextResult.ok) { newNextRunAt = nextResult.value; } else { - this.logger.error('Failed to calculate next run, pausing schedule', nextResult.error, { - scheduleId, - cronExpression: schedule.cronExpression, - }); newStatus = ScheduleStatus.PAUSED; } } else if (schedule.scheduleType === ScheduleType.ONE_TIME) { @@ -573,36 +565,46 @@ export class ScheduleHandler extends BaseEventHandler { if (schedule.maxRuns && newRunCount >= schedule.maxRuns) { newStatus = ScheduleStatus.COMPLETED; newNextRunAt = undefined; - this.logger.info('Schedule reached maxRuns, marking completed', { - scheduleId, - runCount: newRunCount, - maxRuns: schedule.maxRuns, - }); } // Check expiration if (schedule.expiresAt && Date.now() >= schedule.expiresAt) { newStatus = ScheduleStatus.EXPIRED; newNextRunAt = undefined; - this.logger.info('Schedule expired', { scheduleId, expiresAt: schedule.expiresAt }); } - const updates: Partial = { + return { runCount: newRunCount, lastRunAt: triggeredAt, nextRunAt: newNextRunAt, ...(newStatus !== undefined ? { status: newStatus } : {}), }; + } - const updateResult = await this.scheduleRepo.update(scheduleId, updates); - if (!updateResult.ok) { - this.logger.error('Failed to update schedule after trigger', updateResult.error, { - scheduleId, + /** + * Log schedule state transitions after successful commit. + * Post-commit only — never called inside a transaction. + */ + private logScheduleTransition(schedule: Schedule, updates: Partial): void { + if (updates.status === ScheduleStatus.PAUSED) { + this.logger.warn('Failed to calculate next run, pausing schedule', { + scheduleId: schedule.id, + cronExpression: schedule.cronExpression, + }); + } + if (updates.status === ScheduleStatus.COMPLETED && schedule.maxRuns) { + this.logger.info('Schedule reached maxRuns, marking completed', { + scheduleId: schedule.id, + runCount: updates.runCount, + maxRuns: schedule.maxRuns, + }); + } + if (updates.status === ScheduleStatus.EXPIRED) { + this.logger.info('Schedule expired', { + scheduleId: schedule.id, + expiresAt: schedule.expiresAt, }); - return updateResult; } - - return ok(undefined); } /** diff --git a/tests/fixtures/test-doubles.ts b/tests/fixtures/test-doubles.ts index 1b12bb7..95a99da 100644 --- a/tests/fixtures/test-doubles.ts +++ b/tests/fixtures/test-doubles.ts @@ -446,11 +446,6 @@ export class TestTaskRepository implements TaskRepository { return ok(deletedCount); } - async transaction(fn: (repo: TaskRepository) => Promise>): Promise> { - // In-memory implementation - just execute the function - return fn(this); - } - async deleteAll(): Promise> { this.tasks.clear(); return ok(undefined); diff --git a/tests/helpers/test-factories.ts b/tests/helpers/test-factories.ts index 9720b51..d81d49d 100644 --- a/tests/helpers/test-factories.ts +++ b/tests/helpers/test-factories.ts @@ -237,7 +237,6 @@ export const MockFactory = { delete: vi.fn().mockResolvedValue(ok(undefined)), findAll: vi.fn().mockResolvedValue(ok([])), cleanupOldTasks: vi.fn().mockResolvedValue(ok(0)), - transaction: vi.fn().mockImplementation(async (fn) => await fn({} as TaskRepository)), } as TaskRepository; }, diff --git a/tests/unit/implementations/database.test.ts b/tests/unit/implementations/database.test.ts index 0f47164..1d8a1f3 100644 --- a/tests/unit/implementations/database.test.ts +++ b/tests/unit/implementations/database.test.ts @@ -1,5 +1,6 @@ import { afterEach, beforeEach, describe, expect, it } from 'vitest'; import type { TaskId } from '../../../src/core/domain'; +import { BackbeatError, ErrorCode } from '../../../src/core/errors'; import { Database } from '../../../src/implementations/database'; import { TEST_COUNTS } from '../../constants'; import { TaskFactory } from '../../fixtures/factories'; @@ -293,6 +294,71 @@ describe('Database - REAL Database Operations (In-Memory)', () => { }); }); + describe('runInTransaction', () => { + it('should return ok with callback return value on success', () => { + const result = db.runInTransaction(() => { + const sqliteDb = db.getDatabase(); + sqliteDb + .prepare(`INSERT INTO tasks (id, prompt, status, priority, created_at) VALUES (?, ?, ?, ?, ?)`) + .run('tx-1', 'prompt', 'queued', 'P1', Date.now()); + return 42; + }); + + expect(result.ok).toBe(true); + if (!result.ok) return; + expect(result.value).toBe(42); + + // Verify write was committed + const row = db.getDatabase().prepare('SELECT * FROM tasks WHERE id = ?').get('tx-1'); + expect(row).toBeDefined(); + }); + + it('should return err when callback throws generic error', () => { + const result = db.runInTransaction(() => { + throw new Error('boom'); + }); + + expect(result.ok).toBe(false); + if (result.ok) return; + expect(result.error.message).toContain('Transaction failed: boom'); + expect(result.error.code).toBe(ErrorCode.SYSTEM_ERROR); + }); + + it('should preserve BackbeatError types thrown inside', () => { + const result = db.runInTransaction(() => { + throw new BackbeatError(ErrorCode.TASK_NOT_FOUND, 'Task xyz not found'); + }); + + expect(result.ok).toBe(false); + if (result.ok) return; + expect(result.error).toBeInstanceOf(BackbeatError); + expect(result.error.code).toBe(ErrorCode.TASK_NOT_FOUND); + expect(result.error.message).toBe('Task xyz not found'); + }); + + it('should rollback all writes on error', () => { + const sqliteDb = db.getDatabase(); + + const result = db.runInTransaction(() => { + sqliteDb + .prepare(`INSERT INTO tasks (id, prompt, status, priority, created_at) VALUES (?, ?, ?, ?, ?)`) + .run('rollback-1', 'prompt', 'queued', 'P1', Date.now()); + sqliteDb + .prepare(`INSERT INTO tasks (id, prompt, status, priority, created_at) VALUES (?, ?, ?, ?, ?)`) + .run('rollback-2', 'prompt', 'queued', 'P1', Date.now()); + + // This write should be rolled back too + throw new Error('fail after 2 inserts'); + }); + + expect(result.ok).toBe(false); + + // Verify nothing was committed + const count = sqliteDb.prepare('SELECT COUNT(*) as count FROM tasks').get() as { count: number }; + expect(count.count).toBe(0); + }); + }); + describe('Default path handling', () => { it('should use in-memory database for tests', () => { // In tests, we should always use in-memory databases diff --git a/tests/unit/implementations/schedule-repository.test.ts b/tests/unit/implementations/schedule-repository.test.ts index 2450535..940ded5 100644 --- a/tests/unit/implementations/schedule-repository.test.ts +++ b/tests/unit/implementations/schedule-repository.test.ts @@ -14,6 +14,7 @@ import { ScheduleType, TaskId, } from '../../../src/core/domain.js'; +import { BackbeatError, ErrorCode } from '../../../src/core/errors.js'; import { Database } from '../../../src/implementations/database.js'; import { SQLiteScheduleRepository } from '../../../src/implementations/schedule-repository.js'; @@ -649,4 +650,120 @@ describe('SQLiteScheduleRepository - Unit Tests', () => { expect(found.pipelineSteps![2].prompt).toBe('step three'); }); }); + + describe('Sync methods (for transactions)', () => { + it('findByIdSync should return schedule when found', async () => { + const schedule = createTestSchedule(); + await repo.save(schedule); + + const found = repo.findByIdSync(schedule.id); + expect(found).not.toBeNull(); + expect(found!.id).toBe(schedule.id); + }); + + it('findByIdSync should return null when not found', () => { + const found = repo.findByIdSync(ScheduleId('no-such-schedule')); + expect(found).toBeNull(); + }); + + it('updateSync should merge fields', async () => { + const schedule = createTestSchedule(); + await repo.save(schedule); + + repo.updateSync(schedule.id, { status: ScheduleStatus.PAUSED, runCount: 3 }); + + const found = repo.findByIdSync(schedule.id); + expect(found).not.toBeNull(); + expect(found!.status).toBe(ScheduleStatus.PAUSED); + expect(found!.runCount).toBe(3); + }); + + it('updateSync should throw BackbeatError for non-existent schedule', () => { + expect(() => { + repo.updateSync(ScheduleId('no-such-schedule'), { status: ScheduleStatus.PAUSED }); + }).toThrow(BackbeatError); + }); + + it('recordExecutionSync should record and return execution with ID', async () => { + const schedule = createTestSchedule(); + await repo.save(schedule); + + const now = Date.now(); + const execution = repo.recordExecutionSync({ + scheduleId: schedule.id, + scheduledFor: now, + executedAt: now, + status: 'triggered', + createdAt: now, + }); + + expect(execution.id).toBeDefined(); + expect(execution.scheduleId).toBe(schedule.id); + expect(execution.status).toBe('triggered'); + }); + + it('recordExecutionSync should record execution with pipelineTaskIds', async () => { + const schedule = createTestSchedule(); + await repo.save(schedule); + + const now = Date.now(); + const taskIds = [TaskId('task-a'), TaskId('task-b')]; + const execution = repo.recordExecutionSync({ + scheduleId: schedule.id, + scheduledFor: now, + executedAt: now, + status: 'triggered', + pipelineTaskIds: taskIds, + createdAt: now, + }); + + expect(execution.pipelineTaskIds).toBeDefined(); + expect(execution.pipelineTaskIds).toHaveLength(2); + expect(execution.pipelineTaskIds![0]).toBe('task-a'); + }); + + it('should work correctly inside Database.runInTransaction', async () => { + const schedule = createTestSchedule(); + await repo.save(schedule); + + const now = Date.now(); + const result = db.runInTransaction(() => { + repo.updateSync(schedule.id, { runCount: 1, lastRunAt: now }); + repo.recordExecutionSync({ + scheduleId: schedule.id, + scheduledFor: now, + executedAt: now, + status: 'triggered', + createdAt: now, + }); + }); + + expect(result.ok).toBe(true); + + // Verify both operations committed + const found = repo.findByIdSync(schedule.id); + expect(found!.runCount).toBe(1); + + const history = await repo.getExecutionHistory(schedule.id); + expect(history.ok).toBe(true); + if (!history.ok) return; + expect(history.value).toHaveLength(1); + }); + + it('should rollback all operations when transaction fails', async () => { + const schedule = createTestSchedule(); + await repo.save(schedule); + + const result = db.runInTransaction(() => { + repo.updateSync(schedule.id, { runCount: 99 }); + throw new Error('simulated failure'); + }); + + expect(result.ok).toBe(false); + + // runCount should not have changed + const found = repo.findByIdSync(schedule.id); + expect(found!.runCount).toBe(0); + }); + }); }); diff --git a/tests/unit/implementations/task-repository.test.ts b/tests/unit/implementations/task-repository.test.ts index 5728ad0..f455d33 100644 --- a/tests/unit/implementations/task-repository.test.ts +++ b/tests/unit/implementations/task-repository.test.ts @@ -6,7 +6,8 @@ */ import { afterEach, beforeEach, describe, expect, it } from 'vitest'; -import { type Task, TaskId } from '../../../src/core/domain.js'; +import { type Task, TaskId, TaskStatus } from '../../../src/core/domain.js'; +import { BackbeatError, ErrorCode } from '../../../src/core/errors.js'; import { Database } from '../../../src/implementations/database.js'; import { SQLiteTaskRepository } from '../../../src/implementations/task-repository.js'; import { createTestTask } from '../../fixtures/test-data.js'; @@ -211,8 +212,6 @@ describe('SQLiteTaskRepository', () => { }); it('should apply migration v6 correctly (column exists)', async () => { - // The Database constructor applies all migrations automatically - // Verify the column exists by saving/retrieving a task with continueFrom const task = createTestTask({ id: 'task-migration-test', continueFrom: 'task-parent-migration', @@ -227,4 +226,79 @@ describe('SQLiteTaskRepository', () => { } }); }); + + describe('Sync methods (for transactions)', () => { + it('saveSync should persist a task', () => { + const task = createTestTask({ id: 'sync-save-1' }); + repo.saveSync(task); + + const found = repo.findByIdSync(TaskId('sync-save-1')); + expect(found).not.toBeNull(); + expect(found!.prompt).toBe(task.prompt); + }); + + it('findByIdSync should return null for non-existent task', () => { + const found = repo.findByIdSync(TaskId('no-such-task')); + expect(found).toBeNull(); + }); + + it('updateSync should merge fields', () => { + const task = createTestTask({ id: 'sync-update-1' }); + repo.saveSync(task); + + repo.updateSync(TaskId('sync-update-1'), { status: TaskStatus.RUNNING }); + + const found = repo.findByIdSync(TaskId('sync-update-1')); + expect(found).not.toBeNull(); + expect(found!.status).toBe(TaskStatus.RUNNING); + expect(found!.prompt).toBe(task.prompt); // Other fields preserved + }); + + it('updateSync should throw BackbeatError for non-existent task', () => { + expect(() => { + repo.updateSync(TaskId('no-such-task'), { status: TaskStatus.CANCELLED }); + }).toThrow(BackbeatError); + + try { + repo.updateSync(TaskId('no-such-task'), { status: TaskStatus.CANCELLED }); + } catch (e) { + expect((e as BackbeatError).code).toBe(ErrorCode.TASK_NOT_FOUND); + } + }); + + it('should work correctly inside Database.runInTransaction', () => { + const task1 = createTestTask({ id: 'tx-task-1' }); + const task2 = createTestTask({ id: 'tx-task-2' }); + + const result = database.runInTransaction(() => { + repo.saveSync(task1); + repo.saveSync(task2); + repo.updateSync(TaskId('tx-task-1'), { status: TaskStatus.RUNNING }); + }); + + expect(result.ok).toBe(true); + + // Both tasks committed + const found1 = repo.findByIdSync(TaskId('tx-task-1')); + const found2 = repo.findByIdSync(TaskId('tx-task-2')); + expect(found1).not.toBeNull(); + expect(found1!.status).toBe(TaskStatus.RUNNING); + expect(found2).not.toBeNull(); + }); + + it('should rollback all saves when transaction fails', () => { + const task1 = createTestTask({ id: 'tx-rollback-1' }); + + const result = database.runInTransaction(() => { + repo.saveSync(task1); + throw new Error('simulated failure'); + }); + + expect(result.ok).toBe(false); + + // Task should not exist + const found = repo.findByIdSync(TaskId('tx-rollback-1')); + expect(found).toBeNull(); + }); + }); }); diff --git a/tests/unit/services/handler-setup.test.ts b/tests/unit/services/handler-setup.test.ts index 720eb83..c2498bd 100644 --- a/tests/unit/services/handler-setup.test.ts +++ b/tests/unit/services/handler-setup.test.ts @@ -10,12 +10,14 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { Container } from '../../../src/core/container'; import { InMemoryEventBus } from '../../../src/core/events/event-bus'; import { InMemoryAgentRegistry } from '../../../src/implementations/agent-registry'; +import { SQLiteCheckpointRepository } from '../../../src/implementations/checkpoint-repository'; import { Database } from '../../../src/implementations/database'; import { SQLiteDependencyRepository } from '../../../src/implementations/dependency-repository'; import { EventDrivenWorkerPool } from '../../../src/implementations/event-driven-worker-pool'; import { BufferedOutputCapture } from '../../../src/implementations/output-capture'; import { ProcessSpawnerAdapter } from '../../../src/implementations/process-spawner-adapter'; import { SystemResourceMonitor } from '../../../src/implementations/resource-monitor'; +import { SQLiteScheduleRepository } from '../../../src/implementations/schedule-repository'; import { PriorityTaskQueue } from '../../../src/implementations/task-queue'; import { SQLiteTaskRepository } from '../../../src/implementations/task-repository'; import { @@ -69,6 +71,11 @@ describe('handler-setup', () => { new BufferedOutputCapture(config.maxOutputBuffer, eventBus), ); container.registerValue('workerPool', workerPool); + + // Repositories added in v0.4.0+ (scheduleRepository, checkpointRepository, database) + container.registerValue('database', database); + container.registerValue('scheduleRepository', new SQLiteScheduleRepository(database)); + container.registerValue('checkpointRepository', new SQLiteCheckpointRepository(database)); }); afterEach(async () => { @@ -131,11 +138,26 @@ describe('handler-setup', () => { } }); + it('should fail with clear error when database missing', () => { + const partialContainer = new Container(logger); + partialContainer.registerValue('config', config); + partialContainer.registerValue('logger', logger); + partialContainer.registerValue('eventBus', eventBus); + + const result = extractHandlerDependencies(partialContainer); + + expect(result.ok).toBe(false); + if (!result.ok) { + expect(result.error.message).toContain('database'); + } + }); + it('should fail with clear error when taskRepository missing', () => { const partialContainer = new Container(logger); partialContainer.registerValue('config', config); partialContainer.registerValue('logger', logger); partialContainer.registerValue('eventBus', eventBus); + partialContainer.registerValue('database', database); const result = extractHandlerDependencies(partialContainer); diff --git a/tests/unit/services/handlers/schedule-handler.test.ts b/tests/unit/services/handlers/schedule-handler.test.ts index 9aed43b..fa0a5aa 100644 --- a/tests/unit/services/handlers/schedule-handler.test.ts +++ b/tests/unit/services/handlers/schedule-handler.test.ts @@ -19,6 +19,7 @@ import { TaskId, TaskStatus, } from '../../../../src/core/domain'; +import { BackbeatError, ErrorCode } from '../../../../src/core/errors'; import { InMemoryEventBus } from '../../../../src/core/events/event-bus'; import { Database } from '../../../../src/implementations/database'; import { SQLiteScheduleRepository } from '../../../../src/implementations/schedule-repository'; @@ -45,7 +46,7 @@ describe('ScheduleHandler - Behavioral Tests', () => { scheduleRepo = new SQLiteScheduleRepository(database); taskRepo = new SQLiteTaskRepository(database); - const handlerResult = await ScheduleHandler.create(scheduleRepo, taskRepo, eventBus, logger); + const handlerResult = await ScheduleHandler.create(scheduleRepo, taskRepo, eventBus, database, logger); if (!handlerResult.ok) { throw new Error(`Failed to create ScheduleHandler: ${handlerResult.error.message}`); } @@ -82,12 +83,22 @@ describe('ScheduleHandler - Behavioral Tests', () => { const freshEventBus = new InMemoryEventBus(createTestConfiguration(), new TestLogger()); const freshLogger = new TestLogger(); - const result = await ScheduleHandler.create(scheduleRepo, taskRepo, freshEventBus, freshLogger); + const freshDb = new Database(':memory:'); + const freshScheduleRepo = new SQLiteScheduleRepository(freshDb); + const freshTaskRepo = new SQLiteTaskRepository(freshDb); + const result = await ScheduleHandler.create( + freshScheduleRepo, + freshTaskRepo, + freshEventBus, + freshDb, + freshLogger, + ); expect(result.ok).toBe(true); expect(freshLogger.hasLogContaining('ScheduleHandler initialized')).toBe(true); freshEventBus.dispose(); + freshDb.close(); }); }); @@ -735,22 +746,20 @@ describe('ScheduleHandler - Behavioral Tests', () => { expect(step1!.dependsOn).not.toContain(predecessorTask.id); }); - it('should handle partial save failure by cancelling saved tasks', async () => { - // Arrange: 3-step pipeline where the 3rd save will fail + it('should rollback all tasks on partial save failure (transaction atomicity)', async () => { + // Arrange: 3-step pipeline where the 3rd saveSync will throw const schedule = createPipelineSchedule(); await saveSchedule({ ...schedule, status: ScheduleStatus.ACTIVE }); let saveCallCount = 0; - const originalSave = taskRepo.save.bind(taskRepo); - const saveSpy = vi.spyOn(taskRepo, 'save').mockImplementation(async (task) => { + const originalSaveSync = taskRepo.saveSync.bind(taskRepo); + const saveSpy = vi.spyOn(taskRepo, 'saveSync').mockImplementation((task) => { saveCallCount++; if (saveCallCount === 3) { - // Simulate failure on the 3rd task save - const { err: mkErr } = await import('../../../../src/core/result'); - const { BackbeatError, ErrorCode } = await import('../../../../src/core/errors'); - return mkErr(new BackbeatError(ErrorCode.SYSTEM_ERROR, 'Simulated DB failure on step 3')); + // Sync methods throw on error (caught by transaction wrapper) + throw new BackbeatError(ErrorCode.SYSTEM_ERROR, 'Simulated DB failure on step 3'); } - return originalSave(task); + return originalSaveSync(task); }); // Act @@ -758,22 +767,19 @@ describe('ScheduleHandler - Behavioral Tests', () => { saveSpy.mockRestore(); - // Assert: the 2 tasks that were saved should now be CANCELLED + // Assert: 0 tasks exist — transaction rolled back ALL saves const allTasksResult = await taskRepo.findAll(); expect(allTasksResult.ok).toBe(true); if (!allTasksResult.ok) return; + expect(allTasksResult.value).toHaveLength(0); - const allTasks = allTasksResult.value; - // Only 2 tasks saved (the 3rd failed) - expect(allTasks).toHaveLength(2); - expect(allTasks.every((t) => t.status === TaskStatus.CANCELLED)).toBe(true); - - // Assert: a failed execution was recorded + // Assert: a failed execution was recorded (best-effort, outside transaction) const historyResult = await scheduleRepo.getExecutionHistory(schedule.id); expect(historyResult.ok).toBe(true); if (!historyResult.ok) return; expect(historyResult.value).toHaveLength(1); expect(historyResult.value[0].status).toBe('failed'); + expect(historyResult.value[0].errorMessage).toContain('Pipeline failed at step 3'); }); it('should cancel all tasks when TaskDelegated fails for step 0', async () => { @@ -842,21 +848,19 @@ describe('ScheduleHandler - Behavioral Tests', () => { }); it('should not double-wrap error message in pipeline failure execution record', async () => { - // Arrange: 2-step pipeline where the 2nd save will fail + // Arrange: 2-step pipeline where the 2nd saveSync will throw const twoSteps: readonly PipelineStepRequest[] = [{ prompt: 'step-a' }, { prompt: 'step-b' }]; const schedule = createPipelineSchedule({ pipelineSteps: twoSteps }); await saveSchedule({ ...schedule, status: ScheduleStatus.ACTIVE }); let saveCallCount = 0; - const originalSave = taskRepo.save.bind(taskRepo); - const saveSpy = vi.spyOn(taskRepo, 'save').mockImplementation(async (task) => { + const originalSaveSync = taskRepo.saveSync.bind(taskRepo); + const saveSpy = vi.spyOn(taskRepo, 'saveSync').mockImplementation((task) => { saveCallCount++; if (saveCallCount === 2) { - const { err: mkErr } = await import('../../../../src/core/result'); - const { BackbeatError, ErrorCode } = await import('../../../../src/core/errors'); - return mkErr(new BackbeatError(ErrorCode.SYSTEM_ERROR, 'DB write error')); + throw new BackbeatError(ErrorCode.SYSTEM_ERROR, 'DB write error'); } - return originalSave(task); + return originalSaveSync(task); }); // Act @@ -871,21 +875,19 @@ describe('ScheduleHandler - Behavioral Tests', () => { const errorMessage = historyResult.value[0].errorMessage; expect(errorMessage).toBeDefined(); - // Should contain "Pipeline failed at step 2" but NOT "Failed to create task: Pipeline failed" + // Should contain "Pipeline failed at step 2" but NOT "Schedule trigger failed: Pipeline failed" expect(errorMessage).toContain('Pipeline failed at step 2'); - expect(errorMessage).not.toContain('Failed to create task: Pipeline failed'); + expect(errorMessage).not.toContain('Schedule trigger failed: Pipeline failed'); }); it('should include prefix in single-task failure execution record', async () => { - // Arrange: single-task schedule where save fails + // Arrange: single-task schedule where saveSync throws const schedule = createTestSchedule(); await saveSchedule(schedule); await scheduleRepo.update(schedule.id, { nextRunAt: Date.now() - 60000 }); - const saveSpy = vi.spyOn(taskRepo, 'save').mockImplementation(async () => { - const { err: mkErr } = await import('../../../../src/core/result'); - const { BackbeatError, ErrorCode } = await import('../../../../src/core/errors'); - return mkErr(new BackbeatError(ErrorCode.SYSTEM_ERROR, 'DB write error')); + const saveSpy = vi.spyOn(taskRepo, 'saveSync').mockImplementation(() => { + throw new BackbeatError(ErrorCode.SYSTEM_ERROR, 'DB write error'); }); // Act @@ -893,7 +895,7 @@ describe('ScheduleHandler - Behavioral Tests', () => { await flushEventLoop(); saveSpy.mockRestore(); - // Assert: execution error message should include "Failed to create task:" prefix + // Assert: execution error message should include "Schedule trigger failed:" prefix const historyResult = await scheduleRepo.getExecutionHistory(schedule.id); expect(historyResult.ok).toBe(true); if (!historyResult.ok) return; @@ -901,7 +903,58 @@ describe('ScheduleHandler - Behavioral Tests', () => { const errorMessage = historyResult.value[0].errorMessage; expect(errorMessage).toBeDefined(); - expect(errorMessage).toContain('Failed to create task:'); + expect(errorMessage).toContain('Schedule trigger failed:'); + }); + + it('should rollback task on single-task recordExecutionSync failure', async () => { + // Arrange: single-task schedule where recordExecutionSync throws + const schedule = createTestSchedule(); + await saveSchedule(schedule); + await scheduleRepo.update(schedule.id, { nextRunAt: Date.now() - 60000 }); + + const spy = vi.spyOn(scheduleRepo, 'recordExecutionSync').mockImplementation(() => { + throw new BackbeatError(ErrorCode.SYSTEM_ERROR, 'Execution record failed'); + }); + + // Act + await eventBus.emit('ScheduleTriggered', { scheduleId: schedule.id, triggeredAt: Date.now() }); + await flushEventLoop(); + spy.mockRestore(); + + // Assert: no task was saved (transaction rolled back) + const allTasks = await taskRepo.findAll(); + expect(allTasks.ok).toBe(true); + if (!allTasks.ok) return; + expect(allTasks.value).toHaveLength(0); + }); + + it('should commit all pipeline tasks + execution atomically on success', async () => { + // Arrange + const schedule = createPipelineSchedule(); + await saveSchedule({ ...schedule, status: ScheduleStatus.ACTIVE }); + + // Act + await triggerSchedule(schedule.id); + + // Assert: all 3 tasks committed + const allTasks = await taskRepo.findAll(); + expect(allTasks.ok).toBe(true); + if (!allTasks.ok) return; + expect(allTasks.value).toHaveLength(3); + + // Assert: execution record committed + const history = await scheduleRepo.getExecutionHistory(schedule.id); + expect(history.ok).toBe(true); + if (!history.ok) return; + expect(history.value).toHaveLength(1); + expect(history.value[0].status).toBe('triggered'); + expect(history.value[0].pipelineTaskIds).toHaveLength(3); + + // Assert: schedule updated + const updated = await scheduleRepo.findById(schedule.id); + expect(updated.ok).toBe(true); + if (!updated.ok) return; + expect(updated.value!.runCount).toBe(1); }); it('should update schedule state after pipeline trigger', async () => {