Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions examples/dynamic_add.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
//! ```bash
//! cargo run --example dynamic_add
//! ```
mod simple;

use processmanager::*;
use std::{sync::Arc, time::Duration};
use tokio::time::{interval, sleep};
Expand Down
31 changes: 17 additions & 14 deletions src/builtin/idle.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,37 @@
//! A no-op [`Runnable`] that just idles until it receives a shutdown request.
//! A no-op [`Runnable`] that idles until it receives a shutdown request.
//!
//! `IdleProcess` is handy as a tombstone child for a [`ProcessManager`] that
//! would otherwise stop immediately because it starts without any real
//! children. By registering an `IdleProcess`, the manager keeps running until
//! an external caller invokes [`ProcessControlHandler::shutdown`].
//! `IdleProcess` is useful as a *tombstone* child for a [`ProcessManager`] that
//! would otherwise terminate immediately because it has no real children.
//! Registering an `IdleProcess` keeps the supervisor alive until an external
//! caller broadcasts [`ProcessControlHandler::shutdown`].
//!
//! # Example
//! ```rust,ignore
//! use processmanager::*;
//! ```no_run
//! use processmanager::{IdleProcess, ProcessManagerBuilder};
//!
//! // Manager without real children that should stay alive.
//! let mgr = ProcessManagerBuilder::default()
//! // Supervisor without real children that should stay alive.
//! let _mgr = ProcessManagerBuilder::default()
//! .pre_insert(IdleProcess::default())
//! .build();
//! ```
use std::sync::Arc;
use std::{sync::Arc, time::Duration};

use crate::{
ProcFuture, ProcessControlHandler, ProcessOperation, Runnable, RuntimeControlMessage,
RuntimeGuard,
};

/// A no-op process that simply waits for a shutdown request.
/// A *tombstone* process that simply waits for a shutdown request.
///
/// It performs no work and exists solely to keep its parent
/// [`ProcessManager`] alive until an explicit shutdown is broadcast.
#[derive(Debug, Default)]
pub struct IdleProcess {
runtime_guard: RuntimeGuard,
}

