diff --git a/CLAUDE.md b/CLAUDE.md index 517d1e6..73f469e 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -134,7 +134,7 @@ When adding task dependencies: ### MCP Tools -All tools use PascalCase: `DelegateTask`, `TaskStatus`, `TaskLogs`, `CancelTask`, `ScheduleTask`, `ListSchedules`, `GetSchedule`, `CancelSchedule`, `PauseSchedule`, `ResumeSchedule` +All tools use PascalCase: `DelegateTask`, `TaskStatus`, `TaskLogs`, `CancelTask`, `ScheduleTask`, `ListSchedules`, `GetSchedule`, `CancelSchedule`, `PauseSchedule`, `ResumeSchedule`, `CreatePipeline`, `SchedulePipeline` ## File Locations @@ -154,6 +154,7 @@ Quick reference for common operations: | Schedule repository | `src/implementations/schedule-repository.ts` | | Schedule handler | `src/services/handlers/schedule-handler.ts` | | Schedule executor | `src/services/schedule-executor.ts` | +| Schedule manager | `src/services/schedule-manager.ts` | | Cron utilities | `src/utils/cron.ts` | ## Documentation Structure diff --git a/README.md b/README.md index fba6910..f53a5d0 100644 --- a/README.md +++ b/README.md @@ -79,10 +79,12 @@ Once configured, use these tools in Claude Code: | **ScheduleTask** | Schedule recurring or one-time tasks | `ScheduleTask({ prompt: "...", scheduleType: "cron", cronExpression: "0 2 * * *" })` | | **ListSchedules** | List schedules with optional status filter | `ListSchedules({ status: "active" })` | | **GetSchedule** | Get schedule details and execution history | `GetSchedule({ scheduleId })` | -| **CancelSchedule** | Cancel an active schedule | `CancelSchedule({ scheduleId, reason })` | +| **CancelSchedule** | Cancel an active schedule (optionally cancel in-flight tasks) | `CancelSchedule({ scheduleId, reason, cancelTasks? })` | | **PauseSchedule** | Pause a schedule (resumable) | `PauseSchedule({ scheduleId })` | | **ResumeSchedule** | Resume a paused schedule | `ResumeSchedule({ scheduleId })` | | **ResumeTask** | Resume a failed/completed task with checkpoint context | `ResumeTask({ taskId, additionalContext? })` | +| **CreatePipeline** | Create sequential task pipelines | `CreatePipeline({ steps: [...] })` | +| **SchedulePipeline** | Create recurring/one-time scheduled pipelines | `SchedulePipeline({ steps: [...], cronExpression: "0 9 * * *" })` | ### CLI Commands @@ -316,7 +318,8 @@ backbeat/ - [x] v0.3.3 - Test infrastructure and memory management - [x] v0.4.0 - Task scheduling and task resumption - [x] v0.5.0 - Multi-agent support (Claude, Codex, Gemini) -- [ ] v0.6.0 - Scheduled pipelines and loops +- [x] v0.6.0 - Scheduled pipelines +- [ ] v0.6.1 - Task/pipeline loops See **[ROADMAP.md](./docs/ROADMAP.md)** for detailed plans and timelines. diff --git a/docs/FEATURES.md b/docs/FEATURES.md index 822385a..7b5fb92 100644 --- a/docs/FEATURES.md +++ b/docs/FEATURES.md @@ -202,6 +202,7 @@ Last Updated: March 2026 - **PauseSchedule**: Pause an active schedule (can be resumed later) - **ResumeSchedule**: Resume a paused schedule - **CreatePipeline** (v0.4.1): Create sequential task pipelines with 2–20 steps, per-step delays, priority, and working directory overrides +- **SchedulePipeline** (v0.6.0): Create recurring or one-time scheduled pipelines with 2–20 steps, each trigger creates a fresh pipeline instance with linear task dependencies ### Schedule Types - **CRON**: Standard 5-field cron expressions for recurring task execution @@ -263,6 +264,29 @@ Last Updated: March 2026 - **`agent` field on DelegateTask**: Specify agent per task (e.g., `{ agent: "codex" }`) - **Fallback**: Uses default agent when no agent specified +## ✅ Scheduled Pipelines (v0.6.0) + +### Recurring & One-Time Pipelines +- **SchedulePipeline MCP Tool**: Create a single schedule that triggers a full pipeline (2–20 steps) on each execution +- **Cron + One-Time**: Supports both recurring cron expressions and single future execution +- **Linear Dependencies**: Each trigger creates fresh tasks wired with linear dependencies (step N depends on step N-1) +- **Per-Step Configuration**: Each step can have its own prompt, priority, working directory, and agent override (MCP only) +- **Shared Defaults**: Schedule-level agent, priority, and working directory apply to all steps unless overridden + +### Pipeline Lifecycle +- **Dependency Failure Cascade**: When a pipeline step fails, all downstream steps are automatically cancelled +- **Cancel with Tasks**: `CancelSchedule` with `cancelTasks: true` cancels in-flight pipeline tasks from current execution +- **Concurrency Tracking**: Pipeline completion tracked via tail task — prevents overlapping pipeline executions +- **`afterScheduleId` Support**: Chain pipelines after other schedules (predecessor dependency injected on step 0) + +### CLI Support +- `beat schedule create --pipeline --step "lint" --step "test" --cron "0 9 * * *"`: Create scheduled pipeline +- `beat schedule cancel --cancel-tasks`: Cancel schedule and in-flight tasks + +### Bug Fixes (v0.6.0) +- **Dependency Failure Cascade**: Failed/cancelled upstream tasks now cascade cancellation to dependents (was incorrectly unblocking them) +- **Queue Handler Race Condition**: Fast-path check prevents blocked tasks from being prematurely enqueued + ## ❌ NOT Implemented (Despite Some Documentation Claims) - **Distributed Processing**: Single-server only - **Web UI**: No dashboard interface @@ -274,6 +298,26 @@ Last Updated: March 2026 --- +## 🆕 What's New in v0.6.0 + +### Scheduled Pipelines +- **`SchedulePipeline` MCP Tool**: Create cron or one-time schedules that trigger a full pipeline (2–20 steps) on each execution +- **Linear Task Dependencies**: Each trigger creates fresh tasks with `task[i].dependsOn = [task[i-1].id]` +- **Per-Step Agent Override**: MCP tool supports per-step `agent` field; CLI uses shared `--agent` +- **`cancelTasks` on CancelSchedule**: Optional flag to also cancel in-flight pipeline tasks from current execution +- **ListSchedules Enhancement**: Response includes `isPipeline` and `stepCount` indicators +- **GetSchedule Enhancement**: Response includes full `pipelineSteps` when present +- **CLI**: `--pipeline --step "..." --step "..."` flags for creating scheduled pipelines + +### Bug Fixes +- **Dependency Failure Cascade**: When upstream task fails or is cancelled, dependent tasks are now cancelled instead of incorrectly unblocked (**breaking change**) +- **Queue Handler Race Condition**: Fast-path `dependencyState` check prevents blocked tasks from being enqueued before dependency rows are written to DB + +### Database +- **Migration 8**: `pipeline_steps` column on `schedules` table, `pipeline_task_ids` column on `schedule_executions` table + +--- + ## 🆕 What's New in v0.5.0 ### Multi-Agent Support diff --git a/docs/ROADMAP.md b/docs/ROADMAP.md index ae0a262..fef42dc 100644 --- a/docs/ROADMAP.md +++ b/docs/ROADMAP.md @@ -42,7 +42,7 @@ Items originally planned for v0.3.1 that were completed across v0.3.1–v0.4.0: | Remove Cycle Detection from Repository Layer | Open | — | | Consolidate Graph Caching | Open | — | | JSDoc Coverage for dependency APIs | Open | — | -| Failed/Cancelled Dependency Propagation Semantics | Open | Needs design decision | +| Failed/Cancelled Dependency Propagation Semantics | ✅ Done | v0.6.0 (cascade cancellation) | Open items are low priority — they'll be addressed opportunistically or when performance demands it. @@ -57,26 +57,19 @@ Agent registry with pluggable adapters (Claude, Codex, Gemini), per-task agent s --- -### v0.6.0 - Scheduled Pipelines & Loops -**Goal**: Time-triggered pipelines and condition-driven iteration -**Priority**: High — completes the orchestration story -**Issues**: [#78](https://github.com/dean0x/backbeat/issues/78), [#79](https://github.com/dean0x/backbeat/issues/79) +### v0.6.0 - Scheduled Pipelines ✅ +**Status**: **RELEASED** (2026-03-18) -#### Feature 1: Scheduled Pipelines (#78) -Run a multi-step DAG on a cron or one-time schedule. Today `beat schedule create` only supports single tasks and `CreatePipeline` only runs immediately — this connects the two. +Scheduled pipelines (`SchedulePipeline` MCP tool, `--pipeline --step` CLI), dependency failure cascade fix, queue handler race condition fix, `cancelTasks` on `CancelSchedule`. See [FEATURES.md](./FEATURES.md) for details. -```bash -beat schedule create --cron "0 9 * * *" --pipeline \ - --step "lint the codebase" \ - --step "run test suite" \ - --step "deploy to staging" -``` +--- -- Each trigger creates a fresh pipeline instance (new task IDs) -- Steps inherit the schedule's agent unless overridden -- Execution history tracks which schedule triggered which pipeline +### v0.6.1 - Task/Pipeline Loops +**Goal**: Condition-driven iteration +**Priority**: High — completes the orchestration story +**Issue**: [#79](https://github.com/dean0x/backbeat/issues/79) -#### Feature 2: Task/Pipeline Loops (#79) +#### Task/Pipeline Loops (#79) Repeat a task or pipeline until an exit condition is met — the [Ralph Wiggum Loop](https://ghuntley.com/loop/) pattern. ```bash @@ -94,6 +87,7 @@ beat loop "implement next item from spec.md" \ - v0.4.0 schedules (cron/one-time), checkpoints, `continueFrom` - v0.4.1 pipelines (`CreatePipeline`) - v0.5.0 multi-agent per-task selection +- v0.6.0 scheduled pipelines --- @@ -224,7 +218,8 @@ beat recipe create my-workflow # interactive recipe builder | v0.3.1–3 | ✅ Released | Dependency optimizations + security | | v0.4.0 | ✅ Released | Scheduling, Resumption, Rename to Backbeat | | v0.5.0 | ✅ Released | Multi-Agent Support | -| v0.6.0 | 🎯 Next | Scheduled Pipelines + Loops | +| v0.6.0 | ✅ Released | Scheduled Pipelines | +| v0.6.1 | 🎯 Next | Task/Pipeline Loops | | v0.7.0 | 📋 Planned | Agent Failover + Smart Routing | | v0.8.0 | 📋 Planned | Workflow Recipes & Templates | | v0.9.0 | 💭 Research | Monitoring + REST API + Dashboard | diff --git a/docs/TASK-DEPENDENCIES.md b/docs/TASK-DEPENDENCIES.md index efb1bef..1a1fe10 100644 --- a/docs/TASK-DEPENDENCIES.md +++ b/docs/TASK-DEPENDENCIES.md @@ -92,15 +92,23 @@ TaskDelegated ├─ Blocked: Skip enqueueing └─ Not Blocked: Enqueue task -TaskCompleted/Failed/Cancelled +TaskCompleted │ └─▶ DependencyHandler.handleTaskCompleted() - └─▶ Resolves dependencies + └─▶ Resolves dependencies as 'completed' └─▶ Checks dependent tasks - └─▶ If unblocked: Emits TaskUnblocked + └─▶ If unblocked (all deps completed): Emits TaskUnblocked │ └─▶ QueueHandler.handleTaskUnblocked() └─▶ Enqueues unblocked task + +TaskFailed/Cancelled/Timeout + │ + └─▶ DependencyHandler.handleTaskFailed/Cancelled/Timeout() + └─▶ Resolves dependencies as 'failed' or 'cancelled' + └─▶ Checks dependent tasks + └─▶ If unblocked but has failed/cancelled dep: + Emits TaskCancellationRequested (cascade cancellation) ``` ### DAG Validation @@ -376,7 +384,7 @@ const aggregateResults = await taskManager.delegate({ ### Error Handling ```typescript -// Dependencies can fail or be cancelled +// Dependency failures cascade automatically (v0.6.0+) const dbMigration = await taskManager.delegate({ prompt: 'Run database migrations' @@ -389,15 +397,17 @@ const seedData = await taskManager.delegate({ // If dbMigration fails: // 1. Dependency is resolved as 'failed' -// 2. seedData is no longer blocked (isBlocked returns false) -// 3. seedData can execute (you may want to check dependency resolution states) +// 2. seedData has no more pending dependencies +// 3. DependencyHandler detects the failed dependency +// 4. seedData is automatically cancelled (cascade cancellation) +// 5. seedData never executes -// To check dependency resolution: +// To inspect dependency resolution after the fact: const deps = await dependencyRepo.getDependencies(seedData.value.id); if (deps.ok) { - const allSucceeded = deps.value.every(dep => dep.resolution === 'completed'); - if (!allSucceeded) { - console.log('Some dependencies failed - task may want to abort'); + for (const dep of deps.value) { + console.log(`Dependency ${dep.dependsOnTaskId}: ${dep.resolution}`); + // 'completed' | 'failed' | 'cancelled' } } ``` @@ -465,22 +475,28 @@ await taskManager.delegate({ ### 3. Handle Dependency Failures -#### Current Behavior (v0.3.0) +> **Breaking Change (v0.6.0):** Dependency failure behavior changed from "unblock and execute" to "cascade cancellation". Failed or cancelled upstream tasks now automatically cancel all downstream dependents. This replaced the v0.3.0 behavior where dependents were unblocked and expected to check resolution states manually. -**Important:** When a dependency fails or is cancelled, the dependent task is **unblocked** but **not automatically failed or cancelled**. +#### Current Behavior (v0.6.0+) + +**Important:** When a dependency fails, is cancelled, or times out, all dependent tasks are **automatically cancelled** via cascade cancellation. Dependents do not execute. **Resolution Flow:** ``` -Task A fails/is cancelled +Task A fails/is cancelled/times out ↓ Dependency resolved as 'failed' or 'cancelled' ↓ -Task B (depends on A) becomes unblocked (isBlocked = false) +Task B (depends on A) becomes unblocked (no pending deps remain) + ↓ +DependencyHandler detects a failed/cancelled dependency in Task B's resolved deps ↓ -Task B is enqueued and can execute +Emits TaskCancellationRequested for Task B ↓ -Task B should check dependency resolution states before proceeding +Task B is cancelled with reason "Dependency failed" + ↓ +If Task B has its own dependents, the cascade continues recursively ``` **Example:** @@ -499,117 +515,40 @@ const taskB = await taskManager.delegate({ // If Task A fails: // 1. Dependency is marked as resolution='failed' -// 2. Task B becomes unblocked (isBlocked returns false) -// 3. Task B is enqueued and will start executing -// 4. Task B SHOULD check dependency states before proceeding - -// Recommended pattern for Task B: -const deps = await dependencyRepo.getDependencies(taskB.id); -if (deps.ok) { - const failedDeps = deps.value.filter(d => d.resolution === 'failed'); - const cancelledDeps = deps.value.filter(d => d.resolution === 'cancelled'); - - if (failedDeps.length > 0 || cancelledDeps.length > 0) { - // Option 1: Fail the task - throw new Error(`Dependencies failed: ${failedDeps.map(d => d.dependsOnTaskId).join(', ')}`); - - // Option 2: Skip execution and log warning - console.warn('Skipping task due to failed dependencies'); - return; - - // Option 3: Continue anyway (if task can handle partial results) - console.warn('Proceeding despite failed dependencies'); - } -} +// 2. Task B has no more pending dependencies (isBlocked = false) +// 3. DependencyHandler finds a failed dependency in Task B's resolved deps +// 4. TaskCancellationRequested is emitted for Task B +// 5. Task B is cancelled — it never executes +// 6. If Task C depends on Task B, it will also be cascade-cancelled ``` #### Design Rationale -The current behavior (unblock but don't auto-fail) was chosen for flexibility: +Cascade cancellation (auto-cancel on upstream failure) was chosen for safety and pipeline reliability: **Advantages:** -- ✅ Tasks can inspect dependency resolution states and make decisions -- ✅ Some tasks may be able to proceed despite failed dependencies (e.g., "best effort" tasks) -- ✅ Prevents cascading failures when only partial results are needed +- Prevents executing tasks when prerequisites failed (e.g., deploying after a failed build) +- No manual resolution checking required in task code +- Recursive propagation handles deep dependency chains automatically +- Essential for scheduled pipelines where unattended execution must fail-safe -**Disadvantages:** -- ⚠️ Tasks may execute when they shouldn't if resolution checks are forgotten -- ⚠️ Requires explicit handling in each task's code +**Implementation:** +- `DependencyHandler.resolveDependencies()` checks resolved deps after unblocking +- If any dependency has `resolution === 'failed'` or `resolution === 'cancelled'`, emits `TaskCancellationRequested` +- The cancellation itself triggers further cascade for downstream dependents -#### Future Consideration (v0.4.0) +#### Cascade Cancellation Propagation -We may add configurable dependency failure strategies: +**Current Behavior:** Cancelling or failing a task automatically cascades to all dependents. ```typescript -// Proposed future API (not yet implemented) -await taskManager.delegate({ - prompt: 'Task B', - dependsOn: [taskA.id], - onDependencyFailure: 'auto-fail' // or 'auto-cancel', 'continue', 'manual' -}); -``` - -**Track this in**: [GitHub Issue #TBD - Dependency Failure Strategies] - -#### Cancelled Dependency Propagation - -**Current Behavior:** Cancelling a task does **not** automatically cancel its dependents. - -```typescript -// Task C depends on Task A -await taskManager.cancel(taskA.id); - -// Task C is NOT automatically cancelled -// Task C becomes unblocked (dependency resolved as 'cancelled') -// Task C will be enqueued and can execute -``` - -**Workaround for Cascading Cancellation:** - -```typescript -// Manual cascade cancellation -async function cancelWithDependents(taskId: TaskId) { - // Get all dependent tasks - const dependents = await dependencyRepo.getDependents(taskId); - - if (dependents.ok) { - // Cancel the original task - await taskManager.cancel(taskId); - - // Recursively cancel all dependents - for (const dep of dependents.value) { - await cancelWithDependents(dep.taskId); - } - } -} +// Pipeline: A → B → C +// If Task A fails: +// 1. Task B is cascade-cancelled (reason: "Dependency failed") +// 2. Task C is cascade-cancelled (reason: "Dependency cancelled") ``` -#### Recommendation - -**For production use**, always check dependency resolution states: - -```typescript -// At the start of every task that has dependencies: -async function executeTask(taskId: TaskId) { - // 1. Check dependencies - const deps = await dependencyRepo.getDependencies(taskId); - - if (deps.ok && deps.value.length > 0) { - // 2. Verify all dependencies completed successfully - const allSucceeded = deps.value.every(d => d.resolution === 'completed'); - - if (!allSucceeded) { - const failedDeps = deps.value.filter(d => d.resolution !== 'completed'); - throw new Error( - `Cannot execute: ${failedDeps.length} dependencies did not complete successfully` - ); - } - } - - // 3. Proceed with task execution - // ... -} -``` +This is handled automatically by the `DependencyHandler` — no manual cancellation code is needed. ### 4. Avoid Circular Dependencies diff --git a/docs/releases/RELEASE_NOTES_v0.6.0.md b/docs/releases/RELEASE_NOTES_v0.6.0.md new file mode 100644 index 0000000..fe4186d --- /dev/null +++ b/docs/releases/RELEASE_NOTES_v0.6.0.md @@ -0,0 +1,127 @@ +# Backbeat v0.6.0 — Scheduled Pipelines + +Create recurring or one-time scheduled pipelines that trigger a full multi-step pipeline on each execution. v0.6.0 also fixes dependency failure handling with automatic cascade cancellation. + +--- + +## New Features + +### SchedulePipeline MCP Tool (PR #78) + +Create a schedule that triggers a sequential pipeline (2-20 steps) on each execution. + +**Key Capabilities:** +- **Cron + One-Time**: Supports both recurring cron expressions and single future execution +- **Linear Dependencies**: Each trigger creates fresh tasks wired with linear dependencies (step N depends on step N-1) +- **Per-Step Configuration**: Each step can have its own prompt, priority, working directory, and agent override +- **Shared Defaults**: Schedule-level agent, priority, and working directory apply to all steps unless overridden +- **Concurrency Tracking**: Pipeline completion tracked via tail task -- prevents overlapping pipeline executions +- **`afterScheduleId` Support**: Chain pipelines after other schedules (predecessor dependency injected on step 0) + +**MCP Usage:** +```typescript +await SchedulePipeline({ + name: "nightly-ci", + steps: [ + { prompt: "run linter" }, + { prompt: "run tests" }, + { prompt: "build and deploy", priority: 0 } + ], + type: "cron", + cron: "0 2 * * *", + timezone: "America/New_York" +}); +``` + +### CLI Pipeline Support + +Create scheduled pipelines from the CLI with `--pipeline` and `--step` flags. + +```bash +beat schedule create --pipeline \ + --step "lint" \ + --step "test" \ + --step "deploy" \ + --cron "0 9 * * 1-5" +``` + +### Cancel Schedule with In-Flight Tasks + +`CancelSchedule` now supports a `cancelTasks` flag to also cancel in-flight pipeline tasks from the current execution. + +**MCP:** +```typescript +await CancelSchedule({ scheduleId: "...", cancelTasks: true }); +``` + +**CLI:** +```bash +beat schedule cancel --cancel-tasks +``` + +### ListSchedules / GetSchedule Enhancements + +- `ListSchedules` response includes `isPipeline` and `stepCount` indicators +- `GetSchedule` response includes full `pipelineSteps` when present + +--- + +## Breaking Changes + +### Dependency Failure Cascade + +**Before (v0.3.0-v0.5.0):** When an upstream task failed or was cancelled, dependent tasks were unblocked and could execute. Tasks were expected to manually check dependency resolution states. + +**After (v0.6.0+):** When an upstream task fails, is cancelled, or times out, all dependent tasks are automatically cancelled via cascade cancellation. Dependents never execute. + +``` +# v0.6.0 behavior: +Task A fails → Task B (depends on A) is automatically cancelled + → Task C (depends on B) is automatically cancelled +``` + +This change was required for scheduled pipelines to fail-safe in unattended execution. The old behavior risked executing deployment steps after failed build steps. + +--- + +## Bug Fixes + +- **Dependency Failure Cascade**: Failed/cancelled upstream tasks now cascade cancellation to dependents instead of incorrectly unblocking them +- **Queue Handler Race Condition**: Fast-path `dependencyState` check prevents blocked tasks from being enqueued before dependency rows are written to DB +- **Schedule Repo Validation**: Added Zod validation for `pipeline_task_ids` at repository boundary +- **MCP Adapter**: Use `null` instead of `undefined` for `nextRunAt` fallback in `handleSchedulePipeline` +- **Timing Validation**: Deduplicated timing validation logic in `createSchedule` + +--- + +## Database + +- **Migration 8**: Adds `pipeline_steps` column to `schedules` table and `pipeline_task_ids` column to `schedule_executions` table + +--- + +## Installation + +```bash +npm install -g backbeat@0.6.0 +``` + +Or use npx: +```json +{ + "mcpServers": { + "backbeat": { + "command": "npx", + "args": ["-y", "backbeat", "mcp", "start"] + } + } +} +``` + +--- + +## Links + +- NPM Package: https://www.npmjs.com/package/backbeat +- Documentation: https://github.com/dean0x/backbeat/blob/main/docs/FEATURES.md +- Issues: https://github.com/dean0x/backbeat/issues diff --git a/src/adapters/mcp-adapter.ts b/src/adapters/mcp-adapter.ts index e0e2aac..79a6ea7 100644 --- a/src/adapters/mcp-adapter.ts +++ b/src/adapters/mcp-adapter.ts @@ -21,6 +21,7 @@ import { Priority, ResumeTaskRequest, ScheduleCreateRequest, + ScheduledPipelineCreateRequest, ScheduleId, ScheduleStatus, ScheduleType, @@ -108,6 +109,11 @@ const ListSchedulesSchema = z.object({ const CancelScheduleSchema = z.object({ scheduleId: z.string().describe('Schedule ID to cancel'), reason: z.string().optional().describe('Reason for cancellation'), + cancelTasks: z + .boolean() + .optional() + .default(false) + .describe('Also cancel in-flight pipeline tasks from the current execution'), }); const GetScheduleSchema = z.object({ @@ -151,6 +157,38 @@ const CreatePipelineSchema = z.object({ .describe('Default agent for all steps (individual steps can override)'), }); +const SchedulePipelineSchema = z.object({ + steps: z + .array( + z.object({ + prompt: z.string().min(1).max(4000).describe('Task prompt for this step'), + priority: z.enum(['P0', 'P1', 'P2']).optional().describe('Priority override for this step'), + workingDirectory: z.string().optional().describe('Working directory override (absolute path)'), + agent: z.enum(AGENT_PROVIDERS_TUPLE).optional().describe('Agent override for this step'), + }), + ) + .min(2, 'Pipeline requires at least 2 steps') + .max(20, 'Pipeline cannot exceed 20 steps') + .describe('Ordered pipeline steps (executed sequentially on each trigger)'), + scheduleType: z.enum(['cron', 'one_time']).describe('Schedule type'), + cronExpression: z.string().optional().describe('Cron expression (5-field) for recurring pipelines'), + scheduledAt: z.string().optional().describe('ISO 8601 datetime for one-time pipelines'), + timezone: z.string().optional().default('UTC').describe('IANA timezone'), + missedRunPolicy: z.enum(['skip', 'catchup', 'fail']).optional().default('skip'), + priority: z.enum(['P0', 'P1', 'P2']).optional().describe('Default priority for all steps'), + workingDirectory: z.string().optional().describe('Default working directory for all steps'), + maxRuns: z.number().min(1).optional().describe('Maximum number of pipeline runs for cron schedules'), + expiresAt: z.string().optional().describe('ISO 8601 datetime when schedule expires'), + afterSchedule: z + .string() + .optional() + .describe("Schedule ID to chain after (step 0 depends on this schedule's latest task)"), + agent: z + .enum(AGENT_PROVIDERS_TUPLE) + .optional() + .describe('Default agent for all steps (individual steps can override)'), +}); + const ConfigureAgentSchema = z.object({ agent: z.enum(AGENT_PROVIDERS_TUPLE).describe('Agent provider to configure'), action: z @@ -246,6 +284,8 @@ export class MCPAdapter { return await this.handleResumeSchedule(args); case 'CreatePipeline': return await this.handleCreatePipeline(args); + case 'SchedulePipeline': + return await this.handleSchedulePipeline(args); case 'ListAgents': return this.handleListAgents(); case 'ConfigureAgent': @@ -536,7 +576,8 @@ export class MCPAdapter { }, { name: 'CancelSchedule', - description: 'Cancel an active schedule', + description: + 'Cancel an active schedule. Optionally cancel in-flight pipeline tasks from the current execution.', inputSchema: { type: 'object', properties: { @@ -546,6 +587,11 @@ export class MCPAdapter { reason: { type: 'string', }, + cancelTasks: { + type: 'boolean', + description: 'Also cancel in-flight tasks from the current pipeline execution (default: false)', + default: false, + }, }, required: ['scheduleId'], }, @@ -633,6 +679,97 @@ export class MCPAdapter { required: ['steps'], }, }, + // Scheduled pipeline (v0.6.0) + { + name: 'SchedulePipeline', + description: + 'Schedule a recurring or one-time pipeline. Each trigger creates N tasks with linear dependencies (e.g., "every day at 9am: lint → test → deploy").', + inputSchema: { + type: 'object', + properties: { + steps: { + type: 'array', + description: 'Ordered pipeline steps (executed sequentially on each trigger)', + items: { + type: 'object', + properties: { + prompt: { + type: 'string', + description: 'Task prompt for this step', + minLength: 1, + maxLength: 4000, + }, + priority: { + type: 'string', + enum: ['P0', 'P1', 'P2'], + description: 'Priority override for this step', + }, + workingDirectory: { + type: 'string', + description: 'Working directory override (absolute path)', + }, + agent: { + type: 'string', + enum: [...AGENT_PROVIDERS], + description: 'Agent override for this step', + }, + }, + required: ['prompt'], + }, + minItems: 2, + maxItems: 20, + }, + scheduleType: { + type: 'string', + enum: ['cron', 'one_time'], + description: 'cron for recurring, one_time for single execution', + }, + cronExpression: { + type: 'string', + description: 'Cron expression (5-field: minute hour day month weekday)', + }, + scheduledAt: { + type: 'string', + description: 'ISO 8601 datetime for one-time pipelines', + }, + timezone: { + type: 'string', + description: 'IANA timezone (default: UTC)', + }, + missedRunPolicy: { + type: 'string', + enum: ['skip', 'catchup', 'fail'], + }, + priority: { + type: 'string', + enum: ['P0', 'P1', 'P2'], + description: 'Default priority for all steps', + }, + workingDirectory: { + type: 'string', + description: 'Default working directory for all steps', + }, + maxRuns: { + type: 'number', + description: 'Maximum runs for cron pipelines', + }, + expiresAt: { + type: 'string', + description: 'ISO 8601 expiration datetime', + }, + afterSchedule: { + type: 'string', + description: "Schedule ID to chain after (step 0 depends on this schedule's latest task)", + }, + agent: { + type: 'string', + enum: [...AGENT_PROVIDERS], + description: 'Default agent for all steps (individual steps can override)', + }, + }, + required: ['steps', 'scheduleType'], + }, + }, // Agent tools (v0.5.0 Multi-Agent Support) { name: 'ListAgents', @@ -1107,6 +1244,8 @@ export class MCPAdapter { nextRunAt: s.nextRunAt ? new Date(s.nextRunAt).toISOString() : null, runCount: s.runCount, maxRuns: s.maxRuns, + isPipeline: !!(s.pipelineSteps && s.pipelineSteps.length > 0), + stepCount: s.pipelineSteps?.length ?? 0, })); return { @@ -1176,6 +1315,18 @@ export class MCPAdapter { priority: schedule.taskTemplate.priority, workingDirectory: schedule.taskTemplate.workingDirectory, }, + ...(schedule.pipelineSteps && schedule.pipelineSteps.length > 0 + ? { + isPipeline: true, + pipelineSteps: schedule.pipelineSteps.map((s, i) => ({ + index: i, + prompt: s.prompt.substring(0, 100) + (s.prompt.length > 100 ? '...' : ''), + priority: s.priority, + workingDirectory: s.workingDirectory, + agent: s.agent, + })), + } + : {}), }, }; @@ -1218,9 +1369,9 @@ export class MCPAdapter { }; } - const { scheduleId, reason } = parseResult.data; + const { scheduleId, reason, cancelTasks } = parseResult.data; - const result = await this.scheduleService.cancelSchedule(ScheduleId(scheduleId), reason); + const result = await this.scheduleService.cancelSchedule(ScheduleId(scheduleId), reason, cancelTasks); return match(result, { ok: () => ({ @@ -1232,6 +1383,7 @@ export class MCPAdapter { success: true, message: `Schedule ${scheduleId} cancelled`, reason, + cancelTasksRequested: cancelTasks, }, null, 2, @@ -1413,6 +1565,71 @@ export class MCPAdapter { }); } + /** + * Handle SchedulePipeline tool call + * Creates a scheduled pipeline that triggers N tasks with linear dependencies on each run + */ + private async handleSchedulePipeline(args: unknown): Promise { + const parseResult = SchedulePipelineSchema.safeParse(args); + if (!parseResult.success) { + return { + content: [{ type: 'text', text: `Validation error: ${parseResult.error.message}` }], + isError: true, + }; + } + + const data = parseResult.data; + + const request: ScheduledPipelineCreateRequest = { + steps: data.steps.map((s) => ({ + prompt: s.prompt, + priority: s.priority as Priority | undefined, + workingDirectory: s.workingDirectory, + agent: s.agent as AgentProvider | undefined, + })), + scheduleType: data.scheduleType === 'cron' ? ScheduleType.CRON : ScheduleType.ONE_TIME, + cronExpression: data.cronExpression, + scheduledAt: data.scheduledAt, + timezone: data.timezone, + missedRunPolicy: toMissedRunPolicy(data.missedRunPolicy), + priority: data.priority as Priority | undefined, + workingDirectory: data.workingDirectory, + maxRuns: data.maxRuns, + expiresAt: data.expiresAt, + afterScheduleId: data.afterSchedule ? ScheduleId(data.afterSchedule) : undefined, + agent: data.agent as AgentProvider | undefined, + }; + + const result = await this.scheduleService.createScheduledPipeline(request); + + return match(result, { + ok: (schedule) => ({ + content: [ + { + type: 'text', + text: JSON.stringify( + { + success: true, + scheduleId: schedule.id, + stepCount: schedule.pipelineSteps?.length ?? 0, + scheduleType: schedule.scheduleType, + nextRunAt: schedule.nextRunAt ? new Date(schedule.nextRunAt).toISOString() : null, + status: schedule.status, + timezone: schedule.timezone, + }, + null, + 2, + ), + }, + ], + }), + err: (error) => ({ + content: [{ type: 'text', text: JSON.stringify({ success: false, error: error.message }, null, 2) }], + isError: true, + }), + }); + } + // ============================================================================ // AGENT HANDLERS (v0.5.0 Multi-Agent Support) // ============================================================================ diff --git a/src/cli/commands/schedule.ts b/src/cli/commands/schedule.ts index 1b077ba..281e036 100644 --- a/src/cli/commands/schedule.ts +++ b/src/cli/commands/schedule.ts @@ -1,6 +1,7 @@ import { AGENT_PROVIDERS, type AgentProvider, isAgentProvider } from '../../core/agents.js'; import { ScheduleId } from '../../core/domain.js'; import type { ScheduleService } from '../../core/interfaces.js'; +import { toMissedRunPolicy } from '../../services/schedule-manager.js'; import { validatePath } from '../../utils/validation.js'; import { withServices } from '../services.js'; import * as ui from '../ui.js'; @@ -56,6 +57,8 @@ async function scheduleCreate(service: ScheduleService, scheduleArgs: string[]) let expiresAt: string | undefined; let afterScheduleId: string | undefined; let agent: AgentProvider | undefined; + let isPipeline = false; + const pipelineSteps: string[] = []; for (let i = 0; i < scheduleArgs.length; i++) { const arg = scheduleArgs[i]; @@ -123,6 +126,11 @@ async function scheduleCreate(service: ScheduleService, scheduleArgs: string[]) } agent = next; i++; + } else if (arg === '--pipeline') { + isPipeline = true; + } else if (arg === '--step' && next) { + pipelineSteps.push(next); + i++; } else if (arg.startsWith('-')) { ui.error(`Unknown flag: ${arg}`); process.exit(1); @@ -131,12 +139,6 @@ async function scheduleCreate(service: ScheduleService, scheduleArgs: string[]) } } - const prompt = promptWords.join(' '); - if (!prompt) { - ui.error('Usage: beat schedule create --cron "..." | --at "..." [options]'); - process.exit(1); - } - // Infer type from --cron / --at flags if (cronExpression && scheduledAt) { ui.error('Cannot specify both --cron and --at'); @@ -153,7 +155,65 @@ async function scheduleCreate(service: ScheduleService, scheduleArgs: string[]) process.exit(1); } - const { ScheduleType, MissedRunPolicy, Priority } = await import('../../core/domain.js'); + const { ScheduleType, Priority } = await import('../../core/domain.js'); + + // Pipeline mode: --pipeline with --step flags + if (isPipeline) { + if (promptWords.length > 0) { + ui.info(`Ignoring positional prompt text in --pipeline mode: "${promptWords.join(' ')}". Use --step flags only.`); + } + if (pipelineSteps.length < 2) { + ui.error('Pipeline requires at least 2 --step flags'); + process.exit(1); + } + + const result = await service.createScheduledPipeline({ + steps: pipelineSteps.map((prompt) => ({ prompt })), + scheduleType: scheduleType === 'cron' ? ScheduleType.CRON : ScheduleType.ONE_TIME, + cronExpression, + scheduledAt, + timezone, + missedRunPolicy: missedRunPolicy ? toMissedRunPolicy(missedRunPolicy) : undefined, + priority: priority ? Priority[priority] : undefined, + workingDirectory, + maxRuns, + expiresAt, + afterScheduleId: afterScheduleId ? ScheduleId(afterScheduleId) : undefined, + agent, + }); + + if (result.ok) { + ui.success(`Scheduled pipeline created: ${result.value.id}`); + const details = [ + `Type: ${result.value.scheduleType}`, + `Steps: ${result.value.pipelineSteps?.length ?? 0}`, + `Status: ${result.value.status}`, + ]; + if (result.value.nextRunAt) details.push(`Next run: ${new Date(result.value.nextRunAt).toISOString()}`); + if (result.value.cronExpression) details.push(`Cron: ${result.value.cronExpression}`); + if (agent) details.push(`Agent: ${agent}`); + ui.info(details.join(' | ')); + process.exit(0); + } else { + ui.error(`Failed to create scheduled pipeline: ${result.error.message}`); + process.exit(1); + } + return; + } + + // Guard: --step without --pipeline is a user error + if (pipelineSteps.length > 0) { + ui.error('--step requires --pipeline. Did you mean: beat schedule create --pipeline --step "..." --step "..."'); + process.exit(1); + } + + // Single-task mode + const prompt = promptWords.join(' '); + if (!prompt) { + ui.error('Usage: beat schedule create --cron "..." | --at "..." [options]'); + ui.info(' Pipeline: beat schedule create --pipeline --step "lint" --step "test" --cron "0 9 * * *"'); + process.exit(1); + } const result = await service.createSchedule({ prompt, @@ -161,14 +221,7 @@ async function scheduleCreate(service: ScheduleService, scheduleArgs: string[]) cronExpression, scheduledAt, timezone, - missedRunPolicy: - missedRunPolicy === 'catchup' - ? MissedRunPolicy.CATCHUP - : missedRunPolicy === 'fail' - ? MissedRunPolicy.FAIL - : missedRunPolicy - ? MissedRunPolicy.SKIP - : undefined, + missedRunPolicy: missedRunPolicy ? toMissedRunPolicy(missedRunPolicy) : undefined, priority: priority ? Priority[priority] : undefined, workingDirectory, maxRuns, @@ -273,6 +326,15 @@ async function scheduleGet(service: ScheduleService, scheduleArgs: string[]) { ); if (schedule.taskTemplate.agent) lines.push(`Agent: ${schedule.taskTemplate.agent}`); + if (schedule.pipelineSteps && schedule.pipelineSteps.length > 0) { + lines.push(`Pipeline: ${schedule.pipelineSteps.length} steps`); + for (let i = 0; i < schedule.pipelineSteps.length; i++) { + const step = schedule.pipelineSteps[i]; + const stepInfo = ` Step ${i + 1}: ${step.prompt.substring(0, 60)}${step.prompt.length > 60 ? '...' : ''}`; + lines.push(stepInfo); + } + } + ui.note(lines.join('\n'), 'Schedule Details'); if (history && history.length > 0) { @@ -292,16 +354,28 @@ async function scheduleGet(service: ScheduleService, scheduleArgs: string[]) { } async function scheduleCancel(service: ScheduleService, scheduleArgs: string[]) { - const scheduleId = scheduleArgs[0]; + let cancelTasks = false; + const filteredArgs: string[] = []; + + for (const arg of scheduleArgs) { + if (arg === '--cancel-tasks') { + cancelTasks = true; + } else { + filteredArgs.push(arg); + } + } + + const scheduleId = filteredArgs[0]; if (!scheduleId) { - ui.error('Usage: beat schedule cancel [reason]'); + ui.error('Usage: beat schedule cancel [--cancel-tasks] [reason]'); process.exit(1); } - const reason = scheduleArgs.slice(1).join(' ') || undefined; + const reason = filteredArgs.slice(1).join(' ') || undefined; - const result = await service.cancelSchedule(ScheduleId(scheduleId), reason); + const result = await service.cancelSchedule(ScheduleId(scheduleId), reason, cancelTasks); if (result.ok) { ui.success(`Schedule ${scheduleId} cancelled`); + if (cancelTasks) ui.info('In-flight tasks also cancelled'); if (reason) ui.info(`Reason: ${reason}`); } else { ui.error(`Failed to cancel schedule: ${result.error.message}`); diff --git a/src/core/domain.ts b/src/core/domain.ts index b2c5922..3b927a8 100644 --- a/src/core/domain.ts +++ b/src/core/domain.ts @@ -261,6 +261,7 @@ export interface Schedule { readonly nextRunAt?: number; // Computed next execution time (epoch ms) readonly expiresAt?: number; // Optional expiration time (epoch ms) readonly afterScheduleId?: ScheduleId; // Chain: new tasks depend on this schedule's latest task + readonly pipelineSteps?: readonly PipelineStepRequest[]; // Pipeline: ordered steps to create on each trigger readonly createdAt: number; readonly updatedAt: number; } @@ -279,6 +280,7 @@ export interface ScheduleRequest { readonly maxRuns?: number; // Optional limit for CRON readonly expiresAt?: number; // Optional expiration readonly afterScheduleId?: ScheduleId; // Chain: block until after-schedule's latest task completes + readonly pipelineSteps?: readonly PipelineStepRequest[]; // Pipeline: ordered steps for scheduled pipeline } /** @@ -321,6 +323,7 @@ export const createSchedule = (request: ScheduleRequest): Schedule => { nextRunAt: request.scheduleType === ScheduleType.ONE_TIME ? request.scheduledAt : undefined, expiresAt: request.expiresAt, afterScheduleId: request.afterScheduleId, + pipelineSteps: request.pipelineSteps, createdAt: now, updatedAt: now, }); @@ -383,6 +386,26 @@ export interface PipelineCreateRequest { readonly agent?: AgentProvider; // shared default for all steps } +/** + * Request type for creating scheduled pipelines via ScheduleService + * ARCHITECTURE: Flat structure for MCP/CLI consumption + * Each trigger creates fresh tasks with linear dependencies from pipelineSteps + */ +export interface ScheduledPipelineCreateRequest { + readonly steps: readonly PipelineStepRequest[]; + readonly scheduleType: ScheduleType; + readonly cronExpression?: string; + readonly scheduledAt?: string; // ISO 8601 string (parsed by service) + readonly timezone?: string; + readonly missedRunPolicy?: MissedRunPolicy; + readonly priority?: Priority; // shared default for all steps + readonly workingDirectory?: string; // shared default for all steps + readonly maxRuns?: number; + readonly expiresAt?: string; // ISO 8601 string (parsed by service) + readonly afterScheduleId?: ScheduleId; + readonly agent?: AgentProvider; // shared default for all steps +} + export interface PipelineStep { readonly index: number; readonly scheduleId: ScheduleId; diff --git a/src/core/interfaces.ts b/src/core/interfaces.ts index 6ceca06..28e9b2a 100644 --- a/src/core/interfaces.ts +++ b/src/core/interfaces.ts @@ -10,6 +10,7 @@ import { ResumeTaskRequest, Schedule, ScheduleCreateRequest, + ScheduledPipelineCreateRequest, ScheduleId, ScheduleStatus, SystemResources, @@ -248,6 +249,7 @@ export interface ScheduleExecution { readonly executedAt?: number; // Epoch ms - when execution actually started readonly status: 'pending' | 'triggered' | 'completed' | 'failed' | 'missed' | 'skipped'; readonly errorMessage?: string; // Error details if status is 'failed' or 'missed' + readonly pipelineTaskIds?: readonly TaskId[]; // All task IDs from a pipeline trigger (v0.6.0) readonly createdAt: number; } @@ -403,10 +405,11 @@ export interface ScheduleService { includeHistory?: boolean, historyLimit?: number, ): Promise>; - cancelSchedule(scheduleId: ScheduleId, reason?: string): Promise>; + cancelSchedule(scheduleId: ScheduleId, reason?: string, cancelTasks?: boolean): Promise>; pauseSchedule(scheduleId: ScheduleId): Promise>; resumeSchedule(scheduleId: ScheduleId): Promise>; createPipeline(request: PipelineCreateRequest): Promise>; + createScheduledPipeline(request: ScheduledPipelineCreateRequest): Promise>; } /** diff --git a/src/implementations/database.ts b/src/implementations/database.ts index 8f549f6..05b138e 100644 --- a/src/implementations/database.ts +++ b/src/implementations/database.ts @@ -520,6 +520,16 @@ export class Database { db.exec(`ALTER TABLE tasks ADD COLUMN agent TEXT DEFAULT 'claude'`); }, }, + { + version: 8, + description: 'Add pipeline_steps to schedules and pipeline_task_ids to executions (v0.6.0)', + up: (db) => { + // Nullable JSON array of pipeline step definitions + db.exec(`ALTER TABLE schedules ADD COLUMN pipeline_steps TEXT`); + // Nullable JSON array of TaskIds created by a pipeline trigger + db.exec(`ALTER TABLE schedule_executions ADD COLUMN pipeline_task_ids TEXT`); + }, + }, ]; } diff --git a/src/implementations/schedule-repository.ts b/src/implementations/schedule-repository.ts index 185358b..1ffc373 100644 --- a/src/implementations/schedule-repository.ts +++ b/src/implementations/schedule-repository.ts @@ -10,6 +10,7 @@ import { z } from 'zod'; import { AGENT_PROVIDERS_TUPLE } from '../core/agents.js'; import { MissedRunPolicy, + type PipelineStepRequest, Schedule, ScheduleId, ScheduleStatus, @@ -41,6 +42,7 @@ const ScheduleRowSchema = z.object({ next_run_at: z.number().nullable(), expires_at: z.number().nullable(), after_schedule_id: z.string().nullable(), + pipeline_steps: z.string().nullable(), // JSON serialized PipelineStepRequest[] created_at: z.number(), updated_at: z.number(), }); @@ -56,6 +58,7 @@ const ScheduleExecutionRowSchema = z.object({ executed_at: z.number().nullable(), status: z.enum(['pending', 'triggered', 'completed', 'failed', 'missed', 'skipped']), error_message: z.string().nullable(), + pipeline_task_ids: z.string().nullable(), // JSON serialized TaskId[] created_at: z.number(), }); @@ -78,6 +81,28 @@ const TaskRequestSchema = z.object({ agent: z.enum(AGENT_PROVIDERS_TUPLE).optional(), }); +/** + * Zod schema for validating pipeline_task_ids JSON from database + * Pattern: Boundary validation for pipeline task ID arrays + */ +const PipelineTaskIdsSchema = z.array(z.string().min(1)).min(1); + +/** + * Zod schema for validating pipeline_steps JSON from database + * Pattern: Boundary validation for pipeline step definitions (2-20 steps) + */ +const PipelineStepsSchema = z + .array( + z.object({ + prompt: z.string().min(1), + priority: z.enum(['P0', 'P1', 'P2']).optional(), + workingDirectory: z.string().optional(), + agent: z.enum(AGENT_PROVIDERS_TUPLE).optional(), + }), + ) + .min(2) + .max(20); + /** * Database row type for schedules table * TYPE-SAFETY: Explicit typing instead of Record @@ -97,6 +122,7 @@ interface ScheduleRow { readonly next_run_at: number | null; readonly expires_at: number | null; readonly after_schedule_id: string | null; + readonly pipeline_steps: string | null; readonly created_at: number; readonly updated_at: number; } @@ -112,6 +138,7 @@ interface ScheduleExecutionRow { readonly executed_at: number | null; readonly status: string; readonly error_message: string | null; + readonly pipeline_task_ids: string | null; readonly created_at: number; } @@ -140,11 +167,11 @@ export class SQLiteScheduleRepository implements ScheduleRepository { INSERT OR REPLACE INTO schedules ( id, task_template, schedule_type, cron_expression, scheduled_at, timezone, missed_run_policy, status, max_runs, run_count, - last_run_at, next_run_at, expires_at, after_schedule_id, created_at, updated_at + last_run_at, next_run_at, expires_at, after_schedule_id, pipeline_steps, created_at, updated_at ) VALUES ( @id, @taskTemplate, @scheduleType, @cronExpression, @scheduledAt, @timezone, @missedRunPolicy, @status, @maxRuns, @runCount, - @lastRunAt, @nextRunAt, @expiresAt, @afterScheduleId, @createdAt, @updatedAt + @lastRunAt, @nextRunAt, @expiresAt, @afterScheduleId, @pipelineSteps, @createdAt, @updatedAt ) `); @@ -165,6 +192,7 @@ export class SQLiteScheduleRepository implements ScheduleRepository { next_run_at = @nextRunAt, expires_at = @expiresAt, after_schedule_id = @afterScheduleId, + pipeline_steps = @pipelineSteps, updated_at = @updatedAt WHERE id = @id `); @@ -199,8 +227,8 @@ export class SQLiteScheduleRepository implements ScheduleRepository { this.recordExecutionStmt = this.db.prepare(` INSERT INTO schedule_executions ( - schedule_id, task_id, scheduled_for, executed_at, status, error_message, created_at - ) VALUES (?, ?, ?, ?, ?, ?, ?) + schedule_id, task_id, scheduled_for, executed_at, status, error_message, pipeline_task_ids, created_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) `); this.getExecutionByIdStmt = this.db.prepare(` @@ -239,6 +267,7 @@ export class SQLiteScheduleRepository implements ScheduleRepository { nextRunAt: schedule.nextRunAt ?? null, expiresAt: schedule.expiresAt ?? null, afterScheduleId: schedule.afterScheduleId ?? null, + pipelineSteps: schedule.pipelineSteps ? JSON.stringify(schedule.pipelineSteps) : null, createdAt: schedule.createdAt, updatedAt: schedule.updatedAt, }; @@ -294,6 +323,7 @@ export class SQLiteScheduleRepository implements ScheduleRepository { nextRunAt: updatedSchedule.nextRunAt ?? null, expiresAt: updatedSchedule.expiresAt ?? null, afterScheduleId: updatedSchedule.afterScheduleId ?? null, + pipelineSteps: updatedSchedule.pipelineSteps ? JSON.stringify(updatedSchedule.pipelineSteps) : null, updatedAt: updatedSchedule.updatedAt, }); }, @@ -422,6 +452,7 @@ export class SQLiteScheduleRepository implements ScheduleRepository { execution.executedAt ?? null, execution.status, execution.errorMessage ?? null, + execution.pipelineTaskIds ? JSON.stringify(execution.pipelineTaskIds) : null, execution.createdAt, ); @@ -469,6 +500,17 @@ export class SQLiteScheduleRepository implements ScheduleRepository { throw new Error(`Invalid task_template JSON for schedule ${data.id}: ${e}`); } + // Parse pipeline_steps JSON if present + let pipelineSteps: readonly PipelineStepRequest[] | undefined; + if (data.pipeline_steps) { + try { + const parsed = JSON.parse(data.pipeline_steps); + pipelineSteps = PipelineStepsSchema.parse(parsed) as readonly PipelineStepRequest[]; + } catch (e) { + throw new Error(`Invalid pipeline_steps JSON for schedule ${data.id}: ${e}`); + } + } + return { id: ScheduleId(data.id), taskTemplate, @@ -484,6 +526,7 @@ export class SQLiteScheduleRepository implements ScheduleRepository { nextRunAt: data.next_run_at ?? undefined, expiresAt: data.expires_at ?? undefined, afterScheduleId: data.after_schedule_id ? ScheduleId(data.after_schedule_id) : undefined, + pipelineSteps, createdAt: data.created_at, updatedAt: data.updated_at, }; @@ -496,6 +539,19 @@ export class SQLiteScheduleRepository implements ScheduleRepository { private rowToExecution(row: ScheduleExecutionRow): ScheduleExecution { const data = ScheduleExecutionRowSchema.parse(row); + // Parse pipeline_task_ids JSON if present + let pipelineTaskIds: readonly TaskId[] | undefined; + if (data.pipeline_task_ids) { + try { + const parsed = JSON.parse(data.pipeline_task_ids); + const validated = PipelineTaskIdsSchema.parse(parsed); + pipelineTaskIds = validated.map((id) => TaskId(id)); + } catch { + // Non-fatal: log but don't fail + pipelineTaskIds = undefined; + } + } + return { id: data.id, scheduleId: ScheduleId(data.schedule_id), @@ -504,6 +560,7 @@ export class SQLiteScheduleRepository implements ScheduleRepository { executedAt: data.executed_at ?? undefined, status: data.status as ScheduleExecution['status'], errorMessage: data.error_message ?? undefined, + pipelineTaskIds, createdAt: data.created_at, }; } diff --git a/src/services/handlers/dependency-handler.ts b/src/services/handlers/dependency-handler.ts index 514fc00..ed4fec2 100644 --- a/src/services/handlers/dependency-handler.ts +++ b/src/services/handlers/dependency-handler.ts @@ -579,7 +579,32 @@ export class DependencyHandler extends BaseEventHandler { } if (!isBlockedResult.value) { - // Task is unblocked - fetch task and emit event + // Task is no longer blocked — but check if any resolved dependency failed/cancelled + // If so, cascade cancellation instead of unblocking + const depsResult = await this.dependencyRepo.getDependencies(dep.taskId); + if (!depsResult.ok) { + this.logger.warn('getDependencies failed during cascade check, skipping unblock', { + taskId: dep.taskId, + error: depsResult.error.message, + }); + continue; + } + + const failedDep = depsResult.value.find((d) => d.resolution === 'failed' || d.resolution === 'cancelled'); + if (failedDep) { + this.logger.info('Dependency resolved as failed/cancelled — cascading cancellation', { + taskId: dep.taskId, + failedDependency: failedDep.dependsOnTaskId, + failedResolution: failedDep.resolution, + }); + await this.eventBus.emit('TaskCancellationRequested', { + taskId: dep.taskId, + reason: `Dependency ${failedDep.dependsOnTaskId} ${failedDep.resolution}`, + }); + continue; + } + + // Task is unblocked with all deps completed - fetch task and emit event this.logger.info('Task unblocked', { taskId: dep.taskId }); // ARCHITECTURE: Fetch task to include in event, preventing layer violation diff --git a/src/services/handlers/queue-handler.ts b/src/services/handlers/queue-handler.ts index 27a2cb6..b3186ae 100644 --- a/src/services/handlers/queue-handler.ts +++ b/src/services/handlers/queue-handler.ts @@ -62,6 +62,16 @@ export class QueueHandler extends BaseEventHandler { */ private async handleTaskPersisted(event: TaskPersistedEvent): Promise { await this.handleEvent(event, async (event) => { + // Fast-path: if task was created with dependencies, skip DB check entirely + // This eliminates the race condition where DependencyHandler hasn't written + // dependency rows yet but isBlocked() returns false + if (event.task.dependencyState === 'blocked') { + this.logger.info('Task blocked by dependencies (fast-path)', { + taskId: event.task.id, + }); + return ok(undefined); + } + // Check if task is blocked by dependencies const isBlockedResult = await this.dependencyRepo.isBlocked(event.task.id); if (!isBlockedResult.ok) { diff --git a/src/services/handlers/schedule-handler.ts b/src/services/handlers/schedule-handler.ts index 4a3c45a..390f012 100644 --- a/src/services/handlers/schedule-handler.ts +++ b/src/services/handlers/schedule-handler.ts @@ -5,8 +5,16 @@ * Rationale: Manages schedule creation, triggering, pausing, and execution tracking */ -import type { Schedule } from '../../core/domain.js'; -import { createTask, isTerminalState, ScheduleStatus, ScheduleType, updateSchedule } from '../../core/domain.js'; +import type { Schedule, ScheduleId, Task } from '../../core/domain.js'; +import { + createTask, + isTerminalState, + ScheduleStatus, + ScheduleType, + TaskId, + TaskStatus, + updateSchedule, +} from '../../core/domain.js'; import { BackbeatError, ErrorCode } from '../../core/errors.js'; import { EventBus } from '../../core/events/event-bus.js'; import { @@ -215,7 +223,7 @@ export class ScheduleHandler extends BaseEventHandler { } /** - * Handle schedule trigger - create task, record execution, update schedule + * Handle schedule trigger - dispatch to single-task or pipeline path */ private async handleScheduleTriggered(event: ScheduleTriggeredEvent): Promise { await this.handleEvent(event, async (e) => { @@ -247,158 +255,354 @@ export class ScheduleHandler extends BaseEventHandler { return ok(undefined); } - // afterScheduleId enforcement: inject dependency on chained schedule's latest task - let taskTemplate = schedule.taskTemplate; - - if (schedule.afterScheduleId) { - const historyResult = await this.scheduleRepo.getExecutionHistory(schedule.afterScheduleId, 1); - - if (historyResult.ok && historyResult.value.length > 0) { - const latestExecution = historyResult.value[0]; - - if (latestExecution.taskId) { - const depTaskResult = await this.taskRepo.findById(latestExecution.taskId); - - if (depTaskResult.ok && depTaskResult.value && !isTerminalState(depTaskResult.value.status)) { - taskTemplate = { - ...schedule.taskTemplate, - dependsOn: [...(schedule.taskTemplate.dependsOn ?? []), latestExecution.taskId], - }; - this.logger.info('Injected afterSchedule dependency', { - scheduleId, - afterScheduleId: schedule.afterScheduleId, - dependsOnTaskId: latestExecution.taskId, - }); - } else { - this.logger.info('afterSchedule dependency already resolved, skipping', { - scheduleId, - afterScheduleId: schedule.afterScheduleId, - taskId: latestExecution.taskId, - taskStatus: depTaskResult.ok ? (depTaskResult.value?.status ?? 'not-found') : 'lookup-failed', - }); - } - } - // No taskId on execution (failed before creating task) → skip - } - // No execution history → nothing to chain after + // Dispatch to appropriate trigger path + if (schedule.pipelineSteps && schedule.pipelineSteps.length > 0) { + return this.handlePipelineTrigger(schedule, schedule.pipelineSteps, triggeredAt); + } + return this.handleSingleTaskTrigger(schedule, triggeredAt); + }); + } + + /** + * Handle single-task trigger - existing logic extracted verbatim + */ + private async handleSingleTaskTrigger(schedule: Schedule, triggeredAt: number): Promise> { + const scheduleId = schedule.id; + + // afterScheduleId enforcement: inject dependency on chained schedule's latest task + const afterTaskId = await this.resolveAfterScheduleTaskId(schedule); + const dependsOn = afterTaskId + ? [...(schedule.taskTemplate.dependsOn ?? []), afterTaskId] + : schedule.taskTemplate.dependsOn; + + // Create task from template + const task = createTask({ ...schedule.taskTemplate, dependsOn }); + const taskSaveResult = await this.taskRepo.save(task); + if (!taskSaveResult.ok) { + await this.recordFailedExecution( + scheduleId, + schedule.nextRunAt ?? triggeredAt, + triggeredAt, + `Failed to create task: ${taskSaveResult.error.message}`, + ); + return taskSaveResult; + } + + // Record successful execution + await this.recordTriggeredExecution(scheduleId, task.id, schedule.nextRunAt ?? triggeredAt, triggeredAt); + + // Update schedule state + const updateResult = await this.updateScheduleAfterTrigger(schedule, triggeredAt); + if (!updateResult.ok) return updateResult; + + // Emit TaskDelegated event for the created task + await this.eventBus.emit('TaskDelegated', { task }); + + // Emit ScheduleExecuted with the task ID (for concurrency tracking) + await this.eventBus.emit('ScheduleExecuted', { + scheduleId, + taskId: task.id, + executedAt: triggeredAt, + }); + + this.logger.info('Schedule triggered successfully', { + scheduleId, + taskId: task.id, + runCount: schedule.runCount + 1, + }); + + return ok(undefined); + } + + /** + * Handle pipeline trigger - create N tasks with linear dependencies + */ + private async handlePipelineTrigger( + schedule: Schedule, + steps: NonNullable, + triggeredAt: number, + ): Promise> { + const scheduleId = schedule.id; + const defaults = schedule.taskTemplate; + + this.logger.info('Processing pipeline trigger', { + scheduleId, + stepCount: steps.length, + }); + + // afterScheduleId handling: resolve predecessor dependency for step 0 + const afterTaskId = await this.resolveAfterScheduleTaskId(schedule); + const step0DependsOn: TaskId[] | undefined = afterTaskId ? [afterTaskId] : undefined; + + // Create tasks for each step with linear dependencies + // TODO: Wrap in a proper async-safe transaction once better-sqlite3 async + // transaction support is available. Current db.transaction() is synchronous + // and does not support awaited operations inside the callback. + const savedTasks: Task[] = []; + for (let i = 0; i < steps.length; i++) { + const step = steps[i]; + const dependsOn: TaskId[] = []; + + // Step 0 gets afterScheduleId dependency if present + if (i === 0 && step0DependsOn) { + dependsOn.push(...step0DependsOn); + } + + // Step i depends on step i-1 + if (i > 0) { + dependsOn.push(savedTasks[i - 1].id); } - // Create task from template - const task = createTask(taskTemplate); - const taskSaveResult = await this.taskRepo.save(task); - if (!taskSaveResult.ok) { - // Record failed execution (audit trail - log on failure but don't block) - const failedExecResult = await this.scheduleRepo.recordExecution({ + const task = createTask({ + prompt: step.prompt, + priority: step.priority ?? defaults.priority, + workingDirectory: step.workingDirectory ?? defaults.workingDirectory, + agent: step.agent ?? defaults.agent, + dependsOn: dependsOn.length > 0 ? dependsOn : undefined, + }); + + const saveResult = await this.taskRepo.save(task); + if (!saveResult.ok) { + // ARCHITECTURE EXCEPTION: Direct taskRepo.update() instead of emitting + // TaskCancellationRequested events. At this point tasks are just DB rows — + // no TaskDelegated events have been emitted and no workers have been spawned, + // so there is nothing to coordinate via the event bus. Direct cleanup is correct. + this.logger.error('Pipeline task save failed, cleaning up', saveResult.error, { scheduleId, - scheduledFor: schedule.nextRunAt ?? triggeredAt, - executedAt: triggeredAt, - status: 'failed', - errorMessage: `Failed to create task: ${taskSaveResult.error.message}`, - createdAt: Date.now(), + failedStep: i, + savedSteps: savedTasks.length, }); - if (!failedExecResult.ok) { - this.logger.error('Failed to record failed execution', failedExecResult.error, { scheduleId }); + + for (const savedTask of savedTasks) { + const cancelResult = await this.taskRepo.update(savedTask.id, { status: TaskStatus.CANCELLED }); + if (!cancelResult.ok) { + this.logger.error('Failed to cancel pipeline task during cleanup', cancelResult.error, { + taskId: savedTask.id, + }); + } } - return taskSaveResult; - } - // Record successful execution (audit trail - log on failure but don't block) - const execResult = await this.scheduleRepo.recordExecution({ - scheduleId, - taskId: task.id, - scheduledFor: schedule.nextRunAt ?? triggeredAt, - executedAt: triggeredAt, - status: 'triggered', - createdAt: Date.now(), - }); - if (!execResult.ok) { - this.logger.error('Failed to record triggered execution', execResult.error, { scheduleId }); + await this.recordFailedExecution( + scheduleId, + schedule.nextRunAt ?? triggeredAt, + triggeredAt, + `Pipeline failed at step ${i + 1}: ${saveResult.error.message}`, + ); + return saveResult; } - // Calculate update fields for schedule - const newRunCount = schedule.runCount + 1; - - // Determine new status and nextRunAt - let newStatus: ScheduleStatus | undefined; - let newNextRunAt: number | undefined; + savedTasks.push(task); + } - // Calculate next run time for CRON schedules - if (schedule.scheduleType === ScheduleType.CRON && schedule.cronExpression) { - const nextResult = getNextRunTime(schedule.cronExpression, schedule.timezone); - if (nextResult.ok) { - newNextRunAt = nextResult.value; - } else { - this.logger.error('Failed to calculate next run, pausing schedule', nextResult.error, { + const allTaskIds = savedTasks.map((t) => t.id); + const firstTaskId = savedTasks[0].id; + const lastTaskId = savedTasks[savedTasks.length - 1].id; + + // Record execution with lastTaskId — chained schedules (afterScheduleId) resolve + // the predecessor's execution.taskId to check if it's terminal. Using lastTaskId + // ensures the chain fires when the FULL pipeline completes, not just step 1. + await this.recordTriggeredExecution( + scheduleId, + lastTaskId, + schedule.nextRunAt ?? triggeredAt, + triggeredAt, + allTaskIds, + ); + + // Update schedule state + const updateResult = await this.updateScheduleAfterTrigger(schedule, triggeredAt); + if (!updateResult.ok) return updateResult; + + // Emit TaskDelegated for each task + // Step 0 failure is fatal — it's the only task that becomes runnable; all later + // steps block on it. If it's never delegated, the entire pipeline is orphaned. + // Steps 1–N failures are best-effort — they're already saved with dependencies + // and will be enqueued when their predecessor completes. + for (let ti = 0; ti < savedTasks.length; ti++) { + const task = savedTasks[ti]; + const emitResult = await this.eventBus.emit('TaskDelegated', { task }); + if (!emitResult.ok) { + if (ti === 0) { + this.logger.error('Failed to emit TaskDelegated for pipeline step 0 — aborting pipeline', emitResult.error, { + taskId: task.id, scheduleId, - cronExpression: schedule.cronExpression, }); - newStatus = ScheduleStatus.PAUSED; - // newNextRunAt remains undefined -- will be explicitly set below to clear nextRunAt + for (const savedTask of savedTasks) { + await this.taskRepo.update(savedTask.id, { status: TaskStatus.CANCELLED }); + } + return emitResult; } - } else if (schedule.scheduleType === ScheduleType.ONE_TIME) { - // ONE_TIME schedules complete after single execution - newStatus = ScheduleStatus.COMPLETED; - newNextRunAt = undefined; - } - - // Check if maxRuns reached - if (schedule.maxRuns && newRunCount >= schedule.maxRuns) { - newStatus = ScheduleStatus.COMPLETED; - newNextRunAt = undefined; - this.logger.info('Schedule reached maxRuns, marking completed', { + this.logger.error('Failed to emit TaskDelegated for pipeline task', emitResult.error, { + taskId: task.id, scheduleId, - runCount: newRunCount, - maxRuns: schedule.maxRuns, + step: ti, }); } + } - // Check expiration - if (schedule.expiresAt && Date.now() >= schedule.expiresAt) { - newStatus = ScheduleStatus.EXPIRED; - newNextRunAt = undefined; - this.logger.info('Schedule expired', { scheduleId, expiresAt: schedule.expiresAt }); - } + // Emit ScheduleExecuted with lastTaskId (tail task for concurrency tracking) + await this.eventBus.emit('ScheduleExecuted', { + scheduleId, + taskId: lastTaskId, + executedAt: triggeredAt, + }); - // Build update object immutably - // IMPORTANT: Always include nextRunAt to prevent infinite retrigger when getNextRunTime fails. - // If newNextRunAt is undefined (e.g., cron parse failure), this clears the old past nextRunAt - // so the schedule is not returned by findDue on every tick. - const updates: Partial = { - runCount: newRunCount, - lastRunAt: triggeredAt, - nextRunAt: newNextRunAt, - ...(newStatus !== undefined ? { status: newStatus } : {}), - }; + this.logger.info('Pipeline triggered successfully', { + scheduleId, + stepCount: steps.length, + firstTaskId, + lastTaskId, + runCount: schedule.runCount + 1, + }); - // Persist updates - const updateResult = await this.scheduleRepo.update(scheduleId, updates); - if (!updateResult.ok) { - this.logger.error('Failed to update schedule after trigger', updateResult.error, { + return ok(undefined); + } + + // ============================================================================ + // SHARED HELPERS (extracted from handleScheduleTriggered decomposition) + // ============================================================================ + + /** + * Resolve afterScheduleId to the TaskId of the chained schedule's latest non-terminal task. + * Returns undefined if no dependency is needed (no afterScheduleId, no active task, etc.) + * Used by both single-task and pipeline trigger paths. + */ + private async resolveAfterScheduleTaskId(schedule: Schedule): Promise { + if (!schedule.afterScheduleId) return undefined; + + const historyResult = await this.scheduleRepo.getExecutionHistory(schedule.afterScheduleId, 1); + if (!historyResult.ok || historyResult.value.length === 0) return undefined; + + const latestExecution = historyResult.value[0]; + if (!latestExecution.taskId) return undefined; + + const depTaskResult = await this.taskRepo.findById(latestExecution.taskId); + if (!depTaskResult.ok || !depTaskResult.value || isTerminalState(depTaskResult.value.status)) { + this.logger.info('afterSchedule dependency already resolved, skipping', { + scheduleId: schedule.id, + afterScheduleId: schedule.afterScheduleId, + taskId: latestExecution.taskId, + taskStatus: depTaskResult.ok ? (depTaskResult.value?.status ?? 'not-found') : 'lookup-failed', + }); + return undefined; + } + + this.logger.info('Resolved afterSchedule dependency', { + scheduleId: schedule.id, + afterScheduleId: schedule.afterScheduleId, + dependsOnTaskId: latestExecution.taskId, + }); + + return latestExecution.taskId; + } + + /** + * Record a failed execution in the audit trail + */ + private async recordFailedExecution( + scheduleId: ScheduleId, + scheduledFor: number, + triggeredAt: number, + errorMessage: string, + ): Promise { + const result = await this.scheduleRepo.recordExecution({ + scheduleId, + scheduledFor, + executedAt: triggeredAt, + status: 'failed', + errorMessage, + createdAt: Date.now(), + }); + if (!result.ok) { + this.logger.error('Failed to record failed execution', result.error, { scheduleId }); + } + } + + /** + * Record a triggered execution in the audit trail + */ + private async recordTriggeredExecution( + scheduleId: ScheduleId, + taskId: TaskId, + scheduledFor: number, + triggeredAt: number, + pipelineTaskIds?: readonly TaskId[], + ): Promise { + const result = await this.scheduleRepo.recordExecution({ + scheduleId, + taskId, + scheduledFor, + executedAt: triggeredAt, + status: 'triggered', + pipelineTaskIds, + createdAt: Date.now(), + }); + if (!result.ok) { + this.logger.error('Failed to record triggered execution', result.error, { scheduleId }); + } + } + + /** + * Update schedule state after a trigger (runCount, lastRunAt, nextRunAt, status) + */ + private async updateScheduleAfterTrigger(schedule: Schedule, triggeredAt: number): Promise> { + const scheduleId = schedule.id; + const newRunCount = schedule.runCount + 1; + + let newStatus: ScheduleStatus | undefined; + let newNextRunAt: number | undefined; + + // Calculate next run time for CRON schedules + if (schedule.scheduleType === ScheduleType.CRON && schedule.cronExpression) { + const nextResult = getNextRunTime(schedule.cronExpression, schedule.timezone); + if (nextResult.ok) { + newNextRunAt = nextResult.value; + } else { + this.logger.error('Failed to calculate next run, pausing schedule', nextResult.error, { scheduleId, + cronExpression: schedule.cronExpression, }); - return updateResult; + newStatus = ScheduleStatus.PAUSED; } + } else if (schedule.scheduleType === ScheduleType.ONE_TIME) { + newStatus = ScheduleStatus.COMPLETED; + newNextRunAt = undefined; + } - // Emit TaskDelegated event for the created task - await this.eventBus.emit('TaskDelegated', { task }); - - // Emit ScheduleExecuted event - await this.eventBus.emit('ScheduleExecuted', { + // Check if maxRuns reached + if (schedule.maxRuns && newRunCount >= schedule.maxRuns) { + newStatus = ScheduleStatus.COMPLETED; + newNextRunAt = undefined; + this.logger.info('Schedule reached maxRuns, marking completed', { scheduleId, - taskId: task.id, - executedAt: triggeredAt, + runCount: newRunCount, + maxRuns: schedule.maxRuns, }); + } + + // Check expiration + if (schedule.expiresAt && Date.now() >= schedule.expiresAt) { + newStatus = ScheduleStatus.EXPIRED; + newNextRunAt = undefined; + this.logger.info('Schedule expired', { scheduleId, expiresAt: schedule.expiresAt }); + } - this.logger.info('Schedule triggered successfully', { + const updates: Partial = { + runCount: newRunCount, + lastRunAt: triggeredAt, + nextRunAt: newNextRunAt, + ...(newStatus !== undefined ? { status: newStatus } : {}), + }; + + const updateResult = await this.scheduleRepo.update(scheduleId, updates); + if (!updateResult.ok) { + this.logger.error('Failed to update schedule after trigger', updateResult.error, { scheduleId, - taskId: task.id, - runCount: newRunCount, - nextRunAt: updates.nextRunAt, - newStatus: updates.status ?? schedule.status, }); + return updateResult; + } - return ok(undefined); - }); + return ok(undefined); } /** diff --git a/src/services/schedule-manager.ts b/src/services/schedule-manager.ts index 763e605..b638b67 100644 --- a/src/services/schedule-manager.ts +++ b/src/services/schedule-manager.ts @@ -16,6 +16,7 @@ import { Priority, Schedule, ScheduleCreateRequest, + ScheduledPipelineCreateRequest, ScheduleId, ScheduleStatus, ScheduleType, @@ -61,87 +62,9 @@ export class ScheduleManagerService implements ScheduleService { } async createSchedule(request: ScheduleCreateRequest): Promise> { - // Validate schedule type requirements - if (request.scheduleType === ScheduleType.CRON && !request.cronExpression) { - return err( - new BackbeatError(ErrorCode.INVALID_INPUT, 'cronExpression is required for cron schedules', { - scheduleType: request.scheduleType, - }), - ); - } - if (request.scheduleType === ScheduleType.ONE_TIME && !request.scheduledAt) { - return err( - new BackbeatError(ErrorCode.INVALID_INPUT, 'scheduledAt is required for one-time schedules', { - scheduleType: request.scheduleType, - }), - ); - } - - // Validate cron expression - if (request.cronExpression) { - const cronResult = validateCronExpression(request.cronExpression); - if (!cronResult.ok) { - return cronResult; - } - } - - // Validate timezone - const tz = request.timezone ?? 'UTC'; - if (!isValidTimezone(tz)) { - return err(new BackbeatError(ErrorCode.INVALID_INPUT, `Invalid timezone: ${tz}`, { timezone: tz })); - } - - // Parse scheduledAt if provided - let scheduledAtMs: number | undefined; - if (request.scheduledAt) { - scheduledAtMs = Date.parse(request.scheduledAt); - if (isNaN(scheduledAtMs)) { - return err( - new BackbeatError(ErrorCode.INVALID_INPUT, `Invalid scheduledAt datetime: ${request.scheduledAt}`, { - scheduledAt: request.scheduledAt, - }), - ); - } - if (scheduledAtMs <= Date.now()) { - return err( - new BackbeatError(ErrorCode.INVALID_INPUT, 'scheduledAt must be in the future', { - scheduledAt: request.scheduledAt, - }), - ); - } - } - - // Parse expiresAt if provided - let expiresAtMs: number | undefined; - if (request.expiresAt) { - expiresAtMs = Date.parse(request.expiresAt); - if (isNaN(expiresAtMs)) { - return err( - new BackbeatError(ErrorCode.INVALID_INPUT, `Invalid expiresAt datetime: ${request.expiresAt}`, { - expiresAt: request.expiresAt, - }), - ); - } - } - - // Calculate nextRunAt - let nextRunAt: number; - if (request.scheduleType === ScheduleType.CRON && request.cronExpression) { - const nextResult = getNextRunTime(request.cronExpression, tz); - if (!nextResult.ok) { - return nextResult; - } - nextRunAt = nextResult.value; - } else { - if (scheduledAtMs === undefined) { - return err( - new BackbeatError(ErrorCode.INVALID_INPUT, 'scheduledAt must be provided for one-time schedules', { - scheduleType: request.scheduleType, - }), - ); - } - nextRunAt = scheduledAtMs; - } + const timingResult = this.validateScheduleTiming(request); + if (!timingResult.ok) return timingResult; + const { scheduledAtMs, expiresAtMs, nextRunAt, timezone } = timingResult.value; // Validate workingDirectory let validatedWorkingDirectory: string | undefined; @@ -172,7 +95,7 @@ export class ScheduleManagerService implements ScheduleService { scheduleType: request.scheduleType, cronExpression: request.cronExpression, scheduledAt: scheduledAtMs, - timezone: tz, + timezone, missedRunPolicy: toMissedRunPolicy(request.missedRunPolicy), maxRuns: request.maxRuns, expiresAt: expiresAtMs, @@ -234,13 +157,13 @@ export class ScheduleManagerService implements ScheduleService { return ok({ schedule, history }); } - async cancelSchedule(scheduleId: ScheduleId, reason?: string): Promise> { + async cancelSchedule(scheduleId: ScheduleId, reason?: string, cancelTasks?: boolean): Promise> { const lookupResult = await this.fetchScheduleOrError(scheduleId); if (!lookupResult.ok) { return lookupResult; } - this.logger.info('Cancelling schedule', { scheduleId, reason }); + this.logger.info('Cancelling schedule', { scheduleId, reason, cancelTasks }); const emitResult = await this.eventBus.emit('ScheduleCancelled', { scheduleId, @@ -254,6 +177,32 @@ export class ScheduleManagerService implements ScheduleService { return err(emitResult.error); } + // Optionally cancel in-flight pipeline tasks from the latest execution + if (cancelTasks) { + const historyResult = await this.scheduleRepository.getExecutionHistory(scheduleId, 1); + if (historyResult.ok && historyResult.value.length > 0) { + const latestExecution = historyResult.value[0]; + const taskIds = latestExecution.pipelineTaskIds ?? (latestExecution.taskId ? [latestExecution.taskId] : []); + for (const taskId of taskIds) { + const cancelResult = await this.eventBus.emit('TaskCancellationRequested', { + taskId, + reason: `Schedule ${scheduleId} cancelled`, + }); + if (!cancelResult.ok) { + this.logger.warn('Failed to cancel pipeline task', { + taskId, + scheduleId, + error: cancelResult.error.message, + }); + } + } + this.logger.info('Cancelled in-flight pipeline tasks', { + scheduleId, + taskCount: taskIds.length, + }); + } + } + return ok(undefined); } @@ -295,6 +244,108 @@ export class ScheduleManagerService implements ScheduleService { return ok(undefined); } + async createScheduledPipeline(request: ScheduledPipelineCreateRequest): Promise> { + const { steps } = request; + + if (steps.length < 2) { + return err( + new BackbeatError(ErrorCode.INVALID_INPUT, 'Pipeline requires at least 2 steps', { + stepCount: steps.length, + }), + ); + } + + if (steps.length > 20) { + return err( + new BackbeatError(ErrorCode.INVALID_INPUT, 'Pipeline cannot exceed 20 steps', { + stepCount: steps.length, + }), + ); + } + + const timingResult = this.validateScheduleTiming(request); + if (!timingResult.ok) return timingResult; + const { scheduledAtMs, expiresAtMs, nextRunAt, timezone } = timingResult.value; + + // Validate shared workingDirectory + let validatedWorkingDirectory: string | undefined; + if (request.workingDirectory) { + const pathValidation = validatePath(request.workingDirectory); + if (!pathValidation.ok) { + return err( + new BackbeatError(ErrorCode.INVALID_DIRECTORY, `Invalid working directory: ${pathValidation.error.message}`, { + workingDirectory: request.workingDirectory, + }), + ); + } + validatedWorkingDirectory = pathValidation.value; + } + + // Validate per-step workingDirectory and build normalized steps + const normalizedSteps = [...steps]; + for (let i = 0; i < normalizedSteps.length; i++) { + const step = normalizedSteps[i]; + if (step.workingDirectory) { + const pathValidation = validatePath(step.workingDirectory); + if (!pathValidation.ok) { + return err( + new BackbeatError( + ErrorCode.INVALID_DIRECTORY, + `Invalid working directory for step ${i + 1}: ${pathValidation.error.message}`, + { step: i + 1, workingDirectory: step.workingDirectory }, + ), + ); + } + normalizedSteps[i] = { ...step, workingDirectory: pathValidation.value }; + } + } + + // Resolve shared agent + const agentResult = resolveDefaultAgent(request.agent, this.config.defaultAgent); + if (!agentResult.ok) return agentResult; + + // Build synthetic prompt for display/taskTemplate + const stepSummary = normalizedSteps.map((s, i) => `Step ${i + 1}: ${truncatePrompt(s.prompt, 40)}`).join(' → '); + const syntheticPrompt = `Pipeline (${normalizedSteps.length} steps): ${stepSummary}`; + + // Create schedule with pipelineSteps (using normalized paths) + const schedule = createSchedule({ + taskTemplate: { + prompt: syntheticPrompt, + priority: request.priority, + workingDirectory: validatedWorkingDirectory, + agent: agentResult.value, + }, + pipelineSteps: normalizedSteps, + scheduleType: request.scheduleType, + cronExpression: request.cronExpression, + scheduledAt: scheduledAtMs, + timezone, + missedRunPolicy: toMissedRunPolicy(request.missedRunPolicy), + maxRuns: request.maxRuns, + expiresAt: expiresAtMs, + afterScheduleId: request.afterScheduleId, + }); + + this.logger.info('Creating scheduled pipeline', { + scheduleId: schedule.id, + scheduleType: schedule.scheduleType, + stepCount: steps.length, + nextRunAt: new Date(nextRunAt).toISOString(), + }); + + // Emit event — ScheduleHandler persists with calculated nextRunAt + const emitResult = await this.eventBus.emit('ScheduleCreated', { schedule }); + if (!emitResult.ok) { + this.logger.error('Failed to emit ScheduleCreated event', emitResult.error, { + scheduleId: schedule.id, + }); + return err(emitResult.error); + } + + return ok(schedule); + } + async createPipeline(request: PipelineCreateRequest): Promise> { const { steps } = request; @@ -354,6 +405,94 @@ export class ScheduleManagerService implements ScheduleService { }); } + /** + * Validate schedule timing fields shared between createSchedule and createScheduledPipeline + * Returns parsed timestamps and computed nextRunAt + */ + private validateScheduleTiming( + request: Pick, + ): Result<{ scheduledAtMs?: number; expiresAtMs?: number; nextRunAt: number; timezone: string }> { + // Validate schedule type requirements + if (request.scheduleType === ScheduleType.CRON && !request.cronExpression) { + return err( + new BackbeatError(ErrorCode.INVALID_INPUT, 'cronExpression is required for cron schedules', { + scheduleType: request.scheduleType, + }), + ); + } + if (request.scheduleType === ScheduleType.ONE_TIME && !request.scheduledAt) { + return err( + new BackbeatError(ErrorCode.INVALID_INPUT, 'scheduledAt is required for one-time schedules', { + scheduleType: request.scheduleType, + }), + ); + } + + // Validate cron expression + if (request.cronExpression) { + const cronResult = validateCronExpression(request.cronExpression); + if (!cronResult.ok) return cronResult; + } + + // Validate timezone + const tz = request.timezone ?? 'UTC'; + if (!isValidTimezone(tz)) { + return err(new BackbeatError(ErrorCode.INVALID_INPUT, `Invalid timezone: ${tz}`, { timezone: tz })); + } + + // Parse scheduledAt + let scheduledAtMs: number | undefined; + if (request.scheduledAt) { + scheduledAtMs = Date.parse(request.scheduledAt); + if (isNaN(scheduledAtMs)) { + return err( + new BackbeatError(ErrorCode.INVALID_INPUT, `Invalid scheduledAt datetime: ${request.scheduledAt}`, { + scheduledAt: request.scheduledAt, + }), + ); + } + if (scheduledAtMs <= Date.now()) { + return err( + new BackbeatError(ErrorCode.INVALID_INPUT, 'scheduledAt must be in the future', { + scheduledAt: request.scheduledAt, + }), + ); + } + } + + // Parse expiresAt + let expiresAtMs: number | undefined; + if (request.expiresAt) { + expiresAtMs = Date.parse(request.expiresAt); + if (isNaN(expiresAtMs)) { + return err( + new BackbeatError(ErrorCode.INVALID_INPUT, `Invalid expiresAt datetime: ${request.expiresAt}`, { + expiresAt: request.expiresAt, + }), + ); + } + } + + // Calculate nextRunAt + let nextRunAt: number; + if (request.scheduleType === ScheduleType.CRON && request.cronExpression) { + const nextResult = getNextRunTime(request.cronExpression, tz); + if (!nextResult.ok) return nextResult; + nextRunAt = nextResult.value; + } else { + if (scheduledAtMs === undefined) { + return err( + new BackbeatError(ErrorCode.INVALID_INPUT, 'scheduledAt must be provided for one-time schedules', { + scheduleType: request.scheduleType, + }), + ); + } + nextRunAt = scheduledAtMs; + } + + return ok({ scheduledAtMs, expiresAtMs, nextRunAt, timezone: tz }); + } + /** * Fetch a schedule by ID and optionally validate its status * Returns Result with the schedule or a typed error diff --git a/tests/unit/adapters/mcp-adapter.test.ts b/tests/unit/adapters/mcp-adapter.test.ts index e2bd927..086f4b2 100644 --- a/tests/unit/adapters/mcp-adapter.test.ts +++ b/tests/unit/adapters/mcp-adapter.test.ts @@ -13,8 +13,16 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { MCPAdapter } from '../../../src/adapters/mcp-adapter'; -import type { PipelineCreateRequest, PipelineResult, Task, TaskRequest } from '../../../src/core/domain'; -import { Priority, ScheduleId } from '../../../src/core/domain'; +import type { + PipelineCreateRequest, + PipelineResult, + PipelineStepRequest, + Schedule, + ScheduledPipelineCreateRequest, + Task, + TaskRequest, +} from '../../../src/core/domain'; +import { MissedRunPolicy, Priority, ScheduleId, ScheduleStatus, ScheduleType } from '../../../src/core/domain'; import { BackbeatError, ErrorCode, taskNotFound } from '../../../src/core/errors'; import type { Logger, ScheduleService, TaskManager } from '../../../src/core/interfaces'; import type { Result } from '../../../src/core/result'; @@ -172,6 +180,7 @@ const stubScheduleService: ScheduleService = { pauseSchedule: vi.fn().mockResolvedValue(ok(undefined)), resumeSchedule: vi.fn().mockResolvedValue(ok(undefined)), createPipeline: vi.fn().mockResolvedValue(ok({ pipelineId: '', steps: [] })), + createScheduledPipeline: vi.fn().mockResolvedValue(ok(null)), }; describe('MCPAdapter - Protocol Compliance', () => { @@ -821,25 +830,218 @@ describe('MCPAdapter - Multi-Agent Support (v0.5.0)', () => { }); }); +describe('MCPAdapter - SchedulePipeline & Enhanced Schedule Tools', () => { + let adapter: MCPAdapter; + let mockTaskManager: MockTaskManager; + let mockLogger: MockLogger; + let mockScheduleService: MockScheduleService; + + beforeEach(() => { + mockTaskManager = new MockTaskManager(); + mockLogger = new MockLogger(); + mockScheduleService = new MockScheduleService(); + adapter = new MCPAdapter( + mockTaskManager, + mockLogger, + mockScheduleService as unknown as ScheduleService, + undefined, + testConfig, + ); + }); + + afterEach(() => { + mockTaskManager.reset(); + mockLogger.reset(); + }); + + describe('SchedulePipeline tool', () => { + it('should create scheduled pipeline with cron expression', async () => { + const result = await simulateSchedulePipeline(mockScheduleService, { + steps: [{ prompt: 'Build project' }, { prompt: 'Run tests' }, { prompt: 'Deploy' }], + scheduleType: 'cron', + cronExpression: '0 9 * * 1-5', + }); + + expect(result.isError).toBe(false); + const response = JSON.parse(result.content[0].text); + expect(response.success).toBe(true); + expect(response.scheduleId).toBeDefined(); + expect(response.stepCount).toBe(3); + expect(mockScheduleService.createScheduledPipelineCalls).toHaveLength(1); + expect(mockScheduleService.createScheduledPipelineCalls[0].steps).toHaveLength(3); + }); + + it('should validate minimum steps requirement', async () => { + const result = await simulateSchedulePipeline(mockScheduleService, { + steps: [{ prompt: 'only one step' }], + scheduleType: 'cron', + cronExpression: '0 9 * * *', + }); + + expect(result.isError).toBe(true); + expect(result.content[0].text).toContain('at least 2'); + expect(mockScheduleService.createScheduledPipelineCalls).toHaveLength(0); + }); + }); + + describe('CancelSchedule with cancelTasks flag', () => { + it('should pass cancelTasks flag to service', async () => { + const result = await simulateCancelSchedule(mockScheduleService, { + scheduleId: 'schedule-abc123', + reason: 'No longer needed', + cancelTasks: true, + }); + + expect(result.isError).toBe(false); + const response = JSON.parse(result.content[0].text); + expect(response.success).toBe(true); + expect(response.cancelTasksRequested).toBe(true); + expect(mockScheduleService.cancelScheduleCalls).toHaveLength(1); + expect(mockScheduleService.cancelScheduleCalls[0].cancelTasks).toBe(true); + }); + }); + + describe('ListSchedules with pipeline indicators', () => { + it('should include isPipeline indicator in response', async () => { + const now = Date.now(); + mockScheduleService.listSchedulesResult = [ + Object.freeze({ + id: ScheduleId('schedule-pipeline-1'), + taskTemplate: { prompt: 'pipeline step 1' }, + scheduleType: ScheduleType.CRON, + cronExpression: '0 9 * * *', + timezone: 'UTC', + missedRunPolicy: MissedRunPolicy.SKIP, + status: ScheduleStatus.ACTIVE, + runCount: 3, + nextRunAt: now + 60000, + pipelineSteps: [ + { prompt: 'Step 1' }, + { prompt: 'Step 2' }, + { prompt: 'Step 3' }, + ] as readonly PipelineStepRequest[], + createdAt: now, + updatedAt: now, + }), + ]; + + const result = await simulateListSchedules(mockScheduleService, {}); + + expect(result.isError).toBe(false); + const response = JSON.parse(result.content[0].text); + expect(response.success).toBe(true); + expect(response.schedules).toHaveLength(1); + expect(response.schedules[0].isPipeline).toBe(true); + expect(response.schedules[0].stepCount).toBe(3); + }); + }); + + describe('GetSchedule with pipelineSteps', () => { + it('should include pipelineSteps in response when present', async () => { + const now = Date.now(); + mockScheduleService.getScheduleResult = { + schedule: Object.freeze({ + id: ScheduleId('schedule-pipeline-detail'), + taskTemplate: { prompt: 'pipeline placeholder' }, + scheduleType: ScheduleType.CRON, + cronExpression: '0 */6 * * *', + timezone: 'America/New_York', + missedRunPolicy: MissedRunPolicy.SKIP, + status: ScheduleStatus.ACTIVE, + runCount: 5, + nextRunAt: now + 120000, + pipelineSteps: [ + { prompt: 'Lint codebase' }, + { prompt: 'Run unit tests', priority: 'P0' as Priority }, + ] as readonly PipelineStepRequest[], + createdAt: now, + updatedAt: now, + }), + }; + + const result = await simulateGetSchedule(mockScheduleService, { + scheduleId: 'schedule-pipeline-detail', + }); + + expect(result.isError).toBe(false); + const response = JSON.parse(result.content[0].text); + expect(response.success).toBe(true); + expect(response.schedule.isPipeline).toBe(true); + expect(response.schedule.pipelineSteps).toHaveLength(2); + expect(response.schedule.pipelineSteps[0].prompt).toBe('Lint codebase'); + expect(response.schedule.pipelineSteps[1].priority).toBe('P0'); + }); + }); +}); + /** - * Mock ScheduleService for CreatePipeline testing + * Mock ScheduleService for CreatePipeline / SchedulePipeline testing */ class MockScheduleService { createPipelineCalls: PipelineCreateRequest[] = []; + createScheduledPipelineCalls: ScheduledPipelineCreateRequest[] = []; + cancelScheduleCalls: Array<{ scheduleId: string; reason?: string; cancelTasks?: boolean }> = []; + listSchedulesResult: Schedule[] = []; + getScheduleResult: { + schedule: Schedule; + history?: readonly { + scheduledFor: number; + executedAt?: number; + status: string; + taskId?: string; + errorMessage?: string; + }[]; + } | null = null; shouldFailPipeline = false; + shouldFailScheduledPipeline = false; + shouldFailCancelSchedule = false; + shouldFailListSchedules = false; + shouldFailGetSchedule = false; async createSchedule() { return ok(null); } - async listSchedules() { - return ok([]); + + async listSchedules(): Promise> { + if (this.shouldFailListSchedules) { + return err(new BackbeatError(ErrorCode.SYSTEM_ERROR, 'Failed to list schedules', {})); + } + return ok(this.listSchedulesResult); } - async getSchedule() { - return ok({ schedule: null }); + + async getSchedule( + scheduleId: ScheduleId, + _includeHistory?: boolean, + _historyLimit?: number, + ): Promise< + Result<{ + schedule: Schedule; + history?: readonly { + scheduledFor: number; + executedAt?: number; + status: string; + taskId?: string; + errorMessage?: string; + }[]; + }> + > { + if (this.shouldFailGetSchedule) { + return err(new BackbeatError(ErrorCode.SYSTEM_ERROR, 'Failed to get schedule', {})); + } + if (this.getScheduleResult) { + return ok(this.getScheduleResult); + } + return err(new BackbeatError(ErrorCode.SYSTEM_ERROR, `Schedule ${scheduleId} not found`, {})); } - async cancelSchedule() { + + async cancelSchedule(scheduleId: ScheduleId, reason?: string, cancelTasks?: boolean): Promise> { + this.cancelScheduleCalls.push({ scheduleId: scheduleId as string, reason, cancelTasks }); + if (this.shouldFailCancelSchedule) { + return err(new BackbeatError(ErrorCode.SYSTEM_ERROR, 'Failed to cancel schedule', {})); + } return ok(undefined); } + async pauseSchedule() { return ok(undefined); } @@ -863,6 +1065,36 @@ class MockScheduleService { })), }); } + + async createScheduledPipeline(request: ScheduledPipelineCreateRequest): Promise> { + this.createScheduledPipelineCalls.push(request); + + if (this.shouldFailScheduledPipeline) { + return err(new BackbeatError(ErrorCode.SYSTEM_ERROR, 'Scheduled pipeline creation failed', {})); + } + + const now = Date.now(); + return ok( + Object.freeze({ + id: ScheduleId('schedule-mock-pipeline'), + taskTemplate: { + prompt: request.steps[0].prompt, + priority: request.priority, + workingDirectory: request.workingDirectory, + }, + scheduleType: request.scheduleType, + cronExpression: request.cronExpression, + timezone: request.timezone ?? 'UTC', + missedRunPolicy: request.missedRunPolicy ?? MissedRunPolicy.SKIP, + status: ScheduleStatus.ACTIVE, + runCount: 0, + nextRunAt: now + 60000, + pipelineSteps: request.steps, + createdAt: now, + updatedAt: now, + }), + ); + } } // ============================================================================ @@ -1178,3 +1410,222 @@ async function simulateCreatePipeline( ], }; } + +async function simulateSchedulePipeline( + scheduleService: MockScheduleService, + args: { + steps: Array<{ prompt: string; priority?: string; workingDirectory?: string }>; + scheduleType: string; + cronExpression?: string; + scheduledAt?: string; + timezone?: string; + missedRunPolicy?: string; + priority?: string; + workingDirectory?: string; + maxRuns?: number; + expiresAt?: string; + }, +): Promise { + // Validate min/max steps (mirrors SchedulePipelineSchema Zod validation) + if (args.steps.length < 2) { + return { + isError: true, + content: [{ type: 'text', text: 'Pipeline requires at least 2 steps' }], + }; + } + if (args.steps.length > 20) { + return { + isError: true, + content: [{ type: 'text', text: 'Pipeline cannot exceed 20 steps' }], + }; + } + + const request: ScheduledPipelineCreateRequest = { + steps: args.steps.map((s) => ({ + prompt: s.prompt, + priority: s.priority as Priority | undefined, + workingDirectory: s.workingDirectory, + })), + scheduleType: args.scheduleType === 'cron' ? ScheduleType.CRON : ScheduleType.ONE_TIME, + cronExpression: args.cronExpression, + scheduledAt: args.scheduledAt, + timezone: args.timezone ?? 'UTC', + priority: args.priority as Priority | undefined, + workingDirectory: args.workingDirectory, + maxRuns: args.maxRuns, + expiresAt: args.expiresAt, + }; + + const result = await scheduleService.createScheduledPipeline(request); + + if (!result.ok) { + return { + isError: true, + content: [{ type: 'text', text: JSON.stringify({ success: false, error: result.error.message }) }], + }; + } + + return { + isError: false, + content: [ + { + type: 'text', + text: JSON.stringify({ + success: true, + scheduleId: result.value.id, + stepCount: result.value.pipelineSteps?.length ?? 0, + scheduleType: result.value.scheduleType, + nextRunAt: result.value.nextRunAt ? new Date(result.value.nextRunAt).toISOString() : undefined, + status: result.value.status, + timezone: result.value.timezone, + }), + }, + ], + }; +} + +async function simulateCancelSchedule( + scheduleService: MockScheduleService, + args: { scheduleId: string; reason?: string; cancelTasks?: boolean }, +): Promise { + const { scheduleId, reason, cancelTasks } = args; + + const result = await scheduleService.cancelSchedule(ScheduleId(scheduleId), reason, cancelTasks); + + if (!result.ok) { + return { + isError: true, + content: [{ type: 'text', text: JSON.stringify({ success: false, error: result.error.message }) }], + }; + } + + return { + isError: false, + content: [ + { + type: 'text', + text: JSON.stringify({ + success: true, + message: `Schedule ${scheduleId} cancelled`, + reason, + cancelTasksRequested: cancelTasks, + }), + }, + ], + }; +} + +async function simulateListSchedules( + scheduleService: MockScheduleService, + args: { status?: string; limit?: number; offset?: number }, +): Promise { + const result = await scheduleService.listSchedules(); + + if (!result.ok) { + return { + isError: true, + content: [{ type: 'text', text: JSON.stringify({ success: false, error: result.error.message }) }], + }; + } + + const schedules = result.value; + const simplifiedSchedules = schedules.map((s) => ({ + id: s.id, + status: s.status, + scheduleType: s.scheduleType, + cronExpression: s.cronExpression, + nextRunAt: s.nextRunAt ? new Date(s.nextRunAt).toISOString() : null, + runCount: s.runCount, + maxRuns: s.maxRuns, + isPipeline: !!(s.pipelineSteps && s.pipelineSteps.length > 0), + stepCount: s.pipelineSteps?.length ?? 0, + })); + + return { + isError: false, + content: [ + { + type: 'text', + text: JSON.stringify({ + success: true, + schedules: simplifiedSchedules, + count: simplifiedSchedules.length, + }), + }, + ], + }; +} + +async function simulateGetSchedule( + scheduleService: MockScheduleService, + args: { scheduleId: string; includeHistory?: boolean; historyLimit?: number }, +): Promise { + const result = await scheduleService.getSchedule(ScheduleId(args.scheduleId), args.includeHistory, args.historyLimit); + + if (!result.ok) { + return { + isError: true, + content: [{ type: 'text', text: JSON.stringify({ success: false, error: result.error.message }) }], + }; + } + + const { schedule, history } = result.value; + + const response: Record = { + success: true, + schedule: { + id: schedule.id, + status: schedule.status, + scheduleType: schedule.scheduleType, + cronExpression: schedule.cronExpression, + scheduledAt: schedule.scheduledAt ? new Date(schedule.scheduledAt).toISOString() : null, + timezone: schedule.timezone, + missedRunPolicy: schedule.missedRunPolicy, + maxRuns: schedule.maxRuns, + runCount: schedule.runCount, + lastRunAt: schedule.lastRunAt ? new Date(schedule.lastRunAt).toISOString() : null, + nextRunAt: schedule.nextRunAt ? new Date(schedule.nextRunAt).toISOString() : null, + expiresAt: schedule.expiresAt ? new Date(schedule.expiresAt).toISOString() : null, + createdAt: new Date(schedule.createdAt).toISOString(), + updatedAt: new Date(schedule.updatedAt).toISOString(), + taskTemplate: { + prompt: + schedule.taskTemplate.prompt.substring(0, 100) + (schedule.taskTemplate.prompt.length > 100 ? '...' : ''), + priority: schedule.taskTemplate.priority, + workingDirectory: schedule.taskTemplate.workingDirectory, + }, + ...(schedule.pipelineSteps && schedule.pipelineSteps.length > 0 + ? { + isPipeline: true, + pipelineSteps: schedule.pipelineSteps.map((s, i) => ({ + index: i, + prompt: s.prompt.substring(0, 100) + (s.prompt.length > 100 ? '...' : ''), + priority: s.priority, + workingDirectory: s.workingDirectory, + agent: s.agent, + })), + } + : {}), + }, + }; + + if (history) { + response.history = history.map((h) => ({ + scheduledFor: new Date(h.scheduledFor).toISOString(), + executedAt: h.executedAt ? new Date(h.executedAt).toISOString() : null, + status: h.status, + taskId: h.taskId, + errorMessage: h.errorMessage, + })); + } + + return { + isError: false, + content: [ + { + type: 'text', + text: JSON.stringify(response, null, 2), + }, + ], + }; +} diff --git a/tests/unit/cli.test.ts b/tests/unit/cli.test.ts index 3432abd..967d09f 100644 --- a/tests/unit/cli.test.ts +++ b/tests/unit/cli.test.ts @@ -13,9 +13,12 @@ import { AGENT_PROVIDERS, isAgentProvider } from '../../src/core/agents'; import { loadConfiguration } from '../../src/core/configuration'; import type { Container } from '../../src/core/container'; import type { + PipelineCreateRequest, + PipelineResult, ResumeTaskRequest, Schedule, ScheduleCreateRequest, + ScheduledPipelineCreateRequest, ScheduleExecution, Task, TaskRequest, @@ -157,7 +160,7 @@ class MockScheduleService implements ScheduleService { createCalls: ScheduleCreateRequest[] = []; listCalls: Array<{ status?: ScheduleStatus; limit?: number; offset?: number }> = []; getCalls: Array<{ scheduleId: string; includeHistory?: boolean; historyLimit?: number }> = []; - cancelCalls: Array<{ scheduleId: string; reason?: string }> = []; + cancelCalls: Array<{ scheduleId: string; reason?: string; cancelTasks?: boolean }> = []; pauseCalls: Array<{ scheduleId: string }> = []; resumeCalls: Array<{ scheduleId: string }> = []; @@ -184,6 +187,44 @@ class MockScheduleService implements ScheduleService { return ok(schedule); } + createPipelineCalls: PipelineCreateRequest[] = []; + createScheduledPipelineCalls: ScheduledPipelineCreateRequest[] = []; + + async createPipeline(request: PipelineCreateRequest): Promise> { + this.createPipelineCalls.push(request); + const steps = request.steps.map((step, index) => ({ + index, + scheduleId: ScheduleId(`schedule-step-${index}`), + prompt: step.prompt, + })); + return ok({ + pipelineId: ScheduleId('schedule-step-0'), + steps, + } as PipelineResult); + } + + async createScheduledPipeline(request: ScheduledPipelineCreateRequest) { + this.createScheduledPipelineCalls.push(request); + const schedule = createSchedule({ + taskTemplate: { + prompt: request.steps.map((s) => s.prompt).join(' | '), + priority: request.priority, + workingDirectory: request.workingDirectory, + }, + scheduleType: request.scheduleType, + cronExpression: request.cronExpression, + scheduledAt: request.scheduledAt ? Date.parse(request.scheduledAt) : undefined, + timezone: request.timezone ?? 'UTC', + missedRunPolicy: request.missedRunPolicy ?? MissedRunPolicy.SKIP, + maxRuns: request.maxRuns, + expiresAt: request.expiresAt ? Date.parse(request.expiresAt) : undefined, + afterScheduleId: request.afterScheduleId, + pipelineSteps: request.steps, + }); + this.scheduleStorage.set(schedule.id, schedule); + return ok(schedule); + } + async listSchedules(status?: ScheduleStatus, limit?: number, offset?: number) { this.listCalls.push({ status, limit, offset }); const all = Array.from(this.scheduleStorage.values()); @@ -203,8 +244,8 @@ class MockScheduleService implements ScheduleService { return ok({ schedule, history }); } - async cancelSchedule(scheduleId: string, reason?: string) { - this.cancelCalls.push({ scheduleId, reason }); + async cancelSchedule(scheduleId: string, reason?: string, cancelTasks?: boolean) { + this.cancelCalls.push({ scheduleId, reason, cancelTasks }); const schedule = this.scheduleStorage.get(scheduleId); if (!schedule) { return err(new BackbeatError(ErrorCode.TASK_NOT_FOUND, `Schedule ${scheduleId} not found`)); @@ -232,6 +273,8 @@ class MockScheduleService implements ScheduleService { reset() { this.createCalls = []; + this.createPipelineCalls = []; + this.createScheduledPipelineCalls = []; this.listCalls = []; this.getCalls = []; this.cancelCalls = []; @@ -928,6 +971,32 @@ describe('CLI - Schedule Commands', () => { expect(result.ok).toBe(true); expect(mockScheduleService.createCalls[0].scheduleType).toBe(ScheduleType.ONE_TIME); }); + + it('should create scheduled pipeline with --pipeline and --step flags', async () => { + const result = await simulateScheduleCreatePipeline(mockScheduleService, { + steps: ['lint', 'test'], + cron: '0 9 * * *', + }); + + expect(result.ok).toBe(true); + expect(mockScheduleService.createScheduledPipelineCalls).toHaveLength(1); + const call = mockScheduleService.createScheduledPipelineCalls[0]; + expect(call.steps).toHaveLength(2); + expect(call.steps[0].prompt).toBe('lint'); + expect(call.steps[1].prompt).toBe('test'); + expect(call.scheduleType).toBe(ScheduleType.CRON); + expect(call.cronExpression).toBe('0 9 * * *'); + }); + + it('should reject pipeline with fewer than 2 steps', () => { + const validation = validatePipelineCreateInput(['only-one-step']); + + expect(validation.ok).toBe(false); + if (!validation.ok) { + expect(validation.error.code).toBe(ErrorCode.INVALID_INPUT); + expect(validation.error.message).toContain('at least 2'); + } + }); }); describe('schedule list', () => { @@ -997,6 +1066,25 @@ describe('CLI - Schedule Commands', () => { const result = await mockScheduleService.cancelSchedule(ScheduleId('non-existent')); expect(result.ok).toBe(false); }); + + it('should pass cancelTasks flag when --cancel-tasks is provided', async () => { + const createResult = await simulateScheduleCreate(mockScheduleService, { + prompt: 'test', + type: 'cron', + cron: '0 9 * * *', + }); + expect(createResult.ok).toBe(true); + if (!createResult.ok) return; + + const result = await simulateScheduleCancel(mockScheduleService, { + scheduleId: createResult.value.id, + cancelTasks: true, + }); + + expect(result.ok).toBe(true); + expect(mockScheduleService.cancelCalls).toHaveLength(1); + expect(mockScheduleService.cancelCalls[0].cancelTasks).toBe(true); + }); }); describe('schedule pause', () => { @@ -1963,6 +2051,81 @@ function validatePipelineInput(steps: string[]) { return ok(undefined); } +/** + * Validates pipeline create input (--pipeline --step flags). + * Mirrors the validation in scheduleCreate() when isPipeline is true. + */ +function validatePipelineCreateInput(steps: string[]) { + if (steps.length < 2) { + return err( + new BackbeatError(ErrorCode.INVALID_INPUT, 'Pipeline requires at least 2 --step flags', { field: 'steps' }), + ); + } + return ok(undefined); +} + +/** + * Simulates `beat schedule create --pipeline --step "..." --step "..." --cron "..."` + * Mirrors the pipeline branch in scheduleCreate(). + */ +async function simulateScheduleCreatePipeline( + service: MockScheduleService, + options: { + steps: string[]; + cron?: string; + at?: string; + timezone?: string; + missedRunPolicy?: string; + priority?: string; + workingDirectory?: string; + maxRuns?: number; + expiresAt?: string; + afterScheduleId?: string; + agent?: string; + }, +) { + const validation = validatePipelineCreateInput(options.steps); + if (!validation.ok) return validation; + + const scheduleType = options.cron ? ScheduleType.CRON : ScheduleType.ONE_TIME; + + return service.createScheduledPipeline({ + steps: options.steps.map((prompt) => ({ prompt })), + scheduleType, + cronExpression: options.cron, + scheduledAt: options.at, + timezone: options.timezone, + missedRunPolicy: + options.missedRunPolicy === 'catchup' + ? MissedRunPolicy.CATCHUP + : options.missedRunPolicy === 'fail' + ? MissedRunPolicy.FAIL + : options.missedRunPolicy + ? MissedRunPolicy.SKIP + : undefined, + priority: options.priority, + workingDirectory: options.workingDirectory, + maxRuns: options.maxRuns, + expiresAt: options.expiresAt, + afterScheduleId: options.afterScheduleId ? ScheduleId(options.afterScheduleId) : undefined, + }); +} + +/** + * Simulates `beat schedule cancel [--cancel-tasks] [reason]` + * Mirrors the scheduleCancel() function. + */ +async function simulateScheduleCancel( + service: MockScheduleService, + options: { + scheduleId: string; + reason?: string; + cancelTasks?: boolean; + }, +) { + return service.cancelSchedule(ScheduleId(options.scheduleId), options.reason, options.cancelTasks); +} + async function simulatePipeline(service: MockScheduleService, pipelineArgs: string[]) { // Each arg is a pipeline step prompt const validation = validatePipelineInput(pipelineArgs); diff --git a/tests/unit/implementations/schedule-repository.test.ts b/tests/unit/implementations/schedule-repository.test.ts index a84a544..2450535 100644 --- a/tests/unit/implementations/schedule-repository.test.ts +++ b/tests/unit/implementations/schedule-repository.test.ts @@ -6,7 +6,14 @@ import { afterEach, beforeEach, describe, expect, it } from 'vitest'; import type { Schedule } from '../../../src/core/domain.js'; -import { createSchedule, MissedRunPolicy, ScheduleId, ScheduleStatus, ScheduleType } from '../../../src/core/domain.js'; +import { + createSchedule, + MissedRunPolicy, + ScheduleId, + ScheduleStatus, + ScheduleType, + TaskId, +} from '../../../src/core/domain.js'; import { Database } from '../../../src/implementations/database.js'; import { SQLiteScheduleRepository } from '../../../src/implementations/schedule-repository.js'; @@ -548,4 +555,98 @@ describe('SQLiteScheduleRepository - Unit Tests', () => { } }); }); + + describe('pipeline_steps round-trip', () => { + it('should save and retrieve schedule with pipelineSteps', async () => { + const schedule = createTestSchedule({ + pipelineSteps: [{ prompt: 'lint the codebase' }, { prompt: 'run the tests' }], + }); + + await repo.save(schedule); + const findResult = await repo.findById(schedule.id); + + expect(findResult.ok).toBe(true); + if (!findResult.ok) return; + + const found = findResult.value!; + expect(found.pipelineSteps).toBeDefined(); + expect(found.pipelineSteps).toHaveLength(2); + expect(found.pipelineSteps![0].prompt).toBe('lint the codebase'); + expect(found.pipelineSteps![1].prompt).toBe('run the tests'); + }); + + it('should return undefined pipelineSteps for non-pipeline schedules', async () => { + const schedule = createTestSchedule(); + + await repo.save(schedule); + const findResult = await repo.findById(schedule.id); + + expect(findResult.ok).toBe(true); + if (!findResult.ok) return; + + expect(findResult.value!.pipelineSteps).toBeUndefined(); + }); + + it('should record execution with pipelineTaskIds', async () => { + const schedule = createTestSchedule(); + await repo.save(schedule); + + const now = Date.now(); + const taskIds = [TaskId('task-aaa-111'), TaskId('task-bbb-222'), TaskId('task-ccc-333')]; + + const recordResult = await repo.recordExecution({ + scheduleId: schedule.id, + scheduledFor: now, + executedAt: now, + status: 'triggered', + pipelineTaskIds: taskIds, + createdAt: now, + }); + + expect(recordResult.ok).toBe(true); + if (!recordResult.ok) return; + + expect(recordResult.value.pipelineTaskIds).toBeDefined(); + expect(recordResult.value.pipelineTaskIds).toHaveLength(3); + expect(recordResult.value.pipelineTaskIds![0]).toBe('task-aaa-111'); + expect(recordResult.value.pipelineTaskIds![2]).toBe('task-ccc-333'); + + // Also verify via getExecutionHistory + const historyResult = await repo.getExecutionHistory(schedule.id); + expect(historyResult.ok).toBe(true); + if (!historyResult.ok) return; + + expect(historyResult.value).toHaveLength(1); + expect(historyResult.value[0].pipelineTaskIds).toHaveLength(3); + expect(historyResult.value[0].pipelineTaskIds![1]).toBe('task-bbb-222'); + }); + + it('should update schedule with pipelineSteps', async () => { + // Save schedule without pipeline steps + const schedule = createTestSchedule(); + await repo.save(schedule); + + // Verify no pipeline steps initially + const initialResult = await repo.findById(schedule.id); + expect(initialResult.ok).toBe(true); + if (!initialResult.ok) return; + expect(initialResult.value!.pipelineSteps).toBeUndefined(); + + // Update to add pipeline steps + const steps = [{ prompt: 'step one' }, { prompt: 'step two' }, { prompt: 'step three' }]; + await repo.update(schedule.id, { pipelineSteps: steps }); + + // Verify pipeline steps persisted + const updatedResult = await repo.findById(schedule.id); + expect(updatedResult.ok).toBe(true); + if (!updatedResult.ok) return; + + const found = updatedResult.value!; + expect(found.pipelineSteps).toBeDefined(); + expect(found.pipelineSteps).toHaveLength(3); + expect(found.pipelineSteps![0].prompt).toBe('step one'); + expect(found.pipelineSteps![1].prompt).toBe('step two'); + expect(found.pipelineSteps![2].prompt).toBe('step three'); + }); + }); }); diff --git a/tests/unit/services/handlers/dependency-handler.test.ts b/tests/unit/services/handlers/dependency-handler.test.ts index 3ea3f99..15a05a2 100644 --- a/tests/unit/services/handlers/dependency-handler.test.ts +++ b/tests/unit/services/handlers/dependency-handler.test.ts @@ -486,6 +486,45 @@ describe('DependencyHandler - Behavioral Tests', () => { expect(logger.getLogsByLevel('error').length).toBeGreaterThan(0); }); + it('should not unblock task when getDependencies fails during cascade check', async () => { + // Arrange: parent → child dependency + const parent = createTask({ prompt: 'parent' }); + const child = createTask({ prompt: 'child', dependsOn: [parent.id] }); + await taskRepo.save(parent); + await taskRepo.save(child); + await eventBus.emit('TaskDelegated', { task: child }); + + // Track unblock events — none should fire + let unblockedEventReceived = false; + eventBus.subscribe('TaskUnblocked', async () => { + unblockedEventReceived = true; + }); + + // Mock getDependencies to fail when called during cascade check + const originalGetDeps = dependencyRepo.getDependencies.bind(dependencyRepo); + const getDependenciesSpy = vi.spyOn(dependencyRepo, 'getDependencies').mockImplementation(async (taskId) => { + // Only fail during the cascade check (after isBlocked returns false) + if (taskId === child.id) { + const { err: mkErr } = await import('../../../../src/core/result'); + const { BackbeatError, ErrorCode } = await import('../../../../src/core/errors'); + return mkErr(new BackbeatError(ErrorCode.SYSTEM_ERROR, 'Simulated getDependencies failure')); + } + return originalGetDeps(taskId); + }); + + // Act: fail the parent — should trigger cascade check + await eventBus.emit('TaskFailed', { taskId: parent.id, error: new Error('parent failed') }); + await flushEventLoop(); + + getDependenciesSpy.mockRestore(); + + // Assert: task should NOT be unblocked (getDependencies failed, can't confirm cascade) + expect(unblockedEventReceived).toBe(false); + + // Assert: warning should be logged + expect(logger.getLogsByLevel('warn').length).toBeGreaterThan(0); + }); + it('should handle database errors during dependency creation', async () => { // Arrange - Create valid tasks const parent = createTask({ prompt: 'parent' }); @@ -1179,4 +1218,96 @@ describe('DependencyHandler - Behavioral Tests', () => { ); }, 15000); // Extended timeout: handler awaits 5s checkpoint timeout internally }); + + describe('Dependency failure cascade (v0.6.0)', () => { + it('should cancel dependent task when upstream fails', async () => { + // Arrange - Create parent and child with dependency + const parentTask = createTask({ prompt: 'parent' }); + await taskRepo.save(parentTask); + const childTask = createTask({ prompt: 'child', dependsOn: [parentTask.id] }); + await taskRepo.save(childTask); + + // Emit TaskDelegated for child to register deps in handler + await eventBus.emit('TaskDelegated', { task: childTask }); + await flushEventLoop(); + + // Capture TaskCancellationRequested events + const cancellationRequestedIds: TaskId[] = []; + eventBus.subscribe('TaskCancellationRequested', async (event) => { + cancellationRequestedIds.push(event.taskId); + }); + + // Act - Fail the parent task + await eventBus.emit('TaskFailed', { taskId: parentTask.id, error: new Error('failed') }); + await flushEventLoop(); + + // Assert - Child should receive a cancellation request + expect(cancellationRequestedIds).toContain(childTask.id); + expect(logger.hasLogContaining('cascading cancellation')).toBe(true); + }); + + it('should cascade cancellation through multi-level chain', async () => { + // Arrange - Create A→B→C chain + const taskA = createTask({ prompt: 'task A' }); + await taskRepo.save(taskA); + const taskB = createTask({ prompt: 'task B', dependsOn: [taskA.id] }); + await taskRepo.save(taskB); + const taskC = createTask({ prompt: 'task C', dependsOn: [taskB.id] }); + await taskRepo.save(taskC); + + // Register dependencies in the handler + await eventBus.emit('TaskDelegated', { task: taskB }); + await eventBus.emit('TaskDelegated', { task: taskC }); + await flushEventLoop(); + + // Capture TaskCancellationRequested events + const cancellationRequestedIds: TaskId[] = []; + eventBus.subscribe('TaskCancellationRequested', async (event) => { + cancellationRequestedIds.push(event.taskId); + }); + + // Act - Fail A, which should cascade cancellation to B + await eventBus.emit('TaskFailed', { taskId: taskA.id, error: new Error('failed') }); + await flushEventLoop(); + + // B should be cancelled at this point + expect(cancellationRequestedIds).toContain(taskB.id); + + // Simulate B being cancelled (downstream of the cancellation request) — this + // triggers the DependencyHandler to resolve C's dependency on B as 'cancelled', + // which cascades the cancellation to C + await eventBus.emit('TaskCancelled', { taskId: taskB.id, reason: 'dependency failed' }); + await flushEventLoop(); + + // C should also receive a cancellation request (cascade) + expect(cancellationRequestedIds).toContain(taskC.id); + expect(logger.hasLogContaining('cascading cancellation')).toBe(true); + }); + + it('should cancel dependent when upstream is cancelled', async () => { + // Arrange - Create parent and child with dependency + const parentTask = createTask({ prompt: 'parent' }); + await taskRepo.save(parentTask); + const childTask = createTask({ prompt: 'child', dependsOn: [parentTask.id] }); + await taskRepo.save(childTask); + + // Emit TaskDelegated for child to register deps in handler + await eventBus.emit('TaskDelegated', { task: childTask }); + await flushEventLoop(); + + // Capture TaskCancellationRequested events + const cancellationRequestedIds: TaskId[] = []; + eventBus.subscribe('TaskCancellationRequested', async (event) => { + cancellationRequestedIds.push(event.taskId); + }); + + // Act - Cancel the parent task (instead of failing it) + await eventBus.emit('TaskCancelled', { taskId: parentTask.id, reason: 'user cancelled' }); + await flushEventLoop(); + + // Assert - Child should receive a cancellation request + expect(cancellationRequestedIds).toContain(childTask.id); + expect(logger.hasLogContaining('cascading cancellation')).toBe(true); + }); + }); }); diff --git a/tests/unit/services/handlers/queue-handler.test.ts b/tests/unit/services/handlers/queue-handler.test.ts index 405c6aa..7f26b0b 100644 --- a/tests/unit/services/handlers/queue-handler.test.ts +++ b/tests/unit/services/handlers/queue-handler.test.ts @@ -160,6 +160,45 @@ describe('QueueHandler', () => { }); }); + describe('Fast-path blocked task check (v0.6.0)', () => { + it('should not enqueue task when dependencyState is blocked', async () => { + // Arrange - Create a task with dependsOn set (createTask sets dependencyState: 'blocked' automatically) + const parentId = TaskId('parent-task-id'); + const task = createTask({ prompt: 'blocked task', dependsOn: [parentId] }); + // dependencyState is 'blocked' because dependsOn is set + + // Act - Emit TaskPersisted with the blocked task + await eventBus.emit('TaskPersisted', { taskId: task.id, task }); + await flushEventLoop(); + + // Assert - Task should NOT be enqueued (fast-path skip) + expect(queue.size()).toBe(0); + expect(queue.contains(task.id)).toBe(false); + expect(logger.hasLogContaining('fast-path')).toBe(true); + }); + + it('should still enqueue task when dependencyState is none', async () => { + // Arrange - Create a task with no dependencies (dependencyState: 'none') + const task = createTask({ prompt: 'independent task' }); + // dependencyState is 'none' because no dependsOn + + let queuedEvent: TaskQueuedEvent | undefined; + eventBus.on('TaskQueued', (event: TaskQueuedEvent) => { + queuedEvent = event; + }); + + // Act - Emit TaskPersisted with the unblocked task + await eventBus.emit('TaskPersisted', { taskId: task.id, task }); + await flushEventLoop(); + + // Assert - Task should be enqueued normally + expect(queue.size()).toBe(1); + expect(queue.contains(task.id)).toBe(true); + expect(queuedEvent).toBeDefined(); + expect(queuedEvent!.taskId).toBe(task.id); + }); + }); + describe('TaskUnblocked', () => { it('should fetch fresh task from DB, enqueue, and emit TaskQueued', async () => { // Save task then update priority in DB — event payload has stale P2 diff --git a/tests/unit/services/handlers/schedule-handler.test.ts b/tests/unit/services/handlers/schedule-handler.test.ts index 50cd743..9aed43b 100644 --- a/tests/unit/services/handlers/schedule-handler.test.ts +++ b/tests/unit/services/handlers/schedule-handler.test.ts @@ -8,14 +8,15 @@ * state (repo, logger) rather than thrown exceptions. */ -import { afterEach, beforeEach, describe, expect, it } from 'vitest'; -import type { Schedule } from '../../../../src/core/domain'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; +import type { PipelineStepRequest, Schedule, Task } from '../../../../src/core/domain'; import { createSchedule, MissedRunPolicy, ScheduleId, ScheduleStatus, ScheduleType, + TaskId, TaskStatus, } from '../../../../src/core/domain'; import { InMemoryEventBus } from '../../../../src/core/events/event-bus'; @@ -581,4 +582,350 @@ describe('ScheduleHandler - Behavioral Tests', () => { expect(allTasks.value[0].dependsOn ?? []).toHaveLength(0); }); }); + + describe('Pipeline trigger (v0.6.0)', () => { + const pipelineSteps: readonly PipelineStepRequest[] = [ + { prompt: 'lint code' }, + { prompt: 'run tests' }, + { prompt: 'deploy' }, + ]; + + function createPipelineSchedule(overrides: Partial[0]> = {}): Schedule { + return createSchedule({ + taskTemplate: { prompt: 'Pipeline', workingDirectory: '/tmp' }, + scheduleType: ScheduleType.ONE_TIME, + scheduledAt: Date.now() + 60000, + timezone: 'UTC', + missedRunPolicy: MissedRunPolicy.SKIP, + pipelineSteps, + ...overrides, + }); + } + + async function triggerSchedule(scheduleId: ReturnType): Promise { + await scheduleRepo.update(scheduleId, { nextRunAt: Date.now() - 1000 }); + await eventBus.emit('ScheduleTriggered', { scheduleId, triggeredAt: Date.now() }); + await flushEventLoop(); + } + + it('should create N tasks with linear dependencies for pipeline schedule', async () => { + // Arrange + const delegatedTasks: Task[] = []; + eventBus.subscribe('TaskDelegated', async (e) => { + delegatedTasks.push(e.task); + }); + + const schedule = createPipelineSchedule(); + await saveSchedule({ ...schedule, status: ScheduleStatus.ACTIVE }); + + // Act + await triggerSchedule(schedule.id); + + // Assert: 3 tasks created in the repo + const allTasksResult = await taskRepo.findAll(); + expect(allTasksResult.ok).toBe(true); + if (!allTasksResult.ok) return; + expect(allTasksResult.value).toHaveLength(3); + + // TaskDelegated events are emitted in pipeline order (step 0, 1, 2) + expect(delegatedTasks).toHaveLength(3); + expect(delegatedTasks[0].prompt).toBe('lint code'); + expect(delegatedTasks[1].prompt).toBe('run tests'); + expect(delegatedTasks[2].prompt).toBe('deploy'); + + // Assert: linear dependencies — step[1] depends on step[0], step[2] depends on step[1] + expect(delegatedTasks[0].dependsOn ?? []).toHaveLength(0); + expect(delegatedTasks[1].dependsOn).toContain(delegatedTasks[0].id); + expect(delegatedTasks[2].dependsOn).toContain(delegatedTasks[1].id); + expect(delegatedTasks[2].dependsOn).not.toContain(delegatedTasks[0].id); + + // Assert: pipeline trigger logged + expect(logger.hasLogContaining('Pipeline triggered successfully')).toBe(true); + }); + + it('should emit ScheduleExecuted with lastTaskId for concurrency tracking', async () => { + // Arrange + const twoSteps: readonly PipelineStepRequest[] = [{ prompt: 'build' }, { prompt: 'push' }]; + const schedule = createPipelineSchedule({ pipelineSteps: twoSteps }); + await saveSchedule({ ...schedule, status: ScheduleStatus.ACTIVE }); + + const executedEvents: Array<{ scheduleId: ReturnType; taskId: ReturnType }> = + []; + eventBus.subscribe('ScheduleExecuted', async (e) => { + executedEvents.push({ scheduleId: e.scheduleId, taskId: e.taskId }); + }); + + // Act + await triggerSchedule(schedule.id); + + // Assert: exactly one ScheduleExecuted event + expect(executedEvents).toHaveLength(1); + expect(executedEvents[0].scheduleId).toBe(schedule.id); + + // The taskId in ScheduleExecuted must be the LAST step's task ID + const allTasksResult = await taskRepo.findAll(); + expect(allTasksResult.ok).toBe(true); + if (!allTasksResult.ok) return; + + const allTasks = allTasksResult.value; + expect(allTasks).toHaveLength(2); + + // Last task is the one with no dependents (step 1 = 'push') + const lastTask = allTasks.find((t) => t.prompt === 'push'); + expect(lastTask).toBeDefined(); + expect(executedEvents[0].taskId).toBe(lastTask!.id); + }); + + it('should inject afterScheduleId dependency on step 0', async () => { + // Arrange: create a predecessor schedule and trigger it to create a task + const predecessor = createTestSchedule(); + await saveSchedule(predecessor); + await scheduleRepo.update(predecessor.id, { nextRunAt: Date.now() - 60000 }); + + await eventBus.emit('ScheduleTriggered', { + scheduleId: predecessor.id, + triggeredAt: Date.now(), + }); + await flushEventLoop(); + + // Confirm predecessor task was created and is still QUEUED (non-terminal) + const predecessorTasksResult = await taskRepo.findAll(); + expect(predecessorTasksResult.ok).toBe(true); + if (!predecessorTasksResult.ok) return; + expect(predecessorTasksResult.value).toHaveLength(1); + const predecessorTask = predecessorTasksResult.value[0]; + expect(predecessorTask.status).toBe(TaskStatus.QUEUED); + + // Capture TaskDelegated events for the pipeline + const delegatedTasks: Task[] = []; + eventBus.subscribe('TaskDelegated', async (e) => { + delegatedTasks.push(e.task); + }); + + // Create pipeline schedule chained after predecessor + const twoSteps: readonly PipelineStepRequest[] = [{ prompt: 'step-0' }, { prompt: 'step-1' }]; + const pipelineSchedule = createPipelineSchedule({ + afterScheduleId: predecessor.id, + pipelineSteps: twoSteps, + }); + await saveSchedule({ ...pipelineSchedule, status: ScheduleStatus.ACTIVE }); + + // Act + await triggerSchedule(pipelineSchedule.id); + + // Assert: 2 pipeline tasks created (plus the 1 predecessor task = 3 total) + const allTasksResult = await taskRepo.findAll(); + expect(allTasksResult.ok).toBe(true); + if (!allTasksResult.ok) return; + expect(allTasksResult.value).toHaveLength(3); + + // The 2 pipeline TaskDelegated events carry the dependency info + expect(delegatedTasks).toHaveLength(2); + + const step0 = delegatedTasks.find((t) => t.prompt === 'step-0'); + const step1 = delegatedTasks.find((t) => t.prompt === 'step-1'); + expect(step0).toBeDefined(); + expect(step1).toBeDefined(); + + // Step 0 depends on predecessor task (afterScheduleId injection) + expect(step0!.dependsOn).toContain(predecessorTask.id); + + // Step 1 depends on step 0 + expect(step1!.dependsOn).toContain(step0!.id); + expect(step1!.dependsOn).not.toContain(predecessorTask.id); + }); + + it('should handle partial save failure by cancelling saved tasks', async () => { + // Arrange: 3-step pipeline where the 3rd save will fail + const schedule = createPipelineSchedule(); + await saveSchedule({ ...schedule, status: ScheduleStatus.ACTIVE }); + + let saveCallCount = 0; + const originalSave = taskRepo.save.bind(taskRepo); + const saveSpy = vi.spyOn(taskRepo, 'save').mockImplementation(async (task) => { + saveCallCount++; + if (saveCallCount === 3) { + // Simulate failure on the 3rd task save + const { err: mkErr } = await import('../../../../src/core/result'); + const { BackbeatError, ErrorCode } = await import('../../../../src/core/errors'); + return mkErr(new BackbeatError(ErrorCode.SYSTEM_ERROR, 'Simulated DB failure on step 3')); + } + return originalSave(task); + }); + + // Act + await triggerSchedule(schedule.id); + + saveSpy.mockRestore(); + + // Assert: the 2 tasks that were saved should now be CANCELLED + const allTasksResult = await taskRepo.findAll(); + expect(allTasksResult.ok).toBe(true); + if (!allTasksResult.ok) return; + + const allTasks = allTasksResult.value; + // Only 2 tasks saved (the 3rd failed) + expect(allTasks).toHaveLength(2); + expect(allTasks.every((t) => t.status === TaskStatus.CANCELLED)).toBe(true); + + // Assert: a failed execution was recorded + const historyResult = await scheduleRepo.getExecutionHistory(schedule.id); + expect(historyResult.ok).toBe(true); + if (!historyResult.ok) return; + expect(historyResult.value).toHaveLength(1); + expect(historyResult.value[0].status).toBe('failed'); + }); + + it('should cancel all tasks when TaskDelegated fails for step 0', async () => { + // Arrange: 3-step pipeline where step 0 TaskDelegated emission will fail + const schedule = createPipelineSchedule(); + await saveSchedule({ ...schedule, status: ScheduleStatus.ACTIVE }); + + const originalEmit = eventBus.emit.bind(eventBus); + const emitSpy = vi.spyOn(eventBus, 'emit').mockImplementation(async (event, payload) => { + if (event === 'TaskDelegated') { + // Fail on the FIRST TaskDelegated (step 0) + const { err: mkErr } = await import('../../../../src/core/result'); + const { BackbeatError, ErrorCode } = await import('../../../../src/core/errors'); + return mkErr(new BackbeatError(ErrorCode.SYSTEM_ERROR, 'Simulated emit failure')); + } + return originalEmit(event, payload); + }); + + // Act + await triggerSchedule(schedule.id); + + emitSpy.mockRestore(); + + // Assert: all 3 tasks should be cancelled — step 0 failure orphans the whole pipeline + const allTasksResult = await taskRepo.findAll(); + expect(allTasksResult.ok).toBe(true); + if (!allTasksResult.ok) return; + expect(allTasksResult.value).toHaveLength(3); + expect(allTasksResult.value.every((t) => t.status === TaskStatus.CANCELLED)).toBe(true); + + // Assert: error logged for step 0 + expect(logger.hasLogContaining('Failed to emit TaskDelegated for pipeline step 0')).toBe(true); + }); + + it('should record execution with lastTaskId for afterScheduleId chaining', async () => { + // Arrange: a pipeline with 3 steps + const schedule = createPipelineSchedule(); + await saveSchedule({ ...schedule, status: ScheduleStatus.ACTIVE }); + + // Act + await triggerSchedule(schedule.id); + + // Assert: execution record should point to the LAST task, not the first + const historyResult = await scheduleRepo.getExecutionHistory(schedule.id, 1); + expect(historyResult.ok).toBe(true); + if (!historyResult.ok) return; + expect(historyResult.value).toHaveLength(1); + + const execution = historyResult.value[0]; + + // Get all tasks to identify first and last + const allTasksResult = await taskRepo.findAll(); + expect(allTasksResult.ok).toBe(true); + if (!allTasksResult.ok) return; + expect(allTasksResult.value).toHaveLength(3); + + // Last task is the 'deploy' step (step 2, no dependents) + const lastTask = allTasksResult.value.find((t) => t.prompt === 'deploy'); + const firstTask = allTasksResult.value.find((t) => t.prompt === 'lint code'); + expect(lastTask).toBeDefined(); + expect(firstTask).toBeDefined(); + + // execution.taskId must be lastTaskId (for correct afterScheduleId chaining) + expect(execution.taskId).toBe(lastTask!.id); + expect(execution.taskId).not.toBe(firstTask!.id); + }); + + it('should not double-wrap error message in pipeline failure execution record', async () => { + // Arrange: 2-step pipeline where the 2nd save will fail + const twoSteps: readonly PipelineStepRequest[] = [{ prompt: 'step-a' }, { prompt: 'step-b' }]; + const schedule = createPipelineSchedule({ pipelineSteps: twoSteps }); + await saveSchedule({ ...schedule, status: ScheduleStatus.ACTIVE }); + + let saveCallCount = 0; + const originalSave = taskRepo.save.bind(taskRepo); + const saveSpy = vi.spyOn(taskRepo, 'save').mockImplementation(async (task) => { + saveCallCount++; + if (saveCallCount === 2) { + const { err: mkErr } = await import('../../../../src/core/result'); + const { BackbeatError, ErrorCode } = await import('../../../../src/core/errors'); + return mkErr(new BackbeatError(ErrorCode.SYSTEM_ERROR, 'DB write error')); + } + return originalSave(task); + }); + + // Act + await triggerSchedule(schedule.id); + saveSpy.mockRestore(); + + // Assert: execution error message should NOT have double prefix + const historyResult = await scheduleRepo.getExecutionHistory(schedule.id); + expect(historyResult.ok).toBe(true); + if (!historyResult.ok) return; + expect(historyResult.value).toHaveLength(1); + + const errorMessage = historyResult.value[0].errorMessage; + expect(errorMessage).toBeDefined(); + // Should contain "Pipeline failed at step 2" but NOT "Failed to create task: Pipeline failed" + expect(errorMessage).toContain('Pipeline failed at step 2'); + expect(errorMessage).not.toContain('Failed to create task: Pipeline failed'); + }); + + it('should include prefix in single-task failure execution record', async () => { + // Arrange: single-task schedule where save fails + const schedule = createTestSchedule(); + await saveSchedule(schedule); + await scheduleRepo.update(schedule.id, { nextRunAt: Date.now() - 60000 }); + + const saveSpy = vi.spyOn(taskRepo, 'save').mockImplementation(async () => { + const { err: mkErr } = await import('../../../../src/core/result'); + const { BackbeatError, ErrorCode } = await import('../../../../src/core/errors'); + return mkErr(new BackbeatError(ErrorCode.SYSTEM_ERROR, 'DB write error')); + }); + + // Act + await eventBus.emit('ScheduleTriggered', { scheduleId: schedule.id, triggeredAt: Date.now() }); + await flushEventLoop(); + saveSpy.mockRestore(); + + // Assert: execution error message should include "Failed to create task:" prefix + const historyResult = await scheduleRepo.getExecutionHistory(schedule.id); + expect(historyResult.ok).toBe(true); + if (!historyResult.ok) return; + expect(historyResult.value).toHaveLength(1); + + const errorMessage = historyResult.value[0].errorMessage; + expect(errorMessage).toBeDefined(); + expect(errorMessage).toContain('Failed to create task:'); + }); + + it('should update schedule state after pipeline trigger', async () => { + // Arrange: ONE_TIME pipeline schedule + const schedule = createPipelineSchedule({ + scheduleType: ScheduleType.ONE_TIME, + scheduledAt: Date.now() - 60000, + }); + await saveSchedule({ ...schedule, status: ScheduleStatus.ACTIVE, nextRunAt: Date.now() - 60000 }); + + // Act + await eventBus.emit('ScheduleTriggered', { scheduleId: schedule.id, triggeredAt: Date.now() }); + await flushEventLoop(); + + // Assert: schedule is COMPLETED (ONE_TIME runs once) + const findResult = await scheduleRepo.findById(schedule.id); + expect(findResult.ok).toBe(true); + if (!findResult.ok) return; + + const persisted = findResult.value; + expect(persisted).not.toBeNull(); + expect(persisted!.status).toBe(ScheduleStatus.COMPLETED); + expect(persisted!.runCount).toBe(1); + expect(persisted!.nextRunAt).toBeUndefined(); + }); + }); }); diff --git a/tests/unit/services/schedule-manager.test.ts b/tests/unit/services/schedule-manager.test.ts index 8174ffd..e0e9aa3 100644 --- a/tests/unit/services/schedule-manager.test.ts +++ b/tests/unit/services/schedule-manager.test.ts @@ -5,7 +5,11 @@ */ import { afterEach, beforeEach, describe, expect, it } from 'vitest'; -import type { PipelineCreateRequest, ScheduleCreateRequest } from '../../../src/core/domain'; +import type { + PipelineCreateRequest, + ScheduleCreateRequest, + ScheduledPipelineCreateRequest, +} from '../../../src/core/domain'; import { createSchedule, MissedRunPolicy, @@ -13,6 +17,7 @@ import { ScheduleId, ScheduleStatus, ScheduleType, + TaskId, } from '../../../src/core/domain'; import { Database } from '../../../src/implementations/database'; import { SQLiteScheduleRepository } from '../../../src/implementations/schedule-repository'; @@ -370,6 +375,98 @@ describe('ScheduleManagerService - Unit Tests', () => { expect(events[0].reason).toBe('no longer needed'); }); + it('should emit TaskCancellationRequested for in-flight tasks when cancelTasks=true', async () => { + const schedule = createSchedule({ + taskTemplate: { prompt: 'pipeline step' }, + scheduleType: ScheduleType.CRON, + cronExpression: '0 9 * * *', + }); + await scheduleRepo.save(schedule); + + // Record an execution with pipeline task IDs + const now = Date.now(); + await scheduleRepo.recordExecution({ + scheduleId: schedule.id, + scheduledFor: now, + executedAt: now, + status: 'triggered', + pipelineTaskIds: [TaskId('task-aaa'), TaskId('task-bbb'), TaskId('task-ccc')], + createdAt: now, + }); + + const result = await service.cancelSchedule(schedule.id, 'abort pipeline', true); + + expect(result.ok).toBe(true); + expect(eventBus.hasEmitted('ScheduleCancelled')).toBe(true); + expect(eventBus.hasEmitted('TaskCancellationRequested')).toBe(true); + expect(eventBus.getEventCount('TaskCancellationRequested')).toBe(3); + + const cancelEvents = eventBus.getEmittedEvents('TaskCancellationRequested'); + const cancelledTaskIds = cancelEvents.map((e: { taskId: string }) => e.taskId); + expect(cancelledTaskIds).toContain('task-aaa'); + expect(cancelledTaskIds).toContain('task-bbb'); + expect(cancelledTaskIds).toContain('task-ccc'); + }); + + it('should cancel single taskId when no pipelineTaskIds in execution', async () => { + const schedule = createSchedule({ + taskTemplate: { prompt: 'single task' }, + scheduleType: ScheduleType.CRON, + cronExpression: '0 9 * * *', + }); + await scheduleRepo.save(schedule); + + // Insert a task row to satisfy FK constraint on schedule_executions.task_id + const taskId = TaskId('task-single'); + const now = Date.now(); + db.getDatabase() + .prepare(`INSERT INTO tasks (id, prompt, status, priority, created_at) VALUES (?, ?, ?, ?, ?)`) + .run(taskId, 'single task', 'running', 'P2', now); + + // Record an execution with only a single taskId (non-pipeline schedule) + await scheduleRepo.recordExecution({ + scheduleId: schedule.id, + scheduledFor: now, + executedAt: now, + status: 'triggered', + taskId, + createdAt: now, + }); + + const result = await service.cancelSchedule(schedule.id, 'stop it', true); + + expect(result.ok).toBe(true); + expect(eventBus.getEventCount('TaskCancellationRequested')).toBe(1); + + const cancelEvents = eventBus.getEmittedEvents('TaskCancellationRequested'); + expect(cancelEvents[0].taskId).toBe('task-single'); + }); + + it('should not emit TaskCancellationRequested when cancelTasks is false', async () => { + const schedule = createSchedule({ + taskTemplate: { prompt: 'test' }, + scheduleType: ScheduleType.CRON, + cronExpression: '0 9 * * *', + }); + await scheduleRepo.save(schedule); + + const now = Date.now(); + await scheduleRepo.recordExecution({ + scheduleId: schedule.id, + scheduledFor: now, + executedAt: now, + status: 'triggered', + pipelineTaskIds: [TaskId('task-x')], + createdAt: now, + }); + + const result = await service.cancelSchedule(schedule.id, 'normal cancel'); + + expect(result.ok).toBe(true); + expect(eventBus.hasEmitted('ScheduleCancelled')).toBe(true); + expect(eventBus.hasEmitted('TaskCancellationRequested')).toBe(false); + }); + it('should return error for non-existent schedule', async () => { const result = await service.cancelSchedule(ScheduleId('non-existent')); @@ -594,4 +691,119 @@ describe('ScheduleManagerService - Unit Tests', () => { expect(eventBus.getEventCount('ScheduleCreated')).toBe(3); }); }); + + describe('createScheduledPipeline()', () => { + function scheduledPipelineRequest( + overrides: Partial = {}, + ): ScheduledPipelineCreateRequest { + return { + steps: [{ prompt: 'Step one' }, { prompt: 'Step two' }, { prompt: 'Step three' }], + scheduleType: ScheduleType.CRON, + cronExpression: '0 9 * * *', + timezone: 'UTC', + ...overrides, + }; + } + + it('should create a scheduled pipeline with cron', async () => { + const request = scheduledPipelineRequest(); + + const result = await service.createScheduledPipeline(request); + + expect(result.ok).toBe(true); + if (!result.ok) return; + + const schedule = result.value; + expect(schedule.scheduleType).toBe(ScheduleType.CRON); + expect(schedule.cronExpression).toBe('0 9 * * *'); + expect(schedule.pipelineSteps).toBeDefined(); + expect(schedule.pipelineSteps).toHaveLength(3); + expect(schedule.taskTemplate.prompt).toContain('Pipeline (3 steps)'); + expect(schedule.taskTemplate.prompt).toContain('Step one'); + expect(schedule.status).toBe(ScheduleStatus.ACTIVE); + }); + + it('should create a scheduled pipeline with one_time', async () => { + const futureDate = new Date(Date.now() + 3600000); // 1 hour from now + const request = scheduledPipelineRequest({ + steps: [{ prompt: 'First step' }, { prompt: 'Second step' }], + scheduleType: ScheduleType.ONE_TIME, + cronExpression: undefined, + scheduledAt: futureDate.toISOString(), + }); + + const result = await service.createScheduledPipeline(request); + + expect(result.ok).toBe(true); + if (!result.ok) return; + + const schedule = result.value; + expect(schedule.scheduleType).toBe(ScheduleType.ONE_TIME); + expect(schedule.scheduledAt).toBeDefined(); + expect(schedule.pipelineSteps).toHaveLength(2); + }); + + it('should reject fewer than 2 steps', async () => { + const request = scheduledPipelineRequest({ + steps: [{ prompt: 'Only one' }], + }); + + const result = await service.createScheduledPipeline(request); + + expect(result.ok).toBe(false); + if (result.ok) return; + expect(result.error.message).toContain('at least 2 steps'); + }); + + it('should reject more than 20 steps', async () => { + const steps = Array.from({ length: 21 }, (_, i) => ({ prompt: `Step ${i + 1}` })); + const request = scheduledPipelineRequest({ steps }); + + const result = await service.createScheduledPipeline(request); + + expect(result.ok).toBe(false); + if (result.ok) return; + expect(result.error.message).toContain('exceed 20 steps'); + }); + + it('should store normalized paths for per-step workingDirectory', async () => { + const cwd = process.cwd(); + // Path with /../ segment that resolves to cwd + const unnormalizedPath = `${cwd}/src/../src`; + const normalizedPath = `${cwd}/src`; + + const request = scheduledPipelineRequest({ + steps: [{ prompt: 'Step one', workingDirectory: unnormalizedPath }, { prompt: 'Step two' }], + }); + + const result = await service.createScheduledPipeline(request); + + expect(result.ok).toBe(true); + if (!result.ok) return; + + // The schedule's pipelineSteps should contain the NORMALIZED path + expect(result.value.pipelineSteps).toBeDefined(); + expect(result.value.pipelineSteps![0].workingDirectory).toBe(normalizedPath); + // Step without workingDirectory should remain undefined + expect(result.value.pipelineSteps![1].workingDirectory).toBeUndefined(); + }); + + it('should emit ScheduleCreated event with pipelineSteps', async () => { + const request = scheduledPipelineRequest(); + + const result = await service.createScheduledPipeline(request); + + expect(result.ok).toBe(true); + expect(eventBus.hasEmitted('ScheduleCreated')).toBe(true); + expect(eventBus.getEventCount('ScheduleCreated')).toBe(1); + + const events = eventBus.getEmittedEvents('ScheduleCreated'); + const emittedSchedule = events[0].schedule; + expect(emittedSchedule.pipelineSteps).toBeDefined(); + expect(emittedSchedule.pipelineSteps).toHaveLength(3); + expect(emittedSchedule.pipelineSteps![0].prompt).toBe('Step one'); + expect(emittedSchedule.pipelineSteps![1].prompt).toBe('Step two'); + expect(emittedSchedule.pipelineSteps![2].prompt).toBe('Step three'); + }); + }); });