Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ members = [
"crates/zart-macros",
"examples",
"examples/admin-demo",
"examples/cli-demo",
]
resolver = "2"

Expand Down
6 changes: 6 additions & 0 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@ example-transactions db_url='postgres://zart:zart@localhost:5432/zart':
just migrate
RUST_LOG=${RUST_LOG:-off} DATABASE_URL={{db_url}} cargo run -p zart-examples --bin example-transactions

# Run the cli-demo example (long-running execution + interactive CLI admin commands)
# Usage: just example-cli-demo [DATABASE_URL]
example-cli-demo db_url='postgres://zart:zart@localhost:5432/zart':
just migrate
RUST_LOG=${RUST_LOG:-off} DATABASE_URL={{db_url}} bash examples/cli-demo/demo.sh

# Run the admin-demo example (wait_completion, start_and_wait_for, restart, retry_step, rerun, pause/resume)
# Usage: just example-admin-demo [DATABASE_URL]
example-admin-demo db_url='postgres://zart:zart@localhost:5432/zart':
Expand Down
15 changes: 13 additions & 2 deletions crates/zart/src/context/task_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,11 +691,22 @@ impl TaskContext {
{
match &self.execution_mode {
ExecutionMode::Body => {
let group_step_name = format!("__wg__all__{}", uuid::Uuid::new_v4());

// Extract names before awaits; avoid borrowing non-Sync handles across await points.
let step_names: Vec<String> = handles.iter().map(|h| h.step_name.clone()).collect();

// Generate a deterministic group step name based on the sorted child step names.
// This ensures the same wait_group is used across body task replays, preventing
// orphaned children and lost completion counts.
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut sorted_names = step_names.clone();
sorted_names.sort();
let names_joined = sorted_names.join("|");
let mut hasher = DefaultHasher::new();
names_joined.hash(&mut hasher);
let hash_value = hasher.finish();
let group_step_name = format!("__wg__all__{:x}", hash_value);

let req = StepRequest::new_wait_group_barrier(&group_step_name, &step_names, 0);

let json = crate::step_types::dispatch::step_internal::<serde_json::Value>(
Expand Down
41 changes: 23 additions & 18 deletions crates/zart/src/step_types/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,29 @@ impl BodyBehavior for LookupOrScheduleWaitGroupBarrier {
}
};

let total = i32::try_from(child_names.len()).map_err(|_| StepError::Failed {
step: group_step_name.to_string(),
reason: "too many wait-group handles to fit i32".to_string(),
})?;

// Upsert the wait-group row BEFORE scheduling any child tasks so that
// `complete_wait_group_child` always finds a row to decrement. If the
// row were created after the children are committed a fast worker could
// pick up and complete a child before the row exists, turning the
// decrement into a no-op and permanently losing that count.
ctx.scheduler
.upsert_wait_group_step(UpsertWaitGroupStepParams {
run_id: ctx.run_id().to_string(),
group_step_name: group_step_name.to_string(),
total,
threshold,
})
.await
.map_err(|e| StepError::Failed {
step: group_step_name.to_string(),
reason: e.to_string(),
})?;

let mut all_completed = true;

for child_name in child_names {
Expand Down Expand Up @@ -369,24 +392,6 @@ impl BodyBehavior for LookupOrScheduleWaitGroupBarrier {
}
}

let total = i32::try_from(child_names.len()).map_err(|_| StepError::Failed {
step: group_step_name.to_string(),
reason: "too many wait-group handles to fit i32".to_string(),
})?;

ctx.scheduler
.upsert_wait_group_step(UpsertWaitGroupStepParams {
run_id: ctx.run_id().to_string(),
group_step_name: group_step_name.to_string(),
total,
threshold,
})
.await
.map_err(|e| StepError::Failed {
step: group_step_name.to_string(),
reason: e.to_string(),
})?;

if all_completed {
let mut results = Vec::with_capacity(child_names.len());
for child_name in child_names {
Expand Down
21 changes: 21 additions & 0 deletions examples/cli-demo/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[package]
name = "example-cli-demo"
version = "0.1.0"
edition = "2024"
publish = false

[[bin]]
name = "example-cli-demo"
path = "src/main.rs"

[dependencies]
zart = { path = "../../crates/zart" }
scheduler = { path = "../../crates/scheduler", features = ["postgres"] }
tokio = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
async-trait = { workspace = true }
uuid = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
sqlx = { workspace = true }
153 changes: 153 additions & 0 deletions examples/cli-demo/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
# CLI Demo Example

Demonstrates **Zart CLI admin commands** interacting with a long-running durable execution. This example shows how to use the `zart` CLI to monitor, pause, resume, restart, and inspect durable executions while they run.

## Features Demonstrated

- **`zart status <execution_id>`** — Check execution status in real-time
- **`zart pause --execution-id <id>`** — Create pause rules to temporarily halt execution
- **`zart resume --execution-id <id>`** — Resume execution by soft-deleting pause rules
- **`zart pause-list`** — List all pause rules (active and deleted)
- **`zart restart <execution_id>`** — Full restart with history preservation
- **`zart runs <execution_id>`** — List all past runs from the run history

## Flow

The `demo.sh` script handles everything automatically:

1. Generates a unique execution ID (e.g., `cli-demo-add3f0e5-728f`)
2. Starts the Rust durable execution in the background
3. Waits for the execution to initialize in the database
4. Runs through a sequence of CLI commands demonstrating each admin feature
5. Cleans up the background process on exit

The durable execution follows this timeline:

```
┌─────────────────────────────────────────────────────────┐
│ Rust Process (background, ~2 minutes total) │
│ ┌──────────┐ ┌──────┐ ┌──────────┐ ┌──────┐ │
│ │ prepare │→│sleep │→│ process │→│sleep │→│finalize│
│ │ (2s) │ │(30s) │ │ (2s) │ │(30s) │ │(2s) │
│ └──────────┘ └──────┘ └──────────┘ └──────┘ │
│ ↑ ↑ ↑ │
│ └─ CLI ─────┴─── CLI commands ───────┘ │
└─────────────────────────────────────────────────────────┘
```

CLI commands execute during the sleep windows, demonstrating pause/resume, restart, and run history.

## Running

```bash
# Ensure PostgreSQL is running
just up

# Run migrations
just migrate

# Run the CLI demo (fully automated)
just example-cli-demo
```

## What You'll See

```
╔══════════════════════════════════════════════════════════╗
║ Zart CLI Admin Commands — Interactive Demo ║
╚══════════════════════════════════════════════════════════╝

Execution ID: cli-demo-add3f0e5-728f-47cd
Database: postgres://zart:zart@localhost:5432/zart

Starting durable execution in background...
✓ Rust process started (PID: 70065)

Waiting for execution to initialize...
✓ Execution initialized (2 seconds)

▶ Check execution status
$ cargo run -q -p zart-cli -- --database-url postgres://zart:zart@localhost:5432/zart status cli-demo-add3f0e5-728f-47cd

execution_id : cli-demo-add3f0e5-728f-47cd
task_name : zart::cli_demo::CliDemoTask
status : Running
scheduled_at : 2026-04-14T00:45:00.000000+00:00

─────────────────────────────────────────────────────────

Creating pause rule...

▶ Create pause rule for this execution
$ cargo run -q -p zart-cli -- --database-url postgres://zart:zart@localhost:5432/zart pause --execution-id cli-demo-add3f0e5-728f-47cd --triggered-by demo-script

Created pause rule 'rule-abc123' (...)

─────────────────────────────────────────────────────────

▶ List active pause rules
$ cargo run -q -p zart-cli -- --database-url postgres://zart:zart@localhost:5432/zart pause-list

rule-abc123 PauseScope { execution_id: Some("cli-demo-..."), ... }

─────────────────────────────────────────────────────────

... (continues with resume, restart, runs, etc.)

╔══════════════════════════════════════════════════════════╗
║ CLI Demo Commands Complete ║
╚══════════════════════════════════════════════════════════╝

Commands demonstrated:
• zart status — Check execution status
• zart pause — Create pause rules
• zart resume — Soft-delete pause rules
• zart pause-list — List all pause rules
• zart restart — Full restart with history preservation
• zart runs — List run history

Background process will be cleaned up automatically.
```

## Key Concepts

### Pause Rules
Pause rules are **scheduling-time controls** — they prevent new tasks from being scheduled without affecting in-flight work. When you run `zart pause`, a rule is inserted into `zart_pause_rules`. The worker checks this table before scheduling the next task and silently stops if a rule matches.

`zart resume` soft-deletes the rules (sets `deleted_at`), allowing scheduling to continue. The execution replays naturally from `zart_steps` — no state loss occurs.

### Run History
Every restart creates a new run row in `zart_execution_runs`. The original run is preserved with its status, and a new run begins with `trigger = 'restart'`. Run history is append-only and fully auditable.

### CLI vs Embedded API
This example demonstrates the **CLI tier** of admin access. The same operations are available programmatically via `DurableScheduler` methods (see `admin-demo` example) or via HTTP through `zart-api`'s `admin_router()`.

## Manual Exploration

While the demo script runs automatically, you can also interact manually:

```bash
# In one terminal: start the long-running execution
DATABASE_URL=postgres://zart:zart@localhost:5432/zart \
cargo run -p example-cli-demo

# Note the execution ID from the output, then in another terminal:
export DATABASE_URL=postgres://zart:zart@localhost:5432/zart
export EXECUTION_ID=<the-id-from-above>

# Check status
cargo run -q -p zart-cli -- status $EXECUTION_ID

# Pause execution
cargo run -q -p zart-cli -- pause --execution-id $EXECUTION_ID --triggered-by me

# List pause rules
cargo run -q -p zart-cli -- pause-list

# Resume
cargo run -q -p zart-cli -- resume --execution-id $EXECUTION_ID --triggered-by me

# Restart and check runs
cargo run -q -p zart-cli -- restart $EXECUTION_ID --payload '{"fail_step":false}'
cargo run -q -p zart-cli -- runs $EXECUTION_ID
```
Loading
Loading