Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 239 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[![Build Status](https://github.com/conductor-oss/javascript-sdk/actions/workflows/pull_request.yml/badge.svg)](https://github.com/conductor-oss/javascript-sdk/actions/workflows/pull_request.yml)

A comprehensive TypeScript/JavaScript client for [Conductor OSS](https://github.com/conductor-oss/conductor), enabling developers to build, orchestrate, and monitor distributed workflows with ease.
A comprehensive TypeScript/JavaScript SDK for [Conductor OSS](https://github.com/conductor-oss/conductor), enabling developers to build, orchestrate, and monitor distributed workflows with ease.

[Conductor](https://www.conductor-oss.org/) is the leading open-source orchestration platform allowing developers to build highly scalable distributed applications.

Expand Down Expand Up @@ -40,11 +40,16 @@ Show support for the Conductor OSS. Please help spread the awareness by starrin
- [Step 4: Manage and Monitor Execution](#step-4-manage-and-monitor-execution)
- [Use TaskClient to Monitor and Debug Tasks](#use-taskclient-to-monitor-and-debug-tasks)
- [Workers](#workers)
- [The TaskManager](#the-taskmanager)
- [Quick Start: Building a Worker](#quick-start-building-a-worker)
- [Step 1: Define the Worker's Logic](#step-1-define-the-workers-logic)
- [Step 2: Handle Task Outcomes and Errors](#step-2-handle-task-outcomes-and-errors)
- [Step 3: Run the Worker with TaskManager](#step-3-run-the-worker-with-taskmanager)
- [SDK-Style Worker Registration (Recommended)](#sdk-style-worker-registration-recommended)
- [Using the @worker Decorator](#using-the-worker-decorator)
- [Worker Configuration Options](#worker-configuration-options)
- [Environment Variable Configuration](#environment-variable-configuration)
- [Event Listeners for Observability](#event-listeners-for-observability)
- [NonRetryableException for Terminal Failures](#nonretryableexception-for-terminal-failures)
- [Module Imports for Side-Effect Registration](#module-imports-for-side-effect-registration)
- [Legacy TaskManager API](#legacy-taskmanager-api)
- [The TaskManager](#the-taskmanager)
- [Quick Start: Building a Worker](#quick-start-building-a-worker)
- [Worker Design Principles](#worker-design-principles)
- [Scheduling](#scheduling)
- [The SchedulerClient](#the-schedulerclient)
Expand Down Expand Up @@ -110,7 +115,6 @@ Here's a simple example to get you started:
import {
orkesConductorClient,
WorkflowExecutor,
TaskManager,
simpleTask,
workflow
} from "@io-orkes/conductor-javascript";
Expand Down Expand Up @@ -447,17 +451,236 @@ Workers are background processes that execute tasks in your workflows. Think of
Workflow → Creates Tasks → Workers Poll for Tasks → Execute Logic → Return Results → Workflow Continues
```

The `TaskManager` class in this SDK simplifies the process of creating and managing workers.
### SDK-Style Worker Registration (Recommended)

### The TaskManager
The SDK supports decorator-based worker registration. This provides a modern, declarative approach to defining workers with auto-discovery, type safety, and less boilerplate.

The `TaskManager` is the primary tool for managing workers. It handles polling, task execution, and result reporting, allowing you to run multiple workers concurrently. For a complete method reference, see the [TaskManager API Reference](docs/api-reference/task-manager.md).
**Key Benefits:**
- **Auto-discovery**: No need to manually register workers
- **Declarative**: Configuration is co-located with worker logic
- **Type-safe**: Full TypeScript support
- **Cleaner code**: Less boilerplate than the legacy API

### Quick Start: Building a Worker
> 💡 **New to Conductor?** Start here! The decorator-based approach is simpler and more maintainable than the legacy `TaskManager` API.

Building a robust worker involves defining its logic, handling outcomes, and managing its execution.
#### Using the @worker Decorator

#### Step 1: Define the Worker's Logic
Define workers using the `@worker` decorator for cleaner, more maintainable code:

```typescript
import { worker, TaskHandler, Task } from "@io-orkes/conductor-javascript";

// Define a worker with the @worker decorator
@worker({ taskDefName: "send_email", concurrency: 10, pollInterval: 100 })
async function sendEmail(task: Task) {
const { to, subject, body } = task.inputData;
await emailService.send(to, subject, body);

return {
status: "COMPLETED",
outputData: { sent: true, timestamp: new Date().toISOString() }
};
}

@worker({ taskDefName: "process_payment", domain: "payments", concurrency: 5 })
async function processPayment(task: Task) {
const { amount, customerId } = task.inputData;
const result = await paymentGateway.charge(customerId, amount);

return {
status: "COMPLETED",
outputData: { transactionId: result.id }
};
}

// Auto-discover and start all decorated workers
const handler = new TaskHandler({
client,
scanForDecorated: true, // Automatically finds @worker decorated functions
});

handler.startWorkers();
console.log(`Started ${handler.runningWorkerCount} workers`);

// Graceful shutdown
process.on("SIGTERM", async () => {
await handler.stopWorkers();
process.exit(0);
});
```

#### Worker Configuration Options

The `@worker` decorator supports comprehensive configuration:

```typescript
@worker({
taskDefName: "my_task", // Required: task name
concurrency: 5, // Max concurrent tasks (default: 1)
pollInterval: 100, // Polling interval in ms (default: 100)
domain: "production", // Task domain for multi-tenancy
workerId: "worker-123", // Unique worker identifier
pollTimeout: 100, // Server-side long poll timeout
})
async function myTask(task: Task) {
// Your logic here
}
```

#### Environment Variable Configuration

Override worker configuration using environment variables without code changes:

```bash
# Global configuration (applies to all workers)
export CONDUCTOR_WORKER_ALL_POLL_INTERVAL=500
export CONDUCTOR_WORKER_ALL_CONCURRENCY=10

# Worker-specific configuration (overrides global)
export CONDUCTOR_WORKER_SEND_EMAIL_CONCURRENCY=20
export CONDUCTOR_WORKER_PROCESS_PAYMENT_DOMAIN=payments
```

**Configuration Hierarchy** (highest to lowest priority):
1. Worker-specific environment variables
2. Global environment variables
3. Code-level decorator parameters
4. System defaults

**Supported Environment Variable Formats:**
- `CONDUCTOR_WORKER_<TASK_NAME>_<PROPERTY>` - Worker-specific (uppercase)
- `conductor.worker.<task_name>.<property>` - Worker-specific (dotted)
- `CONDUCTOR_WORKER_ALL_<PROPERTY>` - Global (uppercase)
- `conductor.worker.all.<property>` - Global (dotted)

#### Event Listeners for Observability

Monitor worker lifecycle events for metrics, logging, and debugging:

```typescript
import { TaskHandler, TaskRunnerEventsListener } from "@io-orkes/conductor-javascript";

const metricsListener: TaskRunnerEventsListener = {
onTaskExecutionCompleted(event) {
metrics.histogram("task_duration_ms", event.durationMs, {
task_type: event.taskType,
});
},

onTaskExecutionFailure(event) {
logger.error(`Task ${event.taskId} failed:`, event.cause);
metrics.counter("task_failures", 1, {
task_type: event.taskType,
});
},

onTaskUpdateFailure(event) {
// CRITICAL: Task result was lost after all retries
alertOps({
severity: "CRITICAL",
message: `Task update failed after ${event.retryCount} retries`,
taskId: event.taskId,
});
},
};

const handler = new TaskHandler({
client,
eventListeners: [metricsListener],
});
```

**Available Events:**
- `onPollStarted` - Polling begins
- `onPollCompleted` - Polling succeeds
- `onPollFailure` - Polling fails
- `onTaskExecutionStarted` - Task execution begins
- `onTaskExecutionCompleted` - Task execution succeeds
- `onTaskExecutionFailure` - Task execution fails
- `onTaskUpdateFailure` - Task update fails after all retries (CRITICAL)

#### NonRetryableException for Terminal Failures

Mark failures as terminal to prevent unnecessary retries:

```typescript
import { worker, NonRetryableException } from "@io-orkes/conductor-javascript";

@worker({ taskDefName: "validate_order" })
async function validateOrder(task: Task) {
const order = await getOrder(task.inputData.orderId);

if (!order) {
// Order doesn't exist - retry won't help
throw new NonRetryableException(`Order ${task.inputData.orderId} not found`);
}

if (order.status === "CANCELLED") {
// Business rule violation - retry won't help
throw new NonRetryableException("Cannot process cancelled order");
}

// Regular errors will be retried
if (order.amount > MAX_AMOUNT) {
throw new Error("Amount exceeds limit"); // Will retry
}

return { status: "COMPLETED", outputData: { validated: true } };
}
```

**Error Handling:**
- `throw new Error()` → Task status: `FAILED` (will retry with exponential backoff)
- `throw new NonRetryableException()` → Task status: `FAILED_WITH_TERMINAL_ERROR` (no retry)

#### Module Imports for Side-Effect Registration

Organize workers across multiple files:

```typescript
// workers/orderWorkers.ts
import { worker } from "@io-orkes/conductor-javascript";

@worker({ taskDefName: "validate_order" })
export async function validateOrder(task) { /* ... */ }

@worker({ taskDefName: "fulfill_order" })
export async function fulfillOrder(task) { /* ... */ }

// workers/paymentWorkers.ts
@worker({ taskDefName: "process_payment" })
export async function processPayment(task) { /* ... */ }

// main.ts
import { TaskHandler } from "@io-orkes/conductor-javascript";

// Use TaskHandler.create() for async module imports
const handler = await TaskHandler.create({
client,
importModules: [
"./workers/orderWorkers",
"./workers/paymentWorkers",
],
});

handler.startWorkers(); // Auto-discovers all workers from imported modules
```

### Legacy TaskManager API

The legacy `TaskManager` API continues to work with full backward compatibility. However, **new projects should use the decorator-based approach above** for better maintainability and cleaner code.

Both APIs can coexist in the same application, allowing gradual migration from legacy to decorator-based workers.

#### The TaskManager

The `TaskManager` is the legacy tool for managing workers. It handles polling, task execution, and result reporting, allowing you to run multiple workers concurrently. For a complete method reference, see the [TaskManager API Reference](docs/api-reference/task-manager.md).

#### Quick Start: Building a Worker

Building a worker with the legacy API involves defining its logic, handling outcomes, and managing its execution.

##### Step 1: Define the Worker's Logic

A worker is an object that defines a `taskDefName` (which must match the task name in your workflow) and an `execute` function containing your business logic.

Expand All @@ -484,7 +707,7 @@ const emailWorker: ConductorWorker = {
};
```

#### Step 2: Handle Task Outcomes and Errors
##### Step 2: Handle Task Outcomes and Errors

The `execute` function must return an object indicating the task's outcome.

Expand Down Expand Up @@ -512,7 +735,7 @@ try {
}
```

#### Step 3: Run the Worker with TaskManager
##### Step 3: Run the Worker with TaskManager

The `TaskManager` is responsible for polling Conductor, managing task execution, and reporting back results. You can run a single worker or multiple workers with one manager.

Expand All @@ -539,7 +762,7 @@ For a complete method reference, see the [TaskManager API Reference](docs/api-re

### Worker Design Principles

When designing workers, it's best to follow these principles:
When designing workers (with either API), it's best to follow these principles:

- **Stateless**: Workers should not rely on local state.
- **Idempotent**: The same task input should always produce the same result.
Expand Down
10 changes: 10 additions & 0 deletions jest.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,19 @@ export default {
"!src/**/*.d.ts",
"!src/**/generated/**",
"!src/**/spec/**",
"!src/**/*.test.{ts,tsx}",
"!src/**/index.ts",
"!src/**/types.ts",
"!src/**/*.types.ts",
"!src/**/exceptions/**",
],
coverageReporters: ["text", "lcov", "cobertura"],
transformIgnorePatterns: ["/node_modules/", "\\.pnp\\.[^\\/]+$"],
moduleNameMapper: {
"^@/(.*)$": "<rootDir>/src/$1",
"^@open-api/(.*)$": "<rootDir>/src/open-api/$1",
"^@test-utils/(.*)$": "<rootDir>/src/integration-tests/utils/$1",
},
transform: {
"^.+\\.tsx?$": [
"ts-jest",
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@io-orkes/conductor-javascript",
"description": "Typescript Client for Netflix Conductor",
"version": "v0.0.0",
"description": "Typescript SDK for Netflix Conductor",
"version": "v3.0.0",
"private": false,
"homepage": "https://orkes.io",
"repository": {
Expand Down
Loading