From 0040d6d287427025f511020517b34c07ddc700c0 Mon Sep 17 00:00:00 2001 From: Miguel Prieto Date: Fri, 12 Dec 2025 15:45:36 -0300 Subject: [PATCH] Renamed "generic workers" to "stdio workers" + implemented verbose flag. --- README.md | 18 +++++----- WORKER_EXEC.md => WORKER_STDIO.md | 21 +++++++----- cmd/worker.go | 57 +++++++++++++++++++++---------- 3 files changed, 61 insertions(+), 35 deletions(-) rename WORKER_EXEC.md => WORKER_STDIO.md (89%) diff --git a/README.md b/README.md index af7f991..fcefac9 100644 --- a/README.md +++ b/README.md @@ -557,34 +557,36 @@ orkes api-gateway route delete } ``` -## Task Workers +## Workers ⚠️ **EXPERIMENTAL FEATURES** -The CLI supports two types of task workers for processing Conductor tasks: +The CLI supports two types of workers for processing Conductor tasks: -### Generic Workers (Any Language) +### Stdio Workers -Execute tasks using external programs written in **any language** (Python, Node.js, Go, Rust, shell scripts, etc.). The CLI polls for tasks and passes them to your worker via stdin/stdout. +Execute tasks using external programs written in **any language** (Python, Node.js, Go, Rust, shell scripts, etc.). + +The CLI polls for tasks and passes them to your worker via stdin/stdout. **Best for:** Complex logic, heavy dependencies, full language ecosystem access -👉 **[Complete Generic Worker Documentation →](WORKER_EXEC.md)** +👉 **[Complete Stdio Worker Documentation →](WORKER_STDIO.md)** **Quick example:** ```bash # Run a Python worker (continuous polling with parallel execution) -orkes worker exec --type greet_task python3 worker.py +orkes worker stdio --type greet_task python3 worker.py # Poll multiple tasks per batch for higher throughput -orkes worker exec --type greet_task python3 worker.py --count 5 +orkes worker stdio --type greet_task python3 worker.py --count 5 ``` ### JavaScript Workers (Built-in) Execute tasks using **JavaScript** scripts with built-in utilities (HTTP, crypto, string functions). No external dependencies needed. -**Best for:** Lightweight tasks, quick scripts, HTTP integrations +**Best for:** Prototyping, Lightweight tasks, quick scripts, HTTP integrations 👉 **[Complete JavaScript Worker Documentation →](WORKER_JS.md)** diff --git a/WORKER_EXEC.md b/WORKER_STDIO.md similarity index 89% rename from WORKER_EXEC.md rename to WORKER_STDIO.md index b5474a7..91aaa94 100644 --- a/WORKER_EXEC.md +++ b/WORKER_STDIO.md @@ -1,10 +1,12 @@ -# Generic Task Workers +# STDIO Workers -The CLI supports executing tasks using external worker programs written in any language. This allows you to implement task workers in Python, Node.js, shell scripts, or any executable. +The CLI supports executing tasks using external worker programs written in any language. + +This allows you to implement task workers in Python, Node.js, shell scripts, or any executable. ## How it Works -The `worker exec` command continuously polls for tasks and executes them in parallel goroutines: +The `worker stdio` command continuously polls for tasks and executes them in parallel goroutines: 1. **Continuously polls** for tasks of the specified type 2. **Passes** the full task JSON to your worker via **stdin** @@ -16,7 +18,7 @@ The `worker exec` command continuously polls for tasks and executes them in para ## Usage ```bash -orkes worker exec --type [args...] +orkes worker stdio --type [args...] ``` **Flags:** @@ -26,6 +28,7 @@ orkes worker exec --type [args...] - `--poll-timeout`: Poll timeout in milliseconds (default: 100) - `--exec-timeout`: Worker execution timeout in seconds (0 = no timeout) - `--count`: Number of tasks to poll in each batch (default: 1) +- `--verbose`: Print task and result JSON to stdout ## Worker Contract @@ -93,16 +96,16 @@ Run the worker: ```bash # Start worker for 'greet_task' -orkes worker exec --type greet_task python3 worker.py +orkes worker stdio --type greet_task python3 worker.py # Poll multiple tasks per batch (poll 5 tasks at a time) -orkes worker exec --type greet_task python3 worker.py --count 5 +orkes worker stdio --type greet_task python3 worker.py --count 5 # With worker ID and domain -orkes worker exec --type greet_task python3 worker.py --worker-id worker-1 --domain production +orkes worker stdio --type greet_task python3 worker.py --worker-id worker-1 --domain production # With execution timeout (30 seconds per task) -orkes worker exec --type greet_task python3 worker.py --exec-timeout 30 +orkes worker stdio --type greet_task python3 worker.py --exec-timeout 30 ``` ## Example: Shell Script Worker @@ -143,7 +146,7 @@ The worker automatically runs in continuous mode: ```bash # Poll 10 tasks at a time and process them in parallel -orkes worker exec --type greet_task python3 worker.py --count 10 +orkes worker stdio --type greet_task python3 worker.py --count 10 ``` ## Error Handling diff --git a/cmd/worker.go b/cmd/worker.go index 9e6f130..92eccec 100644 --- a/cmd/worker.go +++ b/cmd/worker.go @@ -48,13 +48,13 @@ var ( Example: "orkes worker js --type my_task worker.js", } - workerExecCmd = &cobra.Command{ - Use: "exec [args...]", - Short: "Poll and execute tasks using an external command", - Long: `Continuously poll for tasks and execute them using an external command. + workerStdioCmd = &cobra.Command{ + Use: "stdio [args...]", + Short: "Poll tasks and execute command via stdin/stdout", + Long: `CLI polls tasks and executes the command. The task is passed in the standard input and the result is expected in the standard output. The worker runs in continuous mode, polling for tasks and executing them in -parallel goroutines (similar to JavaScript workers). +parallel goroutines. The task JSON is passed to the command via stdin. The command should read the task from stdin and write a result JSON to stdout. @@ -78,7 +78,7 @@ Exit codes: non-zero: Failure (task marked as FAILED)`, RunE: execWorker, SilenceUsage: true, - Example: "worker exec --type greet_task python worker.py\nworker exec --type greet_task python worker.py --count 5\nworker exec --type greet_task ./worker.sh --verbose", + Example: "worker stdio --type greet_task python worker.py\nworker stdio --type greet_task python worker.py --count 5\nworker stdio --type greet_task ./worker.sh --verbose", } workerRemoteCmd = &cobra.Command{ @@ -463,6 +463,7 @@ func execWorker(cmd *cobra.Command, args []string) error { pollTimeout, _ := cmd.Flags().GetInt32("poll-timeout") execTimeout, _ := cmd.Flags().GetInt32("exec-timeout") count, _ := cmd.Flags().GetInt32("count") + verbose, _ := cmd.Flags().GetBool("verbose") taskClient := internal.GetTaskClient() @@ -506,7 +507,7 @@ func execWorker(cmd *cobra.Command, args []string) error { wg.Add(1) go func(t model.Task) { defer wg.Done() - executeExternalWorker(t, workerCmd, workerArgs, workerId, domain, execTimeout, taskClient) + executeExternalWorker(t, workerCmd, workerArgs, workerId, domain, execTimeout, verbose, taskClient) }(task) } @@ -514,7 +515,7 @@ func execWorker(cmd *cobra.Command, args []string) error { } } -func executeExternalWorker(task model.Task, workerCmd string, workerArgs []string, workerId, domain string, execTimeout int32, taskClient *client.TaskResourceApiService) { +func executeExternalWorker(task model.Task, workerCmd string, workerArgs []string, workerId, domain string, execTimeout int32, verbose bool, taskClient *client.TaskResourceApiService) { log.Infof("Processing task: %s (workflow: %s)", task.TaskId, task.WorkflowInstanceId) taskJSON, err := json.Marshal(task) @@ -524,6 +525,12 @@ func executeExternalWorker(task model.Task, workerCmd string, workerArgs []strin return } + if verbose { + fmt.Println("=== Task Input ===") + fmt.Println(string(taskJSON)) + fmt.Println("==================") + } + ctx := context.Background() if execTimeout > 0 { var cancel context.CancelFunc @@ -583,7 +590,7 @@ func executeExternalWorker(task model.Task, workerCmd string, workerArgs []strin result = WorkerResult{ Status: "FAILED", - Reason: fmt.Sprintf("worker exec failed: %v", execErr), + Reason: fmt.Sprintf("worker execution failed: %v", execErr), Logs: []string{stderrOutput}, } } else { @@ -600,6 +607,19 @@ func executeExternalWorker(task model.Task, workerCmd string, workerArgs []strin } } + if verbose { + resultJSON, _ := json.MarshalIndent(result, "", " ") + if result.Status == "FAILED" { + fmt.Println("=== Task Result (Error) ===") + fmt.Println(string(resultJSON)) + fmt.Println("===========================") + } else { + fmt.Println("=== Task Result ===") + fmt.Println(string(resultJSON)) + fmt.Println("===================") + } + } + switch result.Status { case "COMPLETED", "FAILED", "IN_PROGRESS": default: @@ -1151,7 +1171,7 @@ func executePythonWorkerFromFile(cmd *cobra.Command, workerFile, taskType string wg.Add(1) go func(t model.Task) { defer wg.Done() - executeExternalWorker(t, pythonCmd, []string{workerFile}, workerId, domain, execTimeout, taskClient) + executeExternalWorker(t, pythonCmd, []string{workerFile}, workerId, domain, execTimeout, false, taskClient) }(task) } @@ -1167,13 +1187,14 @@ func init() { workerJsCmd.Flags().String("domain", "", "Domain") workerJsCmd.Flags().Int32("timeout", 100, "Timeout in milliseconds") - workerExecCmd.Flags().String("type", "", "Task type to poll for (required)") - workerExecCmd.MarkFlagRequired("type") - workerExecCmd.Flags().String("worker-id", "", "Worker ID") - workerExecCmd.Flags().String("domain", "", "Domain") - workerExecCmd.Flags().Int32("poll-timeout", 100, "Poll timeout in milliseconds") - workerExecCmd.Flags().Int32("exec-timeout", 0, "Execution timeout in seconds (0 = no timeout)") - workerExecCmd.Flags().Int32("count", 1, "Number of tasks to poll in each batch") + workerStdioCmd.Flags().String("type", "", "Task type to poll for (required)") + workerStdioCmd.MarkFlagRequired("type") + workerStdioCmd.Flags().String("worker-id", "", "Worker ID") + workerStdioCmd.Flags().String("domain", "", "Domain") + workerStdioCmd.Flags().Int32("poll-timeout", 100, "Poll timeout in milliseconds") + workerStdioCmd.Flags().Int32("exec-timeout", 0, "Execution timeout in seconds (0 = no timeout)") + workerStdioCmd.Flags().Int32("count", 1, "Number of tasks to poll in each batch") + workerStdioCmd.Flags().Bool("verbose", false, "Print task and result JSON to stdout") workerRemoteCmd.Flags().String("type", "", "Task type to poll for (required)") workerRemoteCmd.MarkFlagRequired("type") @@ -1186,7 +1207,7 @@ func init() { workerListRemoteCmd.Flags().String("namespace", "default", "Namespace to list workers from") workerCmd.AddCommand(workerJsCmd) - workerCmd.AddCommand(workerExecCmd) + workerCmd.AddCommand(workerStdioCmd) workerCmd.AddCommand(workerRemoteCmd) workerCmd.AddCommand(workerListRemoteCmd) rootCmd.AddCommand(workerCmd)