impl IdleProcess {
/// Create a new idle process.
/// Construct a fresh idle process. Equivalent to [`Default::default`].
pub fn new() -> Self {
Self {
runtime_guard: RuntimeGuard::default(),
Expand All @@ -41,8 +44,8 @@ impl Runnable for IdleProcess {
let ticker = self.runtime_guard.runtime_ticker().await;

loop {
// Sleep for a long time; the ticker wakes us early on control messages.
let sleep = tokio::time::sleep(std::time::Duration::from_secs(3600));
// Sleep for a long time; the ticker wakes us early when a control message arrives.
let sleep = tokio::time::sleep(Duration::from_secs(3600));

match ticker.tick(sleep).await {
ProcessOperation::Next(_) => continue,
Expand Down
24 changes: 22 additions & 2 deletions src/builtin/signal_receiver.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
//! Unix signal integration.
//!
//! `SignalReceiver` listens for `SIGHUP`, `SIGINT`, `SIGTERM`, and `SIGQUIT` and
//! converts them into runtime-level control events so a [`ProcessManager`] can
//! perform a graceful shutdown or reload.
//!
//! This module is compiled only when the crate’s **`signal`** feature is
//! enabled.
//!
//! ```no_run
//! use processmanager::{builtin::SignalReceiver, ProcessManagerBuilder};
//!
//! let mgr = ProcessManagerBuilder::default()
//! .pre_insert(SignalReceiver::default())
//! .build();
//! ```
//!
use crate::{
ProcFuture, ProcessControlHandler, ProcessOperation, Runnable, RuntimeControlMessage,
RuntimeError, RuntimeGuard,
Expand All @@ -9,6 +26,8 @@ use signal_hook_tokio::{Signals, SignalsInfo};
use std::sync::Arc;
use tokio::sync::Mutex;

/// Built-in [`Runnable`] that converts POSIX signals into shutdown / reload
/// requests and propagates them through the process-manager runtime.
pub struct SignalReceiver {
signals: Mutex<SignalsInfo>,
signal_handle: Handle,
Expand All @@ -19,7 +38,7 @@ impl SignalReceiver {
pub fn new() -> Self {
let signals = Signals::new([SIGHUP, SIGINT, SIGTERM, SIGQUIT])
.map_err(|err| RuntimeError::Internal {
message: format!("register process signals: {err:?}"),
message: format!("register signal handler: {err:?}"),
})
.expect("signals to register");

Expand Down Expand Up @@ -53,7 +72,8 @@ impl Runnable for SignalReceiver {
ProcessOperation::Control(_) => continue,
};

// tracing::warn!("Received process signal: {signal:?}");
#[cfg(feature = "tracing")]
::tracing::warn!(signal = ?signal, "Received process signal");

match signal {
SIGHUP => {
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub mod builtin;
/// },
/// ProcessOperation::Control(RuntimeControlMessage::Reload) => println!("trigger relead"),
/// ProcessOperation::Control(RuntimeControlMessage::Custom(_)) => println!("trigger custom action"),
/// ProcessOperation::Control(_) => unimplemented!(),
/// }
/// }
///
Expand Down
50 changes: 43 additions & 7 deletions src/process_manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! Dynamic supervisor for asynchronous `Runnable`s.
//! Dynamic supervisor for asynchronous [`Runnable`] implementations.
//
//! A `ProcessManager` can
//!
Expand Down Expand Up @@ -90,13 +90,27 @@ pub struct ProcessManager {
id: usize,
pre_start: Vec<Arc<dyn Runnable>>,
inner: Arc<Inner>,
/// Optional human-readable name overriding the default `process-manager-<id>`.
/// Optional human-readable name overriding the default
/// `"process-manager-<id>"`.
///
/// If `None`, [`ProcessManager::process_name`] falls back to the automatic
/// naming scheme.
pub(crate) custom_name: Option<Cow<'static, str>>,
/// When `true`, children that finish *successfully* are removed from the
/// internal lists so that long-running supervisors do not leak memory.
pub(crate) auto_cleanup: bool,
}

impl ProcessManager {
/// New manager with auto-cleanup of finished children enabled.
/// Creates a fresh supervisor.
///
/// * A unique *process-manager id* is assigned automatically.
/// * [`auto_cleanup`](ProcessManager::auto_cleanup) is **enabled** by
/// default so that finished children are removed from the internal
/// bookkeeping lists.
///
/// The manager may be configured further with [`insert`] **before** it is
/// started or with [`add`] **after** it is running.
pub fn new() -> Self {
let id = PID.fetch_add(1, Ordering::SeqCst);

Expand All @@ -123,9 +137,10 @@ impl ProcessManager {
}
}

/// Register a child **before** the supervisor is started.
/// Registers a child **before** the supervisor itself is started.
///
/// Panics when called after [`process_start`](Runnable::process_start).
/// # Panics
/// Panics if the manager is already running. Use [`add`] in that case.
pub fn insert(&mut self, process: impl Runnable) {
assert!(
!self.inner.running.load(Ordering::SeqCst),
Expand All @@ -135,8 +150,10 @@ impl ProcessManager {
.push(Arc::from(Box::new(process) as Box<dyn Runnable>));
}

/// Add a child *while* the manager is already running. The child is spawned
/// immediately. Before start-up this behaves the same as [`crate::ProcessManager::insert`].
/// Adds a child **while the manager is already running**.
///
/// The new `Runnable` is spawned immediately in its own Tokio task.
/// Calling this method **before** start-up is equivalent to [`insert`].
pub fn add(&self, process: impl Runnable) {
let proc: Arc<dyn Runnable> = Arc::from(Box::new(process) as Box<dyn Runnable>);

Expand Down Expand Up @@ -283,6 +300,11 @@ impl Runnable for ProcessManager {
})
}

/// Returns the supervisor’s public name.
///
/// If [`custom_name`](ProcessManager::custom_name) is `Some`, that value is
/// returned verbatim; otherwise the default pattern
/// `"process-manager-<id>"` is used.
fn process_name(&self) -> Cow<'static, str> {
if let Some(ref name) = self.custom_name {
name.clone()
Expand All @@ -291,6 +313,10 @@ impl Runnable for ProcessManager {
}
}

/// Returns a handle that can control *all* currently running children of
/// this manager.
///
/// The handle can be cloned freely and used from any async context.
fn process_handle(&self) -> Arc<dyn ProcessControlHandler> {
Arc::new(Handle {
inner: Arc::clone(&self.inner),
Expand All @@ -309,6 +335,8 @@ struct Handle {
}

impl ProcessControlHandler for Handle {
/// Broadcasts [`ProcessControlHandler::shutdown`] to every currently active
/// child and waits for them to complete.
fn shutdown(&self) -> CtrlFuture<'_> {
let inner = Arc::clone(&self.inner);
Box::pin(async move {
Expand Down Expand Up @@ -369,6 +397,9 @@ impl ProcessControlHandler for Handle {
})
}

/// Broadcasts [`ProcessControlHandler::reload`] to every active child.
/// The reload operations are executed in parallel and awaited before the
/// future completes.
fn reload(&self) -> CtrlFuture<'_> {
let inner = Arc::clone(&self.inner);
Box::pin(async move {
Expand All @@ -384,6 +415,11 @@ impl ProcessControlHandler for Handle {
}
}

/// Spawns one child task, converts panics into `RuntimeError`s and notifies the
/// supervisor through the *completion channel*.
///
/// Accounting with [`Inner::active`] is done **before** the task is actually
/// spawned so the supervisor has an accurate count even if the spawn fails.
fn spawn_child(id: usize, proc: Arc<dyn Runnable>, inner: Arc<Inner>) -> JoinHandle<()> {
// increment *before* spawning the task – guarantees the counter is in sync
inner.active.fetch_add(1, Ordering::SeqCst);
Expand Down
23 changes: 13 additions & 10 deletions src/process_manager_builder.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
//! Fluent builder for constructing a [`ProcessManager`].
//!
//! The main goal of this helper is to get rid of the “wrong phase” panics
//! (`insert()` vs. `add()`) by making the set-up phase explicit. All children
//! that are known at construction time are registered on the builder;
//! afterwards `build()` hands you a fully configured manager that can be
//! started immediately.
//! Building a supervisor is a *setup-time* activity, whereas actually running
//! it is a *runtime* concern. Mixing the two often leads to the familiar
//! “wrong phase” panic when someone calls [`ProcessManager::insert`] **after**
//! the manager has already started.
//!
//! Further configuration knobs (metrics, names, tracing options…) can be added
//! here without changing the `ProcessManager` API again.
//! `ProcessManagerBuilder` makes the configuration phase explicit: every child,
//! option or tweak is registered **before** [`Self::build`] returns a
//! ready-to-run supervisor.
//!
//! Additional knobs (metrics, tracing, names, …) can be added here in the
//! future **without** touching the public API of [`ProcessManager`].
use crate::{ProcessManager, Runnable};
use std::borrow::Cow;

Expand Down Expand Up @@ -36,7 +39,7 @@ pub struct ProcessManagerBuilder {
custom_name: Option<Cow<'static, str>>,
/// Deferred actions that will be executed against the manager right before
/// it is returned to the caller.
initialisers: Vec<BoxedInitializer>,
initializers: Vec<BoxedInitializer>,
}

impl ProcessManagerBuilder {
Expand All @@ -61,7 +64,7 @@ impl ProcessManagerBuilder {
///
/// This is the compile-time safe counterpart to `ProcessManager::insert`.
pub fn pre_insert(mut self, process: impl Runnable) -> Self {
self.initialisers
self.initializers
.push(Box::new(move |mgr: &mut ProcessManager| {
// At this point the manager is *not* running yet so `insert`
// is always the correct method.
Expand All @@ -81,7 +84,7 @@ impl ProcessManagerBuilder {
mgr.custom_name = self.custom_name;

// Run all queued initialisers (child registrations, …).
for init in self.initialisers {
for init in self.initializers {
init(&mut mgr);
}

Expand Down
58 changes: 56 additions & 2 deletions src/runtime_guard.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,43 @@
//! Central runtime control hub.
//!
//! `RuntimeGuard` owns two *async* channels: a **global control** channel that
//! forwards messages from any number of [`RuntimeHandle`]s and the
//! **ticker channel** consumed by exactly one [`RuntimeTicker`].
//!
//! The guard exposes three high-level operations:
//!
//! 1. [`runtime_ticker`](RuntimeGuard::runtime_ticker) – creates the sole ticker
//! and connects it to the control fan-out.
//! 2. [`handle`](RuntimeGuard::handle) – returns a cheap, clonable
//! [`ProcessControlHandler`] that broadcasts control messages.
//! 3. [`is_running`](RuntimeGuard::is_running) /
//! [`block_until_shutdown`](RuntimeGuard::block_until_shutdown) – helpers
//! for observing runtime state in tests and demos.
//!
//! Dropping the ticker closes the channel which in turn lets [`is_running`]
//! report `false`. Constructing a second ticker is prohibited and will panic.
//!
//! Internally the guard spawns a “fan-out” task that waits for control messages
//! and forwards them to the ticker once it exists.
//!
//! # Concurrency & safety
//!
//! All interior mutability is protected by `tokio::sync::Mutex`; therefore
//! `RuntimeGuard` is `Send + Sync`.
//!
//! ---
//!
//! ```no_run
//! # use processmanager::*;
//! # async fn demo() {
//! let guard = RuntimeGuard::new();
//! let ticker = guard.runtime_ticker().await;
//! let handle = guard.handle();
//!
//! handle.reload().await; // broadcast control instruction
//! /* ... */
//! # }
//! ```
use std::sync::Arc;

use tokio::sync::Mutex;
Expand All @@ -16,6 +56,10 @@ struct Inner {
}

impl RuntimeGuard {
/// Create a fresh guard and spawn the internal *fan-out* task.
///
/// The returned instance is ready for immediate use; you typically call
/// [`runtime_ticker`](Self::runtime_ticker) right after construction.
pub fn new() -> Self {
let (sender, mut receiver) = tokio::sync::mpsc::channel(1);

Expand Down Expand Up @@ -47,7 +91,11 @@ impl RuntimeGuard {
}
}

/// Create a ticker for the caller and connect it to the control fan-out.
/// Create the **single** [`RuntimeTicker`] and connect it to the fan-out.
///
/// # Panics
/// Panics if a ticker has already been created – i.e. the runtime is
/// considered “running”.
pub async fn runtime_ticker(&self) -> RuntimeTicker {
assert!(
!self.is_running().await,
Expand All @@ -60,19 +108,25 @@ impl RuntimeGuard {
ticker
}

/// Returns `true` while the ticker (and therefore the runtime) is alive.
pub async fn is_running(&self) -> bool {
let lock = self.inner.runtime_ticker_ch_sender.lock().await;
let closed = lock.as_ref().map(|s| s.is_closed()).unwrap_or(true);
!closed
}

/// Obtain a clonable [`ProcessControlHandler`] that broadcasts control
/// messages to the ticker.
pub fn handle(&self) -> Arc<dyn ProcessControlHandler> {
Arc::new(RuntimeHandle::new(Arc::clone(
&self.inner.control_ch_sender,
)))
}

/// Busy-wait helper for tests / demos.
/// **Busy-wait** helper for tests and demos.
///
/// Polls [`is_running`](Self::is_running) once every 10 ms until it
/// returns `false`.
pub async fn block_until_shutdown(&self) {
while self.is_running().await {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
Expand Down
Loading
Loading