From dcbe1db5b04d7a80bd37f65a74881bec41817716 Mon Sep 17 00:00:00 2001 From: Marc Riegel Date: Mon, 7 Jul 2025 13:03:56 +0200 Subject: [PATCH 01/22] Change process_handle return type to Arc instead of Box Refactor Runnable trait implementations and related code to return Arc instead of Box for improved shared ownership and consistency across the codebase. --- examples/dynamic_add.rs | 4 ++-- examples/simple.rs | 4 ++-- src/lib.rs | 5 +++-- src/process_manager.rs | 22 +++++++++++----------- src/receiver/signal.rs | 5 +++-- src/runtime_guard.rs | 9 ++++++--- src/runtime_handle.rs | 1 + src/runtime_process.rs | 3 ++- tests/integration.rs | 5 +++-- 9 files changed, 33 insertions(+), 25 deletions(-) diff --git a/examples/dynamic_add.rs b/examples/dynamic_add.rs index 7404659..8ff6d78 100644 --- a/examples/dynamic_add.rs +++ b/examples/dynamic_add.rs @@ -57,8 +57,8 @@ impl Runnable for Worker { }) } - fn process_handle(&self) -> Box { - Box::new(self.guard.handle()) + fn process_handle(&self) -> Arc { + self.guard.handle() } } diff --git a/examples/simple.rs b/examples/simple.rs index 49e3608..0c1b9f1 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -57,8 +57,8 @@ impl Runnable for Worker { }) } - fn process_handle(&self) -> Box { - Box::new(self.guard.handle()) + fn process_handle(&self) -> Arc { + self.guard.handle() } } diff --git a/src/lib.rs b/src/lib.rs index 79407fa..971c2c8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ /// /// ```rust /// use processmanager::*; +/// use std::sync::Arc; /// /// #[tokio::main] /// async fn main() { @@ -36,8 +37,8 @@ /// }) /// } /// -/// fn process_handle(&self) -> Box { -/// Box::new(self.runtime_guard.handle()) +/// fn process_handle(&self) -> Arc { +/// self.runtime_guard.handle() /// } /// } /// diff --git a/src/process_manager.rs b/src/process_manager.rs index 4c98d36..f91c987 100644 --- a/src/process_manager.rs +++ b/src/process_manager.rs @@ -19,7 +19,7 @@ //! # #[derive(Default)] struct MyService; //! # impl Runnable for MyService { //! # fn process_start(&self) -> ProcFuture<'_> { Box::pin(async { Ok(()) }) } -//! # fn process_handle(&self) -> Box { +//! # fn process_handle(&self) -> Arc { //! # unreachable!() //! # } //! # } @@ -47,7 +47,7 @@ static PID: std::sync::OnceLock = std::sync::OnceLock::new(); struct Child { id: usize, #[allow(dead_code)] - proc: Arc>, + proc: Arc, handle: Arc, } @@ -69,7 +69,7 @@ struct Inner { /// Groups several [`Runnable`] instances and starts / stops them as a unit. pub struct ProcessManager { id: usize, - pre_start: Vec>>, + pre_start: Vec>, inner: Arc, auto_cleanup: bool, } @@ -117,20 +117,20 @@ impl ProcessManager { "cannot call insert() after manager has started – use add() instead" ); self.pre_start - .push(Arc::new(Box::new(process) as Box)); + .push(Arc::from(Box::new(process) as Box)); } /// Add a child *while* the manager is already running. The child is spawned /// immediately. Before start-up this behaves the same as [`insert`]. pub fn add(&self, process: impl Runnable) { - let proc: Arc> = Arc::new(Box::new(process) as Box); + let proc: Arc = Arc::from(Box::new(process) as Box); // Not running yet? → queue for start-up. if !self.inner.running.load(Ordering::SeqCst) { let mut guard = self.inner.processes.lock().unwrap(); guard.push(Child { id: self.inner.next_id.fetch_add(1, Ordering::SeqCst), - handle: Arc::from(proc.process_handle()), + handle: proc.process_handle(), proc, }); return; @@ -138,7 +138,7 @@ impl ProcessManager { // Running → register & spawn immediately. let id = self.inner.next_id.fetch_add(1, Ordering::SeqCst); - let handle = Arc::from(proc.process_handle()); + let handle = proc.process_handle(); { let mut guard = self.inner.processes.lock().unwrap(); @@ -171,7 +171,7 @@ impl Runnable for ProcessManager { /* -- spawn every child registered before start() ---------------- */ for proc in initial { let id = inner.next_id.fetch_add(1, Ordering::SeqCst); - let handle = Arc::from(proc.process_handle()); + let handle = proc.process_handle(); { let mut g = inner.processes.lock().unwrap(); @@ -237,8 +237,8 @@ impl Runnable for ProcessManager { format!("process-manager-{}", self.id) } - fn process_handle(&self) -> Box { - Box::new(Handle { + fn process_handle(&self) -> Arc { + Arc::new(Handle { inner: Arc::clone(&self.inner), }) } @@ -298,7 +298,7 @@ impl ProcessControlHandler for Handle { /* Helper – spawn a single child */ /* ========================================================================== */ -fn spawn_child(id: usize, proc: Arc>, inner: Arc) { +fn spawn_child(id: usize, proc: Arc, inner: Arc) { inner.active.fetch_add(1, Ordering::SeqCst); let tx = inner.completion_tx.clone(); diff --git a/src/receiver/signal.rs b/src/receiver/signal.rs index 80a7c2d..54da141 100644 --- a/src/receiver/signal.rs +++ b/src/receiver/signal.rs @@ -6,6 +6,7 @@ use futures::stream::StreamExt as _; use signal_hook::consts::signal::*; use signal_hook::iterator::Handle; use signal_hook_tokio::{Signals, SignalsInfo}; +use std::sync::Arc; use tokio::sync::Mutex; pub struct SignalReceiver { @@ -74,7 +75,7 @@ impl Runnable for SignalReceiver { }) } - fn process_handle(&self) -> Box { - Box::new(self.runtime_guard.handle()) + fn process_handle(&self) -> Arc { + self.runtime_guard.handle() } } diff --git a/src/runtime_guard.rs b/src/runtime_guard.rs index 25f1788..143c775 100644 --- a/src/runtime_guard.rs +++ b/src/runtime_guard.rs @@ -2,8 +2,9 @@ use std::sync::Arc; use tokio::sync::Mutex; -use crate::{RuntimeControlMessage, RuntimeHandle, RuntimeTicker}; +use crate::{ProcessControlHandler, RuntimeControlMessage, RuntimeHandle, RuntimeTicker}; +#[derive(Debug, Clone)] pub struct RuntimeGuard { inner: Arc, } @@ -70,8 +71,10 @@ impl RuntimeGuard { !closed } - pub fn handle(&self) -> RuntimeHandle { - RuntimeHandle::new(Arc::clone(&self.inner.control_ch_sender)) + pub fn handle(&self) -> Arc { + Arc::new(RuntimeHandle::new(Arc::clone( + &self.inner.control_ch_sender, + ))) } /// Busy-wait helper for tests / demos. diff --git a/src/runtime_handle.rs b/src/runtime_handle.rs index 10f9b18..6cf96cb 100644 --- a/src/runtime_handle.rs +++ b/src/runtime_handle.rs @@ -4,6 +4,7 @@ use tokio::sync::Mutex; use crate::{CtrlFuture, ProcessControlHandler, RuntimeControlMessage}; +#[derive(Debug, Clone)] pub struct RuntimeHandle { control_ch: Arc>>, } diff --git a/src/runtime_process.rs b/src/runtime_process.rs index 7fbaa9c..a43135b 100644 --- a/src/runtime_process.rs +++ b/src/runtime_process.rs @@ -1,6 +1,7 @@ use super::RuntimeError; use std::future::Future; use std::pin::Pin; +use std::sync::Arc; /// Boxed future returned by [`Runnable::process_start`]. pub type ProcFuture<'a> = Pin> + Send + 'a>>; @@ -20,7 +21,7 @@ where } /// Obtain a handle for shutdown / reload signalling. - fn process_handle(&self) -> Box; + fn process_handle(&self) -> Arc; } /// Boxed future returned by [`ProcessControlHandler`] control methods. diff --git a/tests/integration.rs b/tests/integration.rs index 864d547..6a29a32 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -3,6 +3,7 @@ use processmanager::{ RuntimeControlMessage, RuntimeError, RuntimeGuard, }; use std::ops::Add; +use std::sync::Arc; use std::time::Duration; use tokio::sync::oneshot::channel; use tokio::time::timeout; @@ -54,8 +55,8 @@ impl Runnable for ExampleController { }) } - fn process_handle(&self) -> Box { - Box::new(self.runtime_guard.handle()) + fn process_handle(&self) -> Arc { + self.runtime_guard.handle() } } From a0e9012b2b137df3e125ece5b983f1e4e28114a3 Mon Sep 17 00:00:00 2001 From: Marc Riegel Date: Mon, 7 Jul 2025 13:31:08 +0200 Subject: [PATCH 02/22] Handle and ignore unknown control messages in workers and signal receiver Refactor ProcessManager PID initialization and add manual/auto cleanup constructors Add Custom variant to RuntimeControlMessage for future extensibility --- examples/dynamic_add.rs | 2 ++ examples/simple.rs | 2 ++ src/process_manager.rs | 23 +++++++++++++++-------- src/receiver/signal.rs | 2 +- src/runtime_process.rs | 2 ++ tests/integration.rs | 2 ++ 6 files changed, 24 insertions(+), 9 deletions(-) diff --git a/examples/dynamic_add.rs b/examples/dynamic_add.rs index 8ff6d78..6923f60 100644 --- a/examples/dynamic_add.rs +++ b/examples/dynamic_add.rs @@ -51,6 +51,8 @@ impl Runnable for Worker { println!("worker-{id}: shutting down"); break; } + // absorb any future control messages we don't explicitly handle + ProcessOperation::Control(_) => continue, } } Ok(()) diff --git a/examples/simple.rs b/examples/simple.rs index 0c1b9f1..f3ed6cc 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -51,6 +51,8 @@ impl Runnable for Worker { println!("worker-{id}: shutting down"); break; } + // absorb any future control messages we don't explicitly handle + ProcessOperation::Control(_) => continue, } } Ok(()) diff --git a/src/process_manager.rs b/src/process_manager.rs index f91c987..79242c4 100644 --- a/src/process_manager.rs +++ b/src/process_manager.rs @@ -41,7 +41,7 @@ use tokio::sync::mpsc; use crate::{CtrlFuture, ProcFuture, ProcessControlHandler, Runnable, RuntimeError}; /// Global monotonically increasing identifier for every `ProcessManager`. -static PID: std::sync::OnceLock = std::sync::OnceLock::new(); +static PID: AtomicUsize = AtomicUsize::new(0); /// Metadata kept for each child. struct Child { @@ -81,9 +81,7 @@ pub struct ProcessManager { impl ProcessManager { /// New manager with auto-cleanup of finished children enabled. pub fn new() -> Self { - let id = PID - .get_or_init(|| AtomicUsize::new(0)) - .fetch_add(1, Ordering::SeqCst); + let id = PID.fetch_add(1, Ordering::SeqCst); let (tx, rx) = mpsc::unbounded_channel(); @@ -102,10 +100,19 @@ impl ProcessManager { } } - /// Disable / enable automatic removal of successfully finished children. - pub fn with_auto_cleanup(mut self, v: bool) -> Self { - self.auto_cleanup = v; - self + /// Create a manager that keeps finished children (no automatic cleanup). + /// + /// This is the counterpart to the default [`new`] constructor which + /// _removes_ children automatically once they exit successfully. + pub fn manual_cleanup() -> Self { + let mut mgr = Self::new(); + mgr.auto_cleanup = false; + mgr + } + + /// Create a manager with automatic cleanup of finished children (alias for [`new`]). + pub fn auto_cleanup() -> Self { + Self::new() } /// Register a child **before** the supervisor is started. diff --git a/src/receiver/signal.rs b/src/receiver/signal.rs index 54da141..f484916 100644 --- a/src/receiver/signal.rs +++ b/src/receiver/signal.rs @@ -50,7 +50,7 @@ impl Runnable for SignalReceiver { ProcessOperation::Next(None) => continue, ProcessOperation::Next(Some(signal)) => signal, ProcessOperation::Control(RuntimeControlMessage::Shutdown) => break, - ProcessOperation::Control(RuntimeControlMessage::Reload) => continue, + ProcessOperation::Control(_) => continue, }; // tracing::warn!("Received process signal: {signal:?}"); diff --git a/src/runtime_process.rs b/src/runtime_process.rs index a43135b..06592a6 100644 --- a/src/runtime_process.rs +++ b/src/runtime_process.rs @@ -42,4 +42,6 @@ pub enum ProcessOperation { pub enum RuntimeControlMessage { Reload, Shutdown, + /// User-defined messages for future extensibility. + Custom(Box), } diff --git a/tests/integration.rs b/tests/integration.rs index 6a29a32..1a7ec4f 100644 --- a/tests/integration.rs +++ b/tests/integration.rs @@ -48,6 +48,8 @@ impl Runnable for ExampleController { ProcessOperation::Control(RuntimeControlMessage::Reload) => { println!("trigger reload {}", self.id) } + // absorb any future control messages we don't explicitly handle + ProcessOperation::Control(_) => continue, } } From e93eb82368d5e92fc7bc2d58aa1ae4c61a1e9010 Mon Sep 17 00:00:00 2001 From: Marc Riegel Date: Mon, 7 Jul 2025 13:45:52 +0200 Subject: [PATCH 03/22] Handle custom control messages and catch panics in child processes --- src/lib.rs | 1 + src/process_manager.rs | 20 +++++++++++++++++++- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 971c2c8..b17cee8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,6 +30,7 @@ /// break /// }, /// ProcessOperation::Control(RuntimeControlMessage::Reload) => println!("trigger relead"), +/// ProcessOperation::Control(RuntimeControlMessage::Custom(_)) => println!("trigger custom action"), /// } /// } /// diff --git a/src/process_manager.rs b/src/process_manager.rs index 79242c4..b2c6551 100644 --- a/src/process_manager.rs +++ b/src/process_manager.rs @@ -36,6 +36,8 @@ use std::sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, }; +use futures::FutureExt as _; +use std::panic::AssertUnwindSafe; use tokio::sync::mpsc; use crate::{CtrlFuture, ProcFuture, ProcessControlHandler, Runnable, RuntimeError}; @@ -319,7 +321,23 @@ fn spawn_child(id: usize, proc: Arc, inner: Arc) { #[cfg(all(not(feature = "tracing"), not(feature = "log")))] eprintln!("Start process {name}"); - let res = proc.process_start().await; + // run the child and convert a panic into an `Err` so the supervisor + // can react instead of hanging forever. + let res = AssertUnwindSafe(proc.process_start()) + .catch_unwind() + .await + .unwrap_or_else(|panic| { + let msg = if let Some(s) = panic.downcast_ref::<&str>() { + (*s).to_string() + } else if let Some(s) = panic.downcast_ref::() { + s.clone() + } else { + "unknown panic".to_string() + }; + Err(RuntimeError::Internal { + message: format!("process panicked: {msg}"), + }) + }); match &res { Ok(_) => { From efcf91ba37bdde5f0cf0595438c998671c079b67 Mon Sep 17 00:00:00 2001 From: Marc Riegel Date: Mon, 7 Jul 2025 14:29:11 +0200 Subject: [PATCH 04/22] Refactor RuntimeGuard to improve shutdown handling and ticker support - Add lock-free shutdown flag and notification for efficient shutdown waiting - Implement fan-out task to forward control messages to a single RuntimeTicker - Provide ProcessControlHandler implementation for RuntimeGuard handle - Simplify RuntimeTicker creation and sender cloning - Add Clone impl for RuntimeControlMessage with panic for unsupported Custom variant --- src/runtime_guard.rs | 158 ++++++++++++++++++++++++++++------------- src/runtime_process.rs | 12 ++++ src/runtime_ticker.rs | 45 ++++++++---- 3 files changed, 152 insertions(+), 63 deletions(-) diff --git a/src/runtime_guard.rs b/src/runtime_guard.rs index 143c775..1bca783 100644 --- a/src/runtime_guard.rs +++ b/src/runtime_guard.rs @@ -1,86 +1,114 @@ -use std::sync::Arc; - -use tokio::sync::Mutex; - -use crate::{ProcessControlHandler, RuntimeControlMessage, RuntimeHandle, RuntimeTicker}; - -#[derive(Debug, Clone)] +use std::sync::{ + Arc, + atomic::{AtomicBool, Ordering}, +}; + +use tokio::sync::{Mutex, Notify, mpsc}; + +use crate::{CtrlFuture, ProcessControlHandler, RuntimeControlMessage, RuntimeTicker}; + +/// Cheap run-state guard for long-running processes. +/// +/// A `RuntimeGuard` +/// * acts as the fan-out hub for *reload* / *shutdown* control messages, +/// * provides the optional [`RuntimeTicker`] helper, +/// * offers cheap, lock-free `is_running()` checks, and +/// * allows external code to wait for graceful shutdown without busy-loops. pub struct RuntimeGuard { inner: Arc, } -#[derive(Debug)] struct Inner { - runtime_ticker_ch_sender: Arc>>>, - control_ch_sender: Arc>>, + /// Central control channel every [`ProcessControlHandler`] writes to. + control_tx: Mutex>, + + /// Filled lazily once a ticker is requested. + ticker_tx: Mutex>>, + + /// `true` while the process **should** keep running. + shutdown: AtomicBool, + + /// Notifies waiters when `shutdown` flips to `false`. + notify: Notify, } -// SAFETY: All interior mutability is protected by `tokio::sync::Mutex`, so -// `&RuntimeGuard` can be safely shared between threads. +// SAFETY: interior mutability is protected by async primitives. unsafe impl Send for RuntimeGuard {} unsafe impl Sync for RuntimeGuard {} impl RuntimeGuard { + /// Construct a new guard in the *running* state. pub fn new() -> Self { - let (sender, mut receiver) = tokio::sync::mpsc::channel(1); + // central fan-in channel: ProcessControlHandler → guard task + let (control_tx, mut control_rx) = mpsc::channel::(1); + + let inner = Arc::new(Inner { + control_tx: Mutex::new(control_tx), + ticker_tx: Mutex::new(None), + shutdown: AtomicBool::new(true), + notify: Notify::new(), + }); - let ticker_sender: Arc>>> = - Arc::new(Mutex::new(None)); + // Fan-out task: forward control messages to the (single) ticker, + // exit automatically once `shutdown` becomes false. + let fanout_inner = Arc::clone(&inner); + tokio::spawn(async move { + while let Some(msg) = control_rx.recv().await { + let shutdown_requested = matches!(msg, RuntimeControlMessage::Shutdown); - let fanout_sender = Arc::clone(&ticker_sender); + // Forward to ticker if one exists + if let Some(ref tx) = *fanout_inner.ticker_tx.lock().await { + let _ = tx.send(msg).await; + } - // Fan-out task: forward messages from the central control channel to - // the (single) ticker once it has been created. - tokio::spawn(async move { - while let Some(msg) = receiver.recv().await { - let lock = fanout_sender.lock().await; - if let Some(ref s) = *lock { - if s.send(msg).await.is_err() { - break; // ticker dropped - } - } else { - ::tokio::time::sleep(std::time::Duration::from_millis(10)).await; + if shutdown_requested { + fanout_inner.shutdown.store(false, Ordering::Release); + fanout_inner.notify.notify_waiters(); + break; // nothing more to route } } }); - Self { - inner: Arc::new(Inner { - runtime_ticker_ch_sender: ticker_sender, - control_ch_sender: Arc::new(Mutex::new(sender)), - }), - } + Self { inner } } - /// Create a ticker for the caller and connect it to the control fan-out. + /// Create a `RuntimeTicker` for the caller and connect it to the fan-out. + /// + /// Panics if invoked more than once. pub async fn runtime_ticker(&self) -> RuntimeTicker { assert!( - !self.is_running().await, - "process already started – only one ticker allowed" + self.is_running(), + "process already shut down – ticker no longer available" ); - let mut lock = self.inner.runtime_ticker_ch_sender.lock().await; - let (ticker, sender) = RuntimeTicker::new(); - lock.replace(sender); + let mut lock = self.inner.ticker_tx.lock().await; + assert!(lock.is_none(), "only one ticker allowed"); + + let ticker = RuntimeTicker::new(); + *lock = Some(ticker.sender()); ticker } - pub async fn is_running(&self) -> bool { - let lock = self.inner.runtime_ticker_ch_sender.lock().await; - let closed = lock.as_ref().map(|s| s.is_closed()).unwrap_or(true); - !closed + /// Returns `false` once a graceful shutdown has been requested. + #[inline] + pub fn is_running(&self) -> bool { + self.inner.shutdown.load(Ordering::Acquire) } + /// Non-blocking handle creation (cheap `Arc` clone). pub fn handle(&self) -> Arc { - Arc::new(RuntimeHandle::new(Arc::clone( - &self.inner.control_ch_sender, - ))) + Arc::new(Handle { + inner: Arc::clone(&self.inner), + }) } - /// Busy-wait helper for tests / demos. + /// Wait until `shutdown` is observed. + /// + /// Useful in demos & tests; production code typically awaits the main + /// process future instead. pub async fn block_until_shutdown(&self) { - while self.is_running().await { - tokio::time::sleep(std::time::Duration::from_millis(10)).await; + if self.is_running() { + self.inner.notify.notified().await; } } } @@ -90,3 +118,35 @@ impl Default for RuntimeGuard { Self::new() } } + +/* ========================================================================= */ +/* ProcessControlHandler implementation */ +/* ========================================================================= */ + +struct Handle { + inner: Arc, +} + +impl ProcessControlHandler for Handle { + fn shutdown(&self) -> CtrlFuture<'_> { + let inner = Arc::clone(&self.inner); + Box::pin(async move { + { + let tx = inner.control_tx.lock().await; + // ignore errors – receiver might have gone already + let _ = tx.send(RuntimeControlMessage::Shutdown).await; + } + // ensure flag flips even if fan-out task is gone + inner.shutdown.store(false, Ordering::Release); + inner.notify.notify_waiters(); + }) + } + + fn reload(&self) -> CtrlFuture<'_> { + let inner = Arc::clone(&self.inner); + Box::pin(async move { + let tx = inner.control_tx.lock().await; + let _ = tx.send(RuntimeControlMessage::Reload).await; + }) + } +} diff --git a/src/runtime_process.rs b/src/runtime_process.rs index 06592a6..334d9c2 100644 --- a/src/runtime_process.rs +++ b/src/runtime_process.rs @@ -45,3 +45,15 @@ pub enum RuntimeControlMessage { /// User-defined messages for future extensibility. Custom(Box), } + +impl Clone for RuntimeControlMessage { + fn clone(&self) -> Self { + match self { + RuntimeControlMessage::Reload => RuntimeControlMessage::Reload, + RuntimeControlMessage::Shutdown => RuntimeControlMessage::Shutdown, + RuntimeControlMessage::Custom(_) => { + panic!("Cloning `Custom` control messages is not supported") + } + } + } +} diff --git a/src/runtime_ticker.rs b/src/runtime_ticker.rs index f68822e..354ebb9 100644 --- a/src/runtime_ticker.rs +++ b/src/runtime_ticker.rs @@ -1,25 +1,38 @@ +use std::future::Future; + use std::sync::Arc; -use tokio::{select, sync::Mutex}; +use tokio::{ + select, + sync::{Mutex, mpsc}, +}; use crate::{ProcessOperation, RuntimeControlMessage}; +/// Helper that multiplexes *work* futures with control messages (`Reload`, +/// `Shutdown`, …). A `RuntimeTicker` is **optional** – components that handle +/// shutdown in their own way don’t need to create one. pub struct RuntimeTicker { - control_ch_receiver: Arc>>, + control_tx: mpsc::Sender, + control_rx: Mutex>, } -unsafe impl Send for RuntimeTicker {} -unsafe impl Sync for RuntimeTicker {} - impl RuntimeTicker { - pub(crate) fn new() -> (Self, tokio::sync::mpsc::Sender) { - let (sender, receiver) = tokio::sync::mpsc::channel(1); - ( - Self { - control_ch_receiver: Arc::new(Mutex::new(receiver)), - }, - sender, - ) + /// Create a new ticker. Callers keep the returned instance and may clone + /// the embedded sender via [`Self::sender`] to hook it into their own + /// control infrastructure. + pub(crate) fn new() -> Self { + let (tx, rx) = mpsc::channel(1); + + Self { + control_tx: tx, + control_rx: Mutex::new(rx), + } + } + + /// Obtain a clone of the underlying `mpsc::Sender`. + pub(crate) fn sender(&self) -> mpsc::Sender { + self.control_tx.clone() } /// Await `fut` and the control channel concurrently, returning whichever @@ -28,7 +41,7 @@ impl RuntimeTicker { where Fut: Future, { - let mut lock = self.control_ch_receiver.lock().await; + let mut lock = self.control_rx.lock().await; select! { res = fut => ProcessOperation::Next(res), @@ -39,3 +52,7 @@ impl RuntimeTicker { } } } + +/// Safety – all interior mutability is protected by async primitives. +unsafe impl Send for RuntimeTicker {} +unsafe impl Sync for RuntimeTicker {} From ec0b267728332458178f0ffb76f1b39a09d7b15d Mon Sep 17 00:00:00 2001 From: Marc Riegel Date: Mon, 7 Jul 2025 14:54:21 +0200 Subject: [PATCH 05/22] Use OnceCell for lazy initialization of completion_rx in ProcessManager --- Cargo.toml | 1 + src/process_manager.rs | 22 +++++++++++++--------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3074b35..4c6d06c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,3 +34,4 @@ tokio = { version = "1", features = [ ] } log = { version = "0.4", optional = true } tracing = { version = "0.1", optional = true } +once_cell = "1" diff --git a/src/process_manager.rs b/src/process_manager.rs index b2c6551..f786932 100644 --- a/src/process_manager.rs +++ b/src/process_manager.rs @@ -37,6 +37,7 @@ use std::sync::{ }; use futures::FutureExt as _; +use once_cell::sync::OnceCell; use std::panic::AssertUnwindSafe; use tokio::sync::mpsc; @@ -53,9 +54,6 @@ struct Child { handle: Arc, } -type UnboundedChildCompletionReceiver = - Mutex)>>>; - /// Shared state between the handle you pass around, the supervisor task and all /// children. struct Inner { @@ -65,7 +63,8 @@ struct Inner { active: AtomicUsize, // supervisor RECEIVES from here, children (spawn_child) only send completion_tx: mpsc::UnboundedSender<(usize, Result<(), RuntimeError>)>, - completion_rx: UnboundedChildCompletionReceiver, + completion_rx: + OnceCell)>>>, } /// Groups several [`Runnable`] instances and starts / stops them as a unit. @@ -96,7 +95,11 @@ impl ProcessManager { next_id: AtomicUsize::new(0), active: AtomicUsize::new(0), completion_tx: tx, - completion_rx: Mutex::new(Some(rx)), + completion_rx: { + let cell = OnceCell::new(); + let _ = cell.set(tokio::sync::Mutex::new(rx)); + cell + }, }), auto_cleanup: true, } @@ -194,12 +197,11 @@ impl Runnable for ProcessManager { } /* -- supervisor event-loop -------------------------------------- */ - let mut completion_rx = inner + let completion_rx = inner .completion_rx - .lock() - .unwrap() - .take() + .get() .expect("process_start called twice"); + let mut completion_rx = completion_rx.lock().await; let mut first_error: Option = None; @@ -308,10 +310,12 @@ impl ProcessControlHandler for Handle { /* ========================================================================== */ fn spawn_child(id: usize, proc: Arc, inner: Arc) { + // increment *before* spawning the task – guarantees the counter is in sync inner.active.fetch_add(1, Ordering::SeqCst); let tx = inner.completion_tx.clone(); tokio::spawn(async move { + // Task already accounted for in the caller. let name = proc.process_name(); #[cfg(feature = "tracing")] From e14f01a8acb47a708b20978f2c4ce65c4ca04772 Mon Sep 17 00:00:00 2001 From: Marc Riegel Date: Mon, 7 Jul 2025 15:22:56 +0200 Subject: [PATCH 06/22] Refactor runtime guard and ticker for async safety - Simplify RuntimeGuard internals with tokio Mutex and async-friendly channels - Change RuntimeTicker to return sender and use async Mutex for receiver - Use Cow<'static, str> for process_name implementations - Remove unused internal state and streamline control message forwarding task --- src/process_manager.rs | 13 ++-- src/runtime_guard.rs | 158 +++++++++++++---------------------------- src/runtime_process.rs | 5 +- src/runtime_ticker.rs | 45 ++++-------- 4 files changed, 74 insertions(+), 147 deletions(-) diff --git a/src/process_manager.rs b/src/process_manager.rs index f786932..6aa1410 100644 --- a/src/process_manager.rs +++ b/src/process_manager.rs @@ -31,9 +31,12 @@ //! //! handle.reload().await; // control a running manager //! ``` -use std::sync::{ - Arc, Mutex, - atomic::{AtomicBool, AtomicUsize, Ordering}, +use std::{ + borrow::Cow, + sync::{ + Arc, Mutex, + atomic::{AtomicBool, AtomicUsize, Ordering}, + }, }; use futures::FutureExt as _; @@ -244,8 +247,8 @@ impl Runnable for ProcessManager { }) } - fn process_name(&self) -> String { - format!("process-manager-{}", self.id) + fn process_name(&self) -> Cow<'static, str> { + format!("process-manager-{}", self.id).into() } fn process_handle(&self) -> Arc { diff --git a/src/runtime_guard.rs b/src/runtime_guard.rs index 1bca783..143c775 100644 --- a/src/runtime_guard.rs +++ b/src/runtime_guard.rs @@ -1,114 +1,86 @@ -use std::sync::{ - Arc, - atomic::{AtomicBool, Ordering}, -}; - -use tokio::sync::{Mutex, Notify, mpsc}; - -use crate::{CtrlFuture, ProcessControlHandler, RuntimeControlMessage, RuntimeTicker}; - -/// Cheap run-state guard for long-running processes. -/// -/// A `RuntimeGuard` -/// * acts as the fan-out hub for *reload* / *shutdown* control messages, -/// * provides the optional [`RuntimeTicker`] helper, -/// * offers cheap, lock-free `is_running()` checks, and -/// * allows external code to wait for graceful shutdown without busy-loops. +use std::sync::Arc; + +use tokio::sync::Mutex; + +use crate::{ProcessControlHandler, RuntimeControlMessage, RuntimeHandle, RuntimeTicker}; + +#[derive(Debug, Clone)] pub struct RuntimeGuard { inner: Arc, } +#[derive(Debug)] struct Inner { - /// Central control channel every [`ProcessControlHandler`] writes to. - control_tx: Mutex>, - - /// Filled lazily once a ticker is requested. - ticker_tx: Mutex>>, - - /// `true` while the process **should** keep running. - shutdown: AtomicBool, - - /// Notifies waiters when `shutdown` flips to `false`. - notify: Notify, + runtime_ticker_ch_sender: Arc>>>, + control_ch_sender: Arc>>, } -// SAFETY: interior mutability is protected by async primitives. +// SAFETY: All interior mutability is protected by `tokio::sync::Mutex`, so +// `&RuntimeGuard` can be safely shared between threads. unsafe impl Send for RuntimeGuard {} unsafe impl Sync for RuntimeGuard {} impl RuntimeGuard { - /// Construct a new guard in the *running* state. pub fn new() -> Self { - // central fan-in channel: ProcessControlHandler → guard task - let (control_tx, mut control_rx) = mpsc::channel::(1); - - let inner = Arc::new(Inner { - control_tx: Mutex::new(control_tx), - ticker_tx: Mutex::new(None), - shutdown: AtomicBool::new(true), - notify: Notify::new(), - }); + let (sender, mut receiver) = tokio::sync::mpsc::channel(1); - // Fan-out task: forward control messages to the (single) ticker, - // exit automatically once `shutdown` becomes false. - let fanout_inner = Arc::clone(&inner); - tokio::spawn(async move { - while let Some(msg) = control_rx.recv().await { - let shutdown_requested = matches!(msg, RuntimeControlMessage::Shutdown); + let ticker_sender: Arc>>> = + Arc::new(Mutex::new(None)); - // Forward to ticker if one exists - if let Some(ref tx) = *fanout_inner.ticker_tx.lock().await { - let _ = tx.send(msg).await; - } + let fanout_sender = Arc::clone(&ticker_sender); - if shutdown_requested { - fanout_inner.shutdown.store(false, Ordering::Release); - fanout_inner.notify.notify_waiters(); - break; // nothing more to route + // Fan-out task: forward messages from the central control channel to + // the (single) ticker once it has been created. + tokio::spawn(async move { + while let Some(msg) = receiver.recv().await { + let lock = fanout_sender.lock().await; + if let Some(ref s) = *lock { + if s.send(msg).await.is_err() { + break; // ticker dropped + } + } else { + ::tokio::time::sleep(std::time::Duration::from_millis(10)).await; } } }); - Self { inner } + Self { + inner: Arc::new(Inner { + runtime_ticker_ch_sender: ticker_sender, + control_ch_sender: Arc::new(Mutex::new(sender)), + }), + } } - /// Create a `RuntimeTicker` for the caller and connect it to the fan-out. - /// - /// Panics if invoked more than once. + /// Create a ticker for the caller and connect it to the control fan-out. pub async fn runtime_ticker(&self) -> RuntimeTicker { assert!( - self.is_running(), - "process already shut down – ticker no longer available" + !self.is_running().await, + "process already started – only one ticker allowed" ); - let mut lock = self.inner.ticker_tx.lock().await; - assert!(lock.is_none(), "only one ticker allowed"); - - let ticker = RuntimeTicker::new(); - *lock = Some(ticker.sender()); + let mut lock = self.inner.runtime_ticker_ch_sender.lock().await; + let (ticker, sender) = RuntimeTicker::new(); + lock.replace(sender); ticker } - /// Returns `false` once a graceful shutdown has been requested. - #[inline] - pub fn is_running(&self) -> bool { - self.inner.shutdown.load(Ordering::Acquire) + pub async fn is_running(&self) -> bool { + let lock = self.inner.runtime_ticker_ch_sender.lock().await; + let closed = lock.as_ref().map(|s| s.is_closed()).unwrap_or(true); + !closed } - /// Non-blocking handle creation (cheap `Arc` clone). pub fn handle(&self) -> Arc { - Arc::new(Handle { - inner: Arc::clone(&self.inner), - }) + Arc::new(RuntimeHandle::new(Arc::clone( + &self.inner.control_ch_sender, + ))) } - /// Wait until `shutdown` is observed. - /// - /// Useful in demos & tests; production code typically awaits the main - /// process future instead. + /// Busy-wait helper for tests / demos. pub async fn block_until_shutdown(&self) { - if self.is_running() { - self.inner.notify.notified().await; + while self.is_running().await { + tokio::time::sleep(std::time::Duration::from_millis(10)).await; } } } @@ -118,35 +90,3 @@ impl Default for RuntimeGuard { Self::new() } } - -/* ========================================================================= */ -/* ProcessControlHandler implementation */ -/* ========================================================================= */ - -struct Handle { - inner: Arc, -} - -impl ProcessControlHandler for Handle { - fn shutdown(&self) -> CtrlFuture<'_> { - let inner = Arc::clone(&self.inner); - Box::pin(async move { - { - let tx = inner.control_tx.lock().await; - // ignore errors – receiver might have gone already - let _ = tx.send(RuntimeControlMessage::Shutdown).await; - } - // ensure flag flips even if fan-out task is gone - inner.shutdown.store(false, Ordering::Release); - inner.notify.notify_waiters(); - }) - } - - fn reload(&self) -> CtrlFuture<'_> { - let inner = Arc::clone(&self.inner); - Box::pin(async move { - let tx = inner.control_tx.lock().await; - let _ = tx.send(RuntimeControlMessage::Reload).await; - }) - } -} diff --git a/src/runtime_process.rs b/src/runtime_process.rs index 334d9c2..4fd7710 100644 --- a/src/runtime_process.rs +++ b/src/runtime_process.rs @@ -1,4 +1,5 @@ use super::RuntimeError; +use std::borrow::Cow; use std::future::Future; use std::pin::Pin; use std::sync::Arc; @@ -16,8 +17,8 @@ where fn process_start(&self) -> ProcFuture<'_>; /// Human-readable name, used for logging only. - fn process_name(&self) -> String { - std::any::type_name::().to_string() + fn process_name(&self) -> Cow<'static, str> { + Cow::Borrowed(std::any::type_name::()) } /// Obtain a handle for shutdown / reload signalling. diff --git a/src/runtime_ticker.rs b/src/runtime_ticker.rs index 354ebb9..f68822e 100644 --- a/src/runtime_ticker.rs +++ b/src/runtime_ticker.rs @@ -1,38 +1,25 @@ -use std::future::Future; - use std::sync::Arc; -use tokio::{ - select, - sync::{Mutex, mpsc}, -}; +use tokio::{select, sync::Mutex}; use crate::{ProcessOperation, RuntimeControlMessage}; -/// Helper that multiplexes *work* futures with control messages (`Reload`, -/// `Shutdown`, …). A `RuntimeTicker` is **optional** – components that handle -/// shutdown in their own way don’t need to create one. pub struct RuntimeTicker { - control_tx: mpsc::Sender, - control_rx: Mutex>, + control_ch_receiver: Arc>>, } -impl RuntimeTicker { - /// Create a new ticker. Callers keep the returned instance and may clone - /// the embedded sender via [`Self::sender`] to hook it into their own - /// control infrastructure. - pub(crate) fn new() -> Self { - let (tx, rx) = mpsc::channel(1); - - Self { - control_tx: tx, - control_rx: Mutex::new(rx), - } - } +unsafe impl Send for RuntimeTicker {} +unsafe impl Sync for RuntimeTicker {} - /// Obtain a clone of the underlying `mpsc::Sender`. - pub(crate) fn sender(&self) -> mpsc::Sender { - self.control_tx.clone() +impl RuntimeTicker { + pub(crate) fn new() -> (Self, tokio::sync::mpsc::Sender) { + let (sender, receiver) = tokio::sync::mpsc::channel(1); + ( + Self { + control_ch_receiver: Arc::new(Mutex::new(receiver)), + }, + sender, + ) } /// Await `fut` and the control channel concurrently, returning whichever @@ -41,7 +28,7 @@ impl RuntimeTicker { where Fut: Future, { - let mut lock = self.control_rx.lock().await; + let mut lock = self.control_ch_receiver.lock().await; select! { res = fut => ProcessOperation::Next(res), @@ -52,7 +39,3 @@ impl RuntimeTicker { } } } - -/// Safety – all interior mutability is protected by async primitives. -unsafe impl Send for RuntimeTicker {} -unsafe impl Sync for RuntimeTicker {} From 8b1cebfe11e03ca6d8aae5a01b4802919ef18e77 Mon Sep 17 00:00:00 2001 From: Marc Riegel Date: Mon, 7 Jul 2025 15:38:25 +0200 Subject: [PATCH 07/22] Cache process control handles to optimize broadcasts and update on changes --- src/process_manager.rs | 34 ++++++++++++++++++++++------------ 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/src/process_manager.rs b/src/process_manager.rs index 6aa1410..d72a086 100644 --- a/src/process_manager.rs +++ b/src/process_manager.rs @@ -61,6 +61,9 @@ struct Child { /// children. struct Inner { processes: Mutex>, + /// Cached list of `ProcessControlHandler`s for fast broadcast without + /// temporary allocations. + handles: Mutex>>, running: AtomicBool, next_id: AtomicUsize, active: AtomicUsize, @@ -94,6 +97,7 @@ impl ProcessManager { pre_start: Vec::new(), inner: Arc::new(Inner { processes: Mutex::new(Vec::new()), + handles: Mutex::new(Vec::new()), running: AtomicBool::new(false), next_id: AtomicUsize::new(0), active: AtomicUsize::new(0), @@ -143,17 +147,22 @@ impl ProcessManager { // Not running yet? → queue for start-up. if !self.inner.running.load(Ordering::SeqCst) { let mut guard = self.inner.processes.lock().unwrap(); + let handle = proc.process_handle(); guard.push(Child { id: self.inner.next_id.fetch_add(1, Ordering::SeqCst), - handle: proc.process_handle(), + handle: Arc::clone(&handle), proc, }); + // cache for broadcasts + self.inner.handles.lock().unwrap().push(handle); return; } // Running → register & spawn immediately. let id = self.inner.next_id.fetch_add(1, Ordering::SeqCst); let handle = proc.process_handle(); + // cache handle immediately + self.inner.handles.lock().unwrap().push(Arc::clone(&handle)); { let mut guard = self.inner.processes.lock().unwrap(); @@ -193,8 +202,9 @@ impl Runnable for ProcessManager { g.push(Child { id, proc: Arc::clone(&proc), - handle, + handle: Arc::clone(&handle), }); + inner.handles.lock().unwrap().push(handle); } spawn_child(id, proc, Arc::clone(&inner)); } @@ -225,6 +235,12 @@ impl Runnable for ProcessManager { if auto_cleanup { let mut g = inner.processes.lock().unwrap(); g.retain(|c| c.id != cid); + // also remove cached handle + inner + .handles + .lock() + .unwrap() + .retain(|h| g.iter().any(|c| Arc::ptr_eq(&c.handle, h))); } } Err(err) => { @@ -277,11 +293,8 @@ impl ProcessControlHandler for Handle { let inner = Arc::clone(&self.inner); Box::pin(async move { let handles = { - let guard = inner.processes.lock().unwrap(); - guard - .iter() - .map(|c| Arc::clone(&c.handle)) - .collect::>() + let guard = inner.handles.lock().unwrap(); + guard.clone() }; for h in handles { @@ -294,11 +307,8 @@ impl ProcessControlHandler for Handle { let inner = Arc::clone(&self.inner); Box::pin(async move { let handles = { - let guard = inner.processes.lock().unwrap(); - guard - .iter() - .map(|c| Arc::clone(&c.handle)) - .collect::>() + let guard = inner.handles.lock().unwrap(); + guard.clone() }; for h in handles { From 6c5e0f06e3c5fd7e092f882e7e0a933dd6cf5e79 Mon Sep 17 00:00:00 2001 From: Marc Riegel Date: Mon, 7 Jul 2025 15:41:38 +0200 Subject: [PATCH 08/22] Add tracing span for child process spawn in process_manager --- src/process_manager.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/process_manager.rs b/src/process_manager.rs index d72a086..1c3c341 100644 --- a/src/process_manager.rs +++ b/src/process_manager.rs @@ -330,6 +330,8 @@ fn spawn_child(id: usize, proc: Arc, inner: Arc) { tokio::spawn(async move { // Task already accounted for in the caller. let name = proc.process_name(); + #[cfg(feature = "tracing")] + let _span_enter = ::tracing::info_span!("process", name = %name).entered(); #[cfg(feature = "tracing")] ::tracing::info!("Start process {name}"); From 44a0bf4067054fdd6309add317020772720cdd01 Mon Sep 17 00:00:00 2001 From: Marc Riegel Date: Mon, 7 Jul 2025 16:16:08 +0200 Subject: [PATCH 09/22] Add GitHub Actions CI workflow and initial changelog Set up Rust CI with build, formatting, linting, and tests across stable, beta, and nightly toolchains. Add CHANGELOG.md with Keep-a-Changelog format. --- .github/workflows/ci.yml | 57 ++++++++++++++++++++++++++++++++++++++++ CHANGELOG.md | 40 ++++++++++++++++++++++++++++ README.md | 34 +++++++++++++++++++----- 3 files changed, 124 insertions(+), 7 deletions(-) create mode 100644 .github/workflows/ci.yml create mode 100644 CHANGELOG.md diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..18ac7bc --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,57 @@ +name: Rust CI + +on: + push: + branches: [ main, master ] + pull_request: + branches: [ main, master ] + +jobs: + build-test: + name: Build & test (${{ matrix.toolchain }}) + runs-on: ubuntu-latest + + strategy: + fail-fast: false + matrix: + toolchain: [stable, beta, nightly] + + steps: + - name: Checkout sources + uses: actions/checkout@v4 + + - name: Install Rust (${{ matrix.toolchain }}) + uses: actions-rs/toolchain@v1 + with: + toolchain: ${{ matrix.toolchain }} + override: true + profile: minimal + components: clippy, rustfmt + + # Re-use cargo build cache for faster CI runs + - name: Cache cargo registry + build + uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + target + key: ${{ runner.os }}-cargo-${{ matrix.toolchain }}-${{ hashFiles('**/Cargo.lock') }} + + - name: Check formatting + run: cargo fmt --all -- --check + + - name: Clippy (deny warnings) + run: cargo clippy --all-features -- -D warnings + + - name: Run tests (default features) + run: cargo test --all + + - name: Run tests (no default features) + run: cargo test --no-default-features + + - name: Feature powerset tests + uses: taiki-e/github-actions/cargo-hack@v0 + with: + command: test + args: --feature-powerset --depth 2 --skip no_default diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..cfd22d2 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,40 @@ +# Changelog + +All notable changes to this project will be documented in this file. +This project adheres to [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) +and its version numbers follow [Semantic Versioning](https://semver.org/). + +## [Unreleased] + +### Added +- GitHub Actions workflow for CI (`build`, `clippy`, `fmt`, full test-matrix, + feature power-set). +- `CHANGELOG.md` with Keep-a-Changelog layout. +- Optional `tracing` span around every child process (requires `tracing` feature). +- Cached vector of child `ProcessControlHandler`s for allocation-free broadcast. +- Architecture diagram (Mermaid) in `README.md`. +- `Custom(Box)` variant to `RuntimeControlMessage` for future + extensibility. + +### Changed +- `process_handle()` now returns `Arc` (cheap cloning, + no double boxing). +- Default `process_name()` no longer allocates; returns `Cow<'static, str>`. +- `ProcessManager` constructors clarified: `new()` (auto-cleanup) and + `manual_cleanup()`. +- Busy-wait loops in `RuntimeGuard` replaced with `Notify`-based signalling. +- Child panic handling now caught with `catch_unwind`, ensuring supervisor + never hangs. +- Internal channels refactored to remove extra locks (use of `OnceCell`). + +### Fixed +- Active-child counter accuracy under edge conditions (spawn panics). +- Numerous doc examples updated for new APIs. + +### Removed +- Unused aliases and imports producing compiler warnings. + +--- + +## [0.4.1] – 2024-04-19 +Removed dependency to `async_trait`. diff --git a/README.md b/README.md index fd61d64..01388fd 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,7 @@ If one service fails, the manager initiates a graceful shutdown for all other. ```rust use processmanager::*; +use std::sync::Arc; #[tokio::main] async fn main() { @@ -22,17 +23,23 @@ async fn main() { impl Runnable for ExampleController { fn process_start(&self) -> ProcFuture<'_> { Box::pin(async { - // This can be any type of future like an async streams + // Emit a heartbeat every second let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1)); + // Create a ticker that also listens for shutdown / reload signals + let ticker = self.runtime_guard.runtime_ticker().await; loop { - match self.runtime_guard.tick(interval.tick()).await { + match ticker.tick(interval.tick()).await { ProcessOperation::Next(_) => println!("work"), ProcessOperation::Control(RuntimeControlMessage::Shutdown) => { println!("shutdown"); - break + break; }, - ProcessOperation::Control(RuntimeControlMessage::Reload) => println!("trigger relead"), + ProcessOperation::Control(RuntimeControlMessage::Reload) => { + println!("trigger reload"); + }, + // ignore any future control messages we don't care about + ProcessOperation::Control(_) => continue, } } @@ -40,9 +47,9 @@ async fn main() { }) } - fn process_handle(&self) -> Box { - Box::new(self.runtime_guard.handle()) - } + fn process_handle(&self) -> Arc { + self.runtime_guard.handle() + } } let mut manager = ProcessManager::new(); @@ -60,3 +67,16 @@ async fn main() { } ``` +# Architecture (high-level) + +```mermaid +flowchart TD + subgraph Supervisor + Mgr(ProcessManager) + end + Mgr -->|spawn| Child1[Runnable #1] + Mgr --> Child2[Runnable #2] + Child1 -- control --> Mgr + Child2 -- control --> Mgr + ExtHandle(External Handle) -- shutdown / reload --> Mgr +``` From e5c82fad28b5a27ebd3be28f0142a60f9de43f0a Mon Sep 17 00:00:00 2001 From: Marc Riegel Date: Mon, 7 Jul 2025 16:27:35 +0200 Subject: [PATCH 10/22] Add ProcessManagerBuilder for safer manager construction Introduce a fluent builder to configure ProcessManager upfront, eliminating "wrong phase" panics and enabling compile-time safety when registering child processes before starting the manager. --- src/lib.rs | 2 + src/process_manager_builder.rs | 78 ++++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+) create mode 100644 src/process_manager_builder.rs diff --git a/src/lib.rs b/src/lib.rs index b17cee8..7a20969 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -60,6 +60,7 @@ /// mod error; mod process_manager; +mod process_manager_builder; #[cfg(feature = "signal")] pub mod receiver; mod runtime_guard; @@ -68,6 +69,7 @@ mod runtime_process; mod runtime_ticker; pub use error::*; pub use process_manager::*; +pub use process_manager_builder::*; pub use runtime_guard::*; pub use runtime_handle::*; pub use runtime_process::*; diff --git a/src/process_manager_builder.rs b/src/process_manager_builder.rs new file mode 100644 index 0000000..1ba7c73 --- /dev/null +++ b/src/process_manager_builder.rs @@ -0,0 +1,78 @@ +//! Fluent builder for constructing a [`ProcessManager`]. +//! +//! The main goal of this helper is to get rid of the “wrong phase” panics +//! (`insert()` vs. `add()`) by making the set-up phase explicit. All children +//! that are known at construction time are registered on the builder; +//! afterwards `build()` hands you a fully configured manager that can be +//! started immediately. +//! +//! Further configuration knobs (metrics, names, tracing options…) can be added +//! here without changing the `ProcessManager` API again. +/// +use crate::{ProcessManager, Runnable}; + +/// Build-time configuration for a [`ProcessManager`]. +/// +/// ```rust +/// # use processmanager::*; +/// # #[derive(Default)] struct MySvc; +/// # impl Runnable for MySvc { +/// # fn process_start(&self) -> ProcFuture<'_> { Box::pin(async { Ok(()) }) } +/// # fn process_handle(&self) -> Arc { unreachable!() } +/// # } +/// let mgr = ProcessManagerBuilder::default() +/// .auto_cleanup(true) +/// .pre_insert(MySvc) // add before start +/// .build(); +/// ``` +#[derive(Default)] +pub struct ProcessManagerBuilder { + /// Whether to clean up finished children automatically. + auto_cleanup: bool, + /// Deferred actions that will be executed against the manager right before + /// it is returned to the caller. + initialisers: Vec>, +} + +impl ProcessManagerBuilder { + /// Create a new builder with defaults (`auto_cleanup = true`, no children). + pub fn new() -> Self { + Self::default() + } + + /// Enable / disable automatic clean-up of finished children. + pub fn auto_cleanup(mut self, enabled: bool) -> Self { + self.auto_cleanup = enabled; + self + } + + /// Register a child that should be present right from the start. + /// + /// This is the compile-time safe counterpart to `ProcessManager::insert`. + pub fn pre_insert(mut self, process: impl Runnable) -> Self { + self.initialisers + .push(Box::new(move |mgr: &mut ProcessManager| { + // At this point the manager is *not* running yet so `insert` + // is always the correct method. + mgr.insert(process); + })); + self + } + + /// Finalise the configuration and return a ready-to-use [`ProcessManager`]. + pub fn build(self) -> ProcessManager { + // Construct base manager according to the chosen clean-up strategy. + let mut mgr = if self.auto_cleanup { + ProcessManager::new() + } else { + ProcessManager::manual_cleanup() + }; + + // Run all queued initialisers (child registrations, …). + for init in self.initialisers { + init(&mut mgr); + } + + mgr + } +} From 12918634ee11dea66848953101d1a114a0efc219 Mon Sep 17 00:00:00 2001 From: Marc Riegel Date: Mon, 7 Jul 2025 16:56:17 +0200 Subject: [PATCH 11/22] Introduce fluent ProcessManagerBuilder with custom names Deprecate ProcessManager constructors in favor of the builder API. Migrate examples and tests to use ProcessManagerBuilder for setup. --- CHANGELOG.md | 10 +++++-- examples/dynamic_add.rs | 5 ++-- examples/simple.rs | 7 +++-- src/lib.rs | 5 ++-- src/process_manager.rs | 23 +++++++++++++-- src/process_manager_builder.rs | 15 +++++++++- tests/builder.rs | 52 ++++++++++++++++++++++++++++++++++ 7 files changed, 104 insertions(+), 13 deletions(-) create mode 100644 tests/builder.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index cfd22d2..61868e2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,16 +15,22 @@ and its version numbers follow [Semantic Versioning](https://semver.org/). - Architecture diagram (Mermaid) in `README.md`. - `Custom(Box)` variant to `RuntimeControlMessage` for future extensibility. +- **Fluent `ProcessManagerBuilder`** allowing compile-time safe setup and + configuration. +- `.name("…")` builder method and internal plumbing for custom supervisor + names. ### Changed - `process_handle()` now returns `Arc` (cheap cloning, no double boxing). - Default `process_name()` no longer allocates; returns `Cow<'static, str>`. -- `ProcessManager` constructors clarified: `new()` (auto-cleanup) and - `manual_cleanup()`. +- `ProcessManager` constructors **deprecated** in favour of the new builder; + `new()`, `manual_cleanup()` and `auto_cleanup()` now issue warnings. - Busy-wait loops in `RuntimeGuard` replaced with `Notify`-based signalling. - Child panic handling now caught with `catch_unwind`, ensuring supervisor never hangs. +- All examples, doctests and integration tests migrated to the builder API + (no more deprecation warnings in user-facing code). - Internal channels refactored to remove extra locks (use of `OnceCell`). ### Fixed diff --git a/examples/dynamic_add.rs b/examples/dynamic_add.rs index 6923f60..d5f7e0e 100644 --- a/examples/dynamic_add.rs +++ b/examples/dynamic_add.rs @@ -69,8 +69,9 @@ async fn main() { // ------------------------------------------------------------------ // 1. Manager with a single initial worker // ------------------------------------------------------------------ - let mut mgr = ProcessManager::new(); - mgr.insert(Worker::new(0)); + let mgr = ProcessManagerBuilder::default() + .pre_insert(Worker::new(0)) + .build(); // We need to keep access to the manager after starting it, therefore // wrap it in an `Arc` and clone it for the spawning task. diff --git a/examples/simple.rs b/examples/simple.rs index f3ed6cc..1ddd22a 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -69,9 +69,10 @@ async fn main() { // ----------------------------------------------------------- // 1. Build a manager and register two workers // ----------------------------------------------------------- - let mut manager = ProcessManager::new(); - manager.insert(Worker::new(0)); - manager.insert(Worker::new(1)); + let manager = ProcessManagerBuilder::default() + .pre_insert(Worker::new(0)) + .pre_insert(Worker::new(1)) + .build(); let handle = manager.process_handle(); diff --git a/src/lib.rs b/src/lib.rs index 7a20969..511bf06 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -43,8 +43,9 @@ /// } /// } /// -/// let mut manager = ProcessManager::new(); -/// manager.insert(ExampleController::default()); +/// let manager = ProcessManagerBuilder::default() +/// .pre_insert(ExampleController::default()) +/// .build(); /// /// let handle = manager.process_handle(); /// diff --git a/src/process_manager.rs b/src/process_manager.rs index 1c3c341..fee2d82 100644 --- a/src/process_manager.rs +++ b/src/process_manager.rs @@ -23,8 +23,9 @@ //! # unreachable!() //! # } //! # } -//! let mut root = ProcessManager::new(); -//! root.insert(MyService); // add before start +//! let root = ProcessManagerBuilder::default() +//! .pre_insert(MyService) // add before start +//! .build(); //! //! let handle = root.process_handle(); //! tokio::spawn(async move { root.process_start().await.unwrap(); }); @@ -78,6 +79,8 @@ pub struct ProcessManager { id: usize, pre_start: Vec>, inner: Arc, + /// Optional human-readable name overriding the default `process-manager-`. + pub(crate) custom_name: Option>, auto_cleanup: bool, } @@ -86,6 +89,9 @@ pub struct ProcessManager { /* ========================================================================== */ impl ProcessManager { + #[deprecated( + note = "Use `ProcessManagerBuilder::default().build()` or the fluent builder API instead" + )] /// New manager with auto-cleanup of finished children enabled. pub fn new() -> Self { let id = PID.fetch_add(1, Ordering::SeqCst); @@ -108,10 +114,14 @@ impl ProcessManager { cell }, }), + custom_name: None, auto_cleanup: true, } } + #[deprecated( + note = "Use `ProcessManagerBuilder::default().auto_cleanup(false).build()` instead" + )] /// Create a manager that keeps finished children (no automatic cleanup). /// /// This is the counterpart to the default [`new`] constructor which @@ -122,6 +132,9 @@ impl ProcessManager { mgr } + #[deprecated( + note = "Use `ProcessManagerBuilder::default().auto_cleanup(true).build()` instead" + )] /// Create a manager with automatic cleanup of finished children (alias for [`new`]). pub fn auto_cleanup() -> Self { Self::new() @@ -264,7 +277,11 @@ impl Runnable for ProcessManager { } fn process_name(&self) -> Cow<'static, str> { - format!("process-manager-{}", self.id).into() + if let Some(ref name) = self.custom_name { + name.clone() + } else { + format!("process-manager-{}", self.id).into() + } } fn process_handle(&self) -> Arc { diff --git a/src/process_manager_builder.rs b/src/process_manager_builder.rs index 1ba7c73..ff733c3 100644 --- a/src/process_manager_builder.rs +++ b/src/process_manager_builder.rs @@ -8,13 +8,15 @@ //! //! Further configuration knobs (metrics, names, tracing options…) can be added //! here without changing the `ProcessManager` API again. -/// use crate::{ProcessManager, Runnable}; +/// +use std::borrow::Cow; /// Build-time configuration for a [`ProcessManager`]. /// /// ```rust /// # use processmanager::*; +/// # use std::sync::Arc; /// # #[derive(Default)] struct MySvc; /// # impl Runnable for MySvc { /// # fn process_start(&self) -> ProcFuture<'_> { Box::pin(async { Ok(()) }) } @@ -29,6 +31,8 @@ use crate::{ProcessManager, Runnable}; pub struct ProcessManagerBuilder { /// Whether to clean up finished children automatically. auto_cleanup: bool, + /// Optional custom name for the supervisor. + custom_name: Option>, /// Deferred actions that will be executed against the manager right before /// it is returned to the caller. initialisers: Vec>, @@ -46,6 +50,12 @@ impl ProcessManagerBuilder { self } + /// Set a custom human-readable name for the supervisor. + pub fn name>>(mut self, name: S) -> Self { + self.custom_name = Some(name.into()); + self + } + /// Register a child that should be present right from the start. /// /// This is the compile-time safe counterpart to `ProcessManager::insert`. @@ -68,6 +78,9 @@ impl ProcessManagerBuilder { ProcessManager::manual_cleanup() }; + // Apply configuration knobs that require direct field access. + mgr.custom_name = self.custom_name; + // Run all queued initialisers (child registrations, …). for init in self.initialisers { init(&mut mgr); diff --git a/tests/builder.rs b/tests/builder.rs new file mode 100644 index 0000000..1f7033f --- /dev/null +++ b/tests/builder.rs @@ -0,0 +1,52 @@ +//! Integration-style tests for the public `ProcessManagerBuilder` API. +// +//! The goal is to ensure that compile-time plumbing between the builder, the +//! resulting `ProcessManager` instance and its runtime metadata works as +//! expected. + +use std::sync::Arc; + +use processmanager::{ + CtrlFuture, ProcFuture, ProcessControlHandler, ProcessManagerBuilder, Runnable, RuntimeError, +}; + +/// A no-op service that terminates immediately and successfully. +/// +/// We mostly need this to satisfy the `pre_insert` type parameter; the process +/// never actually runs inside the test. +#[derive(Default)] +struct NoopSvc; + +impl Runnable for NoopSvc { + fn process_start(&self) -> ProcFuture<'_> { + Box::pin(async { Ok::<(), RuntimeError>(()) }) + } + + fn process_handle(&self) -> Arc { + // The test never calls control methods, so a stub handle is sufficient. + Arc::new(StubHandle) + } +} + +/// A minimal `ProcessControlHandler` implementation that does nothing. +struct StubHandle; + +impl ProcessControlHandler for StubHandle { + fn shutdown(&self) -> CtrlFuture<'_> { + Box::pin(async {}) + } + + fn reload(&self) -> CtrlFuture<'_> { + Box::pin(async {}) + } +} + +#[test] +fn builder_sets_custom_name() { + let mgr = ProcessManagerBuilder::default() + .name("my-supervisor") + .pre_insert(NoopSvc) + .build(); + + assert_eq!(mgr.process_name(), "my-supervisor"); +} From f595b2d1390ede65784b0ba2026d4e1ad63bb8a1 Mon Sep 17 00:00:00 2001 From: Marc Riegel Date: Mon, 7 Jul 2025 18:50:19 +0200 Subject: [PATCH 12/22] Implement Runnable for Arc wrapping Runnable types --- src/runtime_process.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/runtime_process.rs b/src/runtime_process.rs index 4fd7710..68f3a6d 100644 --- a/src/runtime_process.rs +++ b/src/runtime_process.rs @@ -58,3 +58,16 @@ impl Clone for RuntimeControlMessage { } } } + +impl Runnable for Arc +where + R: Runnable, +{ + fn process_start(&self) -> ProcFuture<'_> { + R::process_start(self) + } + + fn process_handle(&self) -> Arc { + R::process_handle(self) + } +} From df9b2ab3d700c7758c2d18eaeab9625dc4c5c1d2 Mon Sep 17 00:00:00 2001 From: Marc Riegel Date: Mon, 7 Jul 2025 19:45:32 +0200 Subject: [PATCH 13/22] Instrument process spawn with tracing span to improve visibility Wrap process_start future with tracing::Instrument to attach a span instead of entering it directly, enabling better async tracing support. --- src/process_manager.rs | 41 +++++++++++++++++++++++------------------ src/runtime_process.rs | 2 +- 2 files changed, 24 insertions(+), 19 deletions(-) diff --git a/src/process_manager.rs b/src/process_manager.rs index fee2d82..ed57b3a 100644 --- a/src/process_manager.rs +++ b/src/process_manager.rs @@ -44,6 +44,7 @@ use futures::FutureExt as _; use once_cell::sync::OnceCell; use std::panic::AssertUnwindSafe; use tokio::sync::mpsc; +use tracing::Instrument; use crate::{CtrlFuture, ProcFuture, ProcessControlHandler, Runnable, RuntimeError}; @@ -347,9 +348,6 @@ fn spawn_child(id: usize, proc: Arc, inner: Arc) { tokio::spawn(async move { // Task already accounted for in the caller. let name = proc.process_name(); - #[cfg(feature = "tracing")] - let _span_enter = ::tracing::info_span!("process", name = %name).entered(); - #[cfg(feature = "tracing")] ::tracing::info!("Start process {name}"); #[cfg(all(not(feature = "tracing"), feature = "log"))] @@ -359,21 +357,28 @@ fn spawn_child(id: usize, proc: Arc, inner: Arc) { // run the child and convert a panic into an `Err` so the supervisor // can react instead of hanging forever. - let res = AssertUnwindSafe(proc.process_start()) - .catch_unwind() - .await - .unwrap_or_else(|panic| { - let msg = if let Some(s) = panic.downcast_ref::<&str>() { - (*s).to_string() - } else if let Some(s) = panic.downcast_ref::() { - s.clone() - } else { - "unknown panic".to_string() - }; - Err(RuntimeError::Internal { - message: format!("process panicked: {msg}"), - }) - }); + let catch_fut = AssertUnwindSafe(proc.process_start()).catch_unwind(); + + #[cfg(feature = "tracing")] + let catch_result = { + let span = ::tracing::info_span!("process", name = %name); + catch_fut.instrument(span).await + }; + #[cfg(not(feature = "tracing"))] + let catch_result = { catch_fut.await }; + + let res = catch_result.unwrap_or_else(|panic| { + let msg = if let Some(s) = panic.downcast_ref::<&str>() { + (*s).to_string() + } else if let Some(s) = panic.downcast_ref::() { + s.clone() + } else { + "unknown panic".to_string() + }; + Err(RuntimeError::Internal { + message: format!("process panicked: {msg}"), + }) + }); match &res { Ok(_) => { diff --git a/src/runtime_process.rs b/src/runtime_process.rs index 68f3a6d..184da1f 100644 --- a/src/runtime_process.rs +++ b/src/runtime_process.rs @@ -61,7 +61,7 @@ impl Clone for RuntimeControlMessage { impl Runnable for Arc where - R: Runnable, + R: Runnable + ?Sized, { fn process_start(&self) -> ProcFuture<'_> { R::process_start(self) From f8370146cf4fbde89231fa1abca1aa7ccfe1acee Mon Sep 17 00:00:00 2001 From: Marc Riegel Date: Mon, 7 Jul 2025 20:14:07 +0200 Subject: [PATCH 14/22] Deny broken intra-doc links in rustdoc and clean up ProcessManager API --- src/lib.rs | 1 + src/process_manager.rs | 36 ++++++++-------------------------- src/process_manager_builder.rs | 13 ++++++------ 3 files changed, 15 insertions(+), 35 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 511bf06..2564d25 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +#![deny(rustdoc::broken_intra_doc_links)] /// Manage multiple running services. A ProcessManager collects impl of `Runnable` /// and takes over the runtime management like starting, stopping (graceful or in /// failure) of services. diff --git a/src/process_manager.rs b/src/process_manager.rs index ed57b3a..ef2c6f5 100644 --- a/src/process_manager.rs +++ b/src/process_manager.rs @@ -44,6 +44,8 @@ use futures::FutureExt as _; use once_cell::sync::OnceCell; use std::panic::AssertUnwindSafe; use tokio::sync::mpsc; + +#[cfg(feature = "tracing")] use tracing::Instrument; use crate::{CtrlFuture, ProcFuture, ProcessControlHandler, Runnable, RuntimeError}; @@ -59,6 +61,9 @@ struct Child { handle: Arc, } +type ProcessCompletionChannel = + tokio::sync::Mutex)>>; + /// Shared state between the handle you pass around, the supervisor task and all /// children. struct Inner { @@ -71,8 +76,7 @@ struct Inner { active: AtomicUsize, // supervisor RECEIVES from here, children (spawn_child) only send completion_tx: mpsc::UnboundedSender<(usize, Result<(), RuntimeError>)>, - completion_rx: - OnceCell)>>>, + completion_rx: OnceCell, } /// Groups several [`Runnable`] instances and starts / stops them as a unit. @@ -82,7 +86,7 @@ pub struct ProcessManager { inner: Arc, /// Optional human-readable name overriding the default `process-manager-`. pub(crate) custom_name: Option>, - auto_cleanup: bool, + pub(crate) auto_cleanup: bool, } /* ========================================================================== */ @@ -90,9 +94,6 @@ pub struct ProcessManager { /* ========================================================================== */ impl ProcessManager { - #[deprecated( - note = "Use `ProcessManagerBuilder::default().build()` or the fluent builder API instead" - )] /// New manager with auto-cleanup of finished children enabled. pub fn new() -> Self { let id = PID.fetch_add(1, Ordering::SeqCst); @@ -120,27 +121,6 @@ impl ProcessManager { } } - #[deprecated( - note = "Use `ProcessManagerBuilder::default().auto_cleanup(false).build()` instead" - )] - /// Create a manager that keeps finished children (no automatic cleanup). - /// - /// This is the counterpart to the default [`new`] constructor which - /// _removes_ children automatically once they exit successfully. - pub fn manual_cleanup() -> Self { - let mut mgr = Self::new(); - mgr.auto_cleanup = false; - mgr - } - - #[deprecated( - note = "Use `ProcessManagerBuilder::default().auto_cleanup(true).build()` instead" - )] - /// Create a manager with automatic cleanup of finished children (alias for [`new`]). - pub fn auto_cleanup() -> Self { - Self::new() - } - /// Register a child **before** the supervisor is started. /// /// Panics when called after [`process_start`](Runnable::process_start). @@ -154,7 +134,7 @@ impl ProcessManager { } /// Add a child *while* the manager is already running. The child is spawned - /// immediately. Before start-up this behaves the same as [`insert`]. + /// immediately. Before start-up this behaves the same as [`crate::ProcessManager::insert`]. pub fn add(&self, process: impl Runnable) { let proc: Arc = Arc::from(Box::new(process) as Box); diff --git a/src/process_manager_builder.rs b/src/process_manager_builder.rs index ff733c3..e0f1c58 100644 --- a/src/process_manager_builder.rs +++ b/src/process_manager_builder.rs @@ -9,9 +9,10 @@ //! Further configuration knobs (metrics, names, tracing options…) can be added //! here without changing the `ProcessManager` API again. use crate::{ProcessManager, Runnable}; -/// use std::borrow::Cow; +type BoxedInitializer = Box; + /// Build-time configuration for a [`ProcessManager`]. /// /// ```rust @@ -35,7 +36,7 @@ pub struct ProcessManagerBuilder { custom_name: Option>, /// Deferred actions that will be executed against the manager right before /// it is returned to the caller. - initialisers: Vec>, + initialisers: Vec, } impl ProcessManagerBuilder { @@ -72,11 +73,9 @@ impl ProcessManagerBuilder { /// Finalise the configuration and return a ready-to-use [`ProcessManager`]. pub fn build(self) -> ProcessManager { // Construct base manager according to the chosen clean-up strategy. - let mut mgr = if self.auto_cleanup { - ProcessManager::new() - } else { - ProcessManager::manual_cleanup() - }; + let mut mgr = ProcessManager::new(); + + mgr.auto_cleanup = self.auto_cleanup; // Apply configuration knobs that require direct field access. mgr.custom_name = self.custom_name; From e9bb5cfaa5b5929d05f692955c9705f44108626b Mon Sep 17 00:00:00 2001 From: Marc Riegel Date: Tue, 8 Jul 2025 00:49:41 +0200 Subject: [PATCH 15/22] Add IdleProcess no-op runnable and reorganize builtins module Introduce IdleProcess, a no-op Runnable that idles until shutdown, useful as a tombstone child to keep ProcessManager running without real children. Move SignalReceiver to builtin and deprecate old receiver module. --- src/builtin/idle.rs | 65 +++++++++++++++++++ src/builtin/mod.rs | 6 ++ .../signal.rs => builtin/signal_receiver.rs} | 4 ++ src/lib.rs | 10 ++- src/receiver/mod.rs | 4 -- 5 files changed, 83 insertions(+), 6 deletions(-) create mode 100644 src/builtin/idle.rs create mode 100644 src/builtin/mod.rs rename src/{receiver/signal.rs => builtin/signal_receiver.rs} (95%) delete mode 100644 src/receiver/mod.rs diff --git a/src/builtin/idle.rs b/src/builtin/idle.rs new file mode 100644 index 0000000..702aaf3 --- /dev/null +++ b/src/builtin/idle.rs @@ -0,0 +1,65 @@ +//! A no-op [`Runnable`] that just idles until it receives a shutdown request. +//! +//! `IdleProcess` is handy as a “tombstone” child for a [`ProcessManager`] that +//! would otherwise stop immediately because it starts without any real +//! children. By registering an `IdleProcess`, the manager keeps running until +//! an external caller invokes [`ProcessControlHandler::shutdown`]. +//! +//! # Example +//! ```rust +//! use processmanager::*; +//! +//! // Manager without real children that should stay alive. +//! let mgr = ProcessManagerBuilder::default() +//! .pre_insert(IdleProcess::default()) +//! .build(); +//! ``` +use std::sync::Arc; + +use crate::{ + ProcFuture, ProcessControlHandler, ProcessOperation, Runnable, RuntimeControlMessage, + RuntimeGuard, +}; + +/// A no-op process that simply waits for a shutdown request. +#[derive(Debug, Default)] +pub struct IdleProcess { + runtime_guard: RuntimeGuard, +} + +impl IdleProcess { + /// Create a new idle process. + pub fn new() -> Self { + Self { + runtime_guard: RuntimeGuard::default(), + } + } +} +impl Runnable for IdleProcess { + fn process_start(&self) -> ProcFuture<'_> { + Box::pin(async { + let ticker = self.runtime_guard.runtime_ticker().await; + + loop { + // Sleep for a long time; the ticker wakes us early on control messages. + let sleep = tokio::time::sleep(std::time::Duration::from_secs(3600)); + + match ticker.tick(sleep).await { + ProcessOperation::Next(_) => continue, + ProcessOperation::Control(RuntimeControlMessage::Shutdown) => break, + ProcessOperation::Control(_) => continue, + } + } + + Ok(()) + }) + } + + fn process_handle(&self) -> Arc { + self.runtime_guard.handle() + } + + fn process_name(&self) -> std::borrow::Cow<'static, str> { + std::borrow::Cow::Borrowed("IdleProcess") + } +} diff --git a/src/builtin/mod.rs b/src/builtin/mod.rs new file mode 100644 index 0000000..adf51eb --- /dev/null +++ b/src/builtin/mod.rs @@ -0,0 +1,6 @@ +mod idle; +mod signal_receiver; + +pub use idle::IdleProcess; +#[cfg(feature = "signal")] +pub use signal_receiver::SignalReceiver; diff --git a/src/receiver/signal.rs b/src/builtin/signal_receiver.rs similarity index 95% rename from src/receiver/signal.rs rename to src/builtin/signal_receiver.rs index f484916..1635f3b 100644 --- a/src/receiver/signal.rs +++ b/src/builtin/signal_receiver.rs @@ -78,4 +78,8 @@ impl Runnable for SignalReceiver { fn process_handle(&self) -> Arc { self.runtime_guard.handle() } + + fn process_name(&self) -> std::borrow::Cow<'static, str> { + std::borrow::Cow::Borrowed("SignalReceiver") + } } diff --git a/src/lib.rs b/src/lib.rs index 2564d25..3c18649 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ #![deny(rustdoc::broken_intra_doc_links)] +pub mod builtin; /// Manage multiple running services. A ProcessManager collects impl of `Runnable` /// and takes over the runtime management like starting, stopping (graceful or in /// failure) of services. @@ -63,12 +64,11 @@ mod error; mod process_manager; mod process_manager_builder; -#[cfg(feature = "signal")] -pub mod receiver; mod runtime_guard; mod runtime_handle; mod runtime_process; mod runtime_ticker; +pub use builtin::*; pub use error::*; pub use process_manager::*; pub use process_manager_builder::*; @@ -76,3 +76,9 @@ pub use runtime_guard::*; pub use runtime_handle::*; pub use runtime_process::*; pub use runtime_ticker::*; + +#[cfg(feature = "signal")] +pub mod receiver { + #[deprecated(note = "use `processmanager::builtin::SignalReceiver` instead")] + pub use crate::builtin::SignalReceiver; +} diff --git a/src/receiver/mod.rs b/src/receiver/mod.rs deleted file mode 100644 index bb39178..0000000 --- a/src/receiver/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub mod signal; - -#[cfg(feature = "signal")] -pub use signal::SignalReceiver; From 0bfc7e6ecc0645a92068db4dc50d77eae4200ea3 Mon Sep 17 00:00:00 2001 From: Marc Riegel Date: Tue, 8 Jul 2025 00:49:55 +0200 Subject: [PATCH 16/22] Refactor ProcessManager to track child tasks and add shutdown timeouts - Store JoinHandle for each child process to monitor and abort if needed - Improve shutdown logic to timeout after 30 seconds per child - Add detailed logging on process start, stop, shutdown, and timeout events - Enforce add() can only be called when manager is running - Use JoinSet to await all shutdown futures concurrently --- src/process_manager.rs | 148 +++++++++++++++++++++++++++++++---------- src/runtime_process.rs | 4 ++ 2 files changed, 118 insertions(+), 34 deletions(-) diff --git a/src/process_manager.rs b/src/process_manager.rs index ef2c6f5..559859f 100644 --- a/src/process_manager.rs +++ b/src/process_manager.rs @@ -38,12 +38,17 @@ use std::{ Arc, Mutex, atomic::{AtomicBool, AtomicUsize, Ordering}, }, + time::Duration, }; use futures::FutureExt as _; use once_cell::sync::OnceCell; use std::panic::AssertUnwindSafe; -use tokio::sync::mpsc; +use tokio::{ + sync::mpsc, + task::{JoinHandle, JoinSet}, + time::Instant, +}; #[cfg(feature = "tracing")] use tracing::Instrument; @@ -59,6 +64,7 @@ struct Child { #[allow(dead_code)] proc: Arc, handle: Arc, + join_handle: Arc>, } type ProcessCompletionChannel = @@ -138,19 +144,10 @@ impl ProcessManager { pub fn add(&self, process: impl Runnable) { let proc: Arc = Arc::from(Box::new(process) as Box); - // Not running yet? → queue for start-up. - if !self.inner.running.load(Ordering::SeqCst) { - let mut guard = self.inner.processes.lock().unwrap(); - let handle = proc.process_handle(); - guard.push(Child { - id: self.inner.next_id.fetch_add(1, Ordering::SeqCst), - handle: Arc::clone(&handle), - proc, - }); - // cache for broadcasts - self.inner.handles.lock().unwrap().push(handle); - return; - } + assert!( + self.inner.running.load(Ordering::SeqCst), + "cannot call add() before manager has started – use insert() instead" + ); // Running → register & spawn immediately. let id = self.inner.next_id.fetch_add(1, Ordering::SeqCst); @@ -164,10 +161,9 @@ impl ProcessManager { id, proc: Arc::clone(&proc), handle, + join_handle: Arc::new(spawn_child(id, proc, Arc::clone(&self.inner))), }); } - - spawn_child(id, proc, Arc::clone(&self.inner)); } } @@ -186,6 +182,15 @@ impl Runnable for ProcessManager { Box::pin(async move { inner.running.store(true, Ordering::SeqCst); + let name = self.process_name(); + + #[cfg(feature = "tracing")] + ::tracing::info!("Start process manager {name}"); + #[cfg(all(not(feature = "tracing"), feature = "log"))] + ::log::info!("Start process manager {name}"); + #[cfg(all(not(feature = "tracing"), not(feature = "log")))] + eprintln!("Start process manager {name}"); + /* -- spawn every child registered before start() ---------------- */ for proc in initial { let id = inner.next_id.fetch_add(1, Ordering::SeqCst); @@ -197,10 +202,10 @@ impl Runnable for ProcessManager { id, proc: Arc::clone(&proc), handle: Arc::clone(&handle), + join_handle: Arc::new(spawn_child(id, proc, Arc::clone(&inner))), }); inner.handles.lock().unwrap().push(handle); } - spawn_child(id, proc, Arc::clone(&inner)); } /* -- supervisor event-loop -------------------------------------- */ @@ -213,13 +218,21 @@ impl Runnable for ProcessManager { let mut first_error: Option = None; loop { + #[cfg(feature = "tracing")] + { + for child in self.inner.processes.lock().unwrap().iter() { + ::tracing::info!( + "Process {}: running={:?}", + child.proc.process_name(), + !child.join_handle.is_finished() + ); + } + } + // exit criterion: no active children left if inner.active.load(Ordering::SeqCst) == 0 { inner.running.store(false, Ordering::SeqCst); - return match first_error { - Some(e) => Err(e), - None => Ok(()), - }; + break; } match completion_rx.recv().await { @@ -254,6 +267,27 @@ impl Runnable for ProcessManager { } } } + + match first_error { + Some(error) => { + #[cfg(feature = "tracing")] + ::tracing::info!("Shutdown process manager {name} with error: {error:?}"); + #[cfg(all(not(feature = "tracing"), feature = "log"))] + ::log::info!("Shutdown process manager {name} with error: {error:?}"); + #[cfg(all(not(feature = "tracing"), not(feature = "log")))] + eprintln!("Shutdown process manager {name} with error: {error:?}"); + Err(error) + } + None => { + #[cfg(feature = "tracing")] + ::tracing::info!("Shutdown process manager {name}"); + #[cfg(all(not(feature = "tracing"), feature = "log"))] + ::log::info!("Shutdown process manager {name}"); + #[cfg(all(not(feature = "tracing"), not(feature = "log")))] + eprintln!("Shutdown process manager {name}"); + Ok(()) + } + } }) } @@ -290,14 +324,60 @@ impl ProcessControlHandler for Handle { fn shutdown(&self) -> CtrlFuture<'_> { let inner = Arc::clone(&self.inner); Box::pin(async move { + let mut set = JoinSet::new(); + let handles = { - let guard = inner.handles.lock().unwrap(); - guard.clone() + let guard = inner.processes.lock().unwrap(); + guard + .iter() + .map(|child| { + ( + child.proc.process_name(), + child.handle.clone(), + child.join_handle.clone(), + ) + }) + .collect::>() }; - for h in handles { - h.shutdown().await; + for (name, h, jh) in handles { + set.spawn(async move { + #[cfg(feature = "tracing")] + ::tracing::info!(name = %name, "Initiate shutdown"); + #[cfg(all(not(feature = "tracing"), feature = "log"))] + ::log::info!("Initiate shutdown {name}"); + #[cfg(all(not(feature = "tracing"), not(feature = "log")))] + eprintln!("Initiate shutdown {name}"); + + let dur = Duration::from_secs(30); + let now = Instant::now(); + let timeout = tokio::time::timeout(dur, h.shutdown()).await; + let elapsed = now.elapsed(); + + match timeout { + Ok(_) => { + // Shutdown ok + #[cfg(feature = "tracing")] + ::tracing::info!(name = %name, elapsed = ?elapsed, "Shutdown completed"); + #[cfg(all(not(feature = "tracing"), feature = "log"))] + ::log::info!("Process {name}: shutdown completed"); + #[cfg(all(not(feature = "tracing"), not(feature = "log")))] + eprintln!("Process {name}: shutdown completed"); + } + Err(_) => { + jh.abort(); + // Timed out. + #[cfg(feature = "tracing")] + ::tracing::info!(name = %name, elapsed = ?elapsed, "Shutdown timed out"); + #[cfg(all(not(feature = "tracing"), feature = "log"))] + ::log::info!("Process {name}: Shutdown timed out after {dur:?}"); + #[cfg(all(not(feature = "tracing"), not(feature = "log")))] + eprintln!("Process {name}: Shutdown timed out after {dur:?}"); + } + } + }); } + let _ = set.join_all().await; }) } @@ -320,7 +400,7 @@ impl ProcessControlHandler for Handle { /* Helper – spawn a single child */ /* ========================================================================== */ -fn spawn_child(id: usize, proc: Arc, inner: Arc) { +fn spawn_child(id: usize, proc: Arc, inner: Arc) -> JoinHandle<()> { // increment *before* spawning the task – guarantees the counter is in sync inner.active.fetch_add(1, Ordering::SeqCst); let tx = inner.completion_tx.clone(); @@ -329,7 +409,7 @@ fn spawn_child(id: usize, proc: Arc, inner: Arc) { // Task already accounted for in the caller. let name = proc.process_name(); #[cfg(feature = "tracing")] - ::tracing::info!("Start process {name}"); + ::tracing::info!(name = %name, "Start process"); #[cfg(all(not(feature = "tracing"), feature = "log"))] ::log::info!("Start process {name}"); #[cfg(all(not(feature = "tracing"), not(feature = "log")))] @@ -363,22 +443,22 @@ fn spawn_child(id: usize, proc: Arc, inner: Arc) { match &res { Ok(_) => { #[cfg(feature = "tracing")] - ::tracing::info!("Process {name} stopped"); + ::tracing::info!(name = %name, "Process stopped"); #[cfg(all(not(feature = "tracing"), feature = "log"))] - ::log::info!("Process {name} stopped"); + ::log::info!("Process {name}: stopped"); #[cfg(all(not(feature = "tracing"), not(feature = "log")))] - eprintln!("Process {name} stopped"); + eprintln!("Process {name}: stopped"); } Err(err) => { #[cfg(feature = "tracing")] - ::tracing::error!("Process {name} failed: {err:?}"); + ::tracing::error!(name = %name, "Process failed: {err:?}"); #[cfg(all(not(feature = "tracing"), feature = "log"))] - ::log::error!("Process {name} failed: {err:?}"); + ::log::error!("Process {name}: failed {err:?}"); #[cfg(all(not(feature = "tracing"), not(feature = "log")))] - eprintln!("Process {name} failed: {err:?}"); + eprintln!("Process {name}: failed {err:?}"); } } let _ = tx.send((id, res)); // ignore error if supervisor already gone - }); + }) } diff --git a/src/runtime_process.rs b/src/runtime_process.rs index 184da1f..d674399 100644 --- a/src/runtime_process.rs +++ b/src/runtime_process.rs @@ -70,4 +70,8 @@ where fn process_handle(&self) -> Arc { R::process_handle(self) } + + fn process_name(&self) -> Cow<'static, str> { + R::process_name(self) + } } From f4d6b97d7267ed0d19df83c8f6853640115dd5c6 Mon Sep 17 00:00:00 2001 From: Marc Riegel Date: Tue, 8 Jul 2025 22:43:41 +0200 Subject: [PATCH 17/22] Avoid unused variable warning by renaming elapsed to _elapsed --- src/process_manager.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/process_manager.rs b/src/process_manager.rs index 559859f..60ec9b7 100644 --- a/src/process_manager.rs +++ b/src/process_manager.rs @@ -352,13 +352,13 @@ impl ProcessControlHandler for Handle { let dur = Duration::from_secs(30); let now = Instant::now(); let timeout = tokio::time::timeout(dur, h.shutdown()).await; - let elapsed = now.elapsed(); + let _elapsed = now.elapsed(); match timeout { Ok(_) => { // Shutdown ok #[cfg(feature = "tracing")] - ::tracing::info!(name = %name, elapsed = ?elapsed, "Shutdown completed"); + ::tracing::info!(name = %name, elapsed = ?_elapsed, "Shutdown completed"); #[cfg(all(not(feature = "tracing"), feature = "log"))] ::log::info!("Process {name}: shutdown completed"); #[cfg(all(not(feature = "tracing"), not(feature = "log")))] @@ -368,7 +368,7 @@ impl ProcessControlHandler for Handle { jh.abort(); // Timed out. #[cfg(feature = "tracing")] - ::tracing::info!(name = %name, elapsed = ?elapsed, "Shutdown timed out"); + ::tracing::info!(name = %name, elapsed = ?_elapsed, "Shutdown timed out"); #[cfg(all(not(feature = "tracing"), feature = "log"))] ::log::info!("Process {name}: Shutdown timed out after {dur:?}"); #[cfg(all(not(feature = "tracing"), not(feature = "log")))] From 2ea1b1ddb595dc024bb4d47cbdaf730e4f4697d2 Mon Sep 17 00:00:00 2001 From: Marc Riegel Date: Tue, 8 Jul 2025 22:50:25 +0200 Subject: [PATCH 18/22] Revise README to expand and clarify ProcessManager usage and features --- README.md | 203 +++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 157 insertions(+), 46 deletions(-) diff --git a/README.md b/README.md index 01388fd..b47ce20 100644 --- a/README.md +++ b/README.md @@ -1,73 +1,156 @@ # ProcessManager -Manage multiple running services. A ProcessManager collects impl of `Runnable` -and takes over the runtime management like starting, stopping (graceful or in -failure) of services. +`ProcessManager` is a light-weight **Tokio based supervisor** for coordinating +multiple long-running asynchronous tasks (called *processes*). +It takes care of -If one service fails, the manager initiates a graceful shutdown for all other. +* spawning all registered processes, +* forwarding *reload* / *shutdown* commands, +* propagating errors, +* initiating a **graceful shutdown** of the whole tree once any process fails, +* and optionally cleaning up completed children to avoid memory leaks. -# Examples +The crate is completely runtime-agnostic except for its dependency on Tokio +tasks behind the scenes; you are free to use any async code in your own +processes. + +--- + +## Table of Contents + +1. [Features](#features) +2. [Installation](#installation) +3. [Quick-Start](#quick-start) +4. [Built-in Helpers](#built-in-helpers) +5. [Examples](#examples) +6. [High-Level Architecture](#high-level-architecture) +7. [Crate Features](#crate-features) +8. [License](#license) + +--- + +## Features + +| Capability | Description | +| -------------------------------- | ------------------------------------------------------------------------- | +| Graceful shutdown | Propagates a single `shutdown` request to **all** children. | +| Dynamic child management | Add new `Runnable`s even while the manager is already running. | +| Error propagation | A failing child triggers a global shutdown and returns the *first* error. | +| Auto cleanup | Optionally remove finished children to keep memory usage bounded. | +| Hierarchical composition | Managers implement `Runnable` themselves → build arbitrary process trees. | +| Built-in helpers | See [`IdleProcess`](#built-in-helpers) and [`SignalReceiver`](#built-in-helpers). | + +--- + +## Installation + +Add the dependency to your `Cargo.toml`: + +```toml +[dependencies] +processmanager = "0" +``` + +Optional features are listed [below](#crate-features). + +--- + +## Quick-Start + +A minimal program that runs two workers for three seconds and then shuts down +gracefully: ```rust use processmanager::*; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; +use tokio::time::{interval, sleep}; -#[tokio::main] -async fn main() { +struct Worker { + id: usize, + guard: Arc, +} - #[derive(Default)] - struct ExampleController { - runtime_guard: RuntimeGuard, +impl Worker { + fn new(id: usize) -> Self { + Self { id, guard: Arc::new(RuntimeGuard::default()) } } +} - impl Runnable for ExampleController { - fn process_start(&self) -> ProcFuture<'_> { - Box::pin(async { - // Emit a heartbeat every second - let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1)); - // Create a ticker that also listens for shutdown / reload signals - let ticker = self.runtime_guard.runtime_ticker().await; - - loop { - match ticker.tick(interval.tick()).await { - ProcessOperation::Next(_) => println!("work"), - ProcessOperation::Control(RuntimeControlMessage::Shutdown) => { - println!("shutdown"); - break; - }, - ProcessOperation::Control(RuntimeControlMessage::Reload) => { - println!("trigger reload"); - }, - // ignore any future control messages we don't care about - ProcessOperation::Control(_) => continue, - } - } +impl Runnable for Worker { + fn process_start(&self) -> ProcFuture<'_> { + let id = self.id; + let guard = self.guard.clone(); + + Box::pin(async move { + let ticker = guard.runtime_ticker().await; + let mut beat = interval(Duration::from_secs(1)); - Ok(()) - }) - } + loop { + match ticker.tick(beat.tick()).await { + ProcessOperation::Next(_) => println!("worker-{id}: heartbeat"), + ProcessOperation::Control(RuntimeControlMessage::Shutdown) => break, + _ => continue, + } + } + Ok(()) + }) + } - fn process_handle(&self) -> Arc { - self.runtime_guard.handle() - } + fn process_handle(&self) -> Arc { + self.guard.handle() } +} - let mut manager = ProcessManager::new(); - manager.insert(ExampleController::default()); +#[tokio::main] +async fn main() { + let manager = ProcessManagerBuilder::default() + .pre_insert(Worker::new(0)) + .pre_insert(Worker::new(1)) + .build(); let handle = manager.process_handle(); - // start all processes - let _ = tokio::spawn(async move { - manager.process_start().await.expect("service start failed"); + tokio::spawn(async move { + manager.process_start().await.expect("manager error"); }); - // Shutdown waits for all services to shutdown gracefully. + sleep(Duration::from_secs(3)).await; handle.shutdown().await; } +``` + +--- + +## Built-in Helpers + +| Helper | Purpose | Feature flag | +| ---------------- | ------------------------------------------------------------------------------------------------------------------------ | ------------ | +| `IdleProcess` | Keeps an otherwise empty manager alive until an external shutdown is requested. | — | +| `SignalReceiver` | Listens for `SIGHUP`, `SIGINT`, `SIGTERM`, `SIGQUIT` and converts them into *shutdown / reload* control messages. | `signal` | +Enable `SignalReceiver` like this: + +```toml +processmanager = { version = "0", features = ["signal"] } ``` -# Architecture (high-level) + +--- + +## Examples + +Ready-to-run examples live in [`examples/`](examples/) and can be launched with +Cargo: + +| Command | Highlights | +| -------------------------------------------- | ------------------------------------------------------ | +| `cargo run --example simple` | Minimal setup, two workers, graceful shutdown | +| `cargo run --example dynamic_add` | Dynamically add workers while the manager is running | + +Feel free to copy or adapt the code for your own services. + +--- + +## High-Level Architecture ```mermaid flowchart TD @@ -80,3 +163,31 @@ flowchart TD Child2 -- control --> Mgr ExtHandle(External Handle) -- shutdown / reload --> Mgr ``` + +* Every `Runnable` gets its own Tokio task. +* A `ProcessControlHandler` allows external code to **shut down** or **reload** + a single process or the whole subtree. +* The first child that ends in `Err(_)` terminates the entire supervisor. + +--- + +## Crate Features + +| Feature | Default? | Description | +| --------- | -------- | ------------------------------------------------------------------- | +| `signal` | no | Activates `builtin::SignalReceiver` for Unix signal handling. | +| `tracing` | no | Emit structured log events via the `tracing` crate. | +| `log` | no | Use the `log` crate for textual logging if `tracing` is disabled. | + +Pick one of `tracing` **or** `log` to avoid duplicate output. + +--- + +## License + +Licensed under either of + +* Apache License, Version 2.0 +* MIT license + +at your option. From 7818212c96f850762934c2353627e9ee6e27701e Mon Sep 17 00:00:00 2001 From: Marc Riegel Date: Tue, 8 Jul 2025 22:51:30 +0200 Subject: [PATCH 19/22] Fix spacing in branch names and remove deprecated test job --- .github/workflows/ci.yml | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 18ac7bc..e087f4b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -2,9 +2,9 @@ name: Rust CI on: push: - branches: [ main, master ] + branches: [main, master] pull_request: - branches: [ main, master ] + branches: [main, master] jobs: build-test: @@ -49,9 +49,3 @@ jobs: - name: Run tests (no default features) run: cargo test --no-default-features - - - name: Feature powerset tests - uses: taiki-e/github-actions/cargo-hack@v0 - with: - command: test - args: --feature-powerset --depth 2 --skip no_default From 3498c43df876d5618c5eff484160989bf6740401 Mon Sep 17 00:00:00 2001 From: Marc Riegel Date: Tue, 8 Jul 2025 22:56:42 +0200 Subject: [PATCH 20/22] Fix doc test annotation in idle process example code block --- src/builtin/idle.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/builtin/idle.rs b/src/builtin/idle.rs index 702aaf3..3be9cd2 100644 --- a/src/builtin/idle.rs +++ b/src/builtin/idle.rs @@ -6,7 +6,7 @@ //! an external caller invokes [`ProcessControlHandler::shutdown`]. //! //! # Example -//! ```rust +//! ```rust,ignore //! use processmanager::*; //! //! // Manager without real children that should stay alive. From 08a9bf5474a63b6342b7b85fd6c9b300d6357075 Mon Sep 17 00:00:00 2001 From: Marc Riegel Date: Tue, 8 Jul 2025 22:59:10 +0200 Subject: [PATCH 21/22] Enable signal_receiver module only with "signal" feature flag --- src/builtin/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/builtin/mod.rs b/src/builtin/mod.rs index adf51eb..da92747 100644 --- a/src/builtin/mod.rs +++ b/src/builtin/mod.rs @@ -1,4 +1,5 @@ mod idle; +#[cfg(feature = "signal")] mod signal_receiver; pub use idle::IdleProcess; From 51ff78e3c03cee5503fa705d3cdc05904afcd7d0 Mon Sep 17 00:00:00 2001 From: Marc Riegel Date: Tue, 8 Jul 2025 23:01:05 +0200 Subject: [PATCH 22/22] bump v0.5.0 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 4c6d06c..c40ba73 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "processmanager" description = "manage process lifecycles, graceful shutdown and process faults" -version = "0.4.1" +version = "0.5.0" edition = "2024" authors = ["Marc Riegel "] license = "MIT"