From 009c02549ef0f8c4027f01d97d704fe4ca1c804b Mon Sep 17 00:00:00 2001 From: Marc Riegel Date: Wed, 9 Jul 2025 10:58:09 +0200 Subject: [PATCH 1/2] Refactor runtime and process manager internals - Improve documentation for `IdleProcess` and `SignalReceiver` - Add auto cleanup of finished children in `ProcessManager` - Clarify `ProcessManagerBuilder` docs and fix typo in field name - Expand docs for `RuntimeGuard`, `RuntimeHandle` and core traits - Enhance tracing of signal reception in `SignalReceiver` - Implement `RuntimeControlMessage` as non-exhaustive enum - Minor code style and comment refinements throughout --- src/builtin/idle.rs | 31 ++++++++++-------- src/builtin/signal_receiver.rs | 24 ++++++++++++-- src/process_manager.rs | 50 +++++++++++++++++++++++++---- src/process_manager_builder.rs | 23 ++++++++------ src/runtime_guard.rs | 58 ++++++++++++++++++++++++++++++++-- src/runtime_handle.rs | 13 ++++++++ src/runtime_process.rs | 39 +++++++++++++++++++++-- src/runtime_ticker.rs | 19 +++++++++-- 8 files changed, 217 insertions(+), 40 deletions(-) diff --git a/src/builtin/idle.rs b/src/builtin/idle.rs index 3be9cd2..aea215e 100644 --- a/src/builtin/idle.rs +++ b/src/builtin/idle.rs @@ -1,34 +1,37 @@ -//! A no-op [`Runnable`] that just idles until it receives a shutdown request. +//! A no-op [`Runnable`] that idles until it receives a shutdown request. //! -//! `IdleProcess` is handy as a “tombstone” child for a [`ProcessManager`] that -//! would otherwise stop immediately because it starts without any real -//! children. By registering an `IdleProcess`, the manager keeps running until -//! an external caller invokes [`ProcessControlHandler::shutdown`]. +//! `IdleProcess` is useful as a *tombstone* child for a [`ProcessManager`] that +//! would otherwise terminate immediately because it has no real children. +//! Registering an `IdleProcess` keeps the supervisor alive until an external +//! caller broadcasts [`ProcessControlHandler::shutdown`]. //! //! # Example -//! ```rust,ignore -//! use processmanager::*; +//! ```no_run +//! use processmanager::{IdleProcess, ProcessManagerBuilder}; //! -//! // Manager without real children that should stay alive. -//! let mgr = ProcessManagerBuilder::default() +//! // Supervisor without real children that should stay alive. +//! let _mgr = ProcessManagerBuilder::default() //! .pre_insert(IdleProcess::default()) //! .build(); //! ``` -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use crate::{ ProcFuture, ProcessControlHandler, ProcessOperation, Runnable, RuntimeControlMessage, RuntimeGuard, }; -/// A no-op process that simply waits for a shutdown request. +/// A *tombstone* process that simply waits for a shutdown request. +/// +/// It performs no work and exists solely to keep its parent +/// [`ProcessManager`] alive until an explicit shutdown is broadcast. #[derive(Debug, Default)] pub struct IdleProcess { runtime_guard: RuntimeGuard, } impl IdleProcess { - /// Create a new idle process. + /// Construct a fresh idle process. Equivalent to [`Default::default`]. pub fn new() -> Self { Self { runtime_guard: RuntimeGuard::default(), @@ -41,8 +44,8 @@ impl Runnable for IdleProcess { let ticker = self.runtime_guard.runtime_ticker().await; loop { - // Sleep for a long time; the ticker wakes us early on control messages. - let sleep = tokio::time::sleep(std::time::Duration::from_secs(3600)); + // Sleep for a long time; the ticker wakes us early when a control message arrives. + let sleep = tokio::time::sleep(Duration::from_secs(3600)); match ticker.tick(sleep).await { ProcessOperation::Next(_) => continue, diff --git a/src/builtin/signal_receiver.rs b/src/builtin/signal_receiver.rs index 1635f3b..49c1c33 100644 --- a/src/builtin/signal_receiver.rs +++ b/src/builtin/signal_receiver.rs @@ -1,3 +1,20 @@ +//! Unix signal integration. +//! +//! `SignalReceiver` listens for `SIGHUP`, `SIGINT`, `SIGTERM`, and `SIGQUIT` and +//! converts them into runtime-level control events so a [`ProcessManager`] can +//! perform a graceful shutdown or reload. +//! +//! This module is compiled only when the crate’s **`signal`** feature is +//! enabled. +//! +//! ```no_run +//! use processmanager::{builtin::SignalReceiver, ProcessManagerBuilder}; +//! +//! let mgr = ProcessManagerBuilder::default() +//! .pre_insert(SignalReceiver::default()) +//! .build(); +//! ``` +//! use crate::{ ProcFuture, ProcessControlHandler, ProcessOperation, Runnable, RuntimeControlMessage, RuntimeError, RuntimeGuard, @@ -9,6 +26,8 @@ use signal_hook_tokio::{Signals, SignalsInfo}; use std::sync::Arc; use tokio::sync::Mutex; +/// Built-in [`Runnable`] that converts POSIX signals into shutdown / reload +/// requests and propagates them through the process-manager runtime. pub struct SignalReceiver { signals: Mutex, signal_handle: Handle, @@ -19,7 +38,7 @@ impl SignalReceiver { pub fn new() -> Self { let signals = Signals::new([SIGHUP, SIGINT, SIGTERM, SIGQUIT]) .map_err(|err| RuntimeError::Internal { - message: format!("register process signals: {err:?}"), + message: format!("register signal handler: {err:?}"), }) .expect("signals to register"); @@ -53,7 +72,8 @@ impl Runnable for SignalReceiver { ProcessOperation::Control(_) => continue, }; - // tracing::warn!("Received process signal: {signal:?}"); + #[cfg(feature = "tracing")] + ::tracing::warn!(signal = ?signal, "Received process signal"); match signal { SIGHUP => { diff --git a/src/process_manager.rs b/src/process_manager.rs index 60ec9b7..afc7f85 100644 --- a/src/process_manager.rs +++ b/src/process_manager.rs @@ -1,4 +1,4 @@ -//! Dynamic supervisor for asynchronous `Runnable`s. +//! Dynamic supervisor for asynchronous [`Runnable`] implementations. // //! A `ProcessManager` can //! @@ -90,8 +90,14 @@ pub struct ProcessManager { id: usize, pre_start: Vec>, inner: Arc, - /// Optional human-readable name overriding the default `process-manager-`. + /// Optional human-readable name overriding the default + /// `"process-manager-"`. + /// + /// If `None`, [`ProcessManager::process_name`] falls back to the automatic + /// naming scheme. pub(crate) custom_name: Option>, + /// When `true`, children that finish *successfully* are removed from the + /// internal lists so that long-running supervisors do not leak memory. pub(crate) auto_cleanup: bool, } @@ -100,7 +106,15 @@ pub struct ProcessManager { /* ========================================================================== */ impl ProcessManager { - /// New manager with auto-cleanup of finished children enabled. + /// Creates a fresh supervisor. + /// + /// * A unique *process-manager id* is assigned automatically. + /// * [`auto_cleanup`](ProcessManager::auto_cleanup) is **enabled** by + /// default so that finished children are removed from the internal + /// bookkeeping lists. + /// + /// The manager may be configured further with [`insert`] **before** it is + /// started or with [`add`] **after** it is running. pub fn new() -> Self { let id = PID.fetch_add(1, Ordering::SeqCst); @@ -127,9 +141,10 @@ impl ProcessManager { } } - /// Register a child **before** the supervisor is started. + /// Registers a child **before** the supervisor itself is started. /// - /// Panics when called after [`process_start`](Runnable::process_start). + /// # Panics + /// Panics if the manager is already running. Use [`add`] in that case. pub fn insert(&mut self, process: impl Runnable) { assert!( !self.inner.running.load(Ordering::SeqCst), @@ -139,8 +154,10 @@ impl ProcessManager { .push(Arc::from(Box::new(process) as Box)); } - /// Add a child *while* the manager is already running. The child is spawned - /// immediately. Before start-up this behaves the same as [`crate::ProcessManager::insert`]. + /// Adds a child **while the manager is already running**. + /// + /// The new `Runnable` is spawned immediately in its own Tokio task. + /// Calling this method **before** start-up is equivalent to [`insert`]. pub fn add(&self, process: impl Runnable) { let proc: Arc = Arc::from(Box::new(process) as Box); @@ -291,6 +308,11 @@ impl Runnable for ProcessManager { }) } + /// Returns the supervisor’s public name. + /// + /// If [`custom_name`](ProcessManager::custom_name) is `Some`, that value is + /// returned verbatim; otherwise the default pattern + /// `"process-manager-"` is used. fn process_name(&self) -> Cow<'static, str> { if let Some(ref name) = self.custom_name { name.clone() @@ -299,6 +321,10 @@ impl Runnable for ProcessManager { } } + /// Returns a handle that can control *all* currently running children of + /// this manager. + /// + /// The handle can be cloned freely and used from any async context. fn process_handle(&self) -> Arc { Arc::new(Handle { inner: Arc::clone(&self.inner), @@ -321,6 +347,8 @@ struct Handle { } impl ProcessControlHandler for Handle { + /// Broadcasts [`ProcessControlHandler::shutdown`] to every currently active + /// child and waits for them to complete. fn shutdown(&self) -> CtrlFuture<'_> { let inner = Arc::clone(&self.inner); Box::pin(async move { @@ -381,6 +409,9 @@ impl ProcessControlHandler for Handle { }) } + /// Broadcasts [`ProcessControlHandler::reload`] to every active child. + /// The reload operations are executed in parallel and awaited before the + /// future completes. fn reload(&self) -> CtrlFuture<'_> { let inner = Arc::clone(&self.inner); Box::pin(async move { @@ -399,6 +430,11 @@ impl ProcessControlHandler for Handle { /* ========================================================================== */ /* Helper – spawn a single child */ /* ========================================================================== */ +/// Spawns one child task, converts panics into `RuntimeError`s and notifies the +/// supervisor through the *completion channel*. +/// +/// Accounting with [`Inner::active`] is done **before** the task is actually +/// spawned so the supervisor has an accurate count even if the spawn fails. fn spawn_child(id: usize, proc: Arc, inner: Arc) -> JoinHandle<()> { // increment *before* spawning the task – guarantees the counter is in sync diff --git a/src/process_manager_builder.rs b/src/process_manager_builder.rs index e0f1c58..eb71c10 100644 --- a/src/process_manager_builder.rs +++ b/src/process_manager_builder.rs @@ -1,13 +1,16 @@ //! Fluent builder for constructing a [`ProcessManager`]. //! -//! The main goal of this helper is to get rid of the “wrong phase” panics -//! (`insert()` vs. `add()`) by making the set-up phase explicit. All children -//! that are known at construction time are registered on the builder; -//! afterwards `build()` hands you a fully configured manager that can be -//! started immediately. +//! Building a supervisor is a *setup-time* activity, whereas actually running +//! it is a *runtime* concern. Mixing the two often leads to the familiar +//! “wrong phase” panic when someone calls [`ProcessManager::insert`] **after** +//! the manager has already started. //! -//! Further configuration knobs (metrics, names, tracing options…) can be added -//! here without changing the `ProcessManager` API again. +//! `ProcessManagerBuilder` makes the configuration phase explicit: every child, +//! option or tweak is registered **before** [`Self::build`] returns a +//! ready-to-run supervisor. +//! +//! Additional knobs (metrics, tracing, names, …) can be added here in the +//! future **without** touching the public API of [`ProcessManager`]. use crate::{ProcessManager, Runnable}; use std::borrow::Cow; @@ -36,7 +39,7 @@ pub struct ProcessManagerBuilder { custom_name: Option>, /// Deferred actions that will be executed against the manager right before /// it is returned to the caller. - initialisers: Vec, + initializers: Vec, } impl ProcessManagerBuilder { @@ -61,7 +64,7 @@ impl ProcessManagerBuilder { /// /// This is the compile-time safe counterpart to `ProcessManager::insert`. pub fn pre_insert(mut self, process: impl Runnable) -> Self { - self.initialisers + self.initializers .push(Box::new(move |mgr: &mut ProcessManager| { // At this point the manager is *not* running yet so `insert` // is always the correct method. @@ -81,7 +84,7 @@ impl ProcessManagerBuilder { mgr.custom_name = self.custom_name; // Run all queued initialisers (child registrations, …). - for init in self.initialisers { + for init in self.initializers { init(&mut mgr); } diff --git a/src/runtime_guard.rs b/src/runtime_guard.rs index 143c775..d287005 100644 --- a/src/runtime_guard.rs +++ b/src/runtime_guard.rs @@ -1,3 +1,43 @@ +//! Central runtime control hub. +//! +//! `RuntimeGuard` owns two *async* channels: a **global control** channel that +//! forwards messages from any number of [`RuntimeHandle`]s and the +//! **ticker channel** consumed by exactly one [`RuntimeTicker`]. +//! +//! The guard exposes three high-level operations: +//! +//! 1. [`runtime_ticker`](RuntimeGuard::runtime_ticker) – creates the sole ticker +//! and connects it to the control fan-out. +//! 2. [`handle`](RuntimeGuard::handle) – returns a cheap, clonable +//! [`ProcessControlHandler`] that broadcasts control messages. +//! 3. [`is_running`](RuntimeGuard::is_running) / +//! [`block_until_shutdown`](RuntimeGuard::block_until_shutdown) – helpers +//! for observing runtime state in tests and demos. +//! +//! Dropping the ticker closes the channel which in turn lets [`is_running`] +//! report `false`. Constructing a second ticker is prohibited and will panic. +//! +//! Internally the guard spawns a “fan-out” task that waits for control messages +//! and forwards them to the ticker once it exists. +//! +//! # Concurrency & safety +//! +//! All interior mutability is protected by `tokio::sync::Mutex`; therefore +//! `RuntimeGuard` is `Send + Sync`. +//! +//! --- +//! +//! ```no_run +//! # use processmanager::*; +//! # async fn demo() { +//! let guard = RuntimeGuard::new(); +//! let ticker = guard.runtime_ticker().await; +//! let handle = guard.handle(); +//! +//! handle.reload().await; // broadcast control instruction +//! /* ... */ +//! # } +//! ``` use std::sync::Arc; use tokio::sync::Mutex; @@ -21,6 +61,10 @@ unsafe impl Send for RuntimeGuard {} unsafe impl Sync for RuntimeGuard {} impl RuntimeGuard { + /// Create a fresh guard and spawn the internal *fan-out* task. + /// + /// The returned instance is ready for immediate use; you typically call + /// [`runtime_ticker`](Self::runtime_ticker) right after construction. pub fn new() -> Self { let (sender, mut receiver) = tokio::sync::mpsc::channel(1); @@ -52,7 +96,11 @@ impl RuntimeGuard { } } - /// Create a ticker for the caller and connect it to the control fan-out. + /// Create the **single** [`RuntimeTicker`] and connect it to the fan-out. + /// + /// # Panics + /// Panics if a ticker has already been created – i.e. the runtime is + /// considered “running”. pub async fn runtime_ticker(&self) -> RuntimeTicker { assert!( !self.is_running().await, @@ -65,19 +113,25 @@ impl RuntimeGuard { ticker } + /// Returns `true` while the ticker (and therefore the runtime) is alive. pub async fn is_running(&self) -> bool { let lock = self.inner.runtime_ticker_ch_sender.lock().await; let closed = lock.as_ref().map(|s| s.is_closed()).unwrap_or(true); !closed } + /// Obtain a clonable [`ProcessControlHandler`] that broadcasts control + /// messages to the ticker. pub fn handle(&self) -> Arc { Arc::new(RuntimeHandle::new(Arc::clone( &self.inner.control_ch_sender, ))) } - /// Busy-wait helper for tests / demos. + /// **Busy-wait** helper for tests and demos. + /// + /// Polls [`is_running`](Self::is_running) once every 10 ms until it + /// returns `false`. pub async fn block_until_shutdown(&self) { while self.is_running().await { tokio::time::sleep(std::time::Duration::from_millis(10)).await; diff --git a/src/runtime_handle.rs b/src/runtime_handle.rs index 6cf96cb..303bd1b 100644 --- a/src/runtime_handle.rs +++ b/src/runtime_handle.rs @@ -1,9 +1,22 @@ +//! [`ProcessControlHandler`] implementation that forwards runtime‐level control +//! messages (`Shutdown`, `Reload`, …) to a shared `tokio::mpsc::Sender`. +//! +//! Cloning a `RuntimeHandle` is cheap – all clones share the same underlying +//! channel. The async [`shutdown`](ProcessControlHandler::shutdown) and +//! [`reload`](ProcessControlHandler::reload) methods enqueue the requested +//! operation and return immediately without waiting for it to be executed. +// use std::sync::Arc; use tokio::sync::Mutex; use crate::{CtrlFuture, ProcessControlHandler, RuntimeControlMessage}; +/// Handle produced by [`RuntimeGuard::handle`](crate::RuntimeGuard::handle). +/// +/// Acts as a concrete [`ProcessControlHandler`]: every instruction is simply +/// forwarded to the runtime’s central control channel. The handle can be +/// cloned and sent across tasks at will. #[derive(Debug, Clone)] pub struct RuntimeHandle { control_ch: Arc>>, diff --git a/src/runtime_process.rs b/src/runtime_process.rs index d674399..3b5060d 100644 --- a/src/runtime_process.rs +++ b/src/runtime_process.rs @@ -1,3 +1,12 @@ +/// Runtime-level traits and helper types. +/// +/// This module defines: +/// • [`Runnable`] – abstraction for long-running async components supervised by +/// a `ProcessManager`. +/// • [`ProcessControlHandler`] – fire-and-forget interface for broadcasting +/// control messages. +/// • [`RuntimeControlMessage`] – set of built-in control messages understood by +/// the runtime helpers (`RuntimeGuard`, `RuntimeTicker`, …). use super::RuntimeError; use std::borrow::Cow; use std::future::Future; @@ -7,7 +16,14 @@ use std::sync::Arc; /// Boxed future returned by [`Runnable::process_start`]. pub type ProcFuture<'a> = Pin> + Send + 'a>>; -/// A long-running asynchronous component managed by the `ProcessManager`. +/// Trait implemented by every long-running asynchronous component that should +/// be supervised by a [`ProcessManager`]. +/// +/// The trait is **object-safe** and requires `Send + Sync + 'static`, allowing +/// implementors to be moved across tasks and shared between threads. +/// +/// The lifetime parameter on [`process_start`](Runnable::process_start) lets an +/// implementor return a future that borrows from `self` if desired. pub trait Runnable where Self: Send + Sync + 'static, @@ -28,7 +44,11 @@ where /// Boxed future returned by [`ProcessControlHandler`] control methods. pub type CtrlFuture<'a> = Pin + Send + 'a>>; -/// Handle that allows external code to control a running [`Runnable`]. +/// Minimal handle that external code can use to *control* a running +/// [`Runnable`]. +/// +/// All methods are **fire-and-forget**: they enqueue the requested control +/// instruction and return once the message has been sent. pub trait ProcessControlHandler: Send + Sync { fn shutdown(&self) -> CtrlFuture<'_>; fn reload(&self) -> CtrlFuture<'_>; @@ -39,15 +59,28 @@ pub enum ProcessOperation { Control(RuntimeControlMessage), } +/// Built-in control messages understood by runtime helpers such as +/// [`RuntimeTicker`] and [`RuntimeGuard`]. +/// Built-in control messages understood by runtime helpers such as +/// [`RuntimeGuard`](crate::RuntimeGuard) and [`RuntimeTicker`](crate::RuntimeTicker). +/// +/// The enum is marked `#[non_exhaustive]`, requiring downstream crates to add a +/// wildcard arm (`_`) when pattern-matching so that new variants introduced in +/// future releases do not break compilation. +#[non_exhaustive] #[derive(Debug)] pub enum RuntimeControlMessage { + /// Trigger a *hot reload*. Reload, + /// Request a *graceful shutdown*. Shutdown, - /// User-defined messages for future extensibility. + /// User-defined message for custom extensions. Custom(Box), } impl Clone for RuntimeControlMessage { + /// Manual implementation is required because the enum is + /// `#[non_exhaustive]`; remember to update this when adding new variants. fn clone(&self) -> Self { match self { RuntimeControlMessage::Reload => RuntimeControlMessage::Reload, diff --git a/src/runtime_ticker.rs b/src/runtime_ticker.rs index f68822e..b36444c 100644 --- a/src/runtime_ticker.rs +++ b/src/runtime_ticker.rs @@ -1,3 +1,14 @@ +//! Cooperative work / control multiplexer. +//! +//! A `RuntimeTicker` receives **control messages** from the runtime and +//! concurrently drives a caller-supplied *work future*. Call +//! [`tick`](Self::tick) with the future that represents *one unit of user +//! work*; the method races it against incoming [`RuntimeControlMessage`]s and +//! returns a [`ProcessOperation`] that tells the caller what happened first. +//! +//! The ticker is created internally by [`RuntimeGuard::runtime_ticker`]. Only +//! one ticker may exist at any time. +use std::future::Future; use std::sync::Arc; use tokio::{select, sync::Mutex}; @@ -22,8 +33,12 @@ impl RuntimeTicker { ) } - /// Await `fut` and the control channel concurrently, returning whichever - /// completes first. + /// Race `fut` against the next control message and return the winner. + /// + /// Only one mutex lock is taken per call to inspect the control channel. + /// The returned [`ProcessOperation`] describes what completed first: + /// * `ProcessOperation::Next(res)` – the work future finished and yielded `res`. + /// * `ProcessOperation::Control(msg)` – a control instruction (`Reload`, `Shutdown`, …) was received. pub async fn tick(&self, fut: Fut) -> ProcessOperation where Fut: Future, From 0cd21ec5b268d7420f91bca7721cec7c23b96372 Mon Sep 17 00:00:00 2001 From: Marc Riegel Date: Wed, 9 Jul 2025 11:12:35 +0200 Subject: [PATCH 2/2] Remove unused import and add match arm placeholder for control message --- examples/dynamic_add.rs | 2 -- src/lib.rs | 1 + src/process_manager.rs | 1 - 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/examples/dynamic_add.rs b/examples/dynamic_add.rs index d5f7e0e..41caee6 100644 --- a/examples/dynamic_add.rs +++ b/examples/dynamic_add.rs @@ -11,8 +11,6 @@ //! ```bash //! cargo run --example dynamic_add //! ``` -mod simple; - use processmanager::*; use std::{sync::Arc, time::Duration}; use tokio::time::{interval, sleep}; diff --git a/src/lib.rs b/src/lib.rs index 3c18649..a144d83 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,6 +33,7 @@ pub mod builtin; /// }, /// ProcessOperation::Control(RuntimeControlMessage::Reload) => println!("trigger relead"), /// ProcessOperation::Control(RuntimeControlMessage::Custom(_)) => println!("trigger custom action"), +/// ProcessOperation::Control(_) => unimplemented!(), /// } /// } /// diff --git a/src/process_manager.rs b/src/process_manager.rs index afc7f85..a370e39 100644 --- a/src/process_manager.rs +++ b/src/process_manager.rs @@ -435,7 +435,6 @@ impl ProcessControlHandler for Handle { /// /// Accounting with [`Inner::active`] is done **before** the task is actually /// spawned so the supervisor has an accurate count even if the spawn fails. - fn spawn_child(id: usize, proc: Arc, inner: Arc) -> JoinHandle<()> { // increment *before* spawning the task – guarantees the counter is in sync inner.active.fetch_add(1, Ordering::SeqCst);