From c17f371863e5f3b2f84b248a9faad9abb7edf328 Mon Sep 17 00:00:00 2001 From: Paulo Suzart Date: Mon, 13 Apr 2026 19:51:09 -0500 Subject: [PATCH 1/2] doc(cli): Create cli example to display cli usage --- Cargo.lock | 16 +++ Cargo.toml | 1 + Justfile | 6 + examples/cli-demo/Cargo.toml | 21 +++ examples/cli-demo/README.md | 153 ++++++++++++++++++++ examples/cli-demo/demo.sh | 161 +++++++++++++++++++++ examples/cli-demo/src/main.rs | 259 ++++++++++++++++++++++++++++++++++ 7 files changed, 617 insertions(+) create mode 100644 examples/cli-demo/Cargo.toml create mode 100644 examples/cli-demo/README.md create mode 100755 examples/cli-demo/demo.sh create mode 100644 examples/cli-demo/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 09bc95c..ee5bce9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -646,6 +646,22 @@ dependencies = [ "zart", ] +[[package]] +name = "example-cli-demo" +version = "0.1.0" +dependencies = [ + "async-trait", + "scheduler", + "serde", + "serde_json", + "sqlx", + "tokio", + "tracing", + "tracing-subscriber", + "uuid", + "zart", +] + [[package]] name = "fastrand" version = "2.4.0" diff --git a/Cargo.toml b/Cargo.toml index 4db24ff..17f7816 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "crates/zart-macros", "examples", "examples/admin-demo", + "examples/cli-demo", ] resolver = "2" diff --git a/Justfile b/Justfile index 5d84137..c2208c4 100644 --- a/Justfile +++ b/Justfile @@ -197,6 +197,12 @@ example-transactions db_url='postgres://zart:zart@localhost:5432/zart': just migrate RUST_LOG=${RUST_LOG:-off} DATABASE_URL={{db_url}} cargo run -p zart-examples --bin example-transactions +# Run the cli-demo example (long-running execution + interactive CLI admin commands) +# Usage: just example-cli-demo [DATABASE_URL] +example-cli-demo db_url='postgres://zart:zart@localhost:5432/zart': + just migrate + RUST_LOG=${RUST_LOG:-off} DATABASE_URL={{db_url}} bash examples/cli-demo/demo.sh + # Run the admin-demo example (wait_completion, start_and_wait_for, restart, retry_step, rerun, pause/resume) # Usage: just example-admin-demo [DATABASE_URL] example-admin-demo db_url='postgres://zart:zart@localhost:5432/zart': diff --git a/examples/cli-demo/Cargo.toml b/examples/cli-demo/Cargo.toml new file mode 100644 index 0000000..4376d1e --- /dev/null +++ b/examples/cli-demo/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "example-cli-demo" +version = "0.1.0" +edition = "2024" +publish = false + +[[bin]] +name = "example-cli-demo" +path = "src/main.rs" + +[dependencies] +zart = { path = "../../crates/zart" } +scheduler = { path = "../../crates/scheduler", features = ["postgres"] } +tokio = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +async-trait = { workspace = true } +uuid = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +sqlx = { workspace = true } diff --git a/examples/cli-demo/README.md b/examples/cli-demo/README.md new file mode 100644 index 0000000..0ef5b9c --- /dev/null +++ b/examples/cli-demo/README.md @@ -0,0 +1,153 @@ +# CLI Demo Example + +Demonstrates **Zart CLI admin commands** interacting with a long-running durable execution. This example shows how to use the `zart` CLI to monitor, pause, resume, restart, and inspect durable executions while they run. + +## Features Demonstrated + +- **`zart status `** — Check execution status in real-time +- **`zart pause --execution-id `** — Create pause rules to temporarily halt execution +- **`zart resume --execution-id `** — Resume execution by soft-deleting pause rules +- **`zart pause-list`** — List all pause rules (active and deleted) +- **`zart restart `** — Full restart with history preservation +- **`zart runs `** — List all past runs from the run history + +## Flow + +The `demo.sh` script handles everything automatically: + +1. Generates a unique execution ID (e.g., `cli-demo-add3f0e5-728f`) +2. Starts the Rust durable execution in the background +3. Waits for the execution to initialize in the database +4. Runs through a sequence of CLI commands demonstrating each admin feature +5. Cleans up the background process on exit + +The durable execution follows this timeline: + +``` +┌─────────────────────────────────────────────────────────┐ +│ Rust Process (background, ~2 minutes total) │ +│ ┌──────────┐ ┌──────┐ ┌──────────┐ ┌──────┐ │ +│ │ prepare │→│sleep │→│ process │→│sleep │→│finalize│ +│ │ (2s) │ │(30s) │ │ (2s) │ │(30s) │ │(2s) │ +│ └──────────┘ └──────┘ └──────────┘ └──────┘ │ +│ ↑ ↑ ↑ │ +│ └─ CLI ─────┴─── CLI commands ───────┘ │ +└─────────────────────────────────────────────────────────┘ +``` + +CLI commands execute during the sleep windows, demonstrating pause/resume, restart, and run history. + +## Running + +```bash +# Ensure PostgreSQL is running +just up + +# Run migrations +just migrate + +# Run the CLI demo (fully automated) +just example-cli-demo +``` + +## What You'll See + +``` +╔══════════════════════════════════════════════════════════╗ +║ Zart CLI Admin Commands — Interactive Demo ║ +╚══════════════════════════════════════════════════════════╝ + +Execution ID: cli-demo-add3f0e5-728f-47cd +Database: postgres://zart:zart@localhost:5432/zart + +Starting durable execution in background... +✓ Rust process started (PID: 70065) + +Waiting for execution to initialize... +✓ Execution initialized (2 seconds) + +▶ Check execution status + $ cargo run -q -p zart-cli -- --database-url postgres://zart:zart@localhost:5432/zart status cli-demo-add3f0e5-728f-47cd + +execution_id : cli-demo-add3f0e5-728f-47cd +task_name : zart::cli_demo::CliDemoTask +status : Running +scheduled_at : 2026-04-14T00:45:00.000000+00:00 + +───────────────────────────────────────────────────────── + +Creating pause rule... + +▶ Create pause rule for this execution + $ cargo run -q -p zart-cli -- --database-url postgres://zart:zart@localhost:5432/zart pause --execution-id cli-demo-add3f0e5-728f-47cd --triggered-by demo-script + +Created pause rule 'rule-abc123' (...) + +───────────────────────────────────────────────────────── + +▶ List active pause rules + $ cargo run -q -p zart-cli -- --database-url postgres://zart:zart@localhost:5432/zart pause-list + +rule-abc123 PauseScope { execution_id: Some("cli-demo-..."), ... } + +───────────────────────────────────────────────────────── + +... (continues with resume, restart, runs, etc.) + +╔══════════════════════════════════════════════════════════╗ +║ CLI Demo Commands Complete ║ +╚══════════════════════════════════════════════════════════╝ + +Commands demonstrated: + • zart status — Check execution status + • zart pause — Create pause rules + • zart resume — Soft-delete pause rules + • zart pause-list — List all pause rules + • zart restart — Full restart with history preservation + • zart runs — List run history + +Background process will be cleaned up automatically. +``` + +## Key Concepts + +### Pause Rules +Pause rules are **scheduling-time controls** — they prevent new tasks from being scheduled without affecting in-flight work. When you run `zart pause`, a rule is inserted into `zart_pause_rules`. The worker checks this table before scheduling the next task and silently stops if a rule matches. + +`zart resume` soft-deletes the rules (sets `deleted_at`), allowing scheduling to continue. The execution replays naturally from `zart_steps` — no state loss occurs. + +### Run History +Every restart creates a new run row in `zart_execution_runs`. The original run is preserved with its status, and a new run begins with `trigger = 'restart'`. Run history is append-only and fully auditable. + +### CLI vs Embedded API +This example demonstrates the **CLI tier** of admin access. The same operations are available programmatically via `DurableScheduler` methods (see `admin-demo` example) or via HTTP through `zart-api`'s `admin_router()`. + +## Manual Exploration + +While the demo script runs automatically, you can also interact manually: + +```bash +# In one terminal: start the long-running execution +DATABASE_URL=postgres://zart:zart@localhost:5432/zart \ +cargo run -p example-cli-demo + +# Note the execution ID from the output, then in another terminal: +export DATABASE_URL=postgres://zart:zart@localhost:5432/zart +export EXECUTION_ID= + +# Check status +cargo run -q -p zart-cli -- status $EXECUTION_ID + +# Pause execution +cargo run -q -p zart-cli -- pause --execution-id $EXECUTION_ID --triggered-by me + +# List pause rules +cargo run -q -p zart-cli -- pause-list + +# Resume +cargo run -q -p zart-cli -- resume --execution-id $EXECUTION_ID --triggered-by me + +# Restart and check runs +cargo run -q -p zart-cli -- restart $EXECUTION_ID --payload '{"fail_step":false}' +cargo run -q -p zart-cli -- runs $EXECUTION_ID +``` diff --git a/examples/cli-demo/demo.sh b/examples/cli-demo/demo.sh new file mode 100755 index 0000000..0335361 --- /dev/null +++ b/examples/cli-demo/demo.sh @@ -0,0 +1,161 @@ +#!/usr/bin/env bash +# CLI Demo — self-contained orchestration script +# +# This script: +# 1. Generates a unique execution ID +# 2. Starts the Rust durable execution in background +# 3. Runs CLI admin commands against it +# 4. Cleans up the background process on exit +# +# Usage: just example-cli-demo +# or: DATABASE_URL=postgres://... bash examples/cli-demo/demo.sh + +set -euo pipefail + +# ── Colors ──────────────────────────────────────────────────────────────────── +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +BLUE='\033[0;34m' +CYAN='\033[0;36m' +NC='\033[0m' + +DATABASE_URL="${DATABASE_URL:-postgres://zart:zart@localhost:5432/zart}" +EXECUTION_ID="cli-demo-$(uuidgen | tr '[:upper:]' '[:lower:]' | cut -d'-' -f1-3)" +export EXECUTION_ID + +# Silence all cargo/rust logging — only our echo output should appear +export RUST_LOG=off + +ZART="cargo run -q -p zart-cli -- --database-url ${DATABASE_URL}" + +# ── Cleanup on exit ─────────────────────────────────────────────────────────── +RUST_PID="" +cleanup() { + if [ -n "$RUST_PID" ] && kill -0 "$RUST_PID" 2>/dev/null; then + echo "" + echo -e "${YELLOW}Stopping background Rust process (PID: $RUST_PID)...${NC}" + kill "$RUST_PID" 2>/dev/null || true + wait "$RUST_PID" 2>/dev/null || true + fi +} +trap cleanup EXIT + +# ── Header ──────────────────────────────────────────────────────────────────── +echo "" +echo -e "${BLUE}╔══════════════════════════════════════════════════════════╗${NC}" +echo -e "${BLUE}║ Zart CLI Admin Commands — Interactive Demo ║${NC}" +echo -e "${BLUE}╚══════════════════════════════════════════════════════════╝${NC}" +echo "" +echo -e "${CYAN}Execution ID: ${EXECUTION_ID}${NC}" +echo -e "${CYAN}Database: ${DATABASE_URL}${NC}" +echo "" + +# ── Start Rust process in background ───────────────────────────────────────── +echo -e "${GREEN}Starting durable execution in background...${NC}" +DATABASE_URL="${DATABASE_URL}" EXECUTION_ID="${EXECUTION_ID}" \ + cargo run -p example-cli-demo & +RUST_PID=$! +echo -e "${GREEN}✓ Rust process started (PID: $RUST_PID)${NC}" +echo "" + +# ── Wait for execution to initialize in DB ──────────────────────────────────── +echo -e "${GREEN}Waiting for execution to initialize...${NC}" +for i in {1..15}; do + if cargo run -q -p zart-cli -- --database-url "${DATABASE_URL}" status "${EXECUTION_ID}" >/dev/null 2>&1; then + echo -e "${GREEN}✓ Execution initialized ($i seconds)${NC}" + break + fi + if [ $i -eq 15 ]; then + echo -e "${RED}✗ ERROR: Execution did not initialize within 15 seconds${NC}" + exit 1 + fi + sleep 1 +done +echo "" + +# ── Helper: run a CLI command (fails hard on error) ────────────────────────── +run_cli() { + local description="$1" + shift + + echo -e "${YELLOW}▶ ${description}${NC}" + echo -e "${BLUE} $ ${*}${NC}" + echo "" + "$@" 2>&1 + echo "" + echo -e "${BLUE}─────────────────────────────────────────────────────────${NC}" + echo "" +} + +# ── 1. Status ───────────────────────────────────────────────────────────────── +run_cli "Check execution status" \ + ${ZART} status "${EXECUTION_ID}" + +# ── 2. Pause ────────────────────────────────────────────────────────────────── +echo -e "${GREEN}Creating pause rule...${NC}" +run_cli "Create pause rule for this execution" \ + ${ZART} pause --execution-id "${EXECUTION_ID}" --triggered-by demo-script + +# ── 3. List pause rules ─────────────────────────────────────────────────────── +run_cli "List active pause rules" \ + ${ZART} pause-list + +# Brief pause +echo -e "${GREEN}Waiting 3 seconds...${NC}" +sleep 3 + +# ── 4. Resume ───────────────────────────────────────────────────────────────── +echo -e "${GREEN}Resuming execution...${NC}" +run_cli "Resume by soft-deleting pause rules" \ + ${ZART} resume --execution-id "${EXECUTION_ID}" --triggered-by demo-script + +# ── 5. Verify rules deleted ─────────────────────────────────────────────────── +run_cli "Verify pause rules are deleted" \ + ${ZART} pause-list + +# Wait for execution to progress +echo -e "${GREEN}Execution resumed. Waiting 8 seconds...${NC}" +sleep 8 + +# ── 6. Status after resume ──────────────────────────────────────────────────── +run_cli "Check status after resume" \ + ${ZART} status "${EXECUTION_ID}" + +# ── 7. Restart ──────────────────────────────────────────────────────────────── +echo -e "${GREEN}Demonstrating full restart...${NC}" +run_cli "Restart execution with new payload" \ + ${ZART} restart "${EXECUTION_ID}" --payload '{"fail_step":false}' --triggered-by demo-script + +# ── 8. List runs ────────────────────────────────────────────────────────────── +run_cli "List all runs (original + restart)" \ + ${ZART} runs "${EXECUTION_ID}" + +# Wait for restarted execution +echo -e "${GREEN}Waiting 10 seconds for restarted execution...${NC}" +sleep 10 + +# ── 9. Final status ─────────────────────────────────────────────────────────── +run_cli "Final status check" \ + ${ZART} status "${EXECUTION_ID}" + +# ── 10. Final run history ───────────────────────────────────────────────────── +run_cli "Final run history" \ + ${ZART} runs "${EXECUTION_ID}" + +# ── Summary ─────────────────────────────────────────────────────────────────── +echo "" +echo -e "${GREEN}╔══════════════════════════════════════════════════════════╗${NC}" +echo -e "${GREEN}║ CLI Demo Commands Complete ║${NC}" +echo -e "${GREEN}╚══════════════════════════════════════════════════════════╝${NC}" +echo "" +echo -e "${BLUE}Commands demonstrated:${NC}" +echo " • zart status — Check execution status" +echo " • zart pause — Create pause rules" +echo " • zart resume — Soft-delete pause rules" +echo " • zart pause-list — List all pause rules" +echo " • zart restart — Full restart with history preservation" +echo " • zart runs — List run history" +echo "" +echo -e "${YELLOW}Background process will be cleaned up automatically.${NC}" +echo "" diff --git a/examples/cli-demo/src/main.rs b/examples/cli-demo/src/main.rs new file mode 100644 index 0000000..54147f8 --- /dev/null +++ b/examples/cli-demo/src/main.rs @@ -0,0 +1,259 @@ +//! CLI Interaction Demo — Long-running durable execution for CLI admin commands demonstration. +//! +//! This example starts a durable execution with multiple steps and sleeps to provide +//! enough time for interactive CLI admin command demonstrations. +//! +//! The execution flow: +//! 1. Step: "prepare-data" — completes quickly +//! 2. Sleep: 30 seconds (gives time for CLI commands) +//! 3. Step: "process-results" — completes after sleep +//! 4. Sleep: 30 seconds (more time for CLI commands) +//! 5. Step: "finalize" — final step +//! +//! Run this example, then in another terminal use the zart CLI to: +//! - `zart status ` — check execution status +//! - `zart pause --execution-id ` — pause the execution +//! - `zart resume --execution-id ` — resume it +//! - `zart pause-list` — list pause rules +//! - `zart restart ` — restart from scratch +//! - `zart runs ` — list all runs + +use scheduler::PostgresScheduler; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; +use std::time::Duration; +use zart::prelude::*; + +// ── Handler ────────────────────────────────────────────────────────────────── + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct CliDemoInput { + /// Optional: make a step fail for retry demonstration. + fail_step: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct CliDemoOutput { + prepared: String, + processed: String, + finalized: String, +} + +struct CliDemoTask; + +#[async_trait::async_trait] +impl DurableExecution for CliDemoTask { + type Data = CliDemoInput; + type Output = CliDemoOutput; + + async fn run(&self, data: Self::Data) -> Result { + tracing::info!("[cli-demo] === Execution started ==="); + + // Step 1: Prepare data (completes quickly) + tracing::info!("[cli-demo] Step 1: Preparing data..."); + let prepared = zart::require(PrepareData).await?; + tracing::info!("[cli-demo] Step 1 completed: {}", prepared); + + // Sleep 1: 30 seconds — time for CLI commands + tracing::info!("[cli-demo] Sleeping 30 seconds (try CLI commands now)..."); + zart::sleep("first-sleep", Duration::from_secs(30)).await?; + tracing::info!("[cli-demo] First sleep completed"); + + // Step 2: Process results + tracing::info!("[cli-demo] Step 2: Processing results..."); + let processed = zart::require(ProcessResults { + fail: data.fail_step, + }) + .await?; + tracing::info!("[cli-demo] Step 2 completed: {}", processed); + + // Sleep 2: 30 seconds — more time for CLI commands + tracing::info!("[cli-demo] Sleeping 30 seconds (more CLI commands)..."); + zart::sleep("second-sleep", Duration::from_secs(30)).await?; + tracing::info!("[cli-demo] Second sleep completed"); + + // Step 3: Finalize + tracing::info!("[cli-demo] Step 3: Finalizing..."); + let finalized = zart::require(Finalize).await?; + tracing::info!("[cli-demo] Step 3 completed: {}", finalized); + + tracing::info!("[cli-demo] === Execution completed successfully ==="); + + Ok(CliDemoOutput { + prepared, + processed, + finalized, + }) + } + + fn max_retries(&self) -> usize { + 0 // No retries — we want controlled failures for demo + } +} + +// ── Steps ──────────────────────────────────────────────────────────────────── + +struct PrepareData; + +#[async_trait::async_trait] +impl ZartStep for PrepareData { + type Output = String; + type Error = CliDemoStepError; + + fn step_name(&self) -> std::borrow::Cow<'static, str> { + std::borrow::Cow::Borrowed("prepare-data") + } + + async fn run(&self) -> Result { + tracing::info!("[prepare-data] Running preparation logic..."); + // Simulate some work + tokio::time::sleep(Duration::from_secs(2)).await; + Ok("data-prepared-successfully".to_string()) + } +} + +struct ProcessResults { + fail: bool, +} + +#[async_trait::async_trait] +impl ZartStep for ProcessResults { + type Output = String; + type Error = CliDemoStepError; + + fn step_name(&self) -> std::borrow::Cow<'static, str> { + std::borrow::Cow::Borrowed("process-results") + } + + async fn run(&self) -> Result { + if self.fail { + tracing::warn!("[process-results] INTENTIONAL FAILURE for retry demo"); + return Err(CliDemoStepError("intentional failure for demo".into())); + } + tracing::info!("[process-results] Processing results..."); + tokio::time::sleep(Duration::from_secs(2)).await; + Ok("results-processed-successfully".to_string()) + } +} + +struct Finalize; + +#[async_trait::async_trait] +impl ZartStep for Finalize { + type Output = String; + type Error = CliDemoStepError; + + fn step_name(&self) -> std::borrow::Cow<'static, str> { + std::borrow::Cow::Borrowed("finalize") + } + + async fn run(&self) -> Result { + tracing::info!("[finalize] Running finalization..."); + tokio::time::sleep(Duration::from_secs(2)).await; + Ok("execution-finalized-successfully".to_string()) + } +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +struct CliDemoStepError(String); + +impl std::fmt::Display for CliDemoStepError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl std::error::Error for CliDemoStepError {} + +// ── Main ───────────────────────────────────────────────────────────────────── + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Simple output-focused logging + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), + ) + .with_target(false) + .with_thread_ids(false) + .init(); + + let db_url = std::env::var("DATABASE_URL") + .unwrap_or_else(|_| "postgres://zart:zart@localhost:5432/zart".to_string()); + + let execution_id = std::env::var("EXECUTION_ID") + .unwrap_or_else(|_| format!("cli-demo-{}", uuid::Uuid::new_v4())); + + let fail_step = std::env::var("FAIL_STEP") + .map(|v| v.eq_ignore_ascii_case("true")) + .unwrap_or(false); + + println!("=== Zart CLI Demo — Long-Running Execution ===\n"); + println!("Execution ID: {}", execution_id); + println!("Database: {}", db_url); + println!("Fail step: {}", fail_step); + println!(); + println!("This execution will run for ~2 minutes with sleeps between steps."); + println!("During the sleeps, you can use the zart CLI to interact:"); + println!(" zart status {}", execution_id); + println!(" zart pause --execution-id {}", execution_id); + println!(" zart resume --execution-id {}", execution_id); + println!(" zart pause-list"); + println!(" zart restart {}", execution_id); + println!(" zart runs {}", execution_id); + println!(); + + let pool = sqlx::PgPool::connect(&db_url).await?; + let sched = Arc::new(PostgresScheduler::new(pool.clone())); + sched.run_migrations().await?; + + let durable = Arc::new(DurableScheduler::with_pause(sched.clone(), sched.clone())); + + // Start the durable execution + durable + .start_for::( + &execution_id, + "zart::cli_demo::CliDemoTask", + &CliDemoInput { fail_step }, + ) + .await?; + + println!("✓ Execution started with ID: {}", execution_id); + println!(" Task: zart::cli_demo::CliDemoTask"); + println!(); + + // Register and run the handler + let mut registry = TaskRegistry::new(); + registry.register("zart::cli_demo::CliDemoTask", CliDemoTask); + let registry = Arc::new(registry); + + let config = zart::WorkerConfig { + poll_interval: Duration::from_millis(500), + max_tasks_per_poll: 10, + max_concurrent_tasks: 4, + shutdown_timeout: Duration::from_secs(10), + orphan_timeout: Duration::from_secs(30), + ..Default::default() + }; + + let worker = zart::Worker::new(sched.clone(), registry, config); + + println!("Worker starting... (press Ctrl+C to stop)"); + println!(); + + // Run the worker — it will execute the full flow + worker.run().await; + + println!(); + println!("=== Worker completed ==="); + + // Show final status + let record = durable.status(&execution_id).await?; + println!("Final execution status: {}", record.status); + if let Some(result) = &record.result { + println!("Result: {}", result); + } + + Ok(()) +} From d748f0dc050804765b011059eee635168111675f Mon Sep 17 00:00:00 2001 From: Paulo Suzart Date: Mon, 13 Apr 2026 21:21:57 -0500 Subject: [PATCH 2/2] Trying to fix completion group --- crates/zart/src/context/task_context.rs | 15 +++++++-- crates/zart/src/step_types/body.rs | 41 ++++++++++++++----------- webpage/src/pages/index.astro | 3 +- 3 files changed, 37 insertions(+), 22 deletions(-) diff --git a/crates/zart/src/context/task_context.rs b/crates/zart/src/context/task_context.rs index af2168c..96ff1a1 100644 --- a/crates/zart/src/context/task_context.rs +++ b/crates/zart/src/context/task_context.rs @@ -691,11 +691,22 @@ impl TaskContext { { match &self.execution_mode { ExecutionMode::Body => { - let group_step_name = format!("__wg__all__{}", uuid::Uuid::new_v4()); - // Extract names before awaits; avoid borrowing non-Sync handles across await points. let step_names: Vec = handles.iter().map(|h| h.step_name.clone()).collect(); + // Generate a deterministic group step name based on the sorted child step names. + // This ensures the same wait_group is used across body task replays, preventing + // orphaned children and lost completion counts. + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + let mut sorted_names = step_names.clone(); + sorted_names.sort(); + let names_joined = sorted_names.join("|"); + let mut hasher = DefaultHasher::new(); + names_joined.hash(&mut hasher); + let hash_value = hasher.finish(); + let group_step_name = format!("__wg__all__{:x}", hash_value); + let req = StepRequest::new_wait_group_barrier(&group_step_name, &step_names, 0); let json = crate::step_types::dispatch::step_internal::( diff --git a/crates/zart/src/step_types/body.rs b/crates/zart/src/step_types/body.rs index 997e857..0945b31 100644 --- a/crates/zart/src/step_types/body.rs +++ b/crates/zart/src/step_types/body.rs @@ -327,6 +327,29 @@ impl BodyBehavior for LookupOrScheduleWaitGroupBarrier { } }; + let total = i32::try_from(child_names.len()).map_err(|_| StepError::Failed { + step: group_step_name.to_string(), + reason: "too many wait-group handles to fit i32".to_string(), + })?; + + // Upsert the wait-group row BEFORE scheduling any child tasks so that + // `complete_wait_group_child` always finds a row to decrement. If the + // row were created after the children are committed a fast worker could + // pick up and complete a child before the row exists, turning the + // decrement into a no-op and permanently losing that count. + ctx.scheduler + .upsert_wait_group_step(UpsertWaitGroupStepParams { + run_id: ctx.run_id().to_string(), + group_step_name: group_step_name.to_string(), + total, + threshold, + }) + .await + .map_err(|e| StepError::Failed { + step: group_step_name.to_string(), + reason: e.to_string(), + })?; + let mut all_completed = true; for child_name in child_names { @@ -369,24 +392,6 @@ impl BodyBehavior for LookupOrScheduleWaitGroupBarrier { } } - let total = i32::try_from(child_names.len()).map_err(|_| StepError::Failed { - step: group_step_name.to_string(), - reason: "too many wait-group handles to fit i32".to_string(), - })?; - - ctx.scheduler - .upsert_wait_group_step(UpsertWaitGroupStepParams { - run_id: ctx.run_id().to_string(), - group_step_name: group_step_name.to_string(), - total, - threshold, - }) - .await - .map_err(|e| StepError::Failed { - step: group_step_name.to_string(), - reason: e.to_string(), - })?; - if all_completed { let mut results = Vec::with_capacity(child_names.len()); for child_name in child_names { diff --git a/webpage/src/pages/index.astro b/webpage/src/pages/index.astro index 6e5805f..63bb055 100644 --- a/webpage/src/pages/index.astro +++ b/webpage/src/pages/index.astro @@ -1229,11 +1229,10 @@ - +