diff --git a/README.md b/README.md index 2721c95a..182accc7 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ [![Build Status](https://github.com/conductor-oss/javascript-sdk/actions/workflows/pull_request.yml/badge.svg)](https://github.com/conductor-oss/javascript-sdk/actions/workflows/pull_request.yml) -A comprehensive TypeScript/JavaScript client for [Conductor OSS](https://github.com/conductor-oss/conductor), enabling developers to build, orchestrate, and monitor distributed workflows with ease. +A comprehensive TypeScript/JavaScript SDK for [Conductor OSS](https://github.com/conductor-oss/conductor), enabling developers to build, orchestrate, and monitor distributed workflows with ease. [Conductor](https://www.conductor-oss.org/) is the leading open-source orchestration platform allowing developers to build highly scalable distributed applications. @@ -40,11 +40,16 @@ Show support for the Conductor OSS. Please help spread the awareness by starrin - [Step 4: Manage and Monitor Execution](#step-4-manage-and-monitor-execution) - [Use TaskClient to Monitor and Debug Tasks](#use-taskclient-to-monitor-and-debug-tasks) - [Workers](#workers) - - [The TaskManager](#the-taskmanager) - - [Quick Start: Building a Worker](#quick-start-building-a-worker) - - [Step 1: Define the Worker's Logic](#step-1-define-the-workers-logic) - - [Step 2: Handle Task Outcomes and Errors](#step-2-handle-task-outcomes-and-errors) - - [Step 3: Run the Worker with TaskManager](#step-3-run-the-worker-with-taskmanager) + - [SDK-Style Worker Registration (Recommended)](#sdk-style-worker-registration-recommended) + - [Using the @worker Decorator](#using-the-worker-decorator) + - [Worker Configuration Options](#worker-configuration-options) + - [Environment Variable Configuration](#environment-variable-configuration) + - [Event Listeners for Observability](#event-listeners-for-observability) + - [NonRetryableException for Terminal Failures](#nonretryableexception-for-terminal-failures) + - [Module Imports for Side-Effect Registration](#module-imports-for-side-effect-registration) + - [Legacy TaskManager API](#legacy-taskmanager-api) + - [The TaskManager](#the-taskmanager) + - [Quick Start: Building a Worker](#quick-start-building-a-worker) - [Worker Design Principles](#worker-design-principles) - [Scheduling](#scheduling) - [The SchedulerClient](#the-schedulerclient) @@ -110,7 +115,6 @@ Here's a simple example to get you started: import { orkesConductorClient, WorkflowExecutor, - TaskManager, simpleTask, workflow } from "@io-orkes/conductor-javascript"; @@ -447,17 +451,236 @@ Workers are background processes that execute tasks in your workflows. Think of Workflow → Creates Tasks → Workers Poll for Tasks → Execute Logic → Return Results → Workflow Continues ``` -The `TaskManager` class in this SDK simplifies the process of creating and managing workers. +### SDK-Style Worker Registration (Recommended) -### The TaskManager +The SDK supports decorator-based worker registration. This provides a modern, declarative approach to defining workers with auto-discovery, type safety, and less boilerplate. -The `TaskManager` is the primary tool for managing workers. It handles polling, task execution, and result reporting, allowing you to run multiple workers concurrently. For a complete method reference, see the [TaskManager API Reference](docs/api-reference/task-manager.md). +**Key Benefits:** +- **Auto-discovery**: No need to manually register workers +- **Declarative**: Configuration is co-located with worker logic +- **Type-safe**: Full TypeScript support +- **Cleaner code**: Less boilerplate than the legacy API -### Quick Start: Building a Worker +> 💡 **New to Conductor?** Start here! The decorator-based approach is simpler and more maintainable than the legacy `TaskManager` API. -Building a robust worker involves defining its logic, handling outcomes, and managing its execution. +#### Using the @worker Decorator -#### Step 1: Define the Worker's Logic +Define workers using the `@worker` decorator for cleaner, more maintainable code: + +```typescript +import { worker, TaskHandler, Task } from "@io-orkes/conductor-javascript"; + +// Define a worker with the @worker decorator +@worker({ taskDefName: "send_email", concurrency: 10, pollInterval: 100 }) +async function sendEmail(task: Task) { + const { to, subject, body } = task.inputData; + await emailService.send(to, subject, body); + + return { + status: "COMPLETED", + outputData: { sent: true, timestamp: new Date().toISOString() } + }; +} + +@worker({ taskDefName: "process_payment", domain: "payments", concurrency: 5 }) +async function processPayment(task: Task) { + const { amount, customerId } = task.inputData; + const result = await paymentGateway.charge(customerId, amount); + + return { + status: "COMPLETED", + outputData: { transactionId: result.id } + }; +} + +// Auto-discover and start all decorated workers +const handler = new TaskHandler({ + client, + scanForDecorated: true, // Automatically finds @worker decorated functions +}); + +handler.startWorkers(); +console.log(`Started ${handler.runningWorkerCount} workers`); + +// Graceful shutdown +process.on("SIGTERM", async () => { + await handler.stopWorkers(); + process.exit(0); +}); +``` + +#### Worker Configuration Options + +The `@worker` decorator supports comprehensive configuration: + +```typescript +@worker({ + taskDefName: "my_task", // Required: task name + concurrency: 5, // Max concurrent tasks (default: 1) + pollInterval: 100, // Polling interval in ms (default: 100) + domain: "production", // Task domain for multi-tenancy + workerId: "worker-123", // Unique worker identifier + pollTimeout: 100, // Server-side long poll timeout +}) +async function myTask(task: Task) { + // Your logic here +} +``` + +#### Environment Variable Configuration + +Override worker configuration using environment variables without code changes: + +```bash +# Global configuration (applies to all workers) +export CONDUCTOR_WORKER_ALL_POLL_INTERVAL=500 +export CONDUCTOR_WORKER_ALL_CONCURRENCY=10 + +# Worker-specific configuration (overrides global) +export CONDUCTOR_WORKER_SEND_EMAIL_CONCURRENCY=20 +export CONDUCTOR_WORKER_PROCESS_PAYMENT_DOMAIN=payments +``` + +**Configuration Hierarchy** (highest to lowest priority): +1. Worker-specific environment variables +2. Global environment variables +3. Code-level decorator parameters +4. System defaults + +**Supported Environment Variable Formats:** +- `CONDUCTOR_WORKER__` - Worker-specific (uppercase) +- `conductor.worker..` - Worker-specific (dotted) +- `CONDUCTOR_WORKER_ALL_` - Global (uppercase) +- `conductor.worker.all.` - Global (dotted) + +#### Event Listeners for Observability + +Monitor worker lifecycle events for metrics, logging, and debugging: + +```typescript +import { TaskHandler, TaskRunnerEventsListener } from "@io-orkes/conductor-javascript"; + +const metricsListener: TaskRunnerEventsListener = { + onTaskExecutionCompleted(event) { + metrics.histogram("task_duration_ms", event.durationMs, { + task_type: event.taskType, + }); + }, + + onTaskExecutionFailure(event) { + logger.error(`Task ${event.taskId} failed:`, event.cause); + metrics.counter("task_failures", 1, { + task_type: event.taskType, + }); + }, + + onTaskUpdateFailure(event) { + // CRITICAL: Task result was lost after all retries + alertOps({ + severity: "CRITICAL", + message: `Task update failed after ${event.retryCount} retries`, + taskId: event.taskId, + }); + }, +}; + +const handler = new TaskHandler({ + client, + eventListeners: [metricsListener], +}); +``` + +**Available Events:** +- `onPollStarted` - Polling begins +- `onPollCompleted` - Polling succeeds +- `onPollFailure` - Polling fails +- `onTaskExecutionStarted` - Task execution begins +- `onTaskExecutionCompleted` - Task execution succeeds +- `onTaskExecutionFailure` - Task execution fails +- `onTaskUpdateFailure` - Task update fails after all retries (CRITICAL) + +#### NonRetryableException for Terminal Failures + +Mark failures as terminal to prevent unnecessary retries: + +```typescript +import { worker, NonRetryableException } from "@io-orkes/conductor-javascript"; + +@worker({ taskDefName: "validate_order" }) +async function validateOrder(task: Task) { + const order = await getOrder(task.inputData.orderId); + + if (!order) { + // Order doesn't exist - retry won't help + throw new NonRetryableException(`Order ${task.inputData.orderId} not found`); + } + + if (order.status === "CANCELLED") { + // Business rule violation - retry won't help + throw new NonRetryableException("Cannot process cancelled order"); + } + + // Regular errors will be retried + if (order.amount > MAX_AMOUNT) { + throw new Error("Amount exceeds limit"); // Will retry + } + + return { status: "COMPLETED", outputData: { validated: true } }; +} +``` + +**Error Handling:** +- `throw new Error()` → Task status: `FAILED` (will retry with exponential backoff) +- `throw new NonRetryableException()` → Task status: `FAILED_WITH_TERMINAL_ERROR` (no retry) + +#### Module Imports for Side-Effect Registration + +Organize workers across multiple files: + +```typescript +// workers/orderWorkers.ts +import { worker } from "@io-orkes/conductor-javascript"; + +@worker({ taskDefName: "validate_order" }) +export async function validateOrder(task) { /* ... */ } + +@worker({ taskDefName: "fulfill_order" }) +export async function fulfillOrder(task) { /* ... */ } + +// workers/paymentWorkers.ts +@worker({ taskDefName: "process_payment" }) +export async function processPayment(task) { /* ... */ } + +// main.ts +import { TaskHandler } from "@io-orkes/conductor-javascript"; + +// Use TaskHandler.create() for async module imports +const handler = await TaskHandler.create({ + client, + importModules: [ + "./workers/orderWorkers", + "./workers/paymentWorkers", + ], +}); + +handler.startWorkers(); // Auto-discovers all workers from imported modules +``` + +### Legacy TaskManager API + +The legacy `TaskManager` API continues to work with full backward compatibility. However, **new projects should use the decorator-based approach above** for better maintainability and cleaner code. + +Both APIs can coexist in the same application, allowing gradual migration from legacy to decorator-based workers. + +#### The TaskManager + +The `TaskManager` is the legacy tool for managing workers. It handles polling, task execution, and result reporting, allowing you to run multiple workers concurrently. For a complete method reference, see the [TaskManager API Reference](docs/api-reference/task-manager.md). + +#### Quick Start: Building a Worker + +Building a worker with the legacy API involves defining its logic, handling outcomes, and managing its execution. + +##### Step 1: Define the Worker's Logic A worker is an object that defines a `taskDefName` (which must match the task name in your workflow) and an `execute` function containing your business logic. @@ -484,7 +707,7 @@ const emailWorker: ConductorWorker = { }; ``` -#### Step 2: Handle Task Outcomes and Errors +##### Step 2: Handle Task Outcomes and Errors The `execute` function must return an object indicating the task's outcome. @@ -512,7 +735,7 @@ try { } ``` -#### Step 3: Run the Worker with TaskManager +##### Step 3: Run the Worker with TaskManager The `TaskManager` is responsible for polling Conductor, managing task execution, and reporting back results. You can run a single worker or multiple workers with one manager. @@ -539,7 +762,7 @@ For a complete method reference, see the [TaskManager API Reference](docs/api-re ### Worker Design Principles -When designing workers, it's best to follow these principles: +When designing workers (with either API), it's best to follow these principles: - **Stateless**: Workers should not rely on local state. - **Idempotent**: The same task input should always produce the same result. diff --git a/jest.config.mjs b/jest.config.mjs index f98047e0..d8703a14 100644 --- a/jest.config.mjs +++ b/jest.config.mjs @@ -9,9 +9,19 @@ export default { "!src/**/*.d.ts", "!src/**/generated/**", "!src/**/spec/**", + "!src/**/*.test.{ts,tsx}", + "!src/**/index.ts", + "!src/**/types.ts", + "!src/**/*.types.ts", + "!src/**/exceptions/**", ], coverageReporters: ["text", "lcov", "cobertura"], transformIgnorePatterns: ["/node_modules/", "\\.pnp\\.[^\\/]+$"], + moduleNameMapper: { + "^@/(.*)$": "/src/$1", + "^@open-api/(.*)$": "/src/open-api/$1", + "^@test-utils/(.*)$": "/src/integration-tests/utils/$1", + }, transform: { "^.+\\.tsx?$": [ "ts-jest", diff --git a/package-lock.json b/package-lock.json index c3fe6aab..42f51a70 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@io-orkes/conductor-javascript", - "version": "v2.4.0", + "version": "v3.0.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@io-orkes/conductor-javascript", - "version": "v2.4.0", + "version": "v3.0.0", "license": "Apache-2.0", "devDependencies": { "@eslint/js": "^9.34.0", diff --git a/package.json b/package.json index bd2616c2..3121f166 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "@io-orkes/conductor-javascript", - "description": "Typescript Client for Netflix Conductor", - "version": "v0.0.0", + "description": "Typescript SDK for Netflix Conductor", + "version": "v3.0.0", "private": false, "homepage": "https://orkes.io", "repository": { diff --git a/src/integration-tests/WorkerRegistration.test.ts b/src/integration-tests/WorkerRegistration.test.ts new file mode 100644 index 00000000..3f3510bb --- /dev/null +++ b/src/integration-tests/WorkerRegistration.test.ts @@ -0,0 +1,648 @@ +import { afterEach, beforeAll, describe, expect, test } from "@jest/globals"; +import type { Task } from "../open-api"; +import { + NonRetryableException, + TaskHandler, + WorkflowExecutor, + clearWorkerRegistry, + getRegisteredWorkers, + orkesConductorClient, + simpleTask, + worker +} from "../sdk"; +import type { + PollCompleted, + PollStarted, + TaskExecutionCompleted, + TaskExecutionStarted, +} from "../sdk/clients/worker/events/types"; +import { waitForWorkflowStatus } from "./utils/waitForWorkflowStatus"; +import { executeWorkflowWithRetry } from "./utils/executeWorkflowWithRetry"; + +describe("SDK Worker Registration", () => { + const clientPromise = orkesConductorClient(); + let executor: WorkflowExecutor; + + beforeAll(async () => { + const client = await clientPromise; + executor = new WorkflowExecutor(client); + }); + + afterEach(() => { + // Clean up worker registry after each test to prevent conflicts + clearWorkerRegistry(); + }); + + test("worker() function registers workers in global registry", async () => { + const taskName = `sdk_test_basic_worker_${Date.now()}`; + + worker({ taskDefName: taskName })( + async function basicWorker() { + return { + status: "COMPLETED" as const, + outputData: { message: "Worker registered successfully" }, + }; + } + ); + + // Verify worker is registered + const registeredWorkers = getRegisteredWorkers(); + expect(registeredWorkers.length).toBe(1); + expect(registeredWorkers[0]?.taskDefName).toBe(taskName); + }); + + test("TaskHandler auto-discovers and executes decorated workers", async () => { + const client = await clientPromise; + const taskName = `sdk_test_auto_discover_${Date.now()}`; + const workflowName = `sdk_test_auto_discover_wf_${Date.now()}`; + + let workerExecuted = false; + + worker({ taskDefName: taskName, pollInterval: 100, concurrency: 1 })( + async function autoDiscoverWorker(task: Task) { + workerExecuted = true; + return { + status: "COMPLETED" as const, + outputData: { + message: "Auto-discovered worker executed", + inputReceived: task.inputData, + }, + }; + } + ); + + // Create TaskHandler with auto-discovery + const handler = new TaskHandler({ + client, + scanForDecorated: true, + }); + + expect(handler.workerCount).toBe(1); + expect(handler.running).toBe(false); + + // Start workers BEFORE registering workflow + handler.startWorkers(); + + // Wait a bit for workers to start polling + await new Promise(resolve => setTimeout(resolve, 100)); + + // Register workflow - pass workflow input to task + await executor.registerWorkflow(true, { + name: workflowName, + version: 1, + ownerEmail: "developers@orkes.io", + tasks: [simpleTask(taskName, taskName, { + testData: "${workflow.input.testData}", + })], + inputParameters: ["testData"], + outputParameters: {}, + timeoutSeconds: 0, + }); + expect(handler.running).toBe(true); + expect(handler.runningWorkerCount).toBe(1); + + // Execute workflow with retry on transient failures + const { workflowId } = await executeWorkflowWithRetry( + executor, + { + name: workflowName, + version: 1, + input: { testData: "hello" }, + }, + workflowName, + 1, + `${workflowName}-id` + ); + + if (!workflowId) { + throw new Error("Workflow ID is undefined"); + } + + // Wait for workflow completion + const workflowStatus = await waitForWorkflowStatus( + executor, + workflowId, + "COMPLETED", + 60000 + ); + + expect(workflowStatus.status).toBe("COMPLETED"); + expect(workerExecuted).toBe(true); + + const [firstTask] = workflowStatus.tasks || []; + expect(firstTask?.taskType).toBe(taskName); + expect(firstTask?.status).toBe("COMPLETED"); + expect(firstTask?.outputData?.message).toBe("Auto-discovered worker executed"); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + expect((firstTask?.outputData?.inputReceived as any)?.testData).toBe("hello"); + + // Stop workers + await handler.stopWorkers(); + expect(handler.running).toBe(false); + expect(handler.runningWorkerCount).toBe(0); + }, 90000); + + test("worker with concurrency processes multiple tasks", async () => { + const client = await clientPromise; + const taskName = `sdk_test_concurrency_${Date.now()}`; + const workflowName = `sdk_test_concurrency_wf_${Date.now()}`; + + let executionCount = 0; + const executionTimes: number[] = []; + + worker({ taskDefName: taskName, concurrency: 3, pollInterval: 100 })( + async function concurrentWorker(task: Task) { + const startTime = Date.now(); + executionCount++; + + // Simulate some work + await new Promise(resolve => setTimeout(resolve, 100)); + + executionTimes.push(Date.now() - startTime); + + return { + status: "COMPLETED" as const, + outputData: { + executionNumber: executionCount, + taskId: task.taskId, + }, + }; + } + ); + + const handler = new TaskHandler({ + client, + scanForDecorated: true, + }); + + // Start workers BEFORE registering workflow + handler.startWorkers(); + + // Wait a bit for workers to start polling + await new Promise(resolve => setTimeout(resolve, 100)); + + // Register workflow with multiple tasks + await executor.registerWorkflow(true, { + name: workflowName, + version: 1, + ownerEmail: "developers@orkes.io", + tasks: [ + simpleTask(`${taskName}_1`, taskName, {}), + simpleTask(`${taskName}_2`, taskName, {}), + simpleTask(`${taskName}_3`, taskName, {}), + ], + inputParameters: [], + outputParameters: {}, + timeoutSeconds: 0, + }); + + // Execute workflow with retry on transient failures + const { workflowId } = await executeWorkflowWithRetry( + executor, + { + name: workflowName, + version: 1, + }, + workflowName, + 1, + `${workflowName}-id` + ); + + if (!workflowId) { + throw new Error("Workflow ID is undefined"); + } + + const workflowStatus = await waitForWorkflowStatus( + executor, + workflowId, + "COMPLETED", + 60000 + ); + + expect(workflowStatus.status).toBe("COMPLETED"); + expect(executionCount).toBe(3); + + await handler.stopWorkers(); + }, 90000); + + test("worker with domain isolation", async () => { + const client = await clientPromise; + const taskName = `sdk_test_domain_${Date.now()}`; + const domain = "test_domain"; + + worker({ taskDefName: taskName, domain, pollInterval: 100 })( + async function domainWorker() { + return { + status: "COMPLETED" as const, + outputData: { domain: "processed_in_domain" }, + }; + } + ); + + const handler = new TaskHandler({ + client, + scanForDecorated: true, + }); + + expect(handler.workerCount).toBe(1); + + const registeredWorkers = getRegisteredWorkers(); + expect(registeredWorkers[0]?.domain).toBe(domain); + + // Start workers and verify they start properly + handler.startWorkers(); + expect(handler.running).toBe(true); + + // Wait a bit for workers to initialize + await new Promise(resolve => setTimeout(resolve, 100)); + + await handler.stopWorkers(); + expect(handler.running).toBe(false); + }); + + test("NonRetryableException marks task as terminal failure", async () => { + const client = await clientPromise; + const taskName = `sdk_test_non_retryable_${Date.now()}`; + const workflowName = `sdk_test_non_retryable_wf_${Date.now()}`; + + worker({ taskDefName: taskName, pollInterval: 100, concurrency: 1 })( + async function nonRetryableWorker(task: Task) { + const shouldFail = task.inputData?.shouldFail; + + if (shouldFail === "terminal") { + throw new NonRetryableException("Order not found - terminal error"); + } + + return { + status: "COMPLETED" as const, + outputData: { message: "Success" }, + }; + } + ); + + const handler = new TaskHandler({ + client, + scanForDecorated: true, + }); + + // Start workers BEFORE registering workflow (important!) + handler.startWorkers(); + + // Wait a bit for workers to start polling + await new Promise(resolve => setTimeout(resolve, 100)); + + // Register workflow with input parameter + await executor.registerWorkflow(true, { + name: workflowName, + version: 1, + ownerEmail: "developers@orkes.io", + tasks: [simpleTask(taskName, taskName, { + shouldFail: "${workflow.input.shouldFail}", + })], + inputParameters: ["shouldFail"], + outputParameters: {}, + timeoutSeconds: 0, + }); + + // Execute workflow with shouldFail flag and retry on transient failures + const { workflowId } = await executeWorkflowWithRetry( + executor, + { + name: workflowName, + version: 1, + input: { shouldFail: "terminal" }, + }, + workflowName, + 1, + `${workflowName}-id` + ); + + if (!workflowId) { + throw new Error("Workflow ID is undefined"); + } + + // Wait for workflow to fail + const workflowStatus = await waitForWorkflowStatus( + executor, + workflowId, + "FAILED", + 60000 + ); + + expect(workflowStatus.status).toBe("FAILED"); + + const [firstTask] = workflowStatus.tasks || []; + expect(firstTask?.status).toBe("FAILED_WITH_TERMINAL_ERROR"); + expect(firstTask?.reasonForIncompletion).toContain("Order not found - terminal error"); + + await handler.stopWorkers(); + }, 60000); + + test("event listeners receive lifecycle events", async () => { + const client = await clientPromise; + const taskName = `sdk_test_events_${Date.now()}`; + const workflowName = `sdk_test_events_wf_${Date.now()}`; + + const events: string[] = []; + + worker({ taskDefName: taskName, pollInterval: 100 })( + async function eventWorker() { + return { + status: "COMPLETED" as const, + outputData: { message: "Event test" }, + }; + } + ); + + // Define event listener + const eventListener = { + onPollStarted(event: PollStarted) { + events.push(`poll_started:${event.workerId}`); + }, + onPollCompleted(event: PollCompleted) { + events.push(`poll_completed:${event.tasksReceived}`); + }, + onTaskExecutionStarted(event: TaskExecutionStarted) { + events.push(`task_started:${event.taskType}`); + }, + onTaskExecutionCompleted(event: TaskExecutionCompleted) { + events.push(`task_completed:${event.taskType}`); + }, + }; + + const handler = new TaskHandler({ + client, + scanForDecorated: true, + eventListeners: [eventListener], + }); + + // Start workers BEFORE registering workflow + handler.startWorkers(); + + // Wait a bit for workers to start polling + await new Promise(resolve => setTimeout(resolve, 100)); + + // Register workflow + await executor.registerWorkflow(true, { + name: workflowName, + version: 1, + ownerEmail: "developers@orkes.io", + tasks: [simpleTask(taskName, taskName, {})], + inputParameters: [], + outputParameters: {}, + timeoutSeconds: 0, + }); + + // Execute workflow with retry on transient failures + const { workflowId } = await executeWorkflowWithRetry( + executor, + { + name: workflowName, + version: 1, + }, + workflowName, + 1, + `${workflowName}-id` + ); + + if (!workflowId) { + throw new Error("Workflow ID is undefined"); + } + + await waitForWorkflowStatus(executor, workflowId, "COMPLETED", 60000); + + // Verify events were captured + expect(events.length).toBeGreaterThan(0); + expect(events.some(e => e.startsWith("poll_started"))).toBe(true); + expect(events.some(e => e.startsWith("task_started"))).toBe(true); + expect(events.some(e => e.startsWith("task_completed"))).toBe(true); + + await handler.stopWorkers(); + }, 90000); + + test("multiple workers can be registered and executed", async () => { + const client = await clientPromise; + const taskName1 = `sdk_test_multi_1_${Date.now()}`; + const taskName2 = `sdk_test_multi_2_${Date.now()}`; + const workflowName = `sdk_test_multi_wf_${Date.now()}`; + + let worker1Executed = false; + let worker2Executed = false; + + worker({ taskDefName: taskName1, pollInterval: 100 })( + async function worker1() { + worker1Executed = true; + return { + status: "COMPLETED" as const, + outputData: { worker: "worker1" }, + }; + } + ); + + worker({ taskDefName: taskName2, pollInterval: 100 })( + async function worker2() { + worker2Executed = true; + return { + status: "COMPLETED" as const, + outputData: { worker: "worker2" }, + }; + } + ); + + const handler = new TaskHandler({ + client, + scanForDecorated: true, + }); + + expect(handler.workerCount).toBe(2); + + // Start workers BEFORE registering workflow + handler.startWorkers(); + + // Wait a bit for workers to start polling + await new Promise(resolve => setTimeout(resolve, 100)); + + // Register workflow with both tasks + await executor.registerWorkflow(true, { + name: workflowName, + version: 1, + ownerEmail: "developers@orkes.io", + tasks: [ + simpleTask(`${taskName1}_ref`, taskName1, {}), + simpleTask(`${taskName2}_ref`, taskName2, {}), + ], + inputParameters: [], + outputParameters: {}, + timeoutSeconds: 0, + }); + expect(handler.runningWorkerCount).toBe(2); + + // Execute workflow with retry on transient failures + const { workflowId } = await executeWorkflowWithRetry( + executor, + { + name: workflowName, + version: 1, + }, + workflowName, + 1, + `${workflowName}-id` + ); + + if (!workflowId) { + throw new Error("Workflow ID is undefined"); + } + + const workflowStatus = await waitForWorkflowStatus( + executor, + workflowId, + "COMPLETED", + 60000 + ); + + expect(workflowStatus.status).toBe("COMPLETED"); + expect(worker1Executed).toBe(true); + expect(worker2Executed).toBe(true); + + await handler.stopWorkers(); + }, 90000); + + test("TaskHandler lifecycle - start and stop multiple times", async () => { + const client = await clientPromise; + const taskName = `sdk_test_lifecycle_${Date.now()}`; + + worker({ taskDefName: taskName, pollInterval: 100 })( + async function lifecycleWorker() { + return { + status: "COMPLETED" as const, + outputData: { message: "Lifecycle test" }, + }; + } + ); + + const handler = new TaskHandler({ + client, + scanForDecorated: true, + }); + + // Initial state + expect(handler.running).toBe(false); + expect(handler.runningWorkerCount).toBe(0); + + // Start workers + handler.startWorkers(); + expect(handler.running).toBe(true); + expect(handler.runningWorkerCount).toBe(1); + + // Starting again should be idempotent + handler.startWorkers(); + expect(handler.running).toBe(true); + expect(handler.runningWorkerCount).toBe(1); + + // Stop workers + await handler.stopWorkers(); + expect(handler.running).toBe(false); + expect(handler.runningWorkerCount).toBe(0); + + // Stopping again should be idempotent + await handler.stopWorkers(); + expect(handler.running).toBe(false); + expect(handler.runningWorkerCount).toBe(0); + + // Can restart after stopping + handler.startWorkers(); + expect(handler.running).toBe(true); + expect(handler.runningWorkerCount).toBe(1); + + await handler.stopWorkers(); + }); + + test("manual workers can be added alongside decorated workers", async () => { + const client = await clientPromise; + const decoratedTaskName = `sdk_test_decorated_${Date.now()}`; + const manualTaskName = `sdk_test_manual_${Date.now()}`; + + worker({ taskDefName: decoratedTaskName, pollInterval: 100 })( + async function decoratedWorker() { + return { + status: "COMPLETED" as const, + outputData: { type: "decorated" }, + }; + } + ); + + // Create handler with manual worker + const handler = new TaskHandler({ + client, + scanForDecorated: true, + workers: [ + { + taskDefName: manualTaskName, + execute: async () => { + return { + status: "COMPLETED" as const, + outputData: { type: "manual" }, + }; + }, + }, + ], + }); + + expect(handler.workerCount).toBe(2); + + handler.startWorkers(); + expect(handler.runningWorkerCount).toBe(2); + + await handler.stopWorkers(); + }); + + test("worker with custom configuration options", async () => { + const taskName = `sdk_test_custom_config_${Date.now()}`; + + worker({ + taskDefName: taskName, + concurrency: 5, + pollInterval: 200, + domain: "custom_domain", + })( + async function customConfigWorker() { + return { + status: "COMPLETED" as const, + outputData: { message: "Custom config test" }, + }; + } + ); + + const registeredWorkers = getRegisteredWorkers(); + expect(registeredWorkers.length).toBe(1); + + const workerConfig = registeredWorkers[0]; + expect(workerConfig?.taskDefName).toBe(taskName); + expect(workerConfig?.concurrency).toBe(5); + expect(workerConfig?.pollInterval).toBe(200); + expect(workerConfig?.domain).toBe("custom_domain"); + }); + + test("clearWorkerRegistry removes all registered workers", () => { + const taskName1 = `sdk_test_clear_1_${Date.now()}`; + const taskName2 = `sdk_test_clear_2_${Date.now()}`; + + worker({ taskDefName: taskName1 })( + async function worker1() { + return { status: "COMPLETED" as const, outputData: {} }; + } + ); + + worker({ taskDefName: taskName2 })( + async function worker2() { + return { status: "COMPLETED" as const, outputData: {} }; + } + ); + + expect(getRegisteredWorkers().length).toBe(2); + + clearWorkerRegistry(); + + expect(getRegisteredWorkers().length).toBe(0); + }); +}); diff --git a/src/integration-tests/utils/executeWorkflowWithRetry.ts b/src/integration-tests/utils/executeWorkflowWithRetry.ts new file mode 100644 index 00000000..c4ca2e9c --- /dev/null +++ b/src/integration-tests/utils/executeWorkflowWithRetry.ts @@ -0,0 +1,61 @@ +import type { WorkflowExecutor } from "../../sdk/clients/workflow/WorkflowExecutor"; +import type { StartWorkflowRequest } from "../../open-api"; + +/** + * Execute a workflow with automatic retry on transient failures. + * Useful in CI/CD environments where the Conductor server might be under load. + * + * @param executor - The WorkflowExecutor instance + * @param request - The workflow execution request + * @param workflowName - Workflow name + * @param version - Workflow version + * @param correlationId - Optional correlation ID + * @param maxRetries - Maximum number of retry attempts (default: 3) + * @param initialDelayMs - Initial delay before first retry in ms (default: 500) + * @returns Workflow execution result + */ +export async function executeWorkflowWithRetry( + executor: WorkflowExecutor, + request: StartWorkflowRequest, + workflowName: string, + version: number, + correlationId?: string, + maxRetries = 3, + initialDelayMs = 500 +): Promise<{ workflowId?: string }> { + let lastError: Error | undefined; + let delayMs = initialDelayMs; + + for (let attempt = 0; attempt <= maxRetries; attempt++) { + try { + return await executor.executeWorkflow( + request, + workflowName, + version, + correlationId + ); + } catch (error: unknown) { + lastError = error as Error; + const errorMessage = lastError.message?.toLowerCase() || ""; + + // Only retry on transient network errors, not business logic errors + const isRetryable = + errorMessage.includes("fetch failed") || + errorMessage.includes("network") || + errorMessage.includes("timeout") || + errorMessage.includes("econnrefused") || + errorMessage.includes("econnreset"); + + if (!isRetryable || attempt === maxRetries) { + throw lastError; + } + + // Exponential backoff with jitter + const jitter = Math.random() * 100; + await new Promise((resolve) => setTimeout(resolve, delayMs + jitter)); + delayMs *= 2; // Exponential backoff: 500ms, 1000ms, 2000ms + } + } + + throw lastError; +} diff --git a/src/sdk/clients/worker/Poller.ts b/src/sdk/clients/worker/Poller.ts index 1acce512..aac58e88 100644 --- a/src/sdk/clients/worker/Poller.ts +++ b/src/sdk/clients/worker/Poller.ts @@ -33,6 +33,14 @@ export class Poller { this.performWorkFunction = performWorkFunction; this.options = { ...this.options, ...pollerOptions }; this.logger = logger || noopLogger; + + // Ensure concurrency is a valid number + if (typeof this.options.concurrency !== 'number' || isNaN(this.options.concurrency) || this.options.concurrency < 1) { + this.logger.info( + `Invalid concurrency value (${this.options.concurrency}) for poller ${pollerId}. Using default: ${DEFAULT_CONCURRENCY}` + ); + this.options.concurrency = DEFAULT_CONCURRENCY; + } } get isPolling() { @@ -77,12 +85,10 @@ export class Poller { while (this.isPolling) { try { // Concurrency could have been updated. Accounting for that - const count = Math.max( - 0, - this.options.concurrency - this._tasksInProcess - ); + const rawCount = (this.options.concurrency ?? DEFAULT_CONCURRENCY) - this._tasksInProcess; + const count = Math.max(0, Number.isFinite(rawCount) ? rawCount : DEFAULT_CONCURRENCY); - if (count === 0) { + if (count === 0 || !Number.isFinite(count)) { this.logger.debug( "Max in process reached, Will skip polling for " + this._pollerId ); diff --git a/src/sdk/clients/worker/TaskManager.ts b/src/sdk/clients/worker/TaskManager.ts index 5dd6d28b..326a6aab 100644 --- a/src/sdk/clients/worker/TaskManager.ts +++ b/src/sdk/clients/worker/TaskManager.ts @@ -14,6 +14,7 @@ import { ConductorWorker, } from "./types"; import { getWorkerId, noopErrorHandler } from "./helpers"; +import type { TaskRunnerEventsListener } from "./events"; const defaultManagerOptions: Required = { workerID: "", @@ -35,6 +36,7 @@ export class TaskManager { readonly options: Required; private polling = false; private maxRetries: number = MAX_RETRIES; + private eventListeners: TaskRunnerEventsListener[]; constructor( client: Client, @@ -51,6 +53,7 @@ export class TaskManager { this.maxRetries = config.maxRetries ?? MAX_RETRIES; this.errorHandler = config.onError ?? noopErrorHandler; this.workers = workers; + this.eventListeners = config.eventListeners ?? []; const providedOptions = config.options ?? {}; this.options = { ...defaultManagerOptions, @@ -133,6 +136,7 @@ export class TaskManager { logger: this.logger, onError: this.errorHandler, maxRetries: this.maxRetries, + eventListeners: this.eventListeners, }); runner.startPolling(); this.workerRunners.set(worker.taskDefName, runner); diff --git a/src/sdk/clients/worker/TaskRunner.ts b/src/sdk/clients/worker/TaskRunner.ts index 0b04c141..ed21c465 100644 --- a/src/sdk/clients/worker/TaskRunner.ts +++ b/src/sdk/clients/worker/TaskRunner.ts @@ -16,6 +16,8 @@ import { ConductorWorker, } from "./types"; import { noopErrorHandler, optionEquals } from "./helpers"; +import { EventDispatcher } from "./events/EventDispatcher"; +import { NonRetryableException } from "./exceptions"; const defaultRunnerOptions: Required = { workerID: "", @@ -42,6 +44,7 @@ export class TaskRunner { errorHandler: TaskErrorHandler; private poller: Poller; private maxRetries: number; + private eventDispatcher: EventDispatcher; constructor({ worker, @@ -50,6 +53,7 @@ export class TaskRunner { logger = noopLogger, onError: errorHandler = noopErrorHandler, maxRetries = MAX_RETRIES, + eventListeners = [], }: RunnerArgs) { this._client = client; this.maxRetries = maxRetries; @@ -57,13 +61,20 @@ export class TaskRunner { this.worker = worker; this.options = { ...defaultRunnerOptions, ...options }; this.errorHandler = errorHandler; + + // Initialize event dispatcher and register listeners + this.eventDispatcher = new EventDispatcher(); + eventListeners.forEach((listener) => { + this.eventDispatcher.register(listener); + }); + this.poller = new Poller( worker.taskDefName, this.batchPoll, this.executeTask, { - concurrency: worker.concurrency ?? options.concurrency, - pollInterval: worker.pollInterval ?? options.pollInterval, + concurrency: worker.concurrency ?? this.options.concurrency, + pollInterval: worker.pollInterval ?? this.options.pollInterval, }, this.logger ); @@ -112,25 +123,61 @@ export class TaskRunner { private batchPoll = async (count: number): Promise => { const { workerID } = this.options; + const startTime = Date.now(); - const { data: tasks } = await TaskResource.batchPoll({ - client: this._client, - path: { - tasktype: this.worker.taskDefName, - }, - query: { - workerid: workerID, - domain: this.worker.domain ?? this.options.domain, - count, - timeout: this.options.batchPollingTimeout ?? 100, - }, + // Publish PollStarted event + await this.eventDispatcher.publishPollStarted({ + taskType: this.worker.taskDefName, + workerId: workerID, + pollCount: count, + timestamp: new Date(), }); - return tasks; + + try { + const { data: tasks } = await TaskResource.batchPoll({ + client: this._client, + path: { + tasktype: this.worker.taskDefName, + }, + query: { + workerid: workerID, + domain: this.worker.domain ?? this.options.domain, + count, + timeout: this.options.batchPollingTimeout ?? 100, + }, + }); + + const durationMs = Date.now() - startTime; + + // Publish PollCompleted event + await this.eventDispatcher.publishPollCompleted({ + taskType: this.worker.taskDefName, + durationMs, + tasksReceived: tasks?.length ?? 0, + timestamp: new Date(), + }); + + return tasks; + } catch (error) { + const durationMs = Date.now() - startTime; + + // Publish PollFailure event + await this.eventDispatcher.publishPollFailure({ + taskType: this.worker.taskDefName, + durationMs, + cause: error as Error, + timestamp: new Date(), + }); + + throw error; + } }; updateTaskWithRetry = async (task: Task, taskResult: TaskResult) => { const { workerID } = this.options; let retryCount = 0; + let lastError: Error | null = null; + while (retryCount < this.maxRetries) { try { await TaskResource.updateTask({ @@ -141,20 +188,39 @@ export class TaskRunner { }, }); - return; + return; // Success } catch (error: unknown) { - this.errorHandler(error as Error, task); + lastError = error as Error; + this.errorHandler(lastError, task); this.logger.error( - `Error updating task ${taskResult.taskId} on retry ${retryCount}`, + `Error updating task ${taskResult.taskId} on retry ${retryCount + 1}/${this.maxRetries}`, error ); retryCount++; - await new Promise((r) => setTimeout(() => r(true), retryCount * 10)); + + if (retryCount < this.maxRetries) { + // Exponential backoff: 10s, 20s, 30s + const delayMs = retryCount * 10 * 1000; + await new Promise((r) => setTimeout(() => r(true), delayMs)); + } } } + + // All retries exhausted - publish critical TaskUpdateFailure event this.logger.error( - `Unable to update task ${taskResult.taskId} after ${retryCount} retries` + `CRITICAL: Task update failed after ${retryCount} retries. Task result LOST for task_id=${taskResult.taskId}` ); + + await this.eventDispatcher.publishTaskUpdateFailure({ + taskType: this.worker.taskDefName, + taskId: taskResult.taskId ?? "", + workerId: workerID, + workflowInstanceId: taskResult.workflowInstanceId, + cause: lastError ?? new Error("Task update failed after all retries"), + retryCount, + taskResult, + timestamp: new Date(), + }); }; private executeTask = async (task: Task) => { @@ -165,8 +231,38 @@ export class TaskRunner { return; } + const { workerID } = this.options; + const startTime = Date.now(); + + // Publish TaskExecutionStarted event + await this.eventDispatcher.publishTaskExecutionStarted({ + taskType: this.worker.taskDefName, + taskId: task.taskId, + workerId: workerID, + workflowInstanceId: task.workflowInstanceId, + timestamp: new Date(), + }); + try { const result = await this.worker.execute(task); + const durationMs = Date.now() - startTime; + + // Calculate output size if possible + const outputSizeBytes = result.outputData + ? JSON.stringify(result.outputData).length + : undefined; + + // Publish TaskExecutionCompleted event + await this.eventDispatcher.publishTaskExecutionCompleted({ + taskType: this.worker.taskDefName, + taskId: task.taskId, + workerId: workerID, + workflowInstanceId: task.workflowInstanceId, + durationMs, + outputSizeBytes, + timestamp: new Date(), + }); + await this.updateTaskWithRetry(task, { ...result, workflowInstanceId: task.workflowInstanceId, @@ -174,15 +270,39 @@ export class TaskRunner { }); this.logger.debug(`Task has executed successfully ${task.taskId}`); } catch (error: unknown) { + const durationMs = Date.now() - startTime; + const err = error as Error; + + // Publish TaskExecutionFailure event + await this.eventDispatcher.publishTaskExecutionFailure({ + taskType: this.worker.taskDefName, + taskId: task.taskId, + workerId: workerID, + workflowInstanceId: task.workflowInstanceId, + cause: err, + durationMs, + timestamp: new Date(), + }); + + // Determine task status based on exception type + const isNonRetryable = err instanceof NonRetryableException; + const status = isNonRetryable ? "FAILED_WITH_TERMINAL_ERROR" : "FAILED"; + + if (isNonRetryable) { + this.logger.error( + `Task ${task.taskId} failed with terminal error (no retry): ${err.message}` + ); + } + await this.updateTaskWithRetry(task, { workflowInstanceId: task.workflowInstanceId, taskId: task.taskId, reasonForIncompletion: (error as Record)?.message ?? DEFAULT_ERROR_MESSAGE, - status: "FAILED", + status, outputData: {}, }); - this.errorHandler(error as Error, task); + this.errorHandler(err, task); this.logger.error(`Error executing ${task.taskId}`, error); } }; @@ -190,11 +310,15 @@ export class TaskRunner { handleUnknownError = (unknownError: unknown) => { let message = ""; let stack: string | undefined = ""; - if ((unknownError as Error).stack) { - stack = (unknownError as Error).stack; - } - if ((unknownError as Error).message) { - message = (unknownError as Error).message; + if (unknownError && typeof unknownError === "object") { + if ("stack" in unknownError) { + stack = (unknownError as Error).stack; + } + if ("message" in unknownError) { + message = (unknownError as Error).message; + } + } else if (typeof unknownError === "string") { + message = unknownError; } this.logger.error( `Error for ${this.worker.taskDefName}: error: ${message}, stack: ${stack}` diff --git a/src/sdk/clients/worker/__tests__/TaskRunner.test.ts b/src/sdk/clients/worker/__tests__/TaskRunner.test.ts index 6ff44d78..ed19cc29 100644 --- a/src/sdk/clients/worker/__tests__/TaskRunner.test.ts +++ b/src/sdk/clients/worker/__tests__/TaskRunner.test.ts @@ -1,21 +1,71 @@ -import { jest, test, expect } from "@jest/globals"; -import { TaskRunner } from "../TaskRunner"; -import { RunnerArgs } from "../types"; -import { mockLogger } from "../../../../integration-tests/utils/mockLogger"; -import { TaskResource } from "../../../../open-api/generated"; -import { TaskResultStatusEnum } from "../../../../open-api"; -import type { Client } from "../../../../open-api/generated/client/types.gen"; -import type { Task } from "../../../../open-api"; - -jest.mock("../../../../open-api/generated", () => ({ +import type { TaskRunnerEventsListener } from "@/sdk/clients/worker/events"; +import type { + PollCompleted, + PollFailure, + PollStarted, + TaskExecutionCompleted, + TaskExecutionFailure, + TaskExecutionStarted, + TaskUpdateFailure, +} from "@/sdk/clients/worker/events/types"; +import { NonRetryableException } from "@/sdk/clients/worker/exceptions"; +import { TaskRunner } from "@/sdk/clients/worker/TaskRunner"; +import { RunnerArgs } from "@/sdk/clients/worker/types"; +import { afterEach, describe, expect, jest, test } from "@jest/globals"; +import { TaskResource } from "@open-api/generated"; +import type { Client } from "@open-api/generated/client/types.gen"; +import type { Task, TaskResult } from "@open-api/index"; +import { TaskResultStatusEnum } from "@open-api/index"; +import { mockLogger } from "@test-utils/mockLogger"; + +jest.mock("@open-api/generated", () => ({ TaskResource: { batchPoll: jest.fn(), updateTask: jest.fn(), }, })); +// Create a proper mock client with all required methods +const createMockClient = (): Client => { + const mockFn = jest.fn<() => Promise<{ data: null }>>().mockResolvedValue({ data: null }); + return { + buildUrl: jest.fn(), + getConfig: jest.fn(), + request: jest.fn(), + setConfig: jest.fn(), + get: mockFn, + post: mockFn, + put: mockFn, + patch: mockFn, + delete: mockFn, + options: mockFn, + head: mockFn, + interceptors: { + request: { use: jest.fn(), eject: jest.fn() }, + response: { use: jest.fn(), eject: jest.fn() }, + error: { use: jest.fn(), eject: jest.fn() }, + }, + } as unknown as Client; +}; + +// Track runners for cleanup +const activeRunners: TaskRunner[] = []; + +afterEach(async () => { + // Stop all runners + for (const runner of activeRunners) { + runner.stopPolling(); + } + activeRunners.length = 0; + + // Wait for async operations to complete + await new Promise(resolve => setTimeout(resolve, 50)); + + jest.clearAllMocks(); +}); + test("polls tasks", async () => { - const mockClient = {} as Client; + const mockClient = createMockClient(); const workerID = "worker-id"; const args: RunnerArgs = { worker: { @@ -60,8 +110,11 @@ test("polls tasks", async () => { } as Awaited>); const runner = new TaskRunner(args); + activeRunners.push(runner); runner.startPolling(); - await new Promise((r) => setTimeout(() => r(true), 10)); + + // Wait for polling to occur + await new Promise((r) => setTimeout(() => r(true), 100)); runner.stopPolling(); const expected = { @@ -81,7 +134,7 @@ test("polls tasks", async () => { }); test("Should set the task as failed if the task has an error", async () => { - const mockClient = {} as Client; + const mockClient = createMockClient(); const workerID = "worker-id"; const args: RunnerArgs = { worker: { @@ -120,8 +173,11 @@ test("Should set the task as failed if the task has an error", async () => { } as Awaited>); const runner = new TaskRunner(args); + activeRunners.push(runner); runner.startPolling(); - await new Promise((r) => setTimeout(() => r(true), 10)); + + // Wait for polling to occur + await new Promise((r) => setTimeout(() => r(true), 100)); runner.stopPolling(); const expected = { @@ -137,3 +193,969 @@ test("Should set the task as failed if the task has an error", async () => { body: expected, }); }); + +describe("NonRetryableException handling", () => { + test("Should mark task as FAILED_WITH_TERMINAL_ERROR when NonRetryableException is thrown", async () => { + const mockClient = createMockClient(); + const workerID = "worker-id"; + const args: RunnerArgs = { + worker: { + taskDefName: "test", + execute: async () => { + throw new NonRetryableException("Business validation failed"); + }, + }, + options: { + pollInterval: 10, + domain: "", + concurrency: 1, + workerID, + }, + logger: mockLogger, + client: mockClient, + }; + + const workflowInstanceId = "fake-workflow-id"; + const taskId = "fake-task-id"; + + const mockTask: Task = { + taskId, + workflowInstanceId, + status: "IN_PROGRESS", + reasonForIncompletion: undefined, + inputData: {}, + }; + + const mockBatchPoll = TaskResource.batchPoll as jest.MockedFunction< + typeof TaskResource.batchPoll + >; + mockBatchPoll.mockResolvedValue({ + data: [mockTask], + } as Awaited>); + + const runner = new TaskRunner(args); + activeRunners.push(runner); + runner.startPolling(); + + await new Promise((r) => setTimeout(() => r(true), 100)); + runner.stopPolling(); + + const expected = { + taskId, + workflowInstanceId, + workerId: workerID, + status: "FAILED_WITH_TERMINAL_ERROR" as const, + outputData: {}, + reasonForIncompletion: "Business validation failed", + }; + expect(TaskResource.updateTask).toHaveBeenCalledWith({ + client: mockClient, + body: expected, + }); + }); +}); + +describe("Task update retry logic", () => { + test("Should retry failed task updates with exponential backoff", async () => { + const mockClient = createMockClient(); + const workerID = "worker-id"; + const onError = jest.fn(); + + const args: RunnerArgs = { + worker: { + taskDefName: "test", + execute: async ({ inputData }) => { + return { + outputData: inputData, + status: "COMPLETED", + }; + }, + }, + options: { + pollInterval: 10, + domain: "", + concurrency: 1, + workerID, + }, + logger: mockLogger, + client: mockClient, + onError, + maxRetries: 2, + }; + + const workflowInstanceId = "fake-workflow-id"; + const taskId = "fake-task-id"; + + const mockTask: Task = { + taskId, + workflowInstanceId, + status: "IN_PROGRESS", + inputData: {}, + }; + + const mockBatchPoll = TaskResource.batchPoll as jest.MockedFunction< + typeof TaskResource.batchPoll + >; + // Return task once, then return empty array + mockBatchPoll + .mockResolvedValueOnce({ + data: [mockTask], + request: {} as Request, + response: {} as Response, + } as Awaited>) + .mockResolvedValue({ + data: [], + request: {} as Request, + response: {} as Response, + } as Awaited>); + + const mockUpdateTask = TaskResource.updateTask as jest.MockedFunction< + typeof TaskResource.updateTask + >; + + // Fail first attempt, succeed on second + mockUpdateTask + .mockRejectedValueOnce(new Error("Network error")) + .mockResolvedValueOnce({} as Awaited>); + + const runner = new TaskRunner(args); + activeRunners.push(runner); + runner.startPolling(); + + await new Promise((r) => setTimeout(() => r(true), 15000)); + runner.stopPolling(); + + // Should have been called twice (1 failure + 1 success) + expect(mockUpdateTask).toHaveBeenCalledTimes(2); + expect(onError).toHaveBeenCalledTimes(1); + }, 20000); + + test("Should publish TaskUpdateFailure event after all retries exhausted", async () => { + const mockClient = createMockClient(); + const workerID = "worker-id"; + const onTaskUpdateFailure = jest.fn<(event: TaskUpdateFailure) => void>(); + const eventListener: TaskRunnerEventsListener = { + onTaskUpdateFailure, + }; + + const args: RunnerArgs = { + worker: { + taskDefName: "test", + execute: async ({ inputData }) => { + return { + outputData: inputData, + status: "COMPLETED", + }; + }, + }, + options: { + pollInterval: 10, + domain: "", + concurrency: 1, + workerID, + }, + logger: mockLogger, + client: mockClient, + maxRetries: 2, + eventListeners: [eventListener], + }; + + const workflowInstanceId = "fake-workflow-id"; + const taskId = "fake-task-id"; + + const mockTask: Task = { + taskId, + workflowInstanceId, + status: "IN_PROGRESS", + inputData: {}, + }; + + const mockBatchPoll = TaskResource.batchPoll as jest.MockedFunction< + typeof TaskResource.batchPoll + >; + // Return task once, then return empty array + mockBatchPoll + .mockResolvedValueOnce({ + data: [mockTask], + request: {} as Request, + response: {} as Response, + } as Awaited>) + .mockResolvedValue({ + data: [], + request: {} as Request, + response: {} as Response, + } as Awaited>); + + const mockUpdateTask = TaskResource.updateTask as jest.MockedFunction< + typeof TaskResource.updateTask + >; + + // Fail all attempts + mockUpdateTask.mockRejectedValue(new Error("Persistent network error")); + + const runner = new TaskRunner(args); + activeRunners.push(runner); + runner.startPolling(); + + await new Promise((r) => setTimeout(() => r(true), 25000)); + runner.stopPolling(); + + // Should have tried maxRetries times + expect(mockUpdateTask).toHaveBeenCalledTimes(2); + expect(onTaskUpdateFailure).toHaveBeenCalledWith( + expect.objectContaining({ + taskId, + workflowInstanceId, + retryCount: 2, + }) + ); + }, 30000); +}); + +describe("Multiple tasks handling", () => { + test("Should process multiple tasks from batch poll", async () => { + const mockClient = createMockClient(); + const workerID = "worker-id"; + const executeCount = jest.fn(); + + const args: RunnerArgs = { + worker: { + taskDefName: "test", + execute: async ({ inputData }) => { + executeCount(); + return { + outputData: inputData, + status: "COMPLETED", + }; + }, + }, + options: { + pollInterval: 10, + domain: "", + concurrency: 3, + workerID, + }, + logger: mockLogger, + client: mockClient, + }; + + const mockTasks: Task[] = [ + { + taskId: "task-1", + workflowInstanceId: "workflow-1", + status: "IN_PROGRESS", + inputData: { value: 1 }, + }, + { + taskId: "task-2", + workflowInstanceId: "workflow-1", + status: "IN_PROGRESS", + inputData: { value: 2 }, + }, + { + taskId: "task-3", + workflowInstanceId: "workflow-1", + status: "IN_PROGRESS", + inputData: { value: 3 }, + }, + ]; + + const mockBatchPoll = TaskResource.batchPoll as jest.MockedFunction< + typeof TaskResource.batchPoll + >; + mockBatchPoll.mockResolvedValue({ + data: mockTasks, + } as Awaited>); + + const runner = new TaskRunner(args); + activeRunners.push(runner); + runner.startPolling(); + + await new Promise((r) => setTimeout(() => r(true), 200)); + runner.stopPolling(); + + expect(executeCount).toHaveBeenCalledTimes(3); + expect(TaskResource.updateTask).toHaveBeenCalledTimes(3); + }); +}); + +describe("updateOptions", () => { + test("Should update concurrency and pollInterval", async () => { + const mockClient = createMockClient(); + const args: RunnerArgs = { + worker: { + taskDefName: "test", + execute: async ({ inputData }) => ({ + outputData: inputData, + status: "COMPLETED", + }), + }, + options: { + pollInterval: 100, + domain: "", + concurrency: 1, + workerID: "worker-id", + }, + logger: mockLogger, + client: mockClient, + }; + + const runner = new TaskRunner(args); + + expect(runner.getOptions.concurrency).toBe(1); + expect(runner.getOptions.pollInterval).toBe(100); + + runner.updateOptions({ concurrency: 5, pollInterval: 50 }); + + expect(runner.getOptions.concurrency).toBe(5); + expect(runner.getOptions.pollInterval).toBe(50); + }); + + test("Should not trigger update if options are unchanged", async () => { + const mockClient = createMockClient(); + const loggerSpy = { + ...mockLogger, + info: jest.fn(), + error: jest.fn(), + debug: jest.fn(), + }; + + const args: RunnerArgs = { + worker: { + taskDefName: "test", + execute: async ({ inputData }) => ({ + outputData: inputData, + status: "COMPLETED", + }), + }, + options: { + pollInterval: 100, + domain: "", + concurrency: 1, + workerID: "worker-id", + }, + logger: loggerSpy, + client: mockClient, + }; + + const runner = new TaskRunner(args); + + // Clear initial logging + loggerSpy.info.mockClear(); + + // Update with same values + runner.updateOptions({ concurrency: 1, pollInterval: 100 }); + + // Should not log configuration update + const configUpdateCalls = loggerSpy.info.mock.calls.filter((call) => + (call[0] as string)?.includes("configuration updated") + ); + expect(configUpdateCalls).toHaveLength(0); + }); +}); + +describe("Task validation", () => { + test("Should skip task execution if taskId is missing", async () => { + const mockClient = createMockClient(); + const workerID = "worker-id"; + const executeFn = jest.fn<(task: Task) => Promise>>(); + + const args: RunnerArgs = { + worker: { + taskDefName: "test", + execute: executeFn, + }, + options: { + pollInterval: 10, + domain: "", + concurrency: 1, + workerID, + }, + logger: mockLogger, + client: mockClient, + }; + + const mockTask: Task = { + workflowInstanceId: "workflow-1", + status: "IN_PROGRESS", + inputData: {}, + // taskId is missing + }; + + const mockBatchPoll = TaskResource.batchPoll as jest.MockedFunction< + typeof TaskResource.batchPoll + >; + mockBatchPoll.mockResolvedValue({ + data: [mockTask], + } as Awaited>); + + const runner = new TaskRunner(args); + activeRunners.push(runner); + runner.startPolling(); + + await new Promise((r) => setTimeout(() => r(true), 100)); + runner.stopPolling(); + + expect(executeFn).not.toHaveBeenCalled(); + expect(TaskResource.updateTask).not.toHaveBeenCalled(); + }); + + test("Should skip task execution if workflowInstanceId is missing", async () => { + const mockClient = createMockClient(); + const workerID = "worker-id"; + const executeFn = jest.fn<(task: Task) => Promise>>(); + + const args: RunnerArgs = { + worker: { + taskDefName: "test", + execute: executeFn, + }, + options: { + pollInterval: 10, + domain: "", + concurrency: 1, + workerID, + }, + logger: mockLogger, + client: mockClient, + }; + + const mockTask: Task = { + taskId: "task-1", + status: "IN_PROGRESS", + inputData: {}, + // workflowInstanceId is missing + }; + + const mockBatchPoll = TaskResource.batchPoll as jest.MockedFunction< + typeof TaskResource.batchPoll + >; + mockBatchPoll.mockResolvedValue({ + data: [mockTask], + } as Awaited>); + + const runner = new TaskRunner(args); + activeRunners.push(runner); + runner.startPolling(); + + await new Promise((r) => setTimeout(() => r(true), 100)); + runner.stopPolling(); + + expect(executeFn).not.toHaveBeenCalled(); + expect(TaskResource.updateTask).not.toHaveBeenCalled(); + }); +}); + +describe("Event listeners", () => { + test("Should publish poll events", async () => { + const mockClient = createMockClient(); + const onPollStarted = jest.fn<(event: PollStarted) => void>(); + const onPollCompleted = jest.fn<(event: PollCompleted) => void>(); + const eventListener: TaskRunnerEventsListener = { + onPollStarted, + onPollCompleted, + }; + + const args: RunnerArgs = { + worker: { + taskDefName: "test", + execute: async ({ inputData }) => ({ + outputData: inputData, + status: "COMPLETED", + }), + }, + options: { + pollInterval: 10, + domain: "", + concurrency: 1, + workerID: "worker-id", + }, + logger: mockLogger, + client: mockClient, + eventListeners: [eventListener], + }; + + const mockBatchPoll = TaskResource.batchPoll as jest.MockedFunction< + typeof TaskResource.batchPoll + >; + mockBatchPoll.mockResolvedValue({ + data: [], + request: {} as Request, + response: {} as Response, + } as Awaited>); + + const runner = new TaskRunner(args); + activeRunners.push(runner); + runner.startPolling(); + + await new Promise((r) => setTimeout(() => r(true), 100)); + runner.stopPolling(); + + expect(onPollStarted).toHaveBeenCalled(); + expect(onPollCompleted).toHaveBeenCalledWith( + expect.objectContaining({ + tasksReceived: 0, + }) + ); + }); + + test("Should publish task execution events", async () => { + const mockClient = createMockClient(); + const onTaskExecutionStarted = jest.fn<(event: TaskExecutionStarted) => void>(); + const onTaskExecutionCompleted = jest.fn<(event: TaskExecutionCompleted) => void>(); + const eventListener: TaskRunnerEventsListener = { + onTaskExecutionStarted, + onTaskExecutionCompleted, + }; + + const args: RunnerArgs = { + worker: { + taskDefName: "test", + execute: async () => ({ + outputData: { result: "done" }, + status: "COMPLETED", + }), + }, + options: { + pollInterval: 10, + domain: "", + concurrency: 1, + workerID: "worker-id", + }, + logger: mockLogger, + client: mockClient, + eventListeners: [eventListener], + }; + + const mockTask: Task = { + taskId: "task-1", + workflowInstanceId: "workflow-1", + status: "IN_PROGRESS", + inputData: {}, + }; + + const mockBatchPoll = TaskResource.batchPoll as jest.MockedFunction< + typeof TaskResource.batchPoll + >; + mockBatchPoll.mockResolvedValue({ + data: [mockTask], + } as Awaited>); + + const runner = new TaskRunner(args); + activeRunners.push(runner); + runner.startPolling(); + + await new Promise((r) => setTimeout(() => r(true), 100)); + runner.stopPolling(); + + expect(onTaskExecutionStarted).toHaveBeenCalledWith( + expect.objectContaining({ + taskId: "task-1", + workflowInstanceId: "workflow-1", + }) + ); + expect(onTaskExecutionCompleted).toHaveBeenCalledWith( + expect.objectContaining({ + taskId: "task-1", + workflowInstanceId: "workflow-1", + }) + ); + }); + + test("Should publish poll failure event", async () => { + const mockClient = createMockClient(); + const onPollFailure = jest.fn<(event: PollFailure) => void>(); + const eventListener: TaskRunnerEventsListener = { + onPollFailure, + }; + + const args: RunnerArgs = { + worker: { + taskDefName: "test", + execute: async ({ inputData }) => ({ + outputData: inputData, + status: "COMPLETED", + }), + }, + options: { + pollInterval: 10, + domain: "", + concurrency: 1, + workerID: "worker-id", + }, + logger: mockLogger, + client: mockClient, + eventListeners: [eventListener], + }; + + const mockBatchPoll = TaskResource.batchPoll as jest.MockedFunction< + typeof TaskResource.batchPoll + >; + mockBatchPoll.mockRejectedValue(new Error("Poll service unavailable")); + + const runner = new TaskRunner(args); + activeRunners.push(runner); + runner.startPolling(); + + await new Promise((r) => setTimeout(() => r(true), 100)); + runner.stopPolling(); + + expect(onPollFailure).toHaveBeenCalledWith( + expect.objectContaining({ + cause: expect.any(Error), + }) + ); + }); + + test("Should publish task execution failure event", async () => { + const mockClient = createMockClient(); + const onTaskExecutionFailure = jest.fn<(event: TaskExecutionFailure) => void>(); + const eventListener: TaskRunnerEventsListener = { + onTaskExecutionFailure, + }; + + const args: RunnerArgs = { + worker: { + taskDefName: "test", + execute: async () => { + throw new Error("Task execution failed"); + }, + }, + options: { + pollInterval: 10, + domain: "", + concurrency: 1, + workerID: "worker-id", + }, + logger: mockLogger, + client: mockClient, + eventListeners: [eventListener], + }; + + const mockTask: Task = { + taskId: "task-1", + workflowInstanceId: "workflow-1", + status: "IN_PROGRESS", + inputData: {}, + }; + + const mockBatchPoll = TaskResource.batchPoll as jest.MockedFunction< + typeof TaskResource.batchPoll + >; + mockBatchPoll.mockResolvedValue({ + data: [mockTask], + } as Awaited>); + + const runner = new TaskRunner(args); + activeRunners.push(runner); + runner.startPolling(); + + await new Promise((r) => setTimeout(() => r(true), 100)); + runner.stopPolling(); + + expect(onTaskExecutionFailure).toHaveBeenCalledWith( + expect.objectContaining({ + taskId: "task-1", + workflowInstanceId: "workflow-1", + cause: expect.any(Error), + }) + ); + }); +}); + +describe("Worker-specific configuration", () => { + test("Should use worker-specific concurrency when provided", async () => { + const mockClient = createMockClient(); + const args: RunnerArgs = { + worker: { + taskDefName: "test", + execute: async ({ inputData }) => ({ + outputData: inputData, + status: "COMPLETED", + }), + concurrency: 10, // Worker-specific + }, + options: { + pollInterval: 100, + domain: "", + concurrency: 1, // Default + workerID: "worker-id", + }, + logger: mockLogger, + client: mockClient, + }; + + const runner = new TaskRunner(args); + + // Should use worker-specific concurrency + expect(runner.getOptions.concurrency).toBe(1); // Options remain unchanged + // The poller uses worker.concurrency internally + }); + + test("Should use worker-specific domain when provided", async () => { + const mockClient = createMockClient(); + const args: RunnerArgs = { + worker: { + taskDefName: "test", + execute: async ({ inputData }) => ({ + outputData: inputData, + status: "COMPLETED", + }), + domain: "custom-domain", + }, + options: { + pollInterval: 10, + domain: "default-domain", + concurrency: 1, + workerID: "worker-id", + }, + logger: mockLogger, + client: mockClient, + }; + + const mockBatchPoll = TaskResource.batchPoll as jest.MockedFunction< + typeof TaskResource.batchPoll + >; + mockBatchPoll.mockResolvedValue({ + data: [], + request: {} as Request, + response: {} as Response, + } as Awaited>); + + const runner = new TaskRunner(args); + activeRunners.push(runner); + runner.startPolling(); + + await new Promise((r) => setTimeout(() => r(true), 100)); + runner.stopPolling(); + + // Should have called batchPoll with worker-specific domain + expect(mockBatchPoll).toHaveBeenCalledWith( + expect.objectContaining({ + query: expect.objectContaining({ + domain: "custom-domain", + }), + }) + ); + }); +}); + +describe("Error handling", () => { + test("Should call error handler when task execution fails", async () => { + // Clear all mocks to ensure clean slate + jest.clearAllMocks(); + + const mockClient = createMockClient(); + const onError = jest.fn(); + const testError = new Error("Task failed"); + + const args: RunnerArgs = { + worker: { + taskDefName: "test-error-handler", + execute: async () => { + throw testError; + }, + }, + options: { + pollInterval: 10, + domain: "", + concurrency: 1, + workerID: "worker-id", + }, + logger: mockLogger, + client: mockClient, + onError, + }; + + const mockTask: Task = { + taskId: "task-error-1", + workflowInstanceId: "workflow-error-1", + status: "IN_PROGRESS", + inputData: {}, + }; + + const mockBatchPoll = TaskResource.batchPoll as jest.MockedFunction< + typeof TaskResource.batchPoll + >; + mockBatchPoll + .mockResolvedValueOnce({ + data: [mockTask], + request: {} as Request, + response: {} as Response, + } as Awaited>) + .mockResolvedValue({ + data: [], + request: {} as Request, + response: {} as Response, + } as Awaited>); + + const mockUpdateTask = TaskResource.updateTask as jest.MockedFunction< + typeof TaskResource.updateTask + >; + // Ensure updateTask succeeds for this test + mockUpdateTask.mockResolvedValue({} as Awaited>); + + const runner = new TaskRunner(args); + activeRunners.push(runner); + runner.startPolling(); + + await new Promise((r) => setTimeout(() => r(true), 200)); + runner.stopPolling(); + + // Error handler should be called twice: once for task execution error, once if update fails + expect(onError).toHaveBeenCalledWith(testError, mockTask); + }); + + test("Should handle errors without message or stack gracefully", async () => { + const mockClient = createMockClient(); + const runner = new TaskRunner({ + worker: { + taskDefName: "test", + execute: async ({ inputData }) => ({ + outputData: inputData, + status: "COMPLETED", + }), + }, + options: { + pollInterval: 10, + domain: "", + concurrency: 1, + workerID: "worker-id", + }, + logger: mockLogger, + client: mockClient, + }); + + // Test handleUnknownError with non-Error object + expect(() => { + runner.handleUnknownError({ someProperty: "value" }); + }).not.toThrow(); + + // Test with string + expect(() => { + runner.handleUnknownError("string error"); + }).not.toThrow(); + + // Test with undefined + expect(() => { + runner.handleUnknownError(undefined); + }).not.toThrow(); + }); +}); + +describe("Polling state", () => { + test("Should report correct polling state", async () => { + const mockClient = createMockClient(); + const args: RunnerArgs = { + worker: { + taskDefName: "test", + execute: async ({ inputData }) => ({ + outputData: inputData, + status: "COMPLETED", + }), + }, + options: { + pollInterval: 10, + domain: "", + concurrency: 1, + workerID: "worker-id", + }, + logger: mockLogger, + client: mockClient, + }; + + const mockBatchPoll = TaskResource.batchPoll as jest.MockedFunction< + typeof TaskResource.batchPoll + >; + mockBatchPoll.mockResolvedValue({ + data: [], + request: {} as Request, + response: {} as Response, + } as Awaited>); + + const runner = new TaskRunner(args); + + expect(runner.isPolling).toBe(false); + + runner.startPolling(); + expect(runner.isPolling).toBe(true); + + await new Promise((r) => setTimeout(() => r(true), 50)); + + runner.stopPolling(); + await new Promise((r) => setTimeout(() => r(true), 50)); + + expect(runner.isPolling).toBe(false); + }); + + test("handles undefined concurrency by using default", async () => { + const mockClient = createMockClient(); + + // Create worker with undefined concurrency + const args: RunnerArgs = { + worker: { + taskDefName: "test-undefined-concurrency", + execute: async () => ({ + status: "COMPLETED", + outputData: { result: "ok" }, + }), + // concurrency is intentionally undefined + }, + options: { + pollInterval: 10, + workerID: "test-worker", + // concurrency is also undefined here + }, + logger: mockLogger, + client: mockClient, + }; + + const mockTask: Task = { + taskId: "task-1", + workflowInstanceId: "workflow-1", + status: "IN_PROGRESS", + taskType: "test-undefined-concurrency", + inputData: {}, + } as Task; + + (TaskResource.batchPoll as jest.MockedFunction) + .mockResolvedValueOnce({ + data: [mockTask], + request: {} as Request, + response: {} as Response, + } as Awaited>) + .mockResolvedValue({ + data: [], + request: {} as Request, + response: {} as Response, + } as Awaited>); + + (TaskResource.updateTask as jest.MockedFunction) + .mockResolvedValue({ + data: null, + request: {} as Request, + response: {} as Response, + } as Awaited>); + + const runner = new TaskRunner(args); + activeRunners.push(runner); + + runner.startPolling(); + + // Wait for polling cycle + await new Promise(resolve => setTimeout(resolve, 50)); + + await runner.stopPolling(); + + // Verify batchPoll was called with a valid count (should be 1, the default concurrency) + expect(TaskResource.batchPoll).toHaveBeenCalled(); + const batchPollCall = (TaskResource.batchPoll as jest.MockedFunction).mock.calls[0]; + const queryParams = batchPollCall?.[0]?.query; + + // Verify count is a valid number, not NaN + expect(queryParams?.count).toBeDefined(); + expect(typeof queryParams?.count).toBe('number'); + expect(Number.isFinite(queryParams?.count)).toBe(true); + expect(queryParams?.count).toBeGreaterThan(0); + }); +}); diff --git a/src/sdk/clients/worker/events/EventDispatcher.ts b/src/sdk/clients/worker/events/EventDispatcher.ts new file mode 100644 index 00000000..b33be777 --- /dev/null +++ b/src/sdk/clients/worker/events/EventDispatcher.ts @@ -0,0 +1,177 @@ +import type { + TaskRunnerEvent, + PollStarted, + PollCompleted, + PollFailure, + TaskExecutionStarted, + TaskExecutionCompleted, + TaskExecutionFailure, + TaskUpdateFailure, +} from "./types"; + +/** + * Interface for task runner event listeners. + * + * All methods are optional - implement only the events you need to handle. + * Listener failures are isolated and will not affect task execution. + */ +export interface TaskRunnerEventsListener { + /** + * Called when task polling begins. + */ + onPollStarted?(event: PollStarted): void | Promise; + + /** + * Called when task polling completes successfully. + */ + onPollCompleted?(event: PollCompleted): void | Promise; + + /** + * Called when task polling fails. + */ + onPollFailure?(event: PollFailure): void | Promise; + + /** + * Called when task execution begins. + */ + onTaskExecutionStarted?(event: TaskExecutionStarted): void | Promise; + + /** + * Called when task execution completes successfully. + */ + onTaskExecutionCompleted?( + event: TaskExecutionCompleted + ): void | Promise; + + /** + * Called when task execution fails. + */ + onTaskExecutionFailure?(event: TaskExecutionFailure): void | Promise; + + /** + * Called when task update fails after all retry attempts. + * This is a CRITICAL event that may require operational intervention. + */ + onTaskUpdateFailure?(event: TaskUpdateFailure): void | Promise; +} + +/** + * Event dispatcher for task runner events. + * + * Provides a decoupled event system for observability and metrics collection. + * Events are published asynchronously and listener failures are isolated. + */ +export class EventDispatcher { + private listeners: TaskRunnerEventsListener[] = []; + + /** + * Register an event listener. + * + * @param listener - The listener to register + */ + register(listener: TaskRunnerEventsListener): void { + this.listeners.push(listener); + } + + /** + * Unregister an event listener. + * + * @param listener - The listener to unregister + */ + unregister(listener: TaskRunnerEventsListener): void { + const index = this.listeners.indexOf(listener); + if (index !== -1) { + this.listeners.splice(index, 1); + } + } + + /** + * Publish a PollStarted event. + */ + async publishPollStarted(event: PollStarted): Promise { + await this.publishEvent("onPollStarted", event); + } + + /** + * Publish a PollCompleted event. + */ + async publishPollCompleted(event: PollCompleted): Promise { + await this.publishEvent("onPollCompleted", event); + } + + /** + * Publish a PollFailure event. + */ + async publishPollFailure(event: PollFailure): Promise { + await this.publishEvent("onPollFailure", event); + } + + /** + * Publish a TaskExecutionStarted event. + */ + async publishTaskExecutionStarted( + event: TaskExecutionStarted + ): Promise { + await this.publishEvent("onTaskExecutionStarted", event); + } + + /** + * Publish a TaskExecutionCompleted event. + */ + async publishTaskExecutionCompleted( + event: TaskExecutionCompleted + ): Promise { + await this.publishEvent("onTaskExecutionCompleted", event); + } + + /** + * Publish a TaskExecutionFailure event. + */ + async publishTaskExecutionFailure( + event: TaskExecutionFailure + ): Promise { + await this.publishEvent("onTaskExecutionFailure", event); + } + + /** + * Publish a TaskUpdateFailure event. + */ + async publishTaskUpdateFailure(event: TaskUpdateFailure): Promise { + await this.publishEvent("onTaskUpdateFailure", event); + } + + /** + * Internal method to publish events to all registered listeners. + * Listener failures are caught and logged to prevent affecting task execution. + */ + private async publishEvent( + method: K, + event: TaskRunnerEvent + ): Promise { + // Early return if no listeners registered (zero overhead) + if (this.listeners.length === 0) { + return; + } + + // Publish to all listeners asynchronously + const promises = this.listeners + .filter((listener) => listener[method]) + .map(async (listener) => { + try { + const handler = listener[method]; + if (handler) { + await handler.call(listener, event as never); + } + } catch (error) { + // Isolate listener failures - don't affect task execution + console.error( + `Event listener failed for ${method}:`, + error instanceof Error ? error.message : error + ); + } + }); + + // Wait for all listeners to complete (or fail) + await Promise.allSettled(promises); + } +} diff --git a/src/sdk/clients/worker/events/__tests__/EventDispatcher.test.ts b/src/sdk/clients/worker/events/__tests__/EventDispatcher.test.ts new file mode 100644 index 00000000..c411f7c0 --- /dev/null +++ b/src/sdk/clients/worker/events/__tests__/EventDispatcher.test.ts @@ -0,0 +1,240 @@ +import { jest, test, expect, describe, beforeEach } from "@jest/globals"; +import { EventDispatcher, TaskRunnerEventsListener } from "../EventDispatcher"; +import type { + PollStarted, + PollCompleted, + PollFailure, + TaskExecutionStarted, + TaskExecutionCompleted, + TaskExecutionFailure, + TaskUpdateFailure, +} from "../types"; + +describe("EventDispatcher", () => { + let dispatcher: EventDispatcher; + + beforeEach(() => { + dispatcher = new EventDispatcher(); + }); + + test("should register and call event listeners", async () => { + const mockListener: TaskRunnerEventsListener = { + onPollStarted: jest.fn<() => void>(), + onPollCompleted: jest.fn<() => void>(), + }; + + dispatcher.register(mockListener); + + const pollStartedEvent: PollStarted = { + taskType: "test-task", + workerId: "worker-1", + pollCount: 5, + timestamp: new Date(), + }; + + await dispatcher.publishPollStarted(pollStartedEvent); + + expect(mockListener.onPollStarted).toHaveBeenCalledWith(pollStartedEvent); + expect(mockListener.onPollCompleted).not.toHaveBeenCalled(); + }); + + test("should call multiple listeners", async () => { + const listener1: TaskRunnerEventsListener = { + onPollStarted: jest.fn<() => void>(), + }; + const listener2: TaskRunnerEventsListener = { + onPollStarted: jest.fn<() => void>(), + }; + + dispatcher.register(listener1); + dispatcher.register(listener2); + + const event: PollStarted = { + taskType: "test-task", + workerId: "worker-1", + pollCount: 5, + timestamp: new Date(), + }; + + await dispatcher.publishPollStarted(event); + + expect(listener1.onPollStarted).toHaveBeenCalledWith(event); + expect(listener2.onPollStarted).toHaveBeenCalledWith(event); + }); + + test("should unregister listeners", async () => { + const listener: TaskRunnerEventsListener = { + onPollStarted: jest.fn<() => void>(), + }; + + dispatcher.register(listener); + dispatcher.unregister(listener); + + const event: PollStarted = { + taskType: "test-task", + workerId: "worker-1", + pollCount: 5, + timestamp: new Date(), + }; + + await dispatcher.publishPollStarted(event); + + expect(listener.onPollStarted).not.toHaveBeenCalled(); + }); + + test("should isolate listener failures", async () => { + const failingListener: TaskRunnerEventsListener = { + onPollStarted: jest.fn<() => Promise>().mockRejectedValue(new Error("Listener error")), + }; + const workingListener: TaskRunnerEventsListener = { + onPollStarted: jest.fn<() => void>(), + }; + + dispatcher.register(failingListener); + dispatcher.register(workingListener); + + const event: PollStarted = { + taskType: "test-task", + workerId: "worker-1", + pollCount: 5, + timestamp: new Date(), + }; + + // Should not throw despite listener failure + await expect( + dispatcher.publishPollStarted(event) + ).resolves.toBeUndefined(); + + // Working listener should still be called + expect(workingListener.onPollStarted).toHaveBeenCalledWith(event); + }); + + test("should handle all event types", async () => { + const listener: TaskRunnerEventsListener = { + onPollStarted: jest.fn<() => void>(), + onPollCompleted: jest.fn<() => void>(), + onPollFailure: jest.fn<() => void>(), + onTaskExecutionStarted: jest.fn<() => void>(), + onTaskExecutionCompleted: jest.fn<() => void>(), + onTaskExecutionFailure: jest.fn<() => void>(), + onTaskUpdateFailure: jest.fn<() => void>(), + }; + + dispatcher.register(listener); + + // Test PollCompleted + const pollCompleted: PollCompleted = { + taskType: "test-task", + durationMs: 100, + tasksReceived: 3, + timestamp: new Date(), + }; + await dispatcher.publishPollCompleted(pollCompleted); + expect(listener.onPollCompleted).toHaveBeenCalledWith(pollCompleted); + + // Test PollFailure + const pollFailure: PollFailure = { + taskType: "test-task", + durationMs: 50, + cause: new Error("Poll failed"), + timestamp: new Date(), + }; + await dispatcher.publishPollFailure(pollFailure); + expect(listener.onPollFailure).toHaveBeenCalledWith(pollFailure); + + // Test TaskExecutionStarted + const execStarted: TaskExecutionStarted = { + taskType: "test-task", + taskId: "task-1", + workerId: "worker-1", + workflowInstanceId: "workflow-1", + timestamp: new Date(), + }; + await dispatcher.publishTaskExecutionStarted(execStarted); + expect(listener.onTaskExecutionStarted).toHaveBeenCalledWith(execStarted); + + // Test TaskExecutionCompleted + const execCompleted: TaskExecutionCompleted = { + taskType: "test-task", + taskId: "task-1", + workerId: "worker-1", + workflowInstanceId: "workflow-1", + durationMs: 200, + outputSizeBytes: 1024, + timestamp: new Date(), + }; + await dispatcher.publishTaskExecutionCompleted(execCompleted); + expect(listener.onTaskExecutionCompleted).toHaveBeenCalledWith( + execCompleted + ); + + // Test TaskExecutionFailure + const execFailure: TaskExecutionFailure = { + taskType: "test-task", + taskId: "task-1", + workerId: "worker-1", + workflowInstanceId: "workflow-1", + cause: new Error("Execution failed"), + durationMs: 150, + timestamp: new Date(), + }; + await dispatcher.publishTaskExecutionFailure(execFailure); + expect(listener.onTaskExecutionFailure).toHaveBeenCalledWith(execFailure); + + // Test TaskUpdateFailure + const updateFailure: TaskUpdateFailure = { + taskType: "test-task", + taskId: "task-1", + workerId: "worker-1", + workflowInstanceId: "workflow-1", + cause: new Error("Update failed"), + retryCount: 4, + taskResult: { status: "COMPLETED" }, + timestamp: new Date(), + }; + await dispatcher.publishTaskUpdateFailure(updateFailure); + expect(listener.onTaskUpdateFailure).toHaveBeenCalledWith(updateFailure); + }); + + test("should have zero overhead when no listeners registered", async () => { + // No listeners registered + const event: PollStarted = { + taskType: "test-task", + workerId: "worker-1", + pollCount: 5, + timestamp: new Date(), + }; + + // Should complete quickly with no listeners + const start = Date.now(); + await dispatcher.publishPollStarted(event); + const duration = Date.now() - start; + + // Should be nearly instant (< 10ms) + expect(duration).toBeLessThan(10); + }); + + test("should support async listeners", async () => { + let callbackExecuted = false; + + const asyncListener: TaskRunnerEventsListener = { + onPollStarted: async () => { + await new Promise((resolve) => setTimeout(resolve, 10)); + callbackExecuted = true; + }, + }; + + dispatcher.register(asyncListener); + + const event: PollStarted = { + taskType: "test-task", + workerId: "worker-1", + pollCount: 5, + timestamp: new Date(), + }; + + await dispatcher.publishPollStarted(event); + + expect(callbackExecuted).toBe(true); + }); +}); diff --git a/src/sdk/clients/worker/events/index.ts b/src/sdk/clients/worker/events/index.ts new file mode 100644 index 00000000..d005c392 --- /dev/null +++ b/src/sdk/clients/worker/events/index.ts @@ -0,0 +1,2 @@ +export * from "./types"; +export * from "./EventDispatcher"; diff --git a/src/sdk/clients/worker/events/types.ts b/src/sdk/clients/worker/events/types.ts new file mode 100644 index 00000000..aa374114 --- /dev/null +++ b/src/sdk/clients/worker/events/types.ts @@ -0,0 +1,133 @@ +/** + * Event types for task runner lifecycle events. + * + * These events provide observability into the worker polling and execution lifecycle, + * matching the Python SDK's event system architecture. + */ + +/** + * Base interface for all task runner events. + */ +export interface TaskRunnerEvent { + /** The task definition name */ + taskType: string; + /** UTC timestamp when the event was created */ + timestamp: Date; +} + +/** + * Event published when task polling begins. + */ +export interface PollStarted extends TaskRunnerEvent { + /** Identifier of the worker polling for tasks */ + workerId: string; + /** Number of tasks requested in this poll */ + pollCount: number; +} + +/** + * Event published when task polling completes successfully. + */ +export interface PollCompleted extends TaskRunnerEvent { + /** Time taken for the poll operation in milliseconds */ + durationMs: number; + /** Number of tasks received from the poll */ + tasksReceived: number; +} + +/** + * Event published when task polling fails. + */ +export interface PollFailure extends TaskRunnerEvent { + /** Time taken before the poll failed in milliseconds */ + durationMs: number; + /** The error that caused the failure */ + cause: Error; +} + +/** + * Event published when task execution begins. + */ +export interface TaskExecutionStarted extends TaskRunnerEvent { + /** Unique identifier of the task instance */ + taskId: string; + /** Identifier of the worker executing the task */ + workerId: string; + /** ID of the workflow instance this task belongs to */ + workflowInstanceId?: string; +} + +/** + * Event published when task execution completes successfully. + */ +export interface TaskExecutionCompleted extends TaskRunnerEvent { + /** Unique identifier of the task instance */ + taskId: string; + /** Identifier of the worker that executed the task */ + workerId: string; + /** ID of the workflow instance this task belongs to */ + workflowInstanceId?: string; + /** Time taken for task execution in milliseconds */ + durationMs: number; + /** Size of the task output in bytes (if available) */ + outputSizeBytes?: number; +} + +/** + * Event published when task execution fails. + */ +export interface TaskExecutionFailure extends TaskRunnerEvent { + /** Unique identifier of the task instance */ + taskId: string; + /** Identifier of the worker that attempted execution */ + workerId: string; + /** ID of the workflow instance this task belongs to */ + workflowInstanceId?: string; + /** The error that caused the failure */ + cause: Error; + /** Time taken before failure in milliseconds */ + durationMs: number; +} + +/** + * Event published when task update fails after all retry attempts. + * + * This is a CRITICAL event indicating that the worker successfully executed a task + * but failed to communicate the result back to Conductor after multiple retries. + * + * The task result is lost from Conductor's perspective, and external intervention + * may be required to reconcile the state. + * + * Use Cases: + * - Alert operations team of critical update failures + * - Log failed task results to external storage for recovery + * - Implement custom retry logic with different backoff strategies + * - Track update reliability metrics + * - Trigger incident response workflows + */ +export interface TaskUpdateFailure extends TaskRunnerEvent { + /** Unique identifier of the task instance */ + taskId: string; + /** Identifier of the worker that executed the task */ + workerId: string; + /** ID of the workflow instance this task belongs to */ + workflowInstanceId?: string; + /** The error that caused the final update failure */ + cause: Error; + /** Number of retry attempts made */ + retryCount: number; + /** The TaskResult object that failed to update (for recovery/logging) */ + taskResult: unknown; // Using unknown to avoid circular dependency with TaskResult type +} + +/** + * Union type of all task runner events. + */ +export type TaskRunnerEventType = + | PollStarted + | PollCompleted + | PollFailure + | TaskExecutionStarted + | TaskExecutionCompleted + | TaskExecutionFailure + | TaskUpdateFailure; diff --git a/src/sdk/clients/worker/exceptions/Exceptions.ts b/src/sdk/clients/worker/exceptions/Exceptions.ts new file mode 100644 index 00000000..235bbe62 --- /dev/null +++ b/src/sdk/clients/worker/exceptions/Exceptions.ts @@ -0,0 +1,49 @@ +/** + * Custom exception types for worker error handling. + */ + +/** + * Exception indicating a non-retryable task failure. + * + * When thrown from a worker's execute function, the task will be marked as + * FAILED_WITH_TERMINAL_ERROR and will NOT be retried, regardless of the + * task definition's retry_count setting. + * + * Use this for permanent failures where retry would produce the same result: + * - Business validation failures (invalid data format) + * - Authorization failures (user lacks permission) + * - Resource not found (entity doesn't exist) + * - Configuration errors (missing required config) + * - Data integrity violations (constraint violations) + * - Unsupported operations (feature not available) + * + * @example + * ```typescript + * async function validateOrder(task: Task) { + * const order = await getOrder(task.inputData.orderId); + * + * if (!order) { + * // Order doesn't exist - retry won't help + * throw new NonRetryableException(`Order ${task.inputData.orderId} not found`); + * } + * + * if (order.status === 'CANCELLED') { + * // Business rule - retry won't help + * throw new NonRetryableException('Cannot process cancelled order'); + * } + * + * return { status: 'COMPLETED', outputData: { validated: true } }; + * } + * ``` + */ +export class NonRetryableException extends Error { + constructor(message: string) { + super(message); + this.name = "NonRetryableException"; + + // Maintains proper stack trace for where our error was thrown (only available on V8) + if (Error.captureStackTrace) { + Error.captureStackTrace(this, NonRetryableException); + } + } +} diff --git a/src/sdk/clients/worker/exceptions/index.ts b/src/sdk/clients/worker/exceptions/index.ts new file mode 100644 index 00000000..83924741 --- /dev/null +++ b/src/sdk/clients/worker/exceptions/index.ts @@ -0,0 +1 @@ +export * from "./Exceptions"; diff --git a/src/sdk/clients/worker/index.ts b/src/sdk/clients/worker/index.ts index 32f3e6f2..9a35201c 100644 --- a/src/sdk/clients/worker/index.ts +++ b/src/sdk/clients/worker/index.ts @@ -7,3 +7,5 @@ export type { RunnerArgs, ConductorWorker, } from "./types"; +export * from "./events"; +export * from "./exceptions"; diff --git a/src/sdk/clients/worker/types.ts b/src/sdk/clients/worker/types.ts index f4d280a1..26b00f78 100644 --- a/src/sdk/clients/worker/types.ts +++ b/src/sdk/clients/worker/types.ts @@ -1,6 +1,7 @@ import type { ConductorLogger } from "../../helpers/logger"; import type { Task, TaskResult } from "../../../open-api"; import type { Client } from "../../../open-api/generated/client/types.gen"; +import type { TaskRunnerEventsListener } from "./events"; export type TaskErrorHandler = (error: Error, task?: Task) => void; @@ -33,6 +34,7 @@ export interface RunnerArgs { onError?: TaskErrorHandler; concurrency?: number; maxRetries?: number; + eventListeners?: TaskRunnerEventsListener[]; } export interface PollerOptions { @@ -48,6 +50,7 @@ export interface TaskManagerConfig { options?: Partial; onError?: TaskErrorHandler; maxRetries?: number; + eventListeners?: TaskRunnerEventsListener[]; } export type OptionEntries = [ diff --git a/src/sdk/index.ts b/src/sdk/index.ts index a254036c..1f352250 100644 --- a/src/sdk/index.ts +++ b/src/sdk/index.ts @@ -8,6 +8,8 @@ export * from "./generators"; export * from "./types"; +export * from "./worker"; + export { DefaultLogger, noopLogger } from "./helpers/logger"; export type { ConductorLogger } from "./helpers/logger"; export type { ConductorSdkError } from "./helpers/errors"; diff --git a/src/sdk/worker/config/WorkerConfig.ts b/src/sdk/worker/config/WorkerConfig.ts new file mode 100644 index 00000000..fe80ccdc --- /dev/null +++ b/src/sdk/worker/config/WorkerConfig.ts @@ -0,0 +1,345 @@ +import type { ConductorLogger } from "../../helpers/logger"; + +/** + * Worker configuration properties that can be overridden via environment variables. + */ +export interface WorkerConfig { + /** Polling interval in milliseconds */ + pollInterval?: number; + + /** Task domain for multi-tenancy */ + domain?: string; + + /** Unique worker identifier */ + workerId?: string; + + /** Maximum concurrent tasks */ + concurrency?: number; + + /** Auto-register task definition on startup */ + registerTaskDef?: boolean; + + /** Server-side long poll timeout in milliseconds */ + pollTimeout?: number; + + /** Whether worker is paused */ + paused?: boolean; + + /** Overwrite existing task definitions */ + overwriteTaskDef?: boolean; + + /** Enforce strict JSON schema validation */ + strictSchema?: boolean; +} + +/** + * Configurable property names and their types. + */ +const CONFIGURABLE_PROPERTIES: (keyof WorkerConfig)[] = [ + "pollInterval", + "domain", + "workerId", + "concurrency", + "registerTaskDef", + "pollTimeout", + "paused", + "overwriteTaskDef", + "strictSchema", +]; + +/** + * Type mapping for configuration properties. + */ +const PROPERTY_TYPES: Record = { + pollInterval: "number", + domain: "string", + workerId: "string", + concurrency: "number", + registerTaskDef: "boolean", + pollTimeout: "number", + paused: "boolean", + overwriteTaskDef: "boolean", + strictSchema: "boolean", +}; + +/** + * Default values for configuration properties. + */ +const DEFAULT_VALUES: Partial = { + pollInterval: 100, + concurrency: 1, + registerTaskDef: false, + pollTimeout: 100, + paused: false, + overwriteTaskDef: true, + strictSchema: false, +}; + +/** + * Parse environment variable value to the expected type. + */ +function parseEnvValue( + value: string, + expectedType: "number" | "string" | "boolean", + logger?: ConductorLogger +): number | string | boolean | undefined { + if (value === undefined || value === null) { + return undefined; + } + + // Handle boolean values + if (expectedType === "boolean") { + const lower = value.toLowerCase(); + return lower === "true" || lower === "1" || lower === "yes" || lower === "on"; + } + + // Handle number values + if (expectedType === "number") { + const parsed = Number(value); + if (isNaN(parsed)) { + logger?.info(`Cannot convert '${value}' to number, ignoring invalid value`); + return undefined; + } + return parsed; + } + + // String values + return value; +} + +/** + * Convert camelCase property name to snake_case for environment variables. + * + * Examples: + * - pollInterval → poll_interval + * - workerId → worker_id + * - registerTaskDef → register_task_def + */ +function toSnakeCase(camelCase: string): string { + return camelCase.replace(/[A-Z]/g, (letter) => `_${letter.toLowerCase()}`); +} + +/** + * Get configuration value from environment variables with hierarchical lookup. + * + * Priority order (highest to lowest): + * 1. CONDUCTOR_WORKER__ - Worker-specific (uppercase) + * 2. conductor.worker.. - Worker-specific (dotted) + * 3. CONDUCTOR_WORKER_ALL_ - Global (uppercase) + * 4. conductor.worker.all. - Global (dotted) + * + * @param workerName - Task definition name + * @param propertyName - Property name in camelCase (e.g., "pollInterval") + * @param expectedType - Expected type for parsing + * @param logger - Optional logger for debug messages + */ +function getEnvValue( + workerName: string, + propertyName: keyof WorkerConfig, + expectedType: "number" | "string" | "boolean", + logger?: ConductorLogger +): number | string | boolean | undefined { + const snakeCase = toSnakeCase(propertyName); + + // 1. Worker-specific (uppercase): CONDUCTOR_WORKER__ + const workerSpecificUpper = `CONDUCTOR_WORKER_${workerName.toUpperCase()}_${snakeCase.toUpperCase()}`; + let value = process.env[workerSpecificUpper]; + if (value !== undefined) { + logger?.debug(`Using worker-specific config: ${workerSpecificUpper}=${value}`); + return parseEnvValue(value, expectedType, logger); + } + + // 2. Worker-specific (dotted): conductor.worker.. + const workerSpecificDotted = `conductor.worker.${workerName}.${snakeCase}`; + value = process.env[workerSpecificDotted]; + if (value !== undefined) { + logger?.debug(`Using worker-specific config: ${workerSpecificDotted}=${value}`); + return parseEnvValue(value, expectedType, logger); + } + + // 3. Global (uppercase): CONDUCTOR_WORKER_ALL_ + const globalUpper = `CONDUCTOR_WORKER_ALL_${snakeCase.toUpperCase()}`; + value = process.env[globalUpper]; + if (value !== undefined) { + logger?.debug(`Using global worker config: ${globalUpper}=${value}`); + return parseEnvValue(value, expectedType, logger); + } + + // 4. Global (dotted): conductor.worker.all. + const globalDotted = `conductor.worker.all.${snakeCase}`; + value = process.env[globalDotted]; + if (value !== undefined) { + logger?.debug(`Using global worker config: ${globalDotted}=${value}`); + return parseEnvValue(value, expectedType, logger); + } + + return undefined; +} + +/** + * Resolve worker configuration with hierarchical override. + * + * Configuration hierarchy (highest to lowest priority): + * 1. Worker-specific environment variables + * 2. Global worker environment variables + * 3. Code-level defaults (decorator/function parameters) + * 4. System defaults + * + * @param workerName - Task definition name + * @param codeDefaults - Configuration from code (decorator parameters) + * @param logger - Optional logger for debug messages + * + * @example + * ```typescript + * // Code has: pollInterval: 1000 + * // Env has: CONDUCTOR_WORKER_ALL_POLL_INTERVAL=500 + * // Result: pollInterval=500 + * + * const config = resolveWorkerConfig("process_order", { + * pollInterval: 1000, + * domain: "dev", + * }); + * // config = { pollInterval: 500, domain: "dev", ... } + * ``` + */ +export function resolveWorkerConfig( + workerName: string, + codeDefaults: Partial = {}, + _logger?: ConductorLogger +): WorkerConfig { + const resolved: Record = {}; + + for (const property of CONFIGURABLE_PROPERTIES) { + const expectedType = PROPERTY_TYPES[property]; + + // 1. Check environment variables (worker-specific > global) + const envValue = getEnvValue(workerName, property, expectedType, _logger); + if (envValue !== undefined) { + resolved[property] = envValue; + continue; + } + + // 2. Use code default if provided + if (codeDefaults[property] !== undefined) { + resolved[property] = codeDefaults[property]; + continue; + } + + // 3. Use system default + if (DEFAULT_VALUES[property] !== undefined) { + resolved[property] = DEFAULT_VALUES[property]; + } + } + + return resolved as WorkerConfig; +} + +/** + * Generate a human-readable summary of worker configuration resolution. + * + * @param workerName - Task definition name + * @param resolvedConfig - Resolved configuration + * @param logger - Optional logger + * + * @example + * ```typescript + * const summary = getWorkerConfigSummary("process_order", config); + * console.log(summary); + * // Worker 'process_order' configuration: + * // pollInterval: 500 (from CONDUCTOR_WORKER_ALL_POLL_INTERVAL) + * // domain: production (from CONDUCTOR_WORKER_PROCESS_ORDER_DOMAIN) + * // concurrency: 5 (from code) + * ``` + */ +export function getWorkerConfigSummary( + workerName: string, + resolvedConfig: WorkerConfig +): string { + const lines: string[] = [`Worker '${workerName}' configuration:`]; + + for (const [key, value] of Object.entries(resolvedConfig)) { + if (value === undefined || value === null) { + continue; + } + + const property = key as keyof WorkerConfig; + const snakeCase = toSnakeCase(property); + + // Determine source of configuration + let source = "from code"; + + // Check worker-specific env vars + const workerSpecificUpper = `CONDUCTOR_WORKER_${workerName.toUpperCase()}_${snakeCase.toUpperCase()}`; + const workerSpecificDotted = `conductor.worker.${workerName}.${snakeCase}`; + + if (process.env[workerSpecificUpper] !== undefined) { + source = `from ${workerSpecificUpper}`; + } else if (process.env[workerSpecificDotted] !== undefined) { + source = `from ${workerSpecificDotted}`; + } else { + // Check global env vars + const globalUpper = `CONDUCTOR_WORKER_ALL_${snakeCase.toUpperCase()}`; + const globalDotted = `conductor.worker.all.${snakeCase}`; + + if (process.env[globalUpper] !== undefined) { + source = `from ${globalUpper}`; + } else if (process.env[globalDotted] !== undefined) { + source = `from ${globalDotted}`; + } + } + + lines.push(` ${property}: ${value} (${source})`); + } + + return lines.join("\n"); +} + +/** + * Generate a compact single-line summary of worker configuration. + * + * @param workerName - Task definition name + * @param resolvedConfig - Resolved configuration + * + * @example + * ```typescript + * const summary = getWorkerConfigOneline("process_order", config); + * console.log(summary); + * // Conductor Worker[name=process_order, pid=12345, status=active, poll_interval=500ms, domain=production, concurrency=5] + * ``` + */ +export function getWorkerConfigOneline( + workerName: string, + resolvedConfig: WorkerConfig +): string { + const parts: string[] = [`name=${workerName}`]; + + // Add process ID + parts.push(`pid=${process.pid}`); + + // Add status (paused or active) + const isPaused = resolvedConfig.paused ?? false; + parts.push(`status=${isPaused ? "paused" : "active"}`); + + // Add other properties in a logical order + if (resolvedConfig.pollInterval !== undefined) { + parts.push(`poll_interval=${resolvedConfig.pollInterval}ms`); + } + + if (resolvedConfig.domain !== undefined) { + parts.push(`domain=${resolvedConfig.domain}`); + } + + if (resolvedConfig.concurrency !== undefined) { + parts.push(`concurrency=${resolvedConfig.concurrency}`); + } + + if (resolvedConfig.pollTimeout !== undefined) { + parts.push(`poll_timeout=${resolvedConfig.pollTimeout}ms`); + } + + if (resolvedConfig.registerTaskDef !== undefined) { + parts.push(`register_task_def=${resolvedConfig.registerTaskDef}`); + } + + return `Conductor Worker[${parts.join(", ")}]`; +} diff --git a/src/sdk/worker/config/__tests__/WorkerConfig.test.ts b/src/sdk/worker/config/__tests__/WorkerConfig.test.ts new file mode 100644 index 00000000..2de9052d --- /dev/null +++ b/src/sdk/worker/config/__tests__/WorkerConfig.test.ts @@ -0,0 +1,318 @@ +import { test, expect, describe, beforeEach, afterEach } from "@jest/globals"; +import { + resolveWorkerConfig, + getWorkerConfigSummary, + getWorkerConfigOneline, +} from "../WorkerConfig"; + +describe("WorkerConfig", () => { + const originalEnv = process.env; + + beforeEach(() => { + // Clear environment variables before each test + process.env = { ...originalEnv }; + // Remove all CONDUCTOR_WORKER_* variables + for (const key of Object.keys(process.env)) { + if (key.startsWith("CONDUCTOR_WORKER") || key.startsWith("conductor.worker")) { + // eslint-disable-next-line @typescript-eslint/no-dynamic-delete + delete process.env[key]; + } + } + }); + + afterEach(() => { + process.env = originalEnv; + }); + + describe("resolveWorkerConfig", () => { + test("should use code defaults when no env vars set", () => { + const config = resolveWorkerConfig("test_worker", { + pollInterval: 1000, + domain: "dev", + concurrency: 5, + }); + + expect(config.pollInterval).toBe(1000); + expect(config.domain).toBe("dev"); + expect(config.concurrency).toBe(5); + }); + + test("should use system defaults when no code defaults or env vars", () => { + const config = resolveWorkerConfig("test_worker", {}); + + expect(config.pollInterval).toBe(100); // System default + expect(config.concurrency).toBe(1); // System default + expect(config.registerTaskDef).toBe(false); // System default + expect(config.overwriteTaskDef).toBe(true); // System default + expect(config.strictSchema).toBe(false); // System default + }); + + test("should override with worker-specific env var (uppercase)", () => { + process.env.CONDUCTOR_WORKER_TEST_WORKER_POLL_INTERVAL = "2000"; + process.env.CONDUCTOR_WORKER_TEST_WORKER_DOMAIN = "production"; + + const config = resolveWorkerConfig("test_worker", { + pollInterval: 1000, + domain: "dev", + }); + + expect(config.pollInterval).toBe(2000); + expect(config.domain).toBe("production"); + }); + + test("should override with worker-specific env var (dotted)", () => { + process.env["conductor.worker.test_worker.poll_interval"] = "3000"; + process.env["conductor.worker.test_worker.domain"] = "staging"; + + const config = resolveWorkerConfig("test_worker", { + pollInterval: 1000, + domain: "dev", + }); + + expect(config.pollInterval).toBe(3000); + expect(config.domain).toBe("staging"); + }); + + test("should override with global env var (uppercase)", () => { + process.env.CONDUCTOR_WORKER_ALL_POLL_INTERVAL = "500"; + process.env.CONDUCTOR_WORKER_ALL_CONCURRENCY = "10"; + + const config = resolveWorkerConfig("test_worker", { + pollInterval: 1000, + concurrency: 5, + }); + + expect(config.pollInterval).toBe(500); + expect(config.concurrency).toBe(10); + }); + + test("should override with global env var (dotted)", () => { + process.env["conductor.worker.all.poll_interval"] = "600"; + process.env["conductor.worker.all.concurrency"] = "15"; + + const config = resolveWorkerConfig("test_worker", { + pollInterval: 1000, + concurrency: 5, + }); + + expect(config.pollInterval).toBe(600); + expect(config.concurrency).toBe(15); + }); + + test("should prioritize worker-specific over global", () => { + process.env.CONDUCTOR_WORKER_ALL_POLL_INTERVAL = "500"; + process.env.CONDUCTOR_WORKER_TEST_WORKER_POLL_INTERVAL = "2000"; + + const config = resolveWorkerConfig("test_worker", { + pollInterval: 1000, + }); + + expect(config.pollInterval).toBe(2000); // Worker-specific wins + }); + + test("should prioritize uppercase over dotted for same level", () => { + process.env.CONDUCTOR_WORKER_TEST_WORKER_POLL_INTERVAL = "2000"; + process.env["conductor.worker.test_worker.poll_interval"] = "3000"; + + const config = resolveWorkerConfig("test_worker", { + pollInterval: 1000, + }); + + expect(config.pollInterval).toBe(2000); // Uppercase checked first + }); + + test("should handle boolean values", () => { + process.env.CONDUCTOR_WORKER_TEST_WORKER_REGISTER_TASK_DEF = "true"; + process.env.CONDUCTOR_WORKER_TEST_WORKER_PAUSED = "false"; + process.env.CONDUCTOR_WORKER_ALL_OVERWRITE_TASK_DEF = "false"; + process.env.CONDUCTOR_WORKER_ALL_STRICT_SCHEMA = "true"; + + const config = resolveWorkerConfig("test_worker", {}); + + expect(config.registerTaskDef).toBe(true); + expect(config.paused).toBe(false); + expect(config.overwriteTaskDef).toBe(false); + expect(config.strictSchema).toBe(true); + }); + + test("should handle boolean values with various formats", () => { + process.env.CONDUCTOR_WORKER_W1_REGISTER_TASK_DEF = "1"; + process.env.CONDUCTOR_WORKER_W2_REGISTER_TASK_DEF = "yes"; + process.env.CONDUCTOR_WORKER_W3_REGISTER_TASK_DEF = "on"; + process.env.CONDUCTOR_WORKER_W4_REGISTER_TASK_DEF = "TRUE"; + + expect(resolveWorkerConfig("w1", {}).registerTaskDef).toBe(true); + expect(resolveWorkerConfig("w2", {}).registerTaskDef).toBe(true); + expect(resolveWorkerConfig("w3", {}).registerTaskDef).toBe(true); + expect(resolveWorkerConfig("w4", {}).registerTaskDef).toBe(true); + }); + + test("should handle number values", () => { + process.env.CONDUCTOR_WORKER_TEST_WORKER_POLL_INTERVAL = "2500"; + process.env.CONDUCTOR_WORKER_TEST_WORKER_CONCURRENCY = "20"; + process.env.CONDUCTOR_WORKER_TEST_WORKER_POLL_TIMEOUT = "300"; + + const config = resolveWorkerConfig("test_worker", {}); + + expect(config.pollInterval).toBe(2500); + expect(config.concurrency).toBe(20); + expect(config.pollTimeout).toBe(300); + }); + + test("should handle invalid number values gracefully", () => { + process.env.CONDUCTOR_WORKER_TEST_WORKER_POLL_INTERVAL = "invalid"; + + const config = resolveWorkerConfig("test_worker", { + pollInterval: 1000, + }); + + // Should fall back to code default + expect(config.pollInterval).toBe(1000); + }); + + test("should handle string values", () => { + process.env.CONDUCTOR_WORKER_TEST_WORKER_DOMAIN = "production"; + process.env.CONDUCTOR_WORKER_TEST_WORKER_WORKER_ID = "worker-123"; + + const config = resolveWorkerConfig("test_worker", {}); + + expect(config.domain).toBe("production"); + expect(config.workerId).toBe("worker-123"); + }); + + test("should handle all properties", () => { + process.env.CONDUCTOR_WORKER_ALL_POLL_INTERVAL = "500"; + process.env.CONDUCTOR_WORKER_ALL_DOMAIN = "global-domain"; + process.env.CONDUCTOR_WORKER_ALL_WORKER_ID = "global-worker"; + process.env.CONDUCTOR_WORKER_ALL_CONCURRENCY = "10"; + process.env.CONDUCTOR_WORKER_ALL_REGISTER_TASK_DEF = "true"; + process.env.CONDUCTOR_WORKER_ALL_POLL_TIMEOUT = "200"; + process.env.CONDUCTOR_WORKER_ALL_PAUSED = "false"; + process.env.CONDUCTOR_WORKER_ALL_OVERWRITE_TASK_DEF = "false"; + process.env.CONDUCTOR_WORKER_ALL_STRICT_SCHEMA = "true"; + + const config = resolveWorkerConfig("test_worker", {}); + + expect(config.pollInterval).toBe(500); + expect(config.domain).toBe("global-domain"); + expect(config.workerId).toBe("global-worker"); + expect(config.concurrency).toBe(10); + expect(config.registerTaskDef).toBe(true); + expect(config.pollTimeout).toBe(200); + expect(config.paused).toBe(false); + expect(config.overwriteTaskDef).toBe(false); + expect(config.strictSchema).toBe(true); + }); + + test("should handle mixed sources", () => { + // Global env var + process.env.CONDUCTOR_WORKER_ALL_POLL_INTERVAL = "500"; + + // Worker-specific env var + process.env.CONDUCTOR_WORKER_TEST_WORKER_CONCURRENCY = "20"; + + const config = resolveWorkerConfig("test_worker", { + pollInterval: 1000, // Overridden by global + concurrency: 5, // Overridden by worker-specific + domain: "dev", // From code + }); + + expect(config.pollInterval).toBe(500); + expect(config.concurrency).toBe(20); + expect(config.domain).toBe("dev"); + }); + }); + + describe("getWorkerConfigSummary", () => { + test("should generate summary with code defaults", () => { + const config = resolveWorkerConfig("test_worker", { + pollInterval: 1000, + domain: "dev", + }); + + const summary = getWorkerConfigSummary("test_worker", config); + + expect(summary).toContain("Worker 'test_worker' configuration:"); + expect(summary).toContain("pollInterval: 1000 (from code)"); + expect(summary).toContain("domain: dev (from code)"); + }); + + test("should generate summary with env var sources", () => { + process.env.CONDUCTOR_WORKER_ALL_POLL_INTERVAL = "500"; + process.env.CONDUCTOR_WORKER_TEST_WORKER_DOMAIN = "production"; + + const config = resolveWorkerConfig("test_worker", {}); + const summary = getWorkerConfigSummary("test_worker", config); + + expect(summary).toContain("pollInterval: 500 (from CONDUCTOR_WORKER_ALL_POLL_INTERVAL)"); + expect(summary).toContain("domain: production (from CONDUCTOR_WORKER_TEST_WORKER_DOMAIN)"); + }); + + test("should skip undefined values", () => { + const config = resolveWorkerConfig("test_worker", { + pollInterval: 1000, + // domain is undefined + }); + + const summary = getWorkerConfigSummary("test_worker", config); + + expect(summary).toContain("pollInterval"); + expect(summary).not.toContain("domain: undefined"); + }); + }); + + describe("getWorkerConfigOneline", () => { + test("should generate compact one-line summary", () => { + const config = resolveWorkerConfig("test_worker", { + pollInterval: 1000, + domain: "production", + concurrency: 5, + }); + + const oneline = getWorkerConfigOneline("test_worker", config); + + expect(oneline).toContain("Conductor Worker["); + expect(oneline).toContain("name=test_worker"); + expect(oneline).toContain(`pid=${process.pid}`); + expect(oneline).toContain("status=active"); + expect(oneline).toContain("poll_interval=1000ms"); + expect(oneline).toContain("domain=production"); + expect(oneline).toContain("concurrency=5"); + }); + + test("should show paused status", () => { + const config = resolveWorkerConfig("test_worker", { + paused: true, + }); + + const oneline = getWorkerConfigOneline("test_worker", config); + + expect(oneline).toContain("status=paused"); + }); + + test("should handle minimal config", () => { + const config = resolveWorkerConfig("test_worker", {}); + const oneline = getWorkerConfigOneline("test_worker", config); + + expect(oneline).toContain("Conductor Worker["); + expect(oneline).toContain("name=test_worker"); + expect(oneline).toContain("status=active"); + }); + }); + + describe("camelCase to snake_case conversion", () => { + test("should handle various property names", () => { + process.env.CONDUCTOR_WORKER_TEST_POLL_INTERVAL = "1000"; + process.env.CONDUCTOR_WORKER_TEST_WORKER_ID = "w1"; + process.env.CONDUCTOR_WORKER_TEST_REGISTER_TASK_DEF = "true"; + process.env.CONDUCTOR_WORKER_TEST_OVERWRITE_TASK_DEF = "false"; + + const config = resolveWorkerConfig("test", {}); + + expect(config.pollInterval).toBe(1000); + expect(config.workerId).toBe("w1"); + expect(config.registerTaskDef).toBe(true); + expect(config.overwriteTaskDef).toBe(false); + }); + }); +}); diff --git a/src/sdk/worker/config/index.ts b/src/sdk/worker/config/index.ts new file mode 100644 index 00000000..fa38f2df --- /dev/null +++ b/src/sdk/worker/config/index.ts @@ -0,0 +1,6 @@ +export { + resolveWorkerConfig, + getWorkerConfigSummary, + getWorkerConfigOneline, + type WorkerConfig, +} from "./WorkerConfig"; diff --git a/src/sdk/worker/core/TaskHandler.ts b/src/sdk/worker/core/TaskHandler.ts new file mode 100644 index 00000000..95775139 --- /dev/null +++ b/src/sdk/worker/core/TaskHandler.ts @@ -0,0 +1,357 @@ +import type { Client } from "../../../open-api"; +import type { ConductorLogger } from "../../helpers/logger"; +import { DefaultLogger } from "../../helpers/logger"; +import type { TaskRunnerEventsListener } from "../../clients/worker/events"; +import type { ConductorWorker } from "../../clients/worker/types"; +import { TaskRunner } from "../../clients/worker/TaskRunner"; +import { getRegisteredWorkers, type RegisteredWorker } from "../decorators/registry"; + +/** + * Configuration for TaskHandler. + */ +export interface TaskHandlerConfig { + /** + * Conductor client instance. + * Required for communicating with Conductor server. + */ + client: Client; + + /** + * Additional workers to register manually. + * These will be added alongside auto-discovered decorated workers. + * Default: [] + */ + workers?: ConductorWorker[]; + + /** + * Whether to scan for @worker decorated functions. + * When true, automatically discovers all workers registered via @worker decorator. + * Default: true + */ + scanForDecorated?: boolean; + + /** + * Modules to import for side-effect registration. + * Importing these modules will trigger @worker decorator execution. + * Useful when workers are defined in separate files. + * + * Example: ['./workers/orderWorkers', './workers/paymentWorkers'] + * Default: [] + */ + importModules?: string[]; + + /** + * Event listeners for worker lifecycle events. + * Default: [] + */ + eventListeners?: TaskRunnerEventsListener[]; + + /** + * Logger instance for TaskHandler. + * Default: DefaultLogger + */ + logger?: ConductorLogger; +} + +/** + * TaskHandler orchestrates worker lifecycle and auto-discovery. + * + * This is the main entry point for the SDK-style worker framework, + * matching the Python SDK's TaskHandler architecture. + * + * Features: + * - Auto-discovers workers decorated with @worker + * - Manages worker lifecycle (start/stop) + * - Supports both decorated and manual worker registration + * - Module import for side-effect registration + * - Event listener support + * + * @example + * Basic usage with auto-discovery: + * ```typescript + * import { TaskHandler } from "@io-orkes/conductor-javascript/worker"; + * + * // Workers defined elsewhere with @worker decorator + * const handler = new TaskHandler({ + * client: conductorClient, + * scanForDecorated: true, + * }); + * + * handler.startWorkers(); + * + * // Later... + * await handler.stopWorkers(); + * ``` + * + * @example + * With module imports: + * ```typescript + * const handler = new TaskHandler({ + * client: conductorClient, + * importModules: [ + * './workers/orderWorkers', + * './workers/paymentWorkers', + * ], + * }); + * + * handler.startWorkers(); + * ``` + * + * @example + * Mixed approach (decorated + manual): + * ```typescript + * const handler = new TaskHandler({ + * client: conductorClient, + * scanForDecorated: true, + * workers: [ + * { + * taskDefName: 'dynamic_task', + * execute: async (task) => ({ status: 'COMPLETED', outputData: {} }), + * }, + * ], + * }); + * + * handler.startWorkers(); + * ``` + * + * @example + * With event listeners: + * ```typescript + * const metricsListener = { + * onTaskExecutionCompleted(event) { + * console.log(`Task ${event.taskId} completed in ${event.durationMs}ms`); + * }, + * }; + * + * const handler = new TaskHandler({ + * client: conductorClient, + * eventListeners: [metricsListener], + * }); + * + * handler.startWorkers(); + * ``` + */ +export class TaskHandler { + private client: Client; + private workers: ConductorWorker[] = []; + private taskRunners: TaskRunner[] = []; + private config: TaskHandlerConfig; + private logger: ConductorLogger; + private isRunning = false; + + /** + * Create a TaskHandler instance with async module imports. + * Use this instead of `new TaskHandler()` when using `importModules`. + * + * @example + * ```typescript + * const handler = await TaskHandler.create({ + * client, + * importModules: ["./workers/orderWorkers", "./workers/paymentWorkers"], + * }); + * ``` + */ + static async create(config: TaskHandlerConfig): Promise { + const logger = config.logger ?? new DefaultLogger(); + + // Import modules for side-effect registration + if (config.importModules && config.importModules.length > 0) { + logger.info( + `Importing ${config.importModules.length} module(s) for worker discovery...` + ); + + for (const modulePath of config.importModules) { + try { + logger.debug(`Importing module: ${modulePath}`); + await import(modulePath); // Async ES module import + logger.debug(`Successfully imported: ${modulePath}`); + } catch (error) { + logger.error( + `Failed to import module ${modulePath}:`, + error instanceof Error ? error.message : error + ); + throw new Error( + `Failed to import worker module "${modulePath}": ${ + error instanceof Error ? error.message : String(error) + }` + ); + } + } + } + + // Now create the handler - workers are already registered via decorators + return new TaskHandler(config); + } + + constructor(config: TaskHandlerConfig) { + this.config = config; + this.client = config.client; + this.logger = config.logger ?? new DefaultLogger(); + + // Auto-discover decorated workers + if (config.scanForDecorated !== false) { + const decoratedWorkers = getRegisteredWorkers(); + this.logger.info( + `Discovered ${decoratedWorkers.length} worker(s) via @worker decorator` + ); + + // Convert RegisteredWorker to ConductorWorker + for (const registered of decoratedWorkers) { + this.workers.push(this.convertToConductorWorker(registered)); + this.logger.debug( + `Registered worker: ${registered.taskDefName}${ + registered.domain ? ` (domain: ${registered.domain})` : "" + }` + ); + } + } + + // Add manually provided workers + if (config.workers && config.workers.length > 0) { + this.logger.info( + `Adding ${config.workers.length} manually registered worker(s)` + ); + this.workers.push(...config.workers); + } + + if (this.workers.length === 0) { + this.logger.info( + "No workers registered. Did you forget to use @worker decorator or provide workers manually?" + ); + } else { + this.logger.info(`TaskHandler initialized with ${this.workers.length} worker(s)`); + } + } + + /** + * Convert RegisteredWorker to ConductorWorker format. + */ + private convertToConductorWorker(registered: RegisteredWorker): ConductorWorker { + return { + taskDefName: registered.taskDefName, + execute: registered.executeFunction, + concurrency: registered.concurrency, + pollInterval: registered.pollInterval, + domain: registered.domain, + // Note: registerTaskDef, taskDef, etc. are not part of ConductorWorker interface + // These will be handled by configuration system in Phase 4 + }; + } + + /** + * Start all registered workers. + * + * Creates a TaskRunner for each worker and begins polling for tasks. + * This method is idempotent - calling it multiple times has no effect. + */ + startWorkers(): void { + if (this.isRunning) { + this.logger.info("Workers are already running. Ignoring startWorkers() call."); + return; + } + + if (this.workers.length === 0) { + this.logger.info("No workers to start."); + return; + } + + this.logger.info(`Starting ${this.workers.length} worker(s)...`); + + for (const worker of this.workers) { + try { + const runner = new TaskRunner({ + worker, + client: this.client, + options: { + workerID: "", // Will be auto-generated + domain: worker.domain, + pollInterval: worker.pollInterval, + concurrency: worker.concurrency, + }, + logger: this.logger, + eventListeners: this.config.eventListeners, + }); + + runner.startPolling(); + this.taskRunners.push(runner); + + this.logger.info( + `Started worker: ${worker.taskDefName}${ + worker.domain ? ` (domain: ${worker.domain})` : "" + }` + ); + } catch (error) { + this.logger.error( + `Failed to start worker ${worker.taskDefName}:`, + error instanceof Error ? error.message : error + ); + throw error; + } + } + + this.isRunning = true; + this.logger.info("All workers started successfully"); + } + + /** + * Stop all running workers gracefully. + * + * Stops polling and waits for in-flight tasks to complete. + * This method is idempotent - calling it multiple times has no effect. + */ + async stopWorkers(): Promise { + if (!this.isRunning) { + this.logger.info("Workers are not running. Ignoring stopWorkers() call."); + return; + } + + this.logger.info(`Stopping ${this.taskRunners.length} worker(s)...`); + + const stopPromises = this.taskRunners.map(async (runner, index) => { + try { + await runner.stopPolling(); + this.logger.debug(`Stopped worker ${index + 1}/${this.taskRunners.length}`); + } catch (error) { + this.logger.error( + `Error stopping worker ${index + 1}:`, + error instanceof Error ? error.message : error + ); + } + }); + + await Promise.all(stopPromises); + + this.taskRunners = []; + this.isRunning = false; + this.logger.info("All workers stopped"); + } + + /** + * Get the number of registered workers. + */ + get workerCount(): number { + return this.workers.length; + } + + /** + * Get the number of running workers. + */ + get runningWorkerCount(): number { + return this.taskRunners.length; + } + + /** + * Check if workers are currently running. + */ + get running(): boolean { + return this.isRunning; + } + + /** + * Context manager support (for TypeScript 5.2+ using keyword). + * Automatically stops workers when disposed. + */ + async [Symbol.asyncDispose](): Promise { + await this.stopWorkers(); + } +} diff --git a/src/sdk/worker/core/__tests__/TaskHandler.test.ts b/src/sdk/worker/core/__tests__/TaskHandler.test.ts new file mode 100644 index 00000000..14b9b407 --- /dev/null +++ b/src/sdk/worker/core/__tests__/TaskHandler.test.ts @@ -0,0 +1,924 @@ +/* eslint-disable @typescript-eslint/no-unused-vars */ +import type { ConductorLogger } from "@/sdk/helpers/logger"; +import { TaskHandler } from "@/sdk/worker/core/TaskHandler"; +import { clearWorkerRegistry } from "@/sdk/worker/decorators/registry"; +import { worker } from "@/sdk/worker/decorators/worker"; +import { afterEach, beforeEach, describe, expect, jest, test } from "@jest/globals"; +import type { Client, Task } from "@open-api/index"; + +// Mock client with all required methods +const createMockClient = (): Client => { + const mockFn = jest.fn<() => Promise<{ data: null }>>().mockResolvedValue({ data: null }); + return { + buildUrl: jest.fn(), + getConfig: jest.fn(), + request: jest.fn(), + setConfig: jest.fn(), + get: mockFn, + post: mockFn, + put: mockFn, + patch: mockFn, + delete: mockFn, + options: mockFn, + head: mockFn, + interceptors: { + request: { use: jest.fn(), eject: jest.fn() }, + response: { use: jest.fn(), eject: jest.fn() }, + error: { use: jest.fn(), eject: jest.fn() }, + }, + } as unknown as Client; +}; + +describe("TaskHandler", () => { + const activeHandlers: TaskHandler[] = []; + + beforeEach(() => { + clearWorkerRegistry(); + }); + + afterEach(async () => { + // Stop all handlers to prevent async operations continuing after tests + for (const handler of activeHandlers) { + await handler.stopWorkers(); + } + activeHandlers.length = 0; + clearWorkerRegistry(); + + // Wait for async cleanup + await new Promise(resolve => setTimeout(resolve, 50)); + }); + + test("should create TaskHandler with auto-discovery", () => { + // Register workers via decorator + async function testWorker1(task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + async function testWorker2(task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + worker({ taskDefName: "task1" })(testWorker1); + worker({ taskDefName: "task2" })(testWorker2); + + const handler = new TaskHandler({ + client: createMockClient(), + scanForDecorated: true, + }); + + expect(handler.workerCount).toBe(2); + expect(handler.running).toBe(false); + }); + + test("should create TaskHandler without auto-discovery", () => { + // Register workers via decorator + async function testWorker(task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + worker({ taskDefName: "ignored_task" })(testWorker); + + const handler = new TaskHandler({ + client: createMockClient(), + scanForDecorated: false, // Disable auto-discovery + }); + + expect(handler.workerCount).toBe(0); + }); + + test("should add manual workers", () => { + const manualWorker = { + taskDefName: "manual_task", + execute: async (task: Task) => ({ + status: "COMPLETED" as const, + outputData: {}, + }), + }; + + const handler = new TaskHandler({ + client: createMockClient(), + scanForDecorated: false, + workers: [manualWorker], + }); + + expect(handler.workerCount).toBe(1); + }); + + test("should combine decorated and manual workers", () => { + async function decoratedWorker(task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + worker({ taskDefName: "decorated_task" })(decoratedWorker); + + const manualWorker = { + taskDefName: "manual_task", + execute: async (task: Task) => ({ + status: "COMPLETED" as const, + outputData: {}, + }), + }; + + const handler = new TaskHandler({ + client: createMockClient(), + scanForDecorated: true, + workers: [manualWorker], + }); + + expect(handler.workerCount).toBe(2); + }); + + test("should handle no workers registered", () => { + const handler = new TaskHandler({ + client: createMockClient(), + scanForDecorated: true, + }); + + expect(handler.workerCount).toBe(0); + expect(handler.running).toBe(false); + }); + + test("should start and stop workers", async () => { + async function testWorker(task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + worker({ taskDefName: "test_task" })(testWorker); + + const handler = new TaskHandler({ + client: createMockClient(), + scanForDecorated: true, + }); + activeHandlers.push(handler); + + expect(handler.running).toBe(false); + expect(handler.runningWorkerCount).toBe(0); + + handler.startWorkers(); + + expect(handler.running).toBe(true); + expect(handler.runningWorkerCount).toBe(1); + + await handler.stopWorkers(); + + expect(handler.running).toBe(false); + expect(handler.runningWorkerCount).toBe(0); + }); + + test("should be idempotent for startWorkers", () => { + async function testWorker(task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + worker({ taskDefName: "test_task" })(testWorker); + + const handler = new TaskHandler({ + client: createMockClient(), + scanForDecorated: true, + }); + activeHandlers.push(handler); + + handler.startWorkers(); + expect(handler.runningWorkerCount).toBe(1); + + // Call again - should be no-op + handler.startWorkers(); + expect(handler.runningWorkerCount).toBe(1); + }); + + test("should be idempotent for stopWorkers", async () => { + async function testWorker(task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + worker({ taskDefName: "test_task" })(testWorker); + + const handler = new TaskHandler({ + client: createMockClient(), + scanForDecorated: true, + }); + activeHandlers.push(handler); + + handler.startWorkers(); + await handler.stopWorkers(); + expect(handler.running).toBe(false); + + // Call again - should be no-op + await handler.stopWorkers(); + expect(handler.running).toBe(false); + }); + + test("should handle workers with different configurations", () => { + async function worker1(task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + async function worker2(task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + async function worker3(task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + worker({ taskDefName: "task1", concurrency: 5 })(worker1); + worker({ taskDefName: "task2", pollInterval: 200, domain: "test" })(worker2); + worker({ taskDefName: "task3" })(worker3); + + const handler = new TaskHandler({ + client: createMockClient(), + scanForDecorated: true, + }); + + expect(handler.workerCount).toBe(3); + }); + + test("should support async dispose (context manager)", async () => { + async function testWorker(task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + worker({ taskDefName: "test_task" })(testWorker); + + const handler = new TaskHandler({ + client: createMockClient(), + scanForDecorated: true, + }); + activeHandlers.push(handler); + + handler.startWorkers(); + expect(handler.running).toBe(true); + + // Simulate using keyword (TypeScript 5.2+) + await handler[Symbol.asyncDispose](); + + expect(handler.running).toBe(false); + }); + + test("should handle event listeners", () => { + async function testWorker(_task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + worker({ taskDefName: "test_task" })(testWorker); + + const mockListener = { + onTaskExecutionCompleted: jest.fn<() => void>(), + }; + + const handler = new TaskHandler({ + client: createMockClient(), + scanForDecorated: true, + eventListeners: [mockListener], + }); + + expect(handler.workerCount).toBe(1); + // Event listeners are passed to TaskRunner (tested separately) + }); + + test("should handle empty workers array gracefully", () => { + const handler = new TaskHandler({ + client: createMockClient(), + scanForDecorated: false, + workers: [], + }); + + expect(handler.workerCount).toBe(0); + + handler.startWorkers(); // Should not throw + expect(handler.running).toBe(false); + }); + + test("should convert RegisteredWorker to ConductorWorker", () => { + async function testWorker(task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + worker({ + taskDefName: "test_task", + concurrency: 10, + pollInterval: 200, + domain: "production", + })(testWorker); + + const handler = new TaskHandler({ + client: createMockClient(), + scanForDecorated: true, + }); + + expect(handler.workerCount).toBe(1); + // Internal conversion tested via successful worker creation + }); +}); + +describe("TaskHandler - Module Imports", () => { + const activeHandlers: TaskHandler[] = []; + + beforeEach(() => { + clearWorkerRegistry(); + }); + + afterEach(async () => { + // Stop all handlers + for (const handler of activeHandlers) { + await handler.stopWorkers(); + } + activeHandlers.length = 0; + clearWorkerRegistry(); + + // Wait for async cleanup + await new Promise(resolve => setTimeout(resolve, 50)); + }); + + test("should throw error for invalid module path", async () => { + await expect(TaskHandler.create({ + client: createMockClient(), + importModules: ["./nonexistent/module"], + })).rejects.toThrow("Failed to import worker module"); + }); + + test("should handle empty importModules array", async () => { + const handler = await TaskHandler.create({ + client: createMockClient(), + importModules: [], + scanForDecorated: false, + }); + + expect(handler.workerCount).toBe(0); + }); + + // Note: Testing actual module imports would require creating test files + // which is complex in a unit test environment. This is better suited + // for integration tests. +}); + +describe("TaskHandler - Error Handling", () => { + const activeHandlers: TaskHandler[] = []; + + beforeEach(() => { + clearWorkerRegistry(); + }); + + afterEach(async () => { + // Stop all handlers + for (const handler of activeHandlers) { + await handler.stopWorkers(); + } + activeHandlers.length = 0; + clearWorkerRegistry(); + + // Wait for async cleanup + await new Promise(resolve => setTimeout(resolve, 50)); + }); + + test("should handle errors during worker stopping", async () => { + async function testWorker(task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + worker({ taskDefName: "test_task" })(testWorker); + + const mockLogger: ConductorLogger = { + info: jest.fn(), + debug: jest.fn(), + error: jest.fn(), + }; + + const handler = new TaskHandler({ + client: createMockClient(), + scanForDecorated: true, + logger: mockLogger, + }); + activeHandlers.push(handler); + + handler.startWorkers(); + + // Mock one of the runners to throw an error on stop + const runnerToFail = handler["taskRunners"][0]; + const originalStop = runnerToFail.stopPolling; + runnerToFail.stopPolling = jest.fn<() => Promise>().mockRejectedValue(new Error("Stop failed")); + + await handler.stopWorkers(); + + // Should log error but not throw + expect(mockLogger.error).toHaveBeenCalledWith( + expect.stringContaining("Error stopping worker"), + expect.anything() + ); + expect(handler.running).toBe(false); + + // Restore original method + runnerToFail.stopPolling = originalStop; + }); + + test("should handle multiple workers with some failing to start", () => { + async function worker1(task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + worker({ taskDefName: "worker1" })(worker1); + + const mockLogger: ConductorLogger = { + info: jest.fn(), + debug: jest.fn(), + error: jest.fn(), + }; + + // Create a worker that will fail + const failingWorker = { + taskDefName: "failing_worker", + execute: async (task: Task) => ({ + status: "COMPLETED" as const, + outputData: {}, + }), + }; + + const handler = new TaskHandler({ + client: createMockClient(), + scanForDecorated: true, + workers: [failingWorker], + logger: mockLogger, + }); + + expect(handler.workerCount).toBe(2); + }); +}); + +describe("TaskHandler - Custom Logger", () => { + const activeHandlers: TaskHandler[] = []; + + beforeEach(() => { + clearWorkerRegistry(); + }); + + afterEach(async () => { + // Stop all handlers + for (const handler of activeHandlers) { + await handler.stopWorkers(); + } + activeHandlers.length = 0; + clearWorkerRegistry(); + + // Wait for async cleanup + await new Promise(resolve => setTimeout(resolve, 50)); + }); + + test("should use custom logger", () => { + async function testWorker(task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + worker({ taskDefName: "test_task" })(testWorker); + + const mockLogger: ConductorLogger = { + info: jest.fn(), + debug: jest.fn(), + error: jest.fn(), + }; + + const handler = new TaskHandler({ + client: createMockClient(), + scanForDecorated: true, + logger: mockLogger, + }); + + expect(mockLogger.info).toHaveBeenCalledWith( + expect.stringContaining("Discovered 1 worker(s)") + ); + expect(mockLogger.info).toHaveBeenCalledWith( + expect.stringContaining("TaskHandler initialized") + ); + }); + + test("should log debug messages for worker registration", () => { + async function testWorker(task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + worker({ taskDefName: "test_task", domain: "test-domain" })(testWorker); + + const mockLogger: ConductorLogger = { + info: jest.fn(), + debug: jest.fn(), + error: jest.fn(), + }; + + new TaskHandler({ + client: createMockClient(), + scanForDecorated: true, + logger: mockLogger, + }); + + expect(mockLogger.debug).toHaveBeenCalledWith( + expect.stringContaining("test_task") + ); + expect(mockLogger.debug).toHaveBeenCalledWith( + expect.stringContaining("test-domain") + ); + }); + + test("should log when starting and stopping workers", async () => { + async function testWorker(task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + worker({ taskDefName: "test_task" })(testWorker); + + const infoMock = jest.fn(); + const mockLogger: ConductorLogger = { + info: infoMock, + debug: jest.fn(), + error: jest.fn(), + }; + + const handler = new TaskHandler({ + client: createMockClient(), + scanForDecorated: true, + logger: mockLogger, + }); + activeHandlers.push(handler); + + infoMock.mockClear(); + + handler.startWorkers(); + + expect(infoMock).toHaveBeenCalledWith( + expect.stringContaining("Starting 1 worker(s)") + ); + expect(infoMock).toHaveBeenCalledWith( + expect.stringContaining("All workers started successfully") + ); + + infoMock.mockClear(); + + await handler.stopWorkers(); + + expect(infoMock).toHaveBeenCalledWith( + expect.stringContaining("Stopping 1 worker(s)") + ); + expect(infoMock).toHaveBeenCalledWith( + expect.stringContaining("All workers stopped") + ); + }); +}); + +describe("TaskHandler - Worker Configuration", () => { + const activeHandlers: TaskHandler[] = []; + + beforeEach(() => { + clearWorkerRegistry(); + }); + + afterEach(async () => { + // Stop all handlers + for (const handler of activeHandlers) { + await handler.stopWorkers(); + } + activeHandlers.length = 0; + clearWorkerRegistry(); + + // Wait for async cleanup + await new Promise(resolve => setTimeout(resolve, 50)); + }); + + test("should handle workers with only taskDefName", () => { + const worker1 = { + taskDefName: "minimal_worker", + execute: async (task: Task) => ({ + status: "COMPLETED" as const, + outputData: {}, + }), + }; + + const handler = new TaskHandler({ + client: createMockClient(), + scanForDecorated: false, + workers: [worker1], + }); + + expect(handler.workerCount).toBe(1); + }); + + test("should handle workers with full configuration", () => { + const worker1 = { + taskDefName: "full_config_worker", + execute: async (task: Task) => ({ + status: "COMPLETED" as const, + outputData: {}, + }), + concurrency: 10, + pollInterval: 500, + domain: "production", + }; + + const handler = new TaskHandler({ + client: createMockClient(), + scanForDecorated: false, + workers: [worker1], + }); + + expect(handler.workerCount).toBe(1); + }); + + test("should handle mixed worker configurations", () => { + async function decoratedWorker(task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + worker({ + taskDefName: "decorated_worker", + concurrency: 5, + pollInterval: 100, + })(decoratedWorker); + + const manualWorker1 = { + taskDefName: "manual_worker_1", + execute: async (task: Task) => ({ + status: "COMPLETED" as const, + outputData: {}, + }), + }; + + const manualWorker2 = { + taskDefName: "manual_worker_2", + execute: async (task: Task) => ({ + status: "COMPLETED" as const, + outputData: {}, + }), + domain: "test", + }; + + const handler = new TaskHandler({ + client: createMockClient(), + scanForDecorated: true, + workers: [manualWorker1, manualWorker2], + }); + + expect(handler.workerCount).toBe(3); + }); +}); + +describe("TaskHandler - Multiple Workers Lifecycle", () => { + const activeHandlers: TaskHandler[] = []; + + beforeEach(() => { + clearWorkerRegistry(); + }); + + afterEach(async () => { + // Stop all handlers + for (const handler of activeHandlers) { + await handler.stopWorkers(); + } + activeHandlers.length = 0; + clearWorkerRegistry(); + + // Wait for async cleanup + await new Promise(resolve => setTimeout(resolve, 50)); + }); + + test("should start and stop multiple workers correctly", async () => { + async function worker1(task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + async function worker2(task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + async function worker3(task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + worker({ taskDefName: "task1" })(worker1); + worker({ taskDefName: "task2" })(worker2); + worker({ taskDefName: "task3" })(worker3); + + const handler = new TaskHandler({ + client: createMockClient(), + scanForDecorated: true, + }); + activeHandlers.push(handler); + + expect(handler.workerCount).toBe(3); + expect(handler.runningWorkerCount).toBe(0); + expect(handler.running).toBe(false); + + handler.startWorkers(); + + expect(handler.runningWorkerCount).toBe(3); + expect(handler.running).toBe(true); + + await handler.stopWorkers(); + + expect(handler.runningWorkerCount).toBe(0); + expect(handler.running).toBe(false); + }); + + test("should handle starting workers when none are registered", () => { + const mockLogger: ConductorLogger = { + info: jest.fn(), + debug: jest.fn(), + error: jest.fn(), + }; + + const handler = new TaskHandler({ + client: createMockClient(), + scanForDecorated: false, + logger: mockLogger, + }); + + handler.startWorkers(); + + expect(mockLogger.info).toHaveBeenCalledWith( + expect.stringContaining("No workers to start") + ); + expect(handler.running).toBe(false); + }); + + test("should handle stopping workers when not running", async () => { + async function testWorker(task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + worker({ taskDefName: "test_task" })(testWorker); + + const infoMock = jest.fn(); + const mockLogger: ConductorLogger = { + info: infoMock, + debug: jest.fn(), + error: jest.fn(), + }; + + const handler = new TaskHandler({ + client: createMockClient(), + scanForDecorated: true, + logger: mockLogger, + }); + + infoMock.mockClear(); + + await handler.stopWorkers(); + + expect(infoMock).toHaveBeenCalledWith( + expect.stringContaining("Workers are not running") + ); + }); +}); + +describe("TaskHandler - Edge Cases", () => { + const activeHandlers: TaskHandler[] = []; + + beforeEach(() => { + clearWorkerRegistry(); + }); + + afterEach(async () => { + // Stop all handlers + for (const handler of activeHandlers) { + await handler.stopWorkers(); + } + activeHandlers.length = 0; + clearWorkerRegistry(); + + // Wait for async cleanup + await new Promise(resolve => setTimeout(resolve, 50)); + }); + + test("should handle workers with undefined optional fields", () => { + const worker1 = { + taskDefName: "worker_with_undefined", + execute: async (task: Task) => ({ + status: "COMPLETED" as const, + outputData: {}, + }), + concurrency: undefined, + pollInterval: undefined, + domain: undefined, + }; + + const handler = new TaskHandler({ + client: createMockClient(), + scanForDecorated: false, + workers: [worker1], + }); + + expect(handler.workerCount).toBe(1); + }); + + test("should handle create with undefined importModules", async () => { + const handler = await TaskHandler.create({ + client: createMockClient(), + scanForDecorated: false, + }); + + expect(handler.workerCount).toBe(0); + }); + + test("should handle workers with domain configuration", async () => { + async function testWorker(task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + worker({ taskDefName: "domain_worker", domain: "production" })(testWorker); + + const mockLogger: ConductorLogger = { + info: jest.fn(), + debug: jest.fn(), + error: jest.fn(), + }; + + const handler = new TaskHandler({ + client: createMockClient(), + scanForDecorated: true, + logger: mockLogger, + }); + activeHandlers.push(handler); + + handler.startWorkers(); + + expect(mockLogger.info).toHaveBeenCalledWith( + expect.stringContaining("domain_worker") + ); + expect(mockLogger.info).toHaveBeenCalledWith( + expect.stringContaining("production") + ); + }); + + test("should report correct worker counts throughout lifecycle", async () => { + async function worker1(task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + async function worker2(task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + worker({ taskDefName: "task1" })(worker1); + worker({ taskDefName: "task2" })(worker2); + + const handler = new TaskHandler({ + client: createMockClient(), + scanForDecorated: true, + }); + activeHandlers.push(handler); + + // Before starting + expect(handler.workerCount).toBe(2); + expect(handler.runningWorkerCount).toBe(0); + expect(handler.running).toBe(false); + + // After starting + handler.startWorkers(); + expect(handler.workerCount).toBe(2); + expect(handler.runningWorkerCount).toBe(2); + expect(handler.running).toBe(true); + + // After stopping + await handler.stopWorkers(); + expect(handler.workerCount).toBe(2); + expect(handler.runningWorkerCount).toBe(0); + expect(handler.running).toBe(false); + }); + + test("should handle async dispose multiple times", async () => { + async function testWorker(task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + worker({ taskDefName: "test_task" })(testWorker); + + const handler = new TaskHandler({ + client: createMockClient(), + scanForDecorated: true, + }); + + handler.startWorkers(); + expect(handler.running).toBe(true); + + await handler[Symbol.asyncDispose](); + expect(handler.running).toBe(false); + + // Calling dispose again should be safe + await handler[Symbol.asyncDispose](); + expect(handler.running).toBe(false); + }); + + test("should handle workers with empty taskDefName gracefully", () => { + const worker1 = { + taskDefName: "", + execute: async (task: Task) => ({ + status: "COMPLETED" as const, + outputData: {}, + }), + }; + + const handler = new TaskHandler({ + client: createMockClient(), + scanForDecorated: false, + workers: [worker1], + }); + + expect(handler.workerCount).toBe(1); + }); +}); diff --git a/src/sdk/worker/core/index.ts b/src/sdk/worker/core/index.ts new file mode 100644 index 00000000..ce1a73a0 --- /dev/null +++ b/src/sdk/worker/core/index.ts @@ -0,0 +1 @@ +export { TaskHandler, type TaskHandlerConfig } from "./TaskHandler"; diff --git a/src/sdk/worker/decorators/__tests__/worker.test.ts b/src/sdk/worker/decorators/__tests__/worker.test.ts new file mode 100644 index 00000000..48cea860 --- /dev/null +++ b/src/sdk/worker/decorators/__tests__/worker.test.ts @@ -0,0 +1,285 @@ +/* eslint-disable @typescript-eslint/no-unused-vars */ +import { afterEach, beforeEach, describe, expect, jest, test } from "@jest/globals"; +import type { Task } from "../../../../open-api"; +import { + clearWorkerRegistry, + getRegisteredWorker, + getRegisteredWorkers, + getWorkerCount, + registerWorker, +} from "../registry"; +import { worker } from "../worker"; + +describe("@worker decorator", () => { + beforeEach(() => { + clearWorkerRegistry(); + }); + + afterEach(() => { + clearWorkerRegistry(); + }); + + test("should register a decorated function", () => { + // Define function first, then apply decorator + async function testWorker(_task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + // Apply decorator manually (equivalent to @worker) + worker({ taskDefName: "test_task" })(testWorker); + + const workers = getRegisteredWorkers(); + expect(workers).toHaveLength(1); + expect(workers[0].taskDefName).toBe("test_task"); + expect(workers[0].executeFunction).toBe(testWorker); + }); + + test("should register with all options", () => { + const taskDef = { + name: "complex_task", + retryCount: 3, + timeoutSeconds: 300, + totalTimeoutSeconds: 3600, + }; + + async function complexWorker(_task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + worker({ + taskDefName: "complex_task", + concurrency: 10, + pollInterval: 200, + domain: "production", + workerId: "worker-123", + registerTaskDef: true, + pollTimeout: 500, + taskDef, + overwriteTaskDef: false, + strictSchema: true, + })(complexWorker); + + const registered = getRegisteredWorker("complex_task", "production"); + expect(registered).toBeDefined(); + expect(registered?.concurrency).toBe(10); + expect(registered?.pollInterval).toBe(200); + expect(registered?.domain).toBe("production"); + expect(registered?.workerId).toBe("worker-123"); + expect(registered?.registerTaskDef).toBe(true); + expect(registered?.pollTimeout).toBe(500); + expect(registered?.taskDef).toBe(taskDef); + expect(registered?.overwriteTaskDef).toBe(false); + expect(registered?.strictSchema).toBe(true); + }); + + test("should register multiple workers", () => { + async function worker1(_task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + async function worker2(_task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + async function worker3(_task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + worker({ taskDefName: "task1" })(worker1); + worker({ taskDefName: "task2" })(worker2); + worker({ taskDefName: "task3", domain: "test" })(worker3); + + expect(getWorkerCount()).toBe(3); + expect(getRegisteredWorker("task1")).toBeDefined(); + expect(getRegisteredWorker("task2")).toBeDefined(); + expect(getRegisteredWorker("task3", "test")).toBeDefined(); + }); + + test("should handle workers with same name but different domains", () => { + async function worker1(_task: Task) { + return { status: "COMPLETED" as const, outputData: { domain: 1 } }; + } + + async function worker2(_task: Task) { + return { status: "COMPLETED" as const, outputData: { domain: 2 } }; + } + + async function worker3(_task: Task) { + return { status: "COMPLETED" as const, outputData: { domain: 3 } }; + } + + worker({ taskDefName: "shared_task", domain: "domain1" })(worker1); + worker({ taskDefName: "shared_task", domain: "domain2" })(worker2); + worker({ taskDefName: "shared_task" })(worker3); // No domain + + expect(getWorkerCount()).toBe(3); + + const w1 = getRegisteredWorker("shared_task", "domain1"); + const w2 = getRegisteredWorker("shared_task", "domain2"); + const w3 = getRegisteredWorker("shared_task"); + + expect(w1?.executeFunction).toBe(worker1); + expect(w2?.executeFunction).toBe(worker2); + expect(w3?.executeFunction).toBe(worker3); + }); + + test("should throw error if taskDefName is missing", () => { + async function invalidWorker(_task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + expect(() => { + // @ts-expect-error - Testing missing required field + worker({})(invalidWorker); + }).toThrow("requires 'taskDefName'"); + }); + + test("should throw error if applied to non-function", () => { + expect(() => { + const notAFunction = "invalid"; + worker({ taskDefName: "test" })(notAFunction as never); + }).toThrow("can only be applied to functions"); + }); + + test("should allow decorated function to be called normally", async () => { + async function callableWorker(task: Task) { + return { + status: "COMPLETED" as const, + outputData: { result: (task.inputData as Record).value * 2 }, + }; + } + + worker({ taskDefName: "callable_task" })(callableWorker); + + // Function should still be callable + const result = await callableWorker({ + inputData: { value: 5 }, + } as Task); + + expect(result.status).toBe("COMPLETED"); + expect(result.outputData.result).toBe(10); + }); + + test("should warn when registering duplicate worker", () => { + const consoleSpy = jest.spyOn(console, "warn").mockImplementation(() => undefined); + + async function worker1(_task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + async function worker2(_task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + worker({ taskDefName: "duplicate_task" })(worker1); + worker({ taskDefName: "duplicate_task" })(worker2); + + expect(consoleSpy).toHaveBeenCalledWith( + expect.stringContaining("already registered") + ); + expect(getWorkerCount()).toBe(1); // Second overwrites first + + const registered = getRegisteredWorker("duplicate_task"); + expect(registered?.executeFunction).toBe(worker2); // Latest wins + + consoleSpy.mockRestore(); + }); + + test("should support class method decoration", () => { + class WorkerClass { + async processTask(_task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + } + + const instance = new WorkerClass(); + worker({ taskDefName: "class_method_task" })(instance.processTask.bind(instance)); + + const workers = getRegisteredWorkers(); + expect(workers).toHaveLength(1); + expect(workers[0].taskDefName).toBe("class_method_task"); + }); + + test("should clear registry", () => { + async function worker1(_task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + async function worker2(_task: Task) { + return { status: "COMPLETED" as const, outputData: {} }; + } + + worker({ taskDefName: "task1" })(worker1); + worker({ taskDefName: "task2" })(worker2); + + expect(getWorkerCount()).toBe(2); + + clearWorkerRegistry(); + + expect(getWorkerCount()).toBe(0); + expect(getRegisteredWorkers()).toHaveLength(0); + }); +}); + +describe("Worker Registry", () => { + beforeEach(() => { + clearWorkerRegistry(); + }); + + afterEach(() => { + clearWorkerRegistry(); + }); + + test("should register worker manually", () => { + const executeFunction = async (_task: Task) => ({ + status: "COMPLETED" as const, + outputData: {}, + }); + + registerWorker({ + taskDefName: "manual_task", + executeFunction, + concurrency: 5, + }); + + const registered = getRegisteredWorker("manual_task"); + expect(registered).toBeDefined(); + expect(registered?.taskDefName).toBe("manual_task"); + expect(registered?.concurrency).toBe(5); + }); + + test("should get all registered workers", () => { + const worker1 = async (_task: Task) => ({ + status: "COMPLETED" as const, + outputData: {}, + }); + const worker2 = async (_task: Task) => ({ + status: "COMPLETED" as const, + outputData: {}, + }); + + registerWorker({ taskDefName: "task1", executeFunction: worker1 }); + registerWorker({ taskDefName: "task2", executeFunction: worker2 }); + + const all = getRegisteredWorkers(); + expect(all).toHaveLength(2); + expect(all.map((w) => w.taskDefName).sort()).toEqual(["task1", "task2"]); + }); + + test("should get worker by name and domain", () => { + const executeFunction = async (_task: Task) => ({ + status: "COMPLETED" as const, + outputData: {}, + }); + + registerWorker({ + taskDefName: "domain_task", + domain: "production", + executeFunction, + }); + + expect(getRegisteredWorker("domain_task", "production")).toBeDefined(); + expect(getRegisteredWorker("domain_task")).toBeUndefined(); + expect(getRegisteredWorker("domain_task", "staging")).toBeUndefined(); + }); +}); diff --git a/src/sdk/worker/decorators/index.ts b/src/sdk/worker/decorators/index.ts new file mode 100644 index 00000000..207bae4d --- /dev/null +++ b/src/sdk/worker/decorators/index.ts @@ -0,0 +1,2 @@ +export * from "./worker"; +export * from "./registry"; diff --git a/src/sdk/worker/decorators/registry.ts b/src/sdk/worker/decorators/registry.ts new file mode 100644 index 00000000..5c8b2d53 --- /dev/null +++ b/src/sdk/worker/decorators/registry.ts @@ -0,0 +1,157 @@ +import type { Task, TaskResult } from "../../../open-api"; +import type { TaskDef } from "../../../open-api/generated"; + +/** + * Registered worker metadata stored in the global registry. + */ +export interface RegisteredWorker { + /** Task definition name (must match workflow task name) */ + taskDefName: string; + + /** Worker execution function */ + executeFunction: (task: Task) => Promise>; + + /** Maximum concurrent tasks (default: 1) */ + concurrency?: number; + + /** Polling interval in milliseconds (default: 100) */ + pollInterval?: number; + + /** Task domain for multi-tenancy (default: undefined) */ + domain?: string; + + /** Unique worker identifier (default: auto-generated) */ + workerId?: string; + + /** Auto-register task definition on startup (default: false) */ + registerTaskDef?: boolean; + + /** Server-side long poll timeout in milliseconds (default: 100) */ + pollTimeout?: number; + + /** Task definition template for registration (optional) */ + taskDef?: TaskDef; + + /** Overwrite existing task definitions (default: true) */ + overwriteTaskDef?: boolean; + + /** Enforce strict JSON schema validation (default: false) */ + strictSchema?: boolean; +} + +/** + * Global worker registry for auto-discovery. + * Workers registered via @worker decorator are stored here. + */ +class WorkerRegistry { + private workers = new Map(); + + /** + * Register a worker in the global registry. + * + * @param worker - Worker metadata to register + */ + register(worker: RegisteredWorker): void { + // Use taskDefName + domain as unique key + const key = `${worker.taskDefName}:${worker.domain || ""}`; + + if (this.workers.has(key)) { + console.warn( + `Worker "${worker.taskDefName}" with domain "${worker.domain || "default"}" ` + + `is already registered. Overwriting previous registration.` + ); + } + + this.workers.set(key, worker); + } + + /** + * Get all registered workers. + * + * @returns Array of registered workers + */ + getAll(): RegisteredWorker[] { + return Array.from(this.workers.values()); + } + + /** + * Get a specific worker by task definition name and domain. + * + * @param taskDefName - Task definition name + * @param domain - Optional domain + * @returns Registered worker or undefined + */ + get(taskDefName: string, domain?: string): RegisteredWorker | undefined { + const key = `${taskDefName}:${domain || ""}`; + return this.workers.get(key); + } + + /** + * Clear all registered workers. + * Useful for testing. + */ + clear(): void { + this.workers.clear(); + } + + /** + * Get count of registered workers. + */ + get size(): number { + return this.workers.size; + } +} + +/** + * Global singleton registry instance. + */ +export const workerRegistry = new WorkerRegistry(); + +/** + * Register a worker in the global registry. + * Used internally by the @worker decorator. + * + * @param worker - Worker metadata to register + */ +export function registerWorker(worker: RegisteredWorker): void { + workerRegistry.register(worker); +} + +/** + * Get all registered workers from the global registry. + * Used by TaskHandler for auto-discovery. + * + * @returns Array of all registered workers + */ +export function getRegisteredWorkers(): RegisteredWorker[] { + return workerRegistry.getAll(); +} + +/** + * Get a specific registered worker. + * + * @param taskDefName - Task definition name + * @param domain - Optional domain + * @returns Registered worker or undefined + */ +export function getRegisteredWorker( + taskDefName: string, + domain?: string +): RegisteredWorker | undefined { + return workerRegistry.get(taskDefName, domain); +} + +/** + * Clear all registered workers. + * Primarily for testing purposes. + */ +export function clearWorkerRegistry(): void { + workerRegistry.clear(); +} + +/** + * Get the number of registered workers. + */ +export function getWorkerCount(): number { + return workerRegistry.size; +} diff --git a/src/sdk/worker/decorators/worker.ts b/src/sdk/worker/decorators/worker.ts new file mode 100644 index 00000000..7f8c396c --- /dev/null +++ b/src/sdk/worker/decorators/worker.ts @@ -0,0 +1,226 @@ +import type { Task, TaskResult } from "../../../open-api"; +import type { TaskDef } from "../../../open-api/generated"; +import { registerWorker, type RegisteredWorker } from "./registry"; + +/** + * Options for the @worker decorator. + */ +export interface WorkerOptions { + /** + * Task definition name (must match workflow task name). + * This is the only required parameter. + */ + taskDefName: string; + + /** + * Maximum concurrent tasks this worker can execute. + * - Default: 1 + * - Controls concurrency level for task execution + * - Choose based on workload: + * * CPU-bound: 1-4 + * * I/O-bound: 10-50 + * * Mixed: 5-20 + */ + concurrency?: number; + + /** + * Polling interval in milliseconds. + * - Default: 100ms + * - Lower values = more responsive but higher server load + * - Higher values = less server load but slower task pickup + * - Recommended: 100-500ms for most use cases + */ + pollInterval?: number; + + /** + * Task domain for multi-tenancy. + * - Default: undefined (no domain isolation) + * - Use when you need to partition tasks across different environments/tenants + */ + domain?: string; + + /** + * Unique worker identifier. + * - Default: undefined (auto-generated) + * - Useful for debugging and tracking which worker executed which task + */ + workerId?: string; + + /** + * Auto-register task definition on startup. + * - Default: false + * - When true: Task definition is created/updated on worker startup + * - When false: Task definition must exist in Conductor already + * - Recommended: false for production (manage task definitions separately) + */ + registerTaskDef?: boolean; + + /** + * Server-side long poll timeout in milliseconds. + * - Default: 100ms + * - How long the server will wait for a task before returning empty response + * - Higher values reduce polling frequency when no tasks available + * - Recommended: 100-500ms + */ + pollTimeout?: number; + + /** + * Task definition template for registration. + * - Default: undefined + * - Only used when registerTaskDef=true + * - Allows specifying retry policies, timeouts, rate limits, etc. + * - The taskDefName parameter takes precedence for the name field + */ + taskDef?: TaskDef; + + /** + * Overwrite existing task definitions on server. + * - Default: true + * - When true: Always updates task definition + * - When false: Only creates if doesn't exist + * - Can be overridden via env: CONDUCTOR_WORKER__OVERWRITE_TASK_DEF=false + */ + overwriteTaskDef?: boolean; + + /** + * Enforce strict JSON schema validation. + * - Default: false + * - When false: additionalProperties=true (allows extra fields) + * - When true: additionalProperties=false (strict validation) + * - Can be overridden via env: CONDUCTOR_WORKER__STRICT_SCHEMA=true + */ + strictSchema?: boolean; +} + +/** + * Decorator to register a function as a Conductor worker. + * + * This decorator enables SDK-style worker registration with auto-discovery, + * matching the Python SDK's @worker_task pattern. + * + * @param options - Worker configuration options + * + * @example + * Basic usage: + * ```typescript + * @worker({ taskDefName: "process_order" }) + * async function processOrder(task: Task): Promise { + * const orderId = task.inputData.orderId; + * // Process order logic + * return { + * status: "COMPLETED", + * outputData: { orderId, processed: true }, + * }; + * } + * ``` + * + * @example + * With concurrency: + * ```typescript + * @worker({ taskDefName: "send_email", concurrency: 10 }) + * async function sendEmail(task: Task): Promise { + * const { to, subject, body } = task.inputData; + * await emailService.send(to, subject, body); + * return { status: "COMPLETED", outputData: { sent: true } }; + * } + * ``` + * + * @example + * With domain and custom polling: + * ```typescript + * @worker({ + * taskDefName: "validate_payment", + * domain: "payments", + * concurrency: 5, + * pollInterval: 200, + * }) + * async function validatePayment(task: Task): Promise { + * // Validation logic + * return { status: "COMPLETED", outputData: { valid: true } }; + * } + * ``` + * + * @example + * With task definition registration: + * ```typescript + * @worker({ + * taskDefName: "complex_task", + * registerTaskDef: true, + * taskDef: { + * retryCount: 3, + * retryLogic: "EXPONENTIAL_BACKOFF", + * timeoutSeconds: 300, + * }, + * }) + * async function complexTask(task: Task): Promise { + * // Complex logic + * return { status: "COMPLETED", outputData: { result: "..." } }; + * } + * ``` + * + * @example + * Non-retryable errors: + * ```typescript + * import { worker, NonRetryableException } from "@io-orkes/conductor-javascript/worker"; + * + * @worker({ taskDefName: "validate_order" }) + * async function validateOrder(task: Task): Promise { + * const order = await getOrder(task.inputData.orderId); + * + * if (!order) { + * // Order doesn't exist - retry won't help + * throw new NonRetryableException(`Order ${task.inputData.orderId} not found`); + * } + * + * return { status: "COMPLETED", outputData: { validated: true } }; + * } + * ``` + */ +export function worker(options: WorkerOptions) { + return function ( + target: unknown, + propertyKey?: string, + descriptor?: PropertyDescriptor + ) { + // Extract the function to register + const executeFunction = descriptor?.value || target; + + // Validate that we have a function + if (typeof executeFunction !== "function") { + throw new Error( + `@worker decorator can only be applied to functions. ` + + `Received: ${typeof executeFunction}` + ); + } + + // Validate required options + if (!options.taskDefName) { + throw new Error( + `@worker decorator requires 'taskDefName' option. ` + + `Example: @worker({ taskDefName: "my_task" })` + ); + } + + // Create registered worker metadata + const registeredWorker: RegisteredWorker = { + taskDefName: options.taskDefName, + executeFunction: executeFunction as (task: Task) => Promise>, + concurrency: options.concurrency, + pollInterval: options.pollInterval, + domain: options.domain, + workerId: options.workerId, + registerTaskDef: options.registerTaskDef, + pollTimeout: options.pollTimeout, + taskDef: options.taskDef, + overwriteTaskDef: options.overwriteTaskDef, + strictSchema: options.strictSchema, + }; + + // Register in global registry for auto-discovery + registerWorker(registeredWorker); + + // Return original descriptor/target unchanged + // This allows the function to be called normally + return descriptor || target; + }; +} diff --git a/src/sdk/worker/index.ts b/src/sdk/worker/index.ts new file mode 100644 index 00000000..c56f8b49 --- /dev/null +++ b/src/sdk/worker/index.ts @@ -0,0 +1,21 @@ +// Core SDK +export { TaskHandler, type TaskHandlerConfig } from "./core"; + +// Decorators +export { worker, type WorkerOptions } from "./decorators/worker"; +export { + getRegisteredWorkers, + getRegisteredWorker, + clearWorkerRegistry, + getWorkerCount, + type RegisteredWorker, +} from "./decorators/registry"; + +// Events (re-export from clients/worker for now) +export * from "../clients/worker/events"; + +// Exceptions (re-export from clients/worker for now) +export * from "../clients/worker/exceptions"; + +// Types +export type { ConductorWorker } from "../clients/worker/types"; diff --git a/tsconfig.json b/tsconfig.json index dad262af..03434af5 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -7,7 +7,15 @@ "dom" ], "noEmit": true, - "isolatedModules": true + "isolatedModules": true, + "experimentalDecorators": true, + "emitDecoratorMetadata": true, + "baseUrl": ".", + "paths": { + "@/*": ["src/*"], + "@open-api/*": ["src/open-api/*"], + "@test-utils/*": ["src/integration-tests/utils/*"] + } }, "exclude": [ "dist",