Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions crates/core/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,11 @@ pub enum NodeState {
/// - Partial functionality (some features unavailable)
///
/// The node continues processing but users should be aware of reduced quality.
Degraded { reason: String },
Degraded {
reason: String,
#[ts(type = "JsonValue")]
details: Option<serde_json::Value>,
},

/// Node has encountered a fatal error and stopped processing.
/// Manual intervention is required to restart the node.
Expand Down Expand Up @@ -306,7 +310,8 @@ pub mod state_helpers {
state_tx: &mpsc::Sender<NodeStateUpdate>,
node_name: &str,
reason: impl Into<String>,
details: Option<serde_json::Value>,
) {
emit_state(state_tx, node_name, NodeState::Degraded { reason: reason.into() });
emit_state(state_tx, node_name, NodeState::Degraded { reason: reason.into(), details });
}
}
88 changes: 80 additions & 8 deletions crates/core/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//!
//! This module provides types and utilities for collecting runtime statistics
//! from nodes during pipeline execution. Statistics are throttled to prevent
//! overload (typically every 10 seconds or 1000 packets).
//! overload (typically every 2 seconds or 1000 packets).

use serde::{Deserialize, Serialize};
use std::time::SystemTime;
Expand Down Expand Up @@ -35,7 +35,7 @@ impl Default for NodeStats {
}

/// A statistics update message sent by a node to report its current metrics.
/// These updates are throttled to prevent overload (typically every 10s or 1000 packets).
/// These updates are throttled to prevent overload (typically every 2s or 1000 packets).
#[derive(Debug, Clone)]
pub struct NodeStatsUpdate {
/// The unique identifier of the node reporting the stats
Expand All @@ -47,18 +47,19 @@ pub struct NodeStatsUpdate {
}

/// Helper for tracking and throttling node statistics updates.
/// Automatically sends updates every 10 seconds or 1000 packets.
/// Automatically sends updates every 2 seconds or 1000 packets.
pub struct NodeStatsTracker {
stats: NodeStats,
start_time: std::time::Instant,
last_send: std::time::Instant,
has_sent_once: bool,
node_id: String,
stats_tx: Option<tokio::sync::mpsc::Sender<NodeStatsUpdate>>,
}

impl NodeStatsTracker {
/// Throttling configuration
const SEND_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10);
const SEND_INTERVAL: std::time::Duration = std::time::Duration::from_secs(2);
const SEND_PACKET_THRESHOLD: u64 = 1000;

/// Create a new stats tracker for a node
Expand All @@ -67,7 +68,14 @@ impl NodeStatsTracker {
stats_tx: Option<tokio::sync::mpsc::Sender<NodeStatsUpdate>>,
) -> Self {
let now = std::time::Instant::now();
Self { stats: NodeStats::default(), start_time: now, last_send: now, node_id, stats_tx }
Self {
stats: NodeStats::default(),
start_time: now,
last_send: now,
has_sent_once: false,
node_id,
stats_tx,
}
}

/// Record a received packet
Expand All @@ -88,23 +96,49 @@ impl NodeStatsTracker {
self.stats.sent += 1;
}

/// Record multiple sent packets (for batched stats reporting)
#[inline]
pub const fn sent_n(&mut self, count: u64) {
self.stats.sent += count;
}

/// Record a discarded packet
#[inline]
pub const fn discarded(&mut self) {
self.stats.discarded += 1;
}

/// Record multiple discarded packets (for batched stats reporting)
#[inline]
pub const fn discarded_n(&mut self, count: u64) {
self.stats.discarded += count;
}

/// Record an error
#[inline]
pub const fn errored(&mut self) {
self.stats.errored += 1;
}

/// Automatically send stats if threshold is met (every 10s or 1000 packets).
/// Record multiple errors (for batched stats reporting)
#[inline]
pub const fn errored_n(&mut self, count: u64) {
self.stats.errored += count;
}

/// Automatically send stats if threshold is met (every 2s or 1000 packets).
/// Call this after processing a batch of packets.
pub fn maybe_send(&mut self) {
let should_send = self.last_send.elapsed() >= Self::SEND_INTERVAL
|| self.stats.received.is_multiple_of(Self::SEND_PACKET_THRESHOLD);
// Many nodes only increment one side of the counters (e.g. pure sources only `sent`,
// pure sinks only `received`). Use the max to keep the threshold behavior consistent
// across node shapes and avoid the `0.is_multiple_of(..)` pitfall.
let packet_count = self.stats.received.max(self.stats.sent).max(self.stats.discarded);

// Always emit the first non-empty snapshot promptly so monitoring can "lock on"
// even if the node later blocks under backpressure.
let should_send = (!self.has_sent_once && packet_count > 0)
|| self.last_send.elapsed() >= Self::SEND_INTERVAL
|| (packet_count > 0 && packet_count.is_multiple_of(Self::SEND_PACKET_THRESHOLD));

if should_send {
self.force_send();
Expand All @@ -123,6 +157,44 @@ impl NodeStatsTracker {
timestamp: SystemTime::now(),
});
self.last_send = std::time::Instant::now();
self.has_sent_once = true;
}
}
}

#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use tokio::sync::mpsc;

#[tokio::test]
async fn maybe_send_does_not_fire_when_empty() {
let (tx, mut rx) = mpsc::channel::<NodeStatsUpdate>(10);
let mut tracker = NodeStatsTracker::new("node".to_string(), Some(tx));

tracker.maybe_send();

assert!(rx.try_recv().is_err());
}

#[tokio::test]
async fn maybe_send_emits_on_first_activity_and_on_threshold() {
let (tx, mut rx) = mpsc::channel::<NodeStatsUpdate>(10);
let mut tracker = NodeStatsTracker::new("node".to_string(), Some(tx));

tracker.sent();
tracker.maybe_send();
let first = rx.try_recv().unwrap();
assert_eq!(first.stats.sent, 1);

for _ in 1..NodeStatsTracker::SEND_PACKET_THRESHOLD {
tracker.sent();
tracker.maybe_send();
}

let threshold = rx.try_recv().unwrap();
assert_eq!(threshold.node_id, "node");
assert_eq!(threshold.stats.sent, NodeStatsTracker::SEND_PACKET_THRESHOLD);
}
}
2 changes: 1 addition & 1 deletion crates/nodes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ default = [
# Individual features for each node.
passthrough = ["dep:schemars"]
audio_gain = ["dep:schemars"]
audio_mixer = ["dep:schemars"]
audio_mixer = ["dep:schemars", "dep:serde_json"]
audio_resampler = ["dep:schemars", "dep:rubato"]
audio_pacer = ["dep:schemars"]
file_io = ["dep:schemars"]
Expand Down
Loading
Loading