Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -557,34 +557,36 @@ orkes api-gateway route delete <service_id> <http_method> <route_path>
}
```

## Task Workers
## Workers

⚠️ **EXPERIMENTAL FEATURES**

The CLI supports two types of task workers for processing Conductor tasks:
The CLI supports two types of workers for processing Conductor tasks:

### Generic Workers (Any Language)
### Stdio Workers

Execute tasks using external programs written in **any language** (Python, Node.js, Go, Rust, shell scripts, etc.). The CLI polls for tasks and passes them to your worker via stdin/stdout.
Execute tasks using external programs written in **any language** (Python, Node.js, Go, Rust, shell scripts, etc.).

The CLI polls for tasks and passes them to your worker via stdin/stdout.

**Best for:** Complex logic, heavy dependencies, full language ecosystem access

👉 **[Complete Generic Worker Documentation →](WORKER_EXEC.md)**
👉 **[Complete Stdio Worker Documentation →](WORKER_STDIO.md)**

**Quick example:**
```bash
# Run a Python worker (continuous polling with parallel execution)
orkes worker exec --type greet_task python3 worker.py
orkes worker stdio --type greet_task python3 worker.py

# Poll multiple tasks per batch for higher throughput
orkes worker exec --type greet_task python3 worker.py --count 5
orkes worker stdio --type greet_task python3 worker.py --count 5
```

### JavaScript Workers (Built-in)

Execute tasks using **JavaScript** scripts with built-in utilities (HTTP, crypto, string functions). No external dependencies needed.

**Best for:** Lightweight tasks, quick scripts, HTTP integrations
**Best for:** Prototyping, Lightweight tasks, quick scripts, HTTP integrations

👉 **[Complete JavaScript Worker Documentation →](WORKER_JS.md)**

Expand Down
21 changes: 12 additions & 9 deletions WORKER_EXEC.md → WORKER_STDIO.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
# Generic Task Workers
# STDIO Workers

The CLI supports executing tasks using external worker programs written in any language. This allows you to implement task workers in Python, Node.js, shell scripts, or any executable.
The CLI supports executing tasks using external worker programs written in any language.

This allows you to implement task workers in Python, Node.js, shell scripts, or any executable.

## How it Works

The `worker exec` command continuously polls for tasks and executes them in parallel goroutines:
The `worker stdio` command continuously polls for tasks and executes them in parallel goroutines:

1. **Continuously polls** for tasks of the specified type
2. **Passes** the full task JSON to your worker via **stdin**
Expand All @@ -16,7 +18,7 @@ The `worker exec` command continuously polls for tasks and executes them in para
## Usage

```bash
orkes worker exec --type <task_type> <command> [args...]
orkes worker stdio --type <task_type> <command> [args...]
```

**Flags:**
Expand All @@ -26,6 +28,7 @@ orkes worker exec --type <task_type> <command> [args...]
- `--poll-timeout`: Poll timeout in milliseconds (default: 100)
- `--exec-timeout`: Worker execution timeout in seconds (0 = no timeout)
- `--count`: Number of tasks to poll in each batch (default: 1)
- `--verbose`: Print task and result JSON to stdout

## Worker Contract

Expand Down Expand Up @@ -93,16 +96,16 @@ Run the worker:

```bash
# Start worker for 'greet_task'
orkes worker exec --type greet_task python3 worker.py
orkes worker stdio --type greet_task python3 worker.py

# Poll multiple tasks per batch (poll 5 tasks at a time)
orkes worker exec --type greet_task python3 worker.py --count 5
orkes worker stdio --type greet_task python3 worker.py --count 5

# With worker ID and domain
orkes worker exec --type greet_task python3 worker.py --worker-id worker-1 --domain production
orkes worker stdio --type greet_task python3 worker.py --worker-id worker-1 --domain production

# With execution timeout (30 seconds per task)
orkes worker exec --type greet_task python3 worker.py --exec-timeout 30
orkes worker stdio --type greet_task python3 worker.py --exec-timeout 30
```

## Example: Shell Script Worker
Expand Down Expand Up @@ -143,7 +146,7 @@ The worker automatically runs in continuous mode:

```bash
# Poll 10 tasks at a time and process them in parallel
orkes worker exec --type greet_task python3 worker.py --count 10
orkes worker stdio --type greet_task python3 worker.py --count 10
```

## Error Handling
Expand Down
57 changes: 39 additions & 18 deletions cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ var (
Example: "orkes worker js --type my_task worker.js",
}

workerExecCmd = &cobra.Command{
Use: "exec <command> [args...]",
Short: "Poll and execute tasks using an external command",
Long: `Continuously poll for tasks and execute them using an external command.
workerStdioCmd = &cobra.Command{
Use: "stdio <command> [args...]",
Short: "Poll tasks and execute command via stdin/stdout",
Long: `CLI polls tasks and executes the command. The task is passed in the standard input and the result is expected in the standard output.

The worker runs in continuous mode, polling for tasks and executing them in
parallel goroutines (similar to JavaScript workers).
parallel goroutines.

The task JSON is passed to the command via stdin. The command should read the task
from stdin and write a result JSON to stdout.
Expand All @@ -78,7 +78,7 @@ Exit codes:
non-zero: Failure (task marked as FAILED)`,
RunE: execWorker,
SilenceUsage: true,
Example: "worker exec --type greet_task python worker.py\nworker exec --type greet_task python worker.py --count 5\nworker exec --type greet_task ./worker.sh --verbose",
Example: "worker stdio --type greet_task python worker.py\nworker stdio --type greet_task python worker.py --count 5\nworker stdio --type greet_task ./worker.sh --verbose",
}

