diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..e087f4b --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,51 @@ +name: Rust CI + +on: + push: + branches: [main, master] + pull_request: + branches: [main, master] + +jobs: + build-test: + name: Build & test (${{ matrix.toolchain }}) + runs-on: ubuntu-latest + + strategy: + fail-fast: false + matrix: + toolchain: [stable, beta, nightly] + + steps: + - name: Checkout sources + uses: actions/checkout@v4 + + - name: Install Rust (${{ matrix.toolchain }}) + uses: actions-rs/toolchain@v1 + with: + toolchain: ${{ matrix.toolchain }} + override: true + profile: minimal + components: clippy, rustfmt + + # Re-use cargo build cache for faster CI runs + - name: Cache cargo registry + build + uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: ${{ runner.os }}-cargo-${{ matrix.toolchain }}-${{ hashFiles('**/Cargo.lock') }} + + - name: Check formatting + run: cargo fmt --all -- --check + + - name: Clippy (deny warnings) + run: cargo clippy --all-features -- -D warnings + + - name: Run tests (default features) + run: cargo test --all + + - name: Run tests (no default features) + run: cargo test --no-default-features diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..61868e2 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,46 @@ +# Changelog + +All notable changes to this project will be documented in this file. +This project adheres to [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) +and its version numbers follow [Semantic Versioning](https://semver.org/). + +## [Unreleased] + +### Added +- GitHub Actions workflow for CI (`build`, `clippy`, `fmt`, full test-matrix, + feature power-set). +- `CHANGELOG.md` with Keep-a-Changelog layout. +- Optional `tracing` span around every child process (requires `tracing` feature). +- Cached vector of child `ProcessControlHandler`s for allocation-free broadcast. +- Architecture diagram (Mermaid) in `README.md`. +- `Custom(Box)` variant to `RuntimeControlMessage` for future + extensibility. +- **Fluent `ProcessManagerBuilder`** allowing compile-time safe setup and + configuration. +- `.name("…")` builder method and internal plumbing for custom supervisor + names. + +### Changed +- `process_handle()` now returns `Arc` (cheap cloning, + no double boxing). +- Default `process_name()` no longer allocates; returns `Cow<'static, str>`. +- `ProcessManager` constructors **deprecated** in favour of the new builder; + `new()`, `manual_cleanup()` and `auto_cleanup()` now issue warnings. +- Busy-wait loops in `RuntimeGuard` replaced with `Notify`-based signalling. +- Child panic handling now caught with `catch_unwind`, ensuring supervisor + never hangs. +- All examples, doctests and integration tests migrated to the builder API + (no more deprecation warnings in user-facing code). +- Internal channels refactored to remove extra locks (use of `OnceCell`). + +### Fixed +- Active-child counter accuracy under edge conditions (spawn panics). +- Numerous doc examples updated for new APIs. + +### Removed +- Unused aliases and imports producing compiler warnings. + +--- + +## [0.4.1] – 2024-04-19 +Removed dependency to `async_trait`. diff --git a/Cargo.toml b/Cargo.toml index 3074b35..c40ba73 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "processmanager" description = "manage process lifecycles, graceful shutdown and process faults" -version = "0.4.1" +version = "0.5.0" edition = "2024" authors = ["Marc Riegel "] license = "MIT" @@ -34,3 +34,4 @@ tokio = { version = "1", features = [ ] } log = { version = "0.4", optional = true } tracing = { version = "0.1", optional = true } +once_cell = "1" diff --git a/README.md b/README.md index fd61d64..b47ce20 100644 --- a/README.md +++ b/README.md @@ -1,62 +1,193 @@ # ProcessManager -Manage multiple running services. A ProcessManager collects impl of `Runnable` -and takes over the runtime management like starting, stopping (graceful or in -failure) of services. +`ProcessManager` is a light-weight **Tokio based supervisor** for coordinating +multiple long-running asynchronous tasks (called *processes*). +It takes care of -If one service fails, the manager initiates a graceful shutdown for all other. +* spawning all registered processes, +* forwarding *reload* / *shutdown* commands, +* propagating errors, +* initiating a **graceful shutdown** of the whole tree once any process fails, +* and optionally cleaning up completed children to avoid memory leaks. -# Examples +The crate is completely runtime-agnostic except for its dependency on Tokio +tasks behind the scenes; you are free to use any async code in your own +processes. + +--- + +## Table of Contents + +1. [Features](#features) +2. [Installation](#installation) +3. [Quick-Start](#quick-start) +4. [Built-in Helpers](#built-in-helpers) +5. [Examples](#examples) +6. [High-Level Architecture](#high-level-architecture) +7. [Crate Features](#crate-features) +8. [License](#license) + +--- + +## Features + +| Capability | Description | +| -------------------------------- | ------------------------------------------------------------------------- | +| Graceful shutdown | Propagates a single `shutdown` request to **all** children. | +| Dynamic child management | Add new `Runnable`s even while the manager is already running. | +| Error propagation | A failing child triggers a global shutdown and returns the *first* error. | +| Auto cleanup | Optionally remove finished children to keep memory usage bounded. | +| Hierarchical composition | Managers implement `Runnable` themselves → build arbitrary process trees. | +| Built-in helpers | See [`IdleProcess`](#built-in-helpers) and [`SignalReceiver`](#built-in-helpers). | + +--- + +## Installation + +Add the dependency to your `Cargo.toml`: + +```toml +[dependencies] +processmanager = "0" +``` + +Optional features are listed [below](#crate-features). + +--- + +## Quick-Start + +A minimal program that runs two workers for three seconds and then shuts down +gracefully: ```rust use processmanager::*; +use std::{sync::Arc, time::Duration}; +use tokio::time::{interval, sleep}; -#[tokio::main] -async fn main() { +struct Worker { + id: usize, + guard: Arc, +} - #[derive(Default)] - struct ExampleController { - runtime_guard: RuntimeGuard, +impl Worker { + fn new(id: usize) -> Self { + Self { id, guard: Arc::new(RuntimeGuard::default()) } } +} - impl Runnable for ExampleController { - fn process_start(&self) -> ProcFuture<'_> { - Box::pin(async { - // This can be any type of future like an async streams - let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1)); - - loop { - match self.runtime_guard.tick(interval.tick()).await { - ProcessOperation::Next(_) => println!("work"), - ProcessOperation::Control(RuntimeControlMessage::Shutdown) => { - println!("shutdown"); - break - }, - ProcessOperation::Control(RuntimeControlMessage::Reload) => println!("trigger relead"), - } - } +impl Runnable for Worker { + fn process_start(&self) -> ProcFuture<'_> { + let id = self.id; + let guard = self.guard.clone(); + + Box::pin(async move { + let ticker = guard.runtime_ticker().await; + let mut beat = interval(Duration::from_secs(1)); - Ok(()) - }) - } + loop { + match ticker.tick(beat.tick()).await { + ProcessOperation::Next(_) => println!("worker-{id}: heartbeat"), + ProcessOperation::Control(RuntimeControlMessage::Shutdown) => break, + _ => continue, + } + } + Ok(()) + }) + } - fn process_handle(&self) -> Box { - Box::new(self.runtime_guard.handle()) - } + fn process_handle(&self) -> Arc { + self.guard.handle() } +} - let mut manager = ProcessManager::new(); - manager.insert(ExampleController::default()); +#[tokio::main] +async fn main() { + let manager = ProcessManagerBuilder::default() + .pre_insert(Worker::new(0)) + .pre_insert(Worker::new(1)) + .build(); let handle = manager.process_handle(); - // start all processes - let _ = tokio::spawn(async move { - manager.process_start().await.expect("service start failed"); + tokio::spawn(async move { + manager.process_start().await.expect("manager error"); }); - // Shutdown waits for all services to shutdown gracefully. + sleep(Duration::from_secs(3)).await; handle.shutdown().await; } +``` + +--- + +## Built-in Helpers +| Helper | Purpose | Feature flag | +| ---------------- | ------------------------------------------------------------------------------------------------------------------------ | ------------ | +| `IdleProcess` | Keeps an otherwise empty manager alive until an external shutdown is requested. | — | +| `SignalReceiver` | Listens for `SIGHUP`, `SIGINT`, `SIGTERM`, `SIGQUIT` and converts them into *shutdown / reload* control messages. | `signal` | + +Enable `SignalReceiver` like this: + +```toml +processmanager = { version = "0", features = ["signal"] } ``` + +--- + +## Examples + +Ready-to-run examples live in [`examples/`](examples/) and can be launched with +Cargo: + +| Command | Highlights | +| -------------------------------------------- | ------------------------------------------------------ | +| `cargo run --example simple` | Minimal setup, two workers, graceful shutdown | +| `cargo run --example dynamic_add` | Dynamically add workers while the manager is running | + +Feel free to copy or adapt the code for your own services. + +--- + +## High-Level Architecture + +```mermaid +flowchart TD + subgraph Supervisor + Mgr(ProcessManager) + end + Mgr -->|spawn| Child1[Runnable #1] + Mgr --> Child2[Runnable #2] + Child1 -- control --> Mgr + Child2 -- control --> Mgr + ExtHandle(External Handle) -- shutdown / reload --> Mgr +``` + +* Every `Runnable` gets its own Tokio task. +* A `ProcessControlHandler` allows external code to **shut down** or **reload** + a single process or the whole subtree. +* The first child that ends in `Err(_)` terminates the entire supervisor. + +--- + +## Crate Features + +| Feature | Default? | Description | +| --------- | -------- | ------------------------------------------------------------------- | +| `signal` | no | Activates `builtin::SignalReceiver` for Unix signal handling. | +| `tracing` | no | Emit structured log events via the `tracing` crate. | +| `log` | no | Use the `log` crate for textual logging if `tracing` is disabled. | + +Pick one of `tracing` **or** `log` to avoid duplicate output. + +--- + +## License + +Licensed under either of + +* Apache License, Version 2.0 +* MIT license + +at your option. diff --git a/examples/dynamic_add.rs b/examples/dynamic_add.rs index 7404659..d5f7e0e 100644 --- a/examples/dynamic_add.rs +++ b/examples/dynamic_add.rs @@ -51,14 +51,16 @@ impl Runnable for Worker { println!("worker-{id}: shutting down"); break; } + // absorb any future control messages we don't explicitly handle + ProcessOperation::Control(_) => continue, } } Ok(()) }) } - fn process_handle(&self) -> Box { - Box::new(self.guard.handle()) + fn process_handle(&self) -> Arc { + self.guard.handle() } } @@ -67,8 +69,9 @@ async fn main() { // ------------------------------------------------------------------ // 1. Manager with a single initial worker // ------------------------------------------------------------------ - let mut mgr = ProcessManager::new(); - mgr.insert(Worker::new(0)); + let mgr = ProcessManagerBuilder::default() + .pre_insert(Worker::new(0)) + .build(); // We need to keep access to the manager after starting it, therefore // wrap it in an `Arc` and clone it for the spawning task. diff --git a/examples/simple.rs b/examples/simple.rs index 49e3608..1ddd22a 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -51,14 +51,16 @@ impl Runnable for Worker { println!("worker-{id}: shutting down"); break; } + // absorb any future control messages we don't explicitly handle + ProcessOperation::Control(_) => continue, } } Ok(()) }) } - fn process_handle(&self) -> Box { - Box::new(self.guard.handle()) + fn process_handle(&self) -> Arc { + self.guard.handle() } } @@ -67,9 +69,10 @@ async fn main() { // ----------------------------------------------------------- // 1. Build a manager and register two workers // ----------------------------------------------------------- - let mut manager = ProcessManager::new(); - manager.insert(Worker::new(0)); - manager.insert(Worker::new(1)); + let manager = ProcessManagerBuilder::default() + .pre_insert(Worker::new(0)) + .pre_insert(Worker::new(1)) + .build(); let handle = manager.process_handle(); diff --git a/src/builtin/idle.rs b/src/builtin/idle.rs new file mode 100644 index 0000000..3be9cd2 --- /dev/null +++ b/src/builtin/idle.rs @@ -0,0 +1,65 @@ +//! A no-op [`Runnable`] that just idles until it receives a shutdown request. +//! +//! `IdleProcess` is handy as a “tombstone” child for a [`ProcessManager`] that +//! would otherwise stop immediately because it starts without any real +//! children. By registering an `IdleProcess`, the manager keeps running until +//! an external caller invokes [`ProcessControlHandler::shutdown`]. +//! +//! # Example +//! ```rust,ignore +//! use processmanager::*; +//! +//! // Manager without real children that should stay alive. +//! let mgr = ProcessManagerBuilder::default() +//! .pre_insert(IdleProcess::default()) +//! .build(); +//! ``` +use std::sync::Arc; + +use crate::{ + ProcFuture, ProcessControlHandler, ProcessOperation, Runnable, RuntimeControlMessage, + RuntimeGuard, +}; + +/// A no-op process that simply waits for a shutdown request. +#[derive(Debug, Default)] +pub struct IdleProcess { + runtime_guard: RuntimeGuard, +} + +impl IdleProcess { + /// Create a new idle process. + pub fn new() -> Self { + Self { + runtime_guard: RuntimeGuard::default(), + } + } +} +impl Runnable for IdleProcess { + fn process_start(&self) -> ProcFuture<'_> { + Box::pin(async { + let ticker = self.runtime_guard.runtime_ticker().await; + + loop { + // Sleep for a long time; the ticker wakes us early on control messages. + let sleep = tokio::time::sleep(std::time::Duration::from_secs(3600)); + + match ticker.tick(sleep).await { + ProcessOperation::Next(_) => continue, + ProcessOperation::Control(RuntimeControlMessage::Shutdown) => break, + ProcessOperation::Control(_) => continue, + } + } + + Ok(()) + }) + } + + fn process_handle(&self) -> Arc { + self.runtime_guard.handle() + } + + fn process_name(&self) -> std::borrow::Cow<'static, str> { + std::borrow::Cow::Borrowed("IdleProcess") + } +} diff --git a/src/builtin/mod.rs b/src/builtin/mod.rs new file mode 100644 index 0000000..da92747 --- /dev/null +++ b/src/builtin/mod.rs @@ -0,0 +1,7 @@ +mod idle; +#[cfg(feature = "signal")] +mod signal_receiver; + +pub use idle::IdleProcess; +#[cfg(feature = "signal")] +pub use signal_receiver::SignalReceiver; diff --git a/src/receiver/signal.rs b/src/builtin/signal_receiver.rs similarity index 88% rename from src/receiver/signal.rs rename to src/builtin/signal_receiver.rs index 80a7c2d..1635f3b 100644 --- a/src/receiver/signal.rs +++ b/src/builtin/signal_receiver.rs @@ -6,6 +6,7 @@ use futures::stream::StreamExt as _; use signal_hook::consts::signal::*; use signal_hook::iterator::Handle; use signal_hook_tokio::{Signals, SignalsInfo}; +use std::sync::Arc; use tokio::sync::Mutex; pub struct SignalReceiver { @@ -49,7 +50,7 @@ impl Runnable for SignalReceiver { ProcessOperation::Next(None) => continue, ProcessOperation::Next(Some(signal)) => signal, ProcessOperation::Control(RuntimeControlMessage::Shutdown) => break, - ProcessOperation::Control(RuntimeControlMessage::Reload) => continue, + ProcessOperation::Control(_) => continue, }; // tracing::warn!("Received process signal: {signal:?}"); @@ -74,7 +75,11 @@ impl Runnable for SignalReceiver { }) } - fn process_handle(&self) -> Box { - Box::new(self.runtime_guard.handle()) + fn process_handle(&self) -> Arc { + self.runtime_guard.handle() + } + + fn process_name(&self) -> std::borrow::Cow<'static, str> { + std::borrow::Cow::Borrowed("SignalReceiver") } } diff --git a/src/lib.rs b/src/lib.rs index 79407fa..3c18649 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,12 @@ +#![deny(rustdoc::broken_intra_doc_links)] +pub mod builtin; /// Manage multiple running services. A ProcessManager collects impl of `Runnable` /// and takes over the runtime management like starting, stopping (graceful or in /// failure) of services. /// /// ```rust /// use processmanager::*; +/// use std::sync::Arc; /// /// #[tokio::main] /// async fn main() { @@ -29,6 +32,7 @@ /// break /// }, /// ProcessOperation::Control(RuntimeControlMessage::Reload) => println!("trigger relead"), +/// ProcessOperation::Control(RuntimeControlMessage::Custom(_)) => println!("trigger custom action"), /// } /// } /// @@ -36,13 +40,14 @@ /// }) /// } /// -/// fn process_handle(&self) -> Box { -/// Box::new(self.runtime_guard.handle()) +/// fn process_handle(&self) -> Arc { +/// self.runtime_guard.handle() /// } /// } /// -/// let mut manager = ProcessManager::new(); -/// manager.insert(ExampleController::default()); +/// let manager = ProcessManagerBuilder::default() +/// .pre_insert(ExampleController::default()) +/// .build(); /// /// let handle = manager.process_handle(); /// @@ -58,15 +63,22 @@ /// mod error; mod process_manager; -#[cfg(feature = "signal")] -pub mod receiver; +mod process_manager_builder; mod runtime_guard; mod runtime_handle; mod runtime_process; mod runtime_ticker; +pub use builtin::*; pub use error::*; pub use process_manager::*; +pub use process_manager_builder::*; pub use runtime_guard::*; pub use runtime_handle::*; pub use runtime_process::*; pub use runtime_ticker::*; + +#[cfg(feature = "signal")] +pub mod receiver { + #[deprecated(note = "use `processmanager::builtin::SignalReceiver` instead")] + pub use crate::builtin::SignalReceiver; +} diff --git a/src/process_manager.rs b/src/process_manager.rs index 4c98d36..60ec9b7 100644 --- a/src/process_manager.rs +++ b/src/process_manager.rs @@ -19,59 +19,80 @@ //! # #[derive(Default)] struct MyService; //! # impl Runnable for MyService { //! # fn process_start(&self) -> ProcFuture<'_> { Box::pin(async { Ok(()) }) } -//! # fn process_handle(&self) -> Box { +//! # fn process_handle(&self) -> Arc { //! # unreachable!() //! # } //! # } -//! let mut root = ProcessManager::new(); -//! root.insert(MyService); // add before start +//! let root = ProcessManagerBuilder::default() +//! .pre_insert(MyService) // add before start +//! .build(); //! //! let handle = root.process_handle(); //! tokio::spawn(async move { root.process_start().await.unwrap(); }); //! //! handle.reload().await; // control a running manager //! ``` -use std::sync::{ - Arc, Mutex, - atomic::{AtomicBool, AtomicUsize, Ordering}, +use std::{ + borrow::Cow, + sync::{ + Arc, Mutex, + atomic::{AtomicBool, AtomicUsize, Ordering}, + }, + time::Duration, }; -use tokio::sync::mpsc; +use futures::FutureExt as _; +use once_cell::sync::OnceCell; +use std::panic::AssertUnwindSafe; +use tokio::{ + sync::mpsc, + task::{JoinHandle, JoinSet}, + time::Instant, +}; + +#[cfg(feature = "tracing")] +use tracing::Instrument; use crate::{CtrlFuture, ProcFuture, ProcessControlHandler, Runnable, RuntimeError}; /// Global monotonically increasing identifier for every `ProcessManager`. -static PID: std::sync::OnceLock = std::sync::OnceLock::new(); +static PID: AtomicUsize = AtomicUsize::new(0); /// Metadata kept for each child. struct Child { id: usize, #[allow(dead_code)] - proc: Arc>, + proc: Arc, handle: Arc, + join_handle: Arc>, } -type UnboundedChildCompletionReceiver = - Mutex)>>>; +type ProcessCompletionChannel = + tokio::sync::Mutex)>>; /// Shared state between the handle you pass around, the supervisor task and all /// children. struct Inner { processes: Mutex>, + /// Cached list of `ProcessControlHandler`s for fast broadcast without + /// temporary allocations. + handles: Mutex>>, running: AtomicBool, next_id: AtomicUsize, active: AtomicUsize, // supervisor RECEIVES from here, children (spawn_child) only send completion_tx: mpsc::UnboundedSender<(usize, Result<(), RuntimeError>)>, - completion_rx: UnboundedChildCompletionReceiver, + completion_rx: OnceCell, } /// Groups several [`Runnable`] instances and starts / stops them as a unit. pub struct ProcessManager { id: usize, - pre_start: Vec>>, + pre_start: Vec>, inner: Arc, - auto_cleanup: bool, + /// Optional human-readable name overriding the default `process-manager-`. + pub(crate) custom_name: Option>, + pub(crate) auto_cleanup: bool, } /* ========================================================================== */ @@ -81,9 +102,7 @@ pub struct ProcessManager { impl ProcessManager { /// New manager with auto-cleanup of finished children enabled. pub fn new() -> Self { - let id = PID - .get_or_init(|| AtomicUsize::new(0)) - .fetch_add(1, Ordering::SeqCst); + let id = PID.fetch_add(1, Ordering::SeqCst); let (tx, rx) = mpsc::unbounded_channel(); @@ -92,22 +111,22 @@ impl ProcessManager { pre_start: Vec::new(), inner: Arc::new(Inner { processes: Mutex::new(Vec::new()), + handles: Mutex::new(Vec::new()), running: AtomicBool::new(false), next_id: AtomicUsize::new(0), active: AtomicUsize::new(0), completion_tx: tx, - completion_rx: Mutex::new(Some(rx)), + completion_rx: { + let cell = OnceCell::new(); + let _ = cell.set(tokio::sync::Mutex::new(rx)); + cell + }, }), + custom_name: None, auto_cleanup: true, } } - /// Disable / enable automatic removal of successfully finished children. - pub fn with_auto_cleanup(mut self, v: bool) -> Self { - self.auto_cleanup = v; - self - } - /// Register a child **before** the supervisor is started. /// /// Panics when called after [`process_start`](Runnable::process_start). @@ -117,28 +136,24 @@ impl ProcessManager { "cannot call insert() after manager has started – use add() instead" ); self.pre_start - .push(Arc::new(Box::new(process) as Box)); + .push(Arc::from(Box::new(process) as Box)); } /// Add a child *while* the manager is already running. The child is spawned - /// immediately. Before start-up this behaves the same as [`insert`]. + /// immediately. Before start-up this behaves the same as [`crate::ProcessManager::insert`]. pub fn add(&self, process: impl Runnable) { - let proc: Arc> = Arc::new(Box::new(process) as Box); + let proc: Arc = Arc::from(Box::new(process) as Box); - // Not running yet? → queue for start-up. - if !self.inner.running.load(Ordering::SeqCst) { - let mut guard = self.inner.processes.lock().unwrap(); - guard.push(Child { - id: self.inner.next_id.fetch_add(1, Ordering::SeqCst), - handle: Arc::from(proc.process_handle()), - proc, - }); - return; - } + assert!( + self.inner.running.load(Ordering::SeqCst), + "cannot call add() before manager has started – use insert() instead" + ); // Running → register & spawn immediately. let id = self.inner.next_id.fetch_add(1, Ordering::SeqCst); - let handle = Arc::from(proc.process_handle()); + let handle = proc.process_handle(); + // cache handle immediately + self.inner.handles.lock().unwrap().push(Arc::clone(&handle)); { let mut guard = self.inner.processes.lock().unwrap(); @@ -146,10 +161,9 @@ impl ProcessManager { id, proc: Arc::clone(&proc), handle, + join_handle: Arc::new(spawn_child(id, proc, Arc::clone(&self.inner))), }); } - - spawn_child(id, proc, Arc::clone(&self.inner)); } } @@ -168,40 +182,57 @@ impl Runnable for ProcessManager { Box::pin(async move { inner.running.store(true, Ordering::SeqCst); + let name = self.process_name(); + + #[cfg(feature = "tracing")] + ::tracing::info!("Start process manager {name}"); + #[cfg(all(not(feature = "tracing"), feature = "log"))] + ::log::info!("Start process manager {name}"); + #[cfg(all(not(feature = "tracing"), not(feature = "log")))] + eprintln!("Start process manager {name}"); + /* -- spawn every child registered before start() ---------------- */ for proc in initial { let id = inner.next_id.fetch_add(1, Ordering::SeqCst); - let handle = Arc::from(proc.process_handle()); + let handle = proc.process_handle(); { let mut g = inner.processes.lock().unwrap(); g.push(Child { id, proc: Arc::clone(&proc), - handle, + handle: Arc::clone(&handle), + join_handle: Arc::new(spawn_child(id, proc, Arc::clone(&inner))), }); + inner.handles.lock().unwrap().push(handle); } - spawn_child(id, proc, Arc::clone(&inner)); } /* -- supervisor event-loop -------------------------------------- */ - let mut completion_rx = inner + let completion_rx = inner .completion_rx - .lock() - .unwrap() - .take() + .get() .expect("process_start called twice"); + let mut completion_rx = completion_rx.lock().await; let mut first_error: Option = None; loop { + #[cfg(feature = "tracing")] + { + for child in self.inner.processes.lock().unwrap().iter() { + ::tracing::info!( + "Process {}: running={:?}", + child.proc.process_name(), + !child.join_handle.is_finished() + ); + } + } + // exit criterion: no active children left if inner.active.load(Ordering::SeqCst) == 0 { inner.running.store(false, Ordering::SeqCst); - return match first_error { - Some(e) => Err(e), - None => Ok(()), - }; + break; } match completion_rx.recv().await { @@ -211,6 +242,12 @@ impl Runnable for ProcessManager { if auto_cleanup { let mut g = inner.processes.lock().unwrap(); g.retain(|c| c.id != cid); + // also remove cached handle + inner + .handles + .lock() + .unwrap() + .retain(|h| g.iter().any(|c| Arc::ptr_eq(&c.handle, h))); } } Err(err) => { @@ -230,15 +267,40 @@ impl Runnable for ProcessManager { } } } + + match first_error { + Some(error) => { + #[cfg(feature = "tracing")] + ::tracing::info!("Shutdown process manager {name} with error: {error:?}"); + #[cfg(all(not(feature = "tracing"), feature = "log"))] + ::log::info!("Shutdown process manager {name} with error: {error:?}"); + #[cfg(all(not(feature = "tracing"), not(feature = "log")))] + eprintln!("Shutdown process manager {name} with error: {error:?}"); + Err(error) + } + None => { + #[cfg(feature = "tracing")] + ::tracing::info!("Shutdown process manager {name}"); + #[cfg(all(not(feature = "tracing"), feature = "log"))] + ::log::info!("Shutdown process manager {name}"); + #[cfg(all(not(feature = "tracing"), not(feature = "log")))] + eprintln!("Shutdown process manager {name}"); + Ok(()) + } + } }) } - fn process_name(&self) -> String { - format!("process-manager-{}", self.id) + fn process_name(&self) -> Cow<'static, str> { + if let Some(ref name) = self.custom_name { + name.clone() + } else { + format!("process-manager-{}", self.id).into() + } } - fn process_handle(&self) -> Box { - Box::new(Handle { + fn process_handle(&self) -> Arc { + Arc::new(Handle { inner: Arc::clone(&self.inner), }) } @@ -262,17 +324,60 @@ impl ProcessControlHandler for Handle { fn shutdown(&self) -> CtrlFuture<'_> { let inner = Arc::clone(&self.inner); Box::pin(async move { + let mut set = JoinSet::new(); + let handles = { let guard = inner.processes.lock().unwrap(); guard .iter() - .map(|c| Arc::clone(&c.handle)) + .map(|child| { + ( + child.proc.process_name(), + child.handle.clone(), + child.join_handle.clone(), + ) + }) .collect::>() }; - for h in handles { - h.shutdown().await; + for (name, h, jh) in handles { + set.spawn(async move { + #[cfg(feature = "tracing")] + ::tracing::info!(name = %name, "Initiate shutdown"); + #[cfg(all(not(feature = "tracing"), feature = "log"))] + ::log::info!("Initiate shutdown {name}"); + #[cfg(all(not(feature = "tracing"), not(feature = "log")))] + eprintln!("Initiate shutdown {name}"); + + let dur = Duration::from_secs(30); + let now = Instant::now(); + let timeout = tokio::time::timeout(dur, h.shutdown()).await; + let _elapsed = now.elapsed(); + + match timeout { + Ok(_) => { + // Shutdown ok + #[cfg(feature = "tracing")] + ::tracing::info!(name = %name, elapsed = ?_elapsed, "Shutdown completed"); + #[cfg(all(not(feature = "tracing"), feature = "log"))] + ::log::info!("Process {name}: shutdown completed"); + #[cfg(all(not(feature = "tracing"), not(feature = "log")))] + eprintln!("Process {name}: shutdown completed"); + } + Err(_) => { + jh.abort(); + // Timed out. + #[cfg(feature = "tracing")] + ::tracing::info!(name = %name, elapsed = ?_elapsed, "Shutdown timed out"); + #[cfg(all(not(feature = "tracing"), feature = "log"))] + ::log::info!("Process {name}: Shutdown timed out after {dur:?}"); + #[cfg(all(not(feature = "tracing"), not(feature = "log")))] + eprintln!("Process {name}: Shutdown timed out after {dur:?}"); + } + } + }); } + let _ = set.join_all().await; }) } @@ -280,11 +385,8 @@ impl ProcessControlHandler for Handle { let inner = Arc::clone(&self.inner); Box::pin(async move { let handles = { - let guard = inner.processes.lock().unwrap(); - guard - .iter() - .map(|c| Arc::clone(&c.handle)) - .collect::>() + let guard = inner.handles.lock().unwrap(); + guard.clone() }; for h in handles { @@ -298,41 +400,65 @@ impl ProcessControlHandler for Handle { /* Helper – spawn a single child */ /* ========================================================================== */ -fn spawn_child(id: usize, proc: Arc>, inner: Arc) { +fn spawn_child(id: usize, proc: Arc, inner: Arc) -> JoinHandle<()> { + // increment *before* spawning the task – guarantees the counter is in sync inner.active.fetch_add(1, Ordering::SeqCst); let tx = inner.completion_tx.clone(); tokio::spawn(async move { + // Task already accounted for in the caller. let name = proc.process_name(); - #[cfg(feature = "tracing")] - ::tracing::info!("Start process {name}"); + ::tracing::info!(name = %name, "Start process"); #[cfg(all(not(feature = "tracing"), feature = "log"))] ::log::info!("Start process {name}"); #[cfg(all(not(feature = "tracing"), not(feature = "log")))] eprintln!("Start process {name}"); - let res = proc.process_start().await; + // run the child and convert a panic into an `Err` so the supervisor + // can react instead of hanging forever. + let catch_fut = AssertUnwindSafe(proc.process_start()).catch_unwind(); + + #[cfg(feature = "tracing")] + let catch_result = { + let span = ::tracing::info_span!("process", name = %name); + catch_fut.instrument(span).await + }; + #[cfg(not(feature = "tracing"))] + let catch_result = { catch_fut.await }; + + let res = catch_result.unwrap_or_else(|panic| { + let msg = if let Some(s) = panic.downcast_ref::<&str>() { + (*s).to_string() + } else if let Some(s) = panic.downcast_ref::() { + s.clone() + } else { + "unknown panic".to_string() + }; + Err(RuntimeError::Internal { + message: format!("process panicked: {msg}"), + }) + }); match &res { Ok(_) => { #[cfg(feature = "tracing")] - ::tracing::info!("Process {name} stopped"); + ::tracing::info!(name = %name, "Process stopped"); #[cfg(all(not(feature = "tracing"), feature = "log"))] - ::log::info!("Process {name} stopped"); + ::log::info!("Process {name}: stopped"); #[cfg(all(not(feature = "tracing"), not(feature = "log")))] - eprintln!("Process {name} stopped"); + eprintln!("Process {name}: stopped"); } Err(err) => { #[cfg(feature = "tracing")] - ::tracing::error!("Process {name} failed: {err:?}"); + ::tracing::error!(name = %name, "Process failed: {err:?}"); #[cfg(all(not(feature = "tracing"), feature = "log"))] - ::log::error!("Process {name} failed: {err:?}"); + ::log::error!("Process {name}: failed {err:?}"); #[cfg(all(not(feature = "tracing"), not(feature = "log")))] - eprintln!("Process {name} failed: {err:?}"); + eprintln!("Process {name}: failed {err:?}"); } } let _ = tx.send((id, res)); // ignore error if supervisor already gone - }); + }) } diff --git a/src/process_manager_builder.rs b/src/process_manager_builder.rs new file mode 100644 index 0000000..e0f1c58 --- /dev/null +++ b/src/process_manager_builder.rs @@ -0,0 +1,90 @@ +//! Fluent builder for constructing a [`ProcessManager`]. +//! +//! The main goal of this helper is to get rid of the “wrong phase” panics +//! (`insert()` vs. `add()`) by making the set-up phase explicit. All children +//! that are known at construction time are registered on the builder; +//! afterwards `build()` hands you a fully configured manager that can be +//! started immediately. +//! +//! Further configuration knobs (metrics, names, tracing options…) can be added +//! here without changing the `ProcessManager` API again. +use crate::{ProcessManager, Runnable}; +use std::borrow::Cow; + +type BoxedInitializer = Box; + +/// Build-time configuration for a [`ProcessManager`]. +/// +/// ```rust +/// # use processmanager::*; +/// # use std::sync::Arc; +/// # #[derive(Default)] struct MySvc; +/// # impl Runnable for MySvc { +/// # fn process_start(&self) -> ProcFuture<'_> { Box::pin(async { Ok(()) }) } +/// # fn process_handle(&self) -> Arc { unreachable!() } +/// # } +/// let mgr = ProcessManagerBuilder::default() +/// .auto_cleanup(true) +/// .pre_insert(MySvc) // add before start +/// .build(); +/// ``` +#[derive(Default)] +pub struct ProcessManagerBuilder { + /// Whether to clean up finished children automatically. + auto_cleanup: bool, + /// Optional custom name for the supervisor. + custom_name: Option>, + /// Deferred actions that will be executed against the manager right before + /// it is returned to the caller. + initialisers: Vec, +} + +impl ProcessManagerBuilder { + /// Create a new builder with defaults (`auto_cleanup = true`, no children). + pub fn new() -> Self { + Self::default() + } + + /// Enable / disable automatic clean-up of finished children. + pub fn auto_cleanup(mut self, enabled: bool) -> Self { + self.auto_cleanup = enabled; + self + } + + /// Set a custom human-readable name for the supervisor. + pub fn name>>(mut self, name: S) -> Self { + self.custom_name = Some(name.into()); + self + } + + /// Register a child that should be present right from the start. + /// + /// This is the compile-time safe counterpart to `ProcessManager::insert`. + pub fn pre_insert(mut self, process: impl Runnable) -> Self { + self.initialisers + .push(Box::new(move |mgr: &mut ProcessManager| { + // At this point the manager is *not* running yet so `insert` + // is always the correct method. + mgr.insert(process); + })); + self + } + + /// Finalise the configuration and return a ready-to-use [`ProcessManager`]. + pub fn build(self) -> ProcessManager { + // Construct base manager according to the chosen clean-up strategy. + let mut mgr = ProcessManager::new(); + + mgr.auto_cleanup = self.auto_cleanup; + + // Apply configuration knobs that require direct field access. + mgr.custom_name = self.custom_name; + + // Run all queued initialisers (child registrations, …). + for init in self.initialisers { + init(&mut mgr); + } + + mgr + } +} diff --git a/src/receiver/mod.rs b/src/receiver/mod.rs deleted file mode 100644 index bb39178..0000000 --- a/src/receiver/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub mod signal; - -#[cfg(feature = "signal")] -pub use signal::SignalReceiver; diff --git a/src/runtime_guard.rs b/src/runtime_guard.rs index 25f1788..143c775 100644 --- a/src/runtime_guard.rs +++ b/src/runtime_guard.rs @@ -2,8 +2,9 @@ use std::sync::Arc; use tokio::sync::Mutex; -use crate::{RuntimeControlMessage, RuntimeHandle, RuntimeTicker}; +use crate::{ProcessControlHandler, RuntimeControlMessage, RuntimeHandle, RuntimeTicker}; +#[derive(Debug, Clone)] pub struct RuntimeGuard { inner: Arc, } @@ -70,8 +71,10 @@ impl RuntimeGuard { !closed } - pub fn handle(&self) -> RuntimeHandle { - RuntimeHandle::new(Arc::clone(&self.inner.control_ch_sender)) + pub fn handle(&self) -> Arc { + Arc::new(RuntimeHandle::new(Arc::clone( + &self.inner.control_ch_sender, + ))) } /// Busy-wait helper for tests / demos. diff --git a/src/runtime_handle.rs b/src/runtime_handle.rs index 10f9b18..6cf96cb 100644 --- a/src/runtime_handle.rs +++ b/src/runtime_handle.rs @@ -4,6 +4,7 @@ use tokio::sync::Mutex; use crate::{CtrlFuture, ProcessControlHandler, RuntimeControlMessage}; +#[derive(Debug, Clone)] pub struct RuntimeHandle { control_ch: Arc>>, } diff --git a/src/runtime_process.rs b/src/runtime_process.rs index 7fbaa9c..d674399 100644 --- a/src/runtime_process.rs +++ b/src/runtime_process.rs @@ -1,6 +1,8 @@ use super::RuntimeError; +use std::borrow::Cow; use std::future::Future; use std::pin::Pin; +use std::sync::Arc; /// Boxed future returned by [`Runnable::process_start`]. pub type ProcFuture<'a> = Pin> + Send + 'a>>; @@ -15,12 +17,12 @@ where fn process_start(&self) -> ProcFuture<'_>; /// Human-readable name, used for logging only. - fn process_name(&self) -> String { - std::any::type_name::().to_string() + fn process_name(&self) -> Cow<'static, str> { + Cow::Borrowed(std::any::type_name::()) } /// Obtain a handle for shutdown / reload signalling. - fn process_handle(&self) -> Box; + fn process_handle(&self) -> Arc; } /// Boxed future returned by [`ProcessControlHandler`] control methods. @@ -41,4 +43,35 @@ pub enum ProcessOperation { pub enum RuntimeControlMessage { Reload, Shutdown, + /// User-defined messages for future extensibility. + Custom(Box), +} + +impl Clone for RuntimeControlMessage { + fn clone(&self) -> Self { + match self { + RuntimeControlMessage::Reload => RuntimeControlMessage::Reload, + RuntimeControlMessage::Shutdown => RuntimeControlMessage::Shutdown, + RuntimeControlMessage::Custom(_) => { + panic!("Cloning `Custom` control messages is not supported") + } + } + } +} + +impl Runnable for Arc +where + R: Runnable + ?Sized, +{ + fn process_start(&self) -> ProcFuture<'_> { + R::process_start(self) + } + + fn process_handle(&self) -> Arc { + R::process_handle(self) + } + + fn process_name(&self) -> Cow<'static, str> { + R::process_name(self) + } } diff --git a/tests/builder.rs b/tests/builder.rs new file mode 100644 index 0000000..1f7033f --- /dev/null +++ b/tests/builder.rs @@ -0,0 +1,52 @@ +//! Integration-style tests for the public `ProcessManagerBuilder` API. +// +//! The goal is to ensure that compile-time plumbing between the builder, the +//! resulting `ProcessManager` instance and its runtime metadata works as +//! expected. + +use std::sync::Arc; + +use processmanager::{ + CtrlFuture, ProcFuture, ProcessControlHandler, ProcessManagerBuilder, Runnable, RuntimeError, +}; + +/// A no-op service that terminates immediately and successfully. +/// +/// We mostly need this to satisfy the `pre_insert` type parameter; the process +/// never actually runs inside the test. +#[derive(Default)] +struct NoopSvc; + +impl Runnable for NoopSvc { + fn process_start(&self) -> ProcFuture<'_> { + Box::pin(async { Ok::<(), RuntimeError>(()) }) + } + + fn process_handle(&self) -> Arc { + // The test never calls control methods, so a stub handle is sufficient. + Arc::new(StubHandle) + } +} + +/// A minimal `ProcessControlHandler` implementation that does nothing. +struct StubHandle; + +impl ProcessControlHandler for StubHandle { + fn shutdown(&self) -> CtrlFuture<'_> { + Box::pin(async {}) + } + + fn reload(&self) -> CtrlFuture<'_> { + Box::pin(async {}) + } +} + +#[test] +fn builder_sets_custom_name() { + let mgr = ProcessManagerBuilder::default() + .name("my-supervisor") + .pre_insert(NoopSvc) + .build(); + + assert_eq!(mgr.process_name(), "my-supervisor"); +} diff --git a/tests/integration.rs b/tests/integration.rs index 864d547..1a7ec4f 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -3,6 +3,7 @@ use processmanager::{ RuntimeControlMessage, RuntimeError, RuntimeGuard, }; use std::ops::Add; +use std::sync::Arc; use std::time::Duration; use tokio::sync::oneshot::channel; use tokio::time::timeout; @@ -47,6 +48,8 @@ impl Runnable for ExampleController { ProcessOperation::Control(RuntimeControlMessage::Reload) => { println!("trigger reload {}", self.id) } + // absorb any future control messages we don't explicitly handle + ProcessOperation::Control(_) => continue, } } @@ -54,8 +57,8 @@ impl Runnable for ExampleController { }) } - fn process_handle(&self) -> Box { - Box::new(self.runtime_guard.handle()) + fn process_handle(&self) -> Arc { + self.runtime_guard.handle() } }