diff --git a/crates/core/src/state.rs b/crates/core/src/state.rs index e7c7f0f3..ebab9a6a 100644 --- a/crates/core/src/state.rs +++ b/crates/core/src/state.rs @@ -160,7 +160,11 @@ pub enum NodeState { /// - Partial functionality (some features unavailable) /// /// The node continues processing but users should be aware of reduced quality. - Degraded { reason: String }, + Degraded { + reason: String, + #[ts(type = "JsonValue")] + details: Option, + }, /// Node has encountered a fatal error and stopped processing. /// Manual intervention is required to restart the node. @@ -306,7 +310,8 @@ pub mod state_helpers { state_tx: &mpsc::Sender, node_name: &str, reason: impl Into, + details: Option, ) { - emit_state(state_tx, node_name, NodeState::Degraded { reason: reason.into() }); + emit_state(state_tx, node_name, NodeState::Degraded { reason: reason.into(), details }); } } diff --git a/crates/core/src/stats.rs b/crates/core/src/stats.rs index 3afed2e1..b58cd2e3 100644 --- a/crates/core/src/stats.rs +++ b/crates/core/src/stats.rs @@ -6,7 +6,7 @@ //! //! This module provides types and utilities for collecting runtime statistics //! from nodes during pipeline execution. Statistics are throttled to prevent -//! overload (typically every 10 seconds or 1000 packets). +//! overload (typically every 2 seconds or 1000 packets). use serde::{Deserialize, Serialize}; use std::time::SystemTime; @@ -35,7 +35,7 @@ impl Default for NodeStats { } /// A statistics update message sent by a node to report its current metrics. -/// These updates are throttled to prevent overload (typically every 10s or 1000 packets). +/// These updates are throttled to prevent overload (typically every 2s or 1000 packets). #[derive(Debug, Clone)] pub struct NodeStatsUpdate { /// The unique identifier of the node reporting the stats @@ -47,18 +47,19 @@ pub struct NodeStatsUpdate { } /// Helper for tracking and throttling node statistics updates. -/// Automatically sends updates every 10 seconds or 1000 packets. +/// Automatically sends updates every 2 seconds or 1000 packets. pub struct NodeStatsTracker { stats: NodeStats, start_time: std::time::Instant, last_send: std::time::Instant, + has_sent_once: bool, node_id: String, stats_tx: Option>, } impl NodeStatsTracker { /// Throttling configuration - const SEND_INTERVAL: std::time::Duration = std::time::Duration::from_secs(10); + const SEND_INTERVAL: std::time::Duration = std::time::Duration::from_secs(2); const SEND_PACKET_THRESHOLD: u64 = 1000; /// Create a new stats tracker for a node @@ -67,7 +68,14 @@ impl NodeStatsTracker { stats_tx: Option>, ) -> Self { let now = std::time::Instant::now(); - Self { stats: NodeStats::default(), start_time: now, last_send: now, node_id, stats_tx } + Self { + stats: NodeStats::default(), + start_time: now, + last_send: now, + has_sent_once: false, + node_id, + stats_tx, + } } /// Record a received packet @@ -88,23 +96,49 @@ impl NodeStatsTracker { self.stats.sent += 1; } + /// Record multiple sent packets (for batched stats reporting) + #[inline] + pub const fn sent_n(&mut self, count: u64) { + self.stats.sent += count; + } + /// Record a discarded packet #[inline] pub const fn discarded(&mut self) { self.stats.discarded += 1; } + /// Record multiple discarded packets (for batched stats reporting) + #[inline] + pub const fn discarded_n(&mut self, count: u64) { + self.stats.discarded += count; + } + /// Record an error #[inline] pub const fn errored(&mut self) { self.stats.errored += 1; } - /// Automatically send stats if threshold is met (every 10s or 1000 packets). + /// Record multiple errors (for batched stats reporting) + #[inline] + pub const fn errored_n(&mut self, count: u64) { + self.stats.errored += count; + } + + /// Automatically send stats if threshold is met (every 2s or 1000 packets). /// Call this after processing a batch of packets. pub fn maybe_send(&mut self) { - let should_send = self.last_send.elapsed() >= Self::SEND_INTERVAL - || self.stats.received.is_multiple_of(Self::SEND_PACKET_THRESHOLD); + // Many nodes only increment one side of the counters (e.g. pure sources only `sent`, + // pure sinks only `received`). Use the max to keep the threshold behavior consistent + // across node shapes and avoid the `0.is_multiple_of(..)` pitfall. + let packet_count = self.stats.received.max(self.stats.sent).max(self.stats.discarded); + + // Always emit the first non-empty snapshot promptly so monitoring can "lock on" + // even if the node later blocks under backpressure. + let should_send = (!self.has_sent_once && packet_count > 0) + || self.last_send.elapsed() >= Self::SEND_INTERVAL + || (packet_count > 0 && packet_count.is_multiple_of(Self::SEND_PACKET_THRESHOLD)); if should_send { self.force_send(); @@ -123,6 +157,44 @@ impl NodeStatsTracker { timestamp: SystemTime::now(), }); self.last_send = std::time::Instant::now(); + self.has_sent_once = true; } } } + +#[cfg(test)] +#[allow(clippy::unwrap_used)] +mod tests { + use super::*; + use tokio::sync::mpsc; + + #[tokio::test] + async fn maybe_send_does_not_fire_when_empty() { + let (tx, mut rx) = mpsc::channel::(10); + let mut tracker = NodeStatsTracker::new("node".to_string(), Some(tx)); + + tracker.maybe_send(); + + assert!(rx.try_recv().is_err()); + } + + #[tokio::test] + async fn maybe_send_emits_on_first_activity_and_on_threshold() { + let (tx, mut rx) = mpsc::channel::(10); + let mut tracker = NodeStatsTracker::new("node".to_string(), Some(tx)); + + tracker.sent(); + tracker.maybe_send(); + let first = rx.try_recv().unwrap(); + assert_eq!(first.stats.sent, 1); + + for _ in 1..NodeStatsTracker::SEND_PACKET_THRESHOLD { + tracker.sent(); + tracker.maybe_send(); + } + + let threshold = rx.try_recv().unwrap(); + assert_eq!(threshold.node_id, "node"); + assert_eq!(threshold.stats.sent, NodeStatsTracker::SEND_PACKET_THRESHOLD); + } +} diff --git a/crates/nodes/Cargo.toml b/crates/nodes/Cargo.toml index ff99735e..8fac3341 100644 --- a/crates/nodes/Cargo.toml +++ b/crates/nodes/Cargo.toml @@ -97,7 +97,7 @@ default = [ # Individual features for each node. passthrough = ["dep:schemars"] audio_gain = ["dep:schemars"] -audio_mixer = ["dep:schemars"] +audio_mixer = ["dep:schemars", "dep:serde_json"] audio_resampler = ["dep:schemars", "dep:rubato"] audio_pacer = ["dep:schemars"] file_io = ["dep:schemars"] diff --git a/crates/nodes/src/audio/filters/mixer.rs b/crates/nodes/src/audio/filters/mixer.rs index a2d4d037..11cec368 100644 --- a/crates/nodes/src/audio/filters/mixer.rs +++ b/crates/nodes/src/audio/filters/mixer.rs @@ -477,6 +477,45 @@ impl AudioMixerNode { // Track last mix time for timeout detection let mut waiting_since: Option = None; let mut has_warned_slow = false; + let mut last_reported_slow_pins: Vec = Vec::new(); + let sync_timeout_ms = self.config.sync_timeout_ms; + let state_tx = context.state_tx.clone(); + let node_name_for_state = node_name.clone(); + + let mut sync_slow_state = |slots: &[InputSlot], newly_slow_pins: Option>| { + let mut slow_pins: Vec = + slots.iter().filter(|s| s.slow).map(|s| s.name.as_ref().to_string()).collect(); + slow_pins.sort(); + slow_pins.dedup(); + + if slow_pins.is_empty() { + if has_warned_slow { + state_helpers::emit_running(&state_tx, &node_name_for_state); + has_warned_slow = false; + last_reported_slow_pins.clear(); + } + return; + } + + if slow_pins == last_reported_slow_pins { + return; + } + + let details = serde_json::json!({ + "slow_pins": slow_pins.clone(), + "newly_slow_pins": newly_slow_pins, + "sync_timeout_ms": sync_timeout_ms, + }); + + state_helpers::emit_degraded( + &state_tx, + &node_name_for_state, + "slow_input_timeout", + Some(details), + ); + has_warned_slow = true; + last_reported_slow_pins = slow_pins; + }; // Track mixed frames sent (for debugging) let mut mixed_frame_count: u64 = 0; @@ -591,26 +630,23 @@ impl AudioMixerNode { continue; } - let missing_names: Vec<&str> = - missing_idxs.iter().map(|idx| slots[*idx].name.as_ref()).collect(); + let missing_names: Vec = missing_idxs + .iter() + .map(|idx| slots[*idx].name.as_ref().to_string()) + .collect(); tracing::warn!( "Mixer: timeout waiting for pins {:?}, mixing with silence", missing_names ); - if !has_warned_slow { - state_helpers::emit_degraded( - &context.state_tx, - &node_name, - "slow_input_timeout" - ); - has_warned_slow = true; - } - - for idx in missing_idxs { - slots[idx].slow = true; + for idx in &missing_idxs { + slots[*idx].slow = true; } waiting_since = None; + sync_slow_state( + &slots, + Some(missing_names), + ); if let Err(e) = self.mix_and_send( &mut slots, @@ -656,7 +692,6 @@ impl AudioMixerNode { let expected_count = slots.iter().filter(|s| !s.slow).count(); if had_no_frames && expected_count > 1 { waiting_since = Some(std::time::Instant::now()); - has_warned_slow = false; } // Keep the latest frame per pin. @@ -681,10 +716,7 @@ impl AudioMixerNode { s.slow = false; } } - if slots.iter().all(|s| !s.slow) && has_warned_slow { - state_helpers::emit_running(&context.state_tx, &node_name); - has_warned_slow = false; - } + sync_slow_state(&slots, None); } if let Err(e) = self.mix_and_send( @@ -707,27 +739,19 @@ impl AudioMixerNode { if cold_start_complete { if let Some(start) = waiting_since { if start.elapsed() >= timeout { - let missing_names: Vec<&str> = slots + let missing_names: Vec = slots .iter() .filter(|s| !s.slow && s.frame.is_none()) - .map(|s| s.name.as_ref()) + .map(|s| s.name.as_ref().to_string()) .collect(); if !missing_names.is_empty() { - if !has_warned_slow { - tracing::warn!( - "Mixer sync timeout ({}ms) expired. Missing frames from: {:?}. \ - Marking as slow and will continue mixing without waiting.", - timeout.as_millis(), - missing_names - ); - state_helpers::emit_degraded( - &context.state_tx, - &node_name, - "slow_input_timeout" - ); - has_warned_slow = true; - } + tracing::warn!( + "Mixer sync timeout ({}ms) expired. Missing frames from: {:?}. \ + Marking as slow and will continue mixing without waiting.", + timeout.as_millis(), + missing_names + ); for s in &mut slots { if !s.slow && s.frame.is_none() { @@ -736,6 +760,10 @@ impl AudioMixerNode { } waiting_since = None; + sync_slow_state( + &slots, + Some(missing_names), + ); if let Err(e) = self.mix_and_send( &mut slots, &mut mix_frames, @@ -787,11 +815,7 @@ impl AudioMixerNode { round_robin_idx %= slots.len(); } waiting_since = None; - - if slots.iter().all(|s| !s.slow) && has_warned_slow { - state_helpers::emit_running(&context.state_tx, &node_name); - has_warned_slow = false; - } + sync_slow_state(&slots, None); // If we have frames buffered and remaining active pins, mix now. if !slots.is_empty() && slots.iter().any(|s| s.frame.is_some()) { @@ -1161,6 +1185,7 @@ fn run_clocked_audio_thread(config: &ClockedThreadConfig) { let mut max_output_channels_seen: u16 = 0; let mut has_warned_slow = false; + let mut last_reported_slow_pins: Vec = Vec::new(); let mut next_tick = std::time::Instant::now() + config.tick_duration; let tick_us = (config.frame_samples_per_channel as u64).saturating_mul(1_000_000) @@ -1190,9 +1215,12 @@ fn run_clocked_audio_thread(config: &ClockedThreadConfig) { }, AudioThreadCommand::RemoveInput { name } => { inputs.retain(|i| i.name != name); - if inputs.is_empty() && has_warned_slow { - state_helpers::emit_running(&config.state_tx, &config.node_name); + if inputs.is_empty() { + if has_warned_slow { + state_helpers::emit_running(&config.state_tx, &config.node_name); + } has_warned_slow = false; + last_reported_slow_pins.clear(); } }, AudioThreadCommand::Shutdown => break, @@ -1274,17 +1302,33 @@ fn run_clocked_audio_thread(config: &ClockedThreadConfig) { } } - let any_slow = inputs.iter().any(|i| i.slow); - if any_slow && !has_warned_slow { + let mut slow_pins: Vec = + inputs.iter().filter(|i| i.slow).map(|i| i.name.as_ref().to_string()).collect(); + slow_pins.sort(); + slow_pins.dedup(); + + if slow_pins.is_empty() { + if has_warned_slow { + state_helpers::emit_running(&config.state_tx, &config.node_name); + } + has_warned_slow = false; + last_reported_slow_pins.clear(); + } else if slow_pins != last_reported_slow_pins { + let details = serde_json::json!({ + "slow_pins": slow_pins.clone(), + "newly_slow_pins": serde_json::Value::Null, + "sync_timeout_ms": config + .sync_timeout + .and_then(|d| u64::try_from(d.as_millis()).ok()), + }); state_helpers::emit_degraded( &config.state_tx, &config.node_name, "slow_input_timeout", + Some(details), ); has_warned_slow = true; - } else if !any_slow && has_warned_slow { - state_helpers::emit_running(&config.state_tx, &config.node_name); - has_warned_slow = false; + last_reported_slow_pins = slow_pins; } if !any_input_had_frame && !config.generate_silence { diff --git a/crates/nodes/src/transport/moq/peer.rs b/crates/nodes/src/transport/moq/peer.rs index 3cd17c3e..202226a9 100644 --- a/crates/nodes/src/transport/moq/peer.rs +++ b/crates/nodes/src/transport/moq/peer.rs @@ -26,6 +26,14 @@ use tokio::sync::{broadcast, mpsc, OwnedSemaphorePermit, Semaphore}; /// Capacity for the broadcast channel (subscribers) const SUBSCRIBER_BROADCAST_CAPACITY: usize = 256; +#[derive(Clone, Copy, Debug, Default)] +struct NodeStatsDelta { + received: u64, + sent: u64, + discarded: u64, + errored: u64, +} + #[derive(Clone, Debug)] struct BroadcastFrame { data: bytes::Bytes, @@ -59,6 +67,17 @@ struct BidirectionalTaskConfig { subscriber_count: Arc, output_group_duration_ms: u64, output_initial_delay_ms: u64, + stats_delta_tx: mpsc::Sender, +} + +struct PublisherReceiveLoopWithSlotConfig { + subscribe: moq_lite::OriginConsumer, + broadcast_name: String, + output_sender: streamkit_core::OutputSender, + publisher_slot: Arc, + publisher_events: mpsc::UnboundedSender, + publisher_path: String, + stats_delta_tx: mpsc::Sender, } fn normalize_gateway_path(path: &str) -> String { @@ -223,6 +242,7 @@ impl ProcessorNode for MoqPeerNode { // Stats tracking let mut stats_tracker = NodeStatsTracker::new(node_name.clone(), context.stats_tx.clone()); + let (stats_delta_tx, mut stats_delta_rx) = mpsc::channel::(1024); // Subscriber count for logging let subscriber_count = Arc::new(AtomicU64::new(0)); @@ -265,6 +285,7 @@ impl ProcessorNode for MoqPeerNode { subscriber_count: sub_count, output_group_duration_ms: self.config.output_group_duration_ms, output_initial_delay_ms: self.config.output_initial_delay_ms, + stats_delta_tx: stats_delta_tx.clone(), }, ).await { Ok(_handle) => { @@ -298,6 +319,7 @@ impl ProcessorNode for MoqPeerNode { context.output_sender.clone(), shutdown_tx.subscribe(), publisher_events_tx.clone(), + stats_delta_tx.clone(), ).await { Ok(_handle) => { tracing::info!("Publisher connected and streaming"); @@ -338,10 +360,11 @@ impl ProcessorNode for MoqPeerNode { packet = pipeline_input_rx.recv() => { if let Some(packet) = packet { if let Packet::Binary { data, metadata, .. } = packet { - stats_tracker.sent(); + stats_tracker.received(); // Broadcast to all subscribers (ignore if no receivers) let duration_us = super::constants::packet_duration_us(metadata.as_ref()); let _ = subscriber_broadcast_tx.send(BroadcastFrame { data, duration_us }); + stats_tracker.sent(); stats_tracker.maybe_send(); } } else { @@ -350,6 +373,22 @@ impl ProcessorNode for MoqPeerNode { } } + Some(delta) = stats_delta_rx.recv() => { + if delta.received > 0 { + stats_tracker.received_n(delta.received); + } + if delta.sent > 0 { + stats_tracker.sent_n(delta.sent); + } + if delta.discarded > 0 { + stats_tracker.discarded_n(delta.discarded); + } + if delta.errored > 0 { + stats_tracker.errored_n(delta.errored); + } + stats_tracker.maybe_send(); + } + // Publisher lifecycle events (from both /input and base-path peers) Some(event) = publisher_events_rx.recv() => { match event { @@ -425,6 +464,7 @@ impl MoqPeerNode { output_sender: streamkit_core::OutputSender, mut shutdown_rx: broadcast::Receiver<()>, publisher_events: mpsc::UnboundedSender, + stats_delta_tx: mpsc::Sender, ) -> Result>, StreamKitError> { let path = moq_connection.path.clone(); @@ -463,6 +503,7 @@ impl MoqPeerNode { input_broadcast, output_sender, &mut shutdown_rx, + stats_delta_tx, ) .await; @@ -519,12 +560,15 @@ impl MoqPeerNode { let publisher_fut = async { Self::publisher_receive_loop_with_slot( - receive_origin, - config.input_broadcast, - config.output_sender, - config.publisher_slot, - config.publisher_events, - path.clone(), + PublisherReceiveLoopWithSlotConfig { + subscribe: receive_origin, + broadcast_name: config.input_broadcast, + output_sender: config.output_sender, + publisher_slot: config.publisher_slot, + publisher_events: config.publisher_events, + publisher_path: path.clone(), + stats_delta_tx: config.stats_delta_tx.clone(), + }, &mut publisher_shutdown_rx, ) .await @@ -561,35 +605,36 @@ impl MoqPeerNode { } async fn publisher_receive_loop_with_slot( - subscribe: moq_lite::OriginConsumer, - broadcast_name: String, - output_sender: streamkit_core::OutputSender, - publisher_slot: Arc, - publisher_events: mpsc::UnboundedSender, - publisher_path: String, + config: PublisherReceiveLoopWithSlotConfig, shutdown_rx: &mut broadcast::Receiver<()>, ) -> Result<(), StreamKitError> { tracing::info!( - path = %publisher_path, + path = %config.publisher_path, "Waiting for peer publisher to announce broadcast: {}", - broadcast_name + config.broadcast_name ); - let Some(broadcast_consumer) = - Self::wait_for_broadcast_announcement(subscribe, &broadcast_name, shutdown_rx).await? + let Some(broadcast_consumer) = Self::wait_for_broadcast_announcement( + config.subscribe, + &config.broadcast_name, + shutdown_rx, + ) + .await? else { return Ok(()); }; - let Ok(permit) = publisher_slot.try_acquire_owned() else { + let Ok(permit) = config.publisher_slot.try_acquire_owned() else { tracing::warn!( - path = %publisher_path, + path = %config.publisher_path, "Ignoring peer publisher broadcast - publisher already connected" ); return Ok(()); }; - let _ = publisher_events.send(PublisherEvent::Connected { path: publisher_path.clone() }); + let _ = config + .publisher_events + .send(PublisherEvent::Connected { path: config.publisher_path.clone() }); let result = async { let Some((audio_track_name, audio_priority)) = @@ -599,7 +644,7 @@ impl MoqPeerNode { }; tracing::info!( - path = %publisher_path, + path = %config.publisher_path, "Subscribing to peer publisher audio track: {}", audio_track_name ); @@ -609,13 +654,19 @@ impl MoqPeerNode { priority: audio_priority, }); - Self::process_publisher_frames(track_consumer, output_sender, shutdown_rx).await + Self::process_publisher_frames( + track_consumer, + config.output_sender, + shutdown_rx, + &config.stats_delta_tx, + ) + .await } .await; drop(permit); - let _ = publisher_events.send(PublisherEvent::Disconnected { - path: publisher_path, + let _ = config.publisher_events.send(PublisherEvent::Disconnected { + path: config.publisher_path, error: result.as_ref().err().map(std::string::ToString::to_string), }); @@ -628,6 +679,7 @@ impl MoqPeerNode { broadcast_name: String, output_sender: streamkit_core::OutputSender, shutdown_rx: &mut broadcast::Receiver<()>, + stats_delta_tx: mpsc::Sender, ) -> Result<(), StreamKitError> { tracing::info!("Waiting for publisher to announce broadcast: {}", broadcast_name); @@ -651,7 +703,8 @@ impl MoqPeerNode { .subscribe_track(&moq_lite::Track { name: audio_track_name, priority: audio_priority }); // Process incoming frames - Self::process_publisher_frames(track_consumer, output_sender, shutdown_rx).await + Self::process_publisher_frames(track_consumer, output_sender, shutdown_rx, &stats_delta_tx) + .await } /// Wait for the publisher to announce the expected broadcast @@ -725,6 +778,7 @@ impl MoqPeerNode { mut track_consumer: moq_lite::TrackConsumer, mut output_sender: streamkit_core::OutputSender, shutdown_rx: &mut broadcast::Receiver<()>, + stats_delta_tx: &mpsc::Sender, ) -> Result<(), StreamKitError> { let mut frame_count = 0u64; let mut last_log = std::time::Instant::now(); @@ -747,6 +801,7 @@ impl MoqPeerNode { &mut frame_count, &mut last_log, shutdown_rx, + stats_delta_tx, ) .await? { @@ -789,6 +844,7 @@ impl MoqPeerNode { frame_count: &mut u64, last_log: &mut std::time::Instant, shutdown_rx: &mut broadcast::Receiver<()>, + stats_delta_tx: &mpsc::Sender, ) -> Result { tokio::select! { biased; @@ -807,6 +863,8 @@ impl MoqPeerNode { // The hang protocol encodes timestamp at the start of each frame if let Err(e) = u64::decode(&mut payload, moq_lite::lite::Version::Draft02) { tracing::warn!("Failed to decode frame timestamp: {e}"); + let _ = stats_delta_tx + .try_send(NodeStatsDelta { received: 1, discarded: 1, ..Default::default() }); return Ok(FrameResult::Continue); } @@ -819,13 +877,17 @@ impl MoqPeerNode { if output_sender.send("out", packet).await.is_err() { tracing::debug!("Output channel closed"); + let _ = stats_delta_tx + .try_send(NodeStatsDelta { received: 1, ..Default::default() }); return Ok(FrameResult::Shutdown); } + let _ = stats_delta_tx.try_send(NodeStatsDelta { received: 1, sent: 1, ..Default::default() }); Ok(FrameResult::Continue) } Ok(None) => Ok(FrameResult::GroupExhausted), Err(e) => { tracing::warn!("Error reading frame: {e}"); + let _ = stats_delta_tx.try_send(NodeStatsDelta { errored: 1, ..Default::default() }); Ok(FrameResult::GroupExhausted) } } diff --git a/ui/src/components/NodeStateIndicator.tsx b/ui/src/components/NodeStateIndicator.tsx index 191fa705..5914bd89 100644 --- a/ui/src/components/NodeStateIndicator.tsx +++ b/ui/src/components/NodeStateIndicator.tsx @@ -6,7 +6,7 @@ import styled from '@emotion/styled'; import React from 'react'; import { useSessionStore } from '@/stores/sessionStore'; -import type { NodeState, NodeStats } from '@/types/types'; +import type { NodeState, NodeStats, Pipeline } from '@/types/types'; import { SKTooltip } from './Tooltip'; @@ -31,6 +31,46 @@ function formatNumber(num: number | bigint): string { return num.toLocaleString(); } +function isRecord(value: unknown): value is Record { + return ( + value !== null && value !== undefined && typeof value === 'object' && !Array.isArray(value) + ); +} + +function asStringArray(value: unknown): string[] | null { + if (!Array.isArray(value)) return null; + if (!value.every((v) => typeof v === 'string')) return null; + return value; +} + +type SlowInputSource = { + slowPin: string; + fromNode: string; + fromPin: string; +}; + +function deriveSlowInputSources( + pipeline: Pipeline | null | undefined, + nodeId: string, + slowPins: string[] +): SlowInputSource[] { + if (!pipeline || slowPins.length === 0) return []; + + const slowPinSet = new Set(slowPins); + const sources: SlowInputSource[] = []; + + for (const c of pipeline.connections) { + if (c.to_node !== nodeId) continue; + if (!slowPinSet.has(c.to_pin)) continue; + sources.push({ slowPin: c.to_pin, fromNode: c.from_node, fromPin: c.from_pin }); + } + + sources.sort( + (a, b) => a.slowPin.localeCompare(b.slowPin) || a.fromNode.localeCompare(b.fromNode) + ); + return sources; +} + function getStateColor(state: NodeState): string { if (typeof state === 'string') { switch (state) { @@ -108,154 +148,177 @@ function getStateDescription(state: NodeState): string { return ''; } -function getStateDetails(state: NodeState, stats?: NodeStats): React.ReactNode { - if (typeof state === 'string') { - return ( -
-
{state}
-
{getStateDescription(state)}
- {stats && - (() => { - // Calculate rates (packets per second) - const duration = stats.duration_secs > 0 ? stats.duration_secs : 1; - const receivedPps = Math.round(Number(stats.received) / duration); - const sentPps = Math.round(Number(stats.sent) / duration); - - return ( -
-
- Packet Statistics -
-
-
- In:{' '} - {formatNumber(stats.received)} pkt ({receivedPps} pps) - Out:{' '} - {formatNumber(stats.sent)} pkt ({sentPps} pps) -
- {(stats.discarded > 0 || stats.errored > 0) && ( -
- {stats.discarded > 0 && `⚠ Discarded: ${formatNumber(stats.discarded)} pkt`} - {stats.discarded > 0 && stats.errored > 0 && ' | '} - {stats.errored > 0 && `❌ Errors: ${formatNumber(stats.errored)} pkt`} -
- )} -
-
- ); - })()} -
- ); - } +const renderPacketStats = (stats?: NodeStats): React.ReactNode => { + if (!stats) return null; - const renderStats = (stats?: NodeStats) => { - if (!stats) return null; + const duration = stats.duration_secs > 0 ? stats.duration_secs : 1; + const receivedPps = Math.round(Number(stats.received) / duration); + const sentPps = Math.round(Number(stats.sent) / duration); - // Calculate rates (packets per second) - const duration = stats.duration_secs > 0 ? stats.duration_secs : 1; - const receivedPps = Math.round(Number(stats.received) / duration); - const sentPps = Math.round(Number(stats.sent) / duration); - - return ( -
-
- Packet Statistics -
-
-
- In:{' '} - {formatNumber(stats.received)} pkt ({receivedPps} pps) - Out:{' '} - {formatNumber(stats.sent)} pkt ({sentPps} pps) -
- {(stats.discarded > 0 || stats.errored > 0) && ( -
- {stats.discarded > 0 && `⚠ Discarded: ${formatNumber(stats.discarded)} pkt`} - {stats.discarded > 0 && stats.errored > 0 && ' | '} - {stats.errored > 0 && `❌ Errors: ${formatNumber(stats.errored)} pkt`} -
- )} -
+ return ( +
+
+ Packet Statistics
- ); - }; - - if ('Recovering' in state) { - const details = state.Recovering.details; - const hasDetails = details !== null && details !== undefined && typeof details === 'object'; - - return ( -
-
Recovering
-
- {getStateDescription(state)} +
+
+ In: {formatNumber(stats.received)}{' '} + pkt ({receivedPps} pps) + Out:{' '} + {formatNumber(stats.sent)} pkt ({sentPps} pps)
-
{state.Recovering.reason}
- {hasDetails && ( -
-            {JSON.stringify(details, null, 2)}
-          
+ {(stats.discarded > 0 || stats.errored > 0) && ( +
+ {stats.discarded > 0 && `⚠ Discarded: ${formatNumber(stats.discarded)} pkt`} + {stats.discarded > 0 && stats.errored > 0 && ' | '} + {stats.errored > 0 && `❌ Errors: ${formatNumber(stats.errored)} pkt`} +
)} - {renderStats(stats)}
- ); - } +
+ ); +}; - if ('Degraded' in state) { - return ( -
-
Degraded
-
- {getStateDescription(state)} -
-
{state.Degraded.reason}
- {renderStats(stats)} +const renderStringStateDetails = ( + state: Extract, + stats?: NodeStats +): React.ReactNode => { + return ( +
+
{state}
+
{getStateDescription(state)}
+ {renderPacketStats(stats)} +
+ ); +}; + +const renderRecoveringDetails = ( + state: Extract, + stats?: NodeStats +): React.ReactNode => { + const details = state.Recovering.details; + const hasDetails = details !== null && details !== undefined && typeof details === 'object'; + + return ( +
+
Recovering
+
+ {getStateDescription(state)}
- ); - } +
{state.Recovering.reason}
+ {hasDetails && ( +
+          {JSON.stringify(details, null, 2)}
+        
+ )} + {renderPacketStats(stats)} +
+ ); +}; - if ('Failed' in state) { - return ( -
-
Failed
-
- {getStateDescription(state)} +const renderDegradedDetails = ( + state: Extract, + stats?: NodeStats, + context?: { pipeline?: Pipeline | null; nodeId?: string } +): React.ReactNode => { + const detailsObj = isRecord(state.Degraded.details) ? state.Degraded.details : null; + const slowPins = detailsObj ? asStringArray(detailsObj['slow_pins']) : null; + const newlySlowPins = detailsObj ? asStringArray(detailsObj['newly_slow_pins']) : null; + const slowSources = + slowPins && context?.pipeline && context?.nodeId + ? deriveSlowInputSources(context.pipeline, context.nodeId, slowPins) + : []; + + return ( +
+
Degraded
+
+ {getStateDescription(state)} +
+
{state.Degraded.reason}
+ {(slowPins || newlySlowPins) && ( +
+ {slowSources.length > 0 ? ( +
+ Slow inputs:{' '} + {slowSources.map((s) => `${s.fromNode}.${s.fromPin} → ${s.slowPin}`).join(', ')} +
+ ) : ( + slowPins && + slowPins.length > 0 && ( +
+ Slow pins: {slowPins.join(', ')} +
+ ) + )} + {slowPins && slowPins.length > 0 && slowSources.length > 0 && ( +
+ Pins: {slowPins.join(', ')} +
+ )} + {newlySlowPins && newlySlowPins.length > 0 && ( +
+ Newly slow: {newlySlowPins.join(', ')} +
+ )}
-
{state.Failed.reason}
- {renderStats(stats)} + )} + {renderPacketStats(stats)} +
+ ); +}; + +const renderFailedDetails = ( + state: Extract, + stats?: NodeStats +): React.ReactNode => { + return ( +
+
Failed
+
+ {getStateDescription(state)}
- ); - } +
{state.Failed.reason}
+ {renderPacketStats(stats)} +
+ ); +}; - if ('Stopped' in state) { - return ( -
-
Stopped
-
- {getStateDescription(state)} -
-
{state.Stopped.reason}
- {renderStats(stats)} +const renderStoppedDetails = ( + state: Extract, + stats?: NodeStats +): React.ReactNode => { + return ( +
+
Stopped
+
+ {getStateDescription(state)}
- ); - } +
{String(state.Stopped.reason)}
+ {renderPacketStats(stats)} +
+ ); +}; +function getStateDetails(state: NodeState, stats?: NodeStats): React.ReactNode { + if (typeof state === 'string') return renderStringStateDetails(state, stats); + if ('Recovering' in state) return renderRecoveringDetails(state, stats); + if ('Degraded' in state) return renderDegradedDetails(state, stats); + if ('Failed' in state) return renderFailedDetails(state, stats); + if ('Stopped' in state) return renderStoppedDetails(state, stats); return null; } @@ -278,6 +341,13 @@ const LiveNodeStateTooltipContent = React.memo( const liveStats = useSessionStore( React.useCallback((s) => s.sessions.get(sessionId)?.nodeStats[nodeId], [nodeId, sessionId]) ); + const pipeline = useSessionStore( + React.useCallback((s) => s.sessions.get(sessionId)?.pipeline, [sessionId]) + ); + + if (typeof state === 'object' && 'Degraded' in state) { + return renderDegradedDetails(state, liveStats ?? fallbackStats, { pipeline, nodeId }); + } return getStateDetails(state, liveStats ?? fallbackStats); } diff --git a/ui/src/components/TypedEdge.tsx b/ui/src/components/TypedEdge.tsx index 33023e43..5aa8b4c2 100644 --- a/ui/src/components/TypedEdge.tsx +++ b/ui/src/components/TypedEdge.tsx @@ -2,14 +2,23 @@ // // SPDX-License-Identifier: MPL-2.0 -import { BaseEdge, getBezierPath, type EdgeProps } from '@xyflow/react'; +import { BaseEdge, EdgeLabelRenderer, getBezierPath, type EdgeProps } from '@xyflow/react'; import React from 'react'; +import { SKTooltip } from '@/components/Tooltip'; import type { PacketType } from '@/types/types'; import { getPacketTypeColor } from '@/utils/packetTypes'; export type TypedEdgeData = { resolvedType?: PacketType; + alert?: { + kind: string; + severity: 'warning' | 'error'; + tooltip?: { + title: string; + lines: string[]; + }; + }; [key: string]: unknown; }; @@ -23,7 +32,7 @@ const TypedEdge: React.FC = ({ style = {}, data, }) => { - const [edgePath] = getBezierPath({ + const [edgePath, labelX, labelY] = getBezierPath({ sourceX, sourceY, sourcePosition, @@ -33,17 +42,80 @@ const TypedEdge: React.FC = ({ }); const resolvedType = (data as TypedEdgeData | undefined)?.resolvedType; + const alert = (data as TypedEdgeData | undefined)?.alert; // Get color based on resolved type const typeColor = resolvedType ? getPacketTypeColor(resolvedType) : 'var(--sk-primary)'; + const alertColor = + alert?.severity === 'error' + ? 'var(--sk-danger)' + : alert?.severity === 'warning' + ? 'var(--sk-warning)' + : null; + // Override style with type-specific color const edgeStyle: React.CSSProperties = { ...(style || {}), - stroke: typeColor, + stroke: alertColor ?? typeColor, + strokeDasharray: alertColor + ? '6, 4' + : (style as React.CSSProperties | undefined)?.strokeDasharray, + strokeWidth: alertColor ? 3 : (style as React.CSSProperties | undefined)?.strokeWidth, }; - return ; + const badgeIcon = + alert?.severity === 'error' + ? '❌' + : alert?.severity === 'warning' && alert?.kind === 'slow_input_timeout' + ? '⏱️' + : alert + ? '⚠️' + : null; + + const tooltipContent = + alert?.tooltip && badgeIcon ? ( +
+
+ {badgeIcon} {alert.tooltip.title} +
+ {alert.tooltip.lines.map((line) => ( +
+ {line} +
+ ))} +
+ ) : null; + + return ( + <> + + {alertColor && badgeIcon && ( + + +
+ {badgeIcon} +
+
+
+ )} + + ); }; export default TypedEdge; diff --git a/ui/src/stores/sessionStore.edge-cases.test.ts b/ui/src/stores/sessionStore.edge-cases.test.ts index 93ec0001..b372e3b8 100644 --- a/ui/src/stores/sessionStore.edge-cases.test.ts +++ b/ui/src/stores/sessionStore.edge-cases.test.ts @@ -74,7 +74,7 @@ describe('sessionStore edge cases', () => { const states: NodeState[] = [ 'Initializing', 'Running', - { Degraded: { reason: 'test degradation' } }, + { Degraded: { reason: 'test degradation', details: null } }, 'Running', ]; diff --git a/ui/src/types/generated/api-types.ts b/ui/src/types/generated/api-types.ts index aaae9d78..d5206690 100644 --- a/ui/src/types/generated/api-types.ts +++ b/ui/src/types/generated/api-types.ts @@ -83,7 +83,7 @@ bidirectional: boolean, }; export type StopReason = "completed" | "input_closed" | "output_closed" | "shutdown" | "no_inputs" | "unknown"; -export type NodeState = "Initializing" | "Ready" | "Running" | { "Recovering": { reason: string, details: JsonValue, } } | { "Degraded": { reason: string, } } | { "Failed": { reason: string, } } | { "Stopped": { reason: StopReason, } }; +export type NodeState = "Initializing" | "Ready" | "Running" | { "Recovering": { reason: string, details: JsonValue, } } | { "Degraded": { reason: string, details: JsonValue, } } | { "Failed": { reason: string, } } | { "Stopped": { reason: StopReason, } }; export type NodeStats = { /** @@ -345,7 +345,7 @@ is_fragment: boolean, }; export type AudioAsset = { /** - * Unique identifier (filename without extension) + * Unique identifier (filename, including extension) */ id: string, /** @@ -353,7 +353,7 @@ id: string, */ name: string, /** - * Absolute path on the server + * Server-relative path suitable for `core::file_reader` (e.g., `samples/audio/system/foo.wav`) */ path: string, /** diff --git a/ui/src/views/MonitorView.tsx b/ui/src/views/MonitorView.tsx index f452d0ed..c50ff0f4 100644 --- a/ui/src/views/MonitorView.tsx +++ b/ui/src/views/MonitorView.tsx @@ -58,6 +58,7 @@ import type { NodeDefinition, Connection, Node, + NodeState, Pipeline, MessageType, BatchOperation, @@ -985,6 +986,52 @@ const buildEdgesFromConnections = (connections: Connection[], nodes: RFNode[]): })); }; +const isRecord = (value: unknown): value is Record => + value !== null && value !== undefined && typeof value === 'object' && !Array.isArray(value); + +type SlowTimeoutDetails = { + slowPins: string[]; + newlySlowPins: string[]; + syncTimeoutMs: number | null; +}; + +const extractSlowTimeoutDetailsFromNodeState = ( + state: NodeState | null | undefined +): SlowTimeoutDetails | null => { + if (!state || typeof state === 'string') return null; + if (!('Degraded' in state)) return null; + if (state.Degraded.reason !== 'slow_input_timeout') return null; + + const details = state.Degraded.details; + if (!isRecord(details)) return null; + + const slowPinsRaw = details['slow_pins']; + const newlySlowPinsRaw = details['newly_slow_pins']; + const syncTimeoutRaw = details['sync_timeout_ms']; + + const slowPins = Array.isArray(slowPinsRaw) + ? slowPinsRaw.filter((p): p is string => typeof p === 'string') + : []; + const newlySlowPins = Array.isArray(newlySlowPinsRaw) + ? newlySlowPinsRaw.filter((p): p is string => typeof p === 'string') + : []; + const syncTimeoutMs = typeof syncTimeoutRaw === 'number' ? syncTimeoutRaw : null; + + return { slowPins, newlySlowPins, syncTimeoutMs }; +}; + +const describeSlowInputs = (pipeline: Pipeline, nodeId: string, slowPins: string[]): string[] => { + if (slowPins.length === 0) return []; + const slowPinSet = new Set(slowPins); + + const sources = pipeline.connections + .filter((c) => c.to_node === nodeId && slowPinSet.has(c.to_pin)) + .map((c) => `${c.from_node}.${c.from_pin} → ${c.to_pin}`); + + sources.sort(); + return sources; +}; + /** * Generate YAML representation of the pipeline ordered by topological sort. */ @@ -2442,6 +2489,85 @@ const MonitorViewContent: React.FC = () => { // Note: setNodes, tuneNode, updateStagedNodeParams are stable and don't need to be dependencies ]); + // Lightweight patch: update edge alerts based on node degraded details. + useEffect(() => { + if (!pipeline) return; + + const slowPinsByNode = new Map>(); + const slowDetailsByNode = new Map(); + for (const [nodeId, apiNode] of Object.entries(pipeline.nodes)) { + const state = (nodeStates as Record)[nodeId] ?? apiNode.state ?? null; + const details = extractSlowTimeoutDetailsFromNodeState(state); + const slowPins = details?.slowPins ?? []; + if (slowPins.length > 0) { + slowPinsByNode.set(nodeId, new Set(slowPins)); + } + if (details) { + slowDetailsByNode.set(nodeId, details); + } + } + + React.startTransition(() => { + setEdges((prev) => { + let changed = false; + + const next = prev.map((edge) => { + const targetPin = edge.targetHandle ?? ''; + const shouldWarn = slowPinsByNode.get(edge.target)?.has(targetPin) ?? false; + const currentAlert = isRecord(edge.data) ? edge.data['alert'] : undefined; + const currentAlertKind = + isRecord(currentAlert) && typeof currentAlert['kind'] === 'string' + ? currentAlert['kind'] + : null; + const isCurrentlyWarned = currentAlertKind === 'slow_input_timeout'; + + if (shouldWarn === isCurrentlyWarned) return edge; + + changed = true; + const nextData: Record = { ...(edge.data || {}) }; + + if (shouldWarn) { + const details = slowDetailsByNode.get(edge.target); + const slowPins = details?.slowPins ?? []; + const slowInputs = pipeline ? describeSlowInputs(pipeline, edge.target, slowPins) : []; + + const lines: string[] = []; + if (slowInputs.length > 0) { + lines.push(`Slow inputs: ${slowInputs.join(', ')}`); + } else if (slowPins.length > 0) { + lines.push(`Slow pins: ${slowPins.join(', ')}`); + } + + const sourceHandle = edge.sourceHandle ?? ''; + lines.push(`This: ${edge.source}.${sourceHandle} → ${edge.targetHandle ?? ''}`); + + if (details?.newlySlowPins && details.newlySlowPins.length > 0) { + lines.push(`Newly slow: ${details.newlySlowPins.join(', ')}`); + } + if (details?.syncTimeoutMs !== null && details?.syncTimeoutMs !== undefined) { + lines.push(`Timeout: ${details.syncTimeoutMs}ms`); + } + + nextData.alert = { + kind: 'slow_input_timeout', + severity: 'warning', + tooltip: { + title: `${edge.target} degraded`, + lines, + }, + }; + } else if (isCurrentlyWarned) { + delete nextData.alert; + } + + return { ...edge, data: nextData }; + }); + + return changed ? next : prev; + }); + }); + }, [pipeline, nodeStates, setEdges]); + // Create a stable callback that handles both staged and live param changes // This avoids recreating callbacks for each node, which would break React.memo const stableOnParamChange = useCallback(