workerRemoteCmd = &cobra.Command{
Expand Down Expand Up @@ -463,6 +463,7 @@ func execWorker(cmd *cobra.Command, args []string) error {
pollTimeout, _ := cmd.Flags().GetInt32("poll-timeout")
execTimeout, _ := cmd.Flags().GetInt32("exec-timeout")
count, _ := cmd.Flags().GetInt32("count")
verbose, _ := cmd.Flags().GetBool("verbose")

taskClient := internal.GetTaskClient()

Expand Down Expand Up @@ -506,15 +507,15 @@ func execWorker(cmd *cobra.Command, args []string) error {
wg.Add(1)
go func(t model.Task) {
defer wg.Done()
executeExternalWorker(t, workerCmd, workerArgs, workerId, domain, execTimeout, taskClient)
executeExternalWorker(t, workerCmd, workerArgs, workerId, domain, execTimeout, verbose, taskClient)
}(task)
}

wg.Wait()
}
}

func executeExternalWorker(task model.Task, workerCmd string, workerArgs []string, workerId, domain string, execTimeout int32, taskClient *client.TaskResourceApiService) {
func executeExternalWorker(task model.Task, workerCmd string, workerArgs []string, workerId, domain string, execTimeout int32, verbose bool, taskClient *client.TaskResourceApiService) {
log.Infof("Processing task: %s (workflow: %s)", task.TaskId, task.WorkflowInstanceId)

taskJSON, err := json.Marshal(task)
Expand All @@ -524,6 +525,12 @@ func executeExternalWorker(task model.Task, workerCmd string, workerArgs []strin
return
}

if verbose {
fmt.Println("=== Task Input ===")
fmt.Println(string(taskJSON))
fmt.Println("==================")
}

ctx := context.Background()
if execTimeout > 0 {
var cancel context.CancelFunc
Expand Down Expand Up @@ -583,7 +590,7 @@ func executeExternalWorker(task model.Task, workerCmd string, workerArgs []strin

result = WorkerResult{
Status: "FAILED",
Reason: fmt.Sprintf("worker exec failed: %v", execErr),
Reason: fmt.Sprintf("worker execution failed: %v", execErr),
Logs: []string{stderrOutput},
}
} else {
Expand All @@ -600,6 +607,19 @@ func executeExternalWorker(task model.Task, workerCmd string, workerArgs []strin
}
}

if verbose {
resultJSON, _ := json.MarshalIndent(result, "", " ")
if result.Status == "FAILED" {
fmt.Println("=== Task Result (Error) ===")
fmt.Println(string(resultJSON))
fmt.Println("===========================")
} else {
fmt.Println("=== Task Result ===")
fmt.Println(string(resultJSON))
fmt.Println("===================")
}
}

switch result.Status {
case "COMPLETED", "FAILED", "IN_PROGRESS":
default:
Expand Down Expand Up @@ -1151,7 +1171,7 @@ func executePythonWorkerFromFile(cmd *cobra.Command, workerFile, taskType string
wg.Add(1)
go func(t model.Task) {
defer wg.Done()
executeExternalWorker(t, pythonCmd, []string{workerFile}, workerId, domain, execTimeout, taskClient)
executeExternalWorker(t, pythonCmd, []string{workerFile}, workerId, domain, execTimeout, false, taskClient)
}(task)
}

Expand All @@ -1167,13 +1187,14 @@ func init() {
workerJsCmd.Flags().String("domain", "", "Domain")
workerJsCmd.Flags().Int32("timeout", 100, "Timeout in milliseconds")

workerExecCmd.Flags().String("type", "", "Task type to poll for (required)")
workerExecCmd.MarkFlagRequired("type")
workerExecCmd.Flags().String("worker-id", "", "Worker ID")
workerExecCmd.Flags().String("domain", "", "Domain")
workerExecCmd.Flags().Int32("poll-timeout", 100, "Poll timeout in milliseconds")
workerExecCmd.Flags().Int32("exec-timeout", 0, "Execution timeout in seconds (0 = no timeout)")
workerExecCmd.Flags().Int32("count", 1, "Number of tasks to poll in each batch")
workerStdioCmd.Flags().String("type", "", "Task type to poll for (required)")
workerStdioCmd.MarkFlagRequired("type")
workerStdioCmd.Flags().String("worker-id", "", "Worker ID")
workerStdioCmd.Flags().String("domain", "", "Domain")
workerStdioCmd.Flags().Int32("poll-timeout", 100, "Poll timeout in milliseconds")
workerStdioCmd.Flags().Int32("exec-timeout", 0, "Execution timeout in seconds (0 = no timeout)")
workerStdioCmd.Flags().Int32("count", 1, "Number of tasks to poll in each batch")
workerStdioCmd.Flags().Bool("verbose", false, "Print task and result JSON to stdout")

workerRemoteCmd.Flags().String("type", "", "Task type to poll for (required)")
workerRemoteCmd.MarkFlagRequired("type")
Expand All @@ -1186,7 +1207,7 @@ func init() {
workerListRemoteCmd.Flags().String("namespace", "default", "Namespace to list workers from")

workerCmd.AddCommand(workerJsCmd)
workerCmd.AddCommand(workerExecCmd)
workerCmd.AddCommand(workerStdioCmd)
workerCmd.AddCommand(workerRemoteCmd)
workerCmd.AddCommand(workerListRemoteCmd)
rootCmd.AddCommand(workerCmd)
Expand Down
Loading