From 0df4ddcfd164c9ee4ab6235fd886ac367896e2a5 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Sat, 11 Apr 2026 16:10:59 +0000 Subject: [PATCH 01/16] feat(engine): make AddNode non-blocking with async node creation Node creation (registry.create_node()) now runs inside tokio::task::spawn_blocking so that slow native plugin constructors (e.g. ONNX model loading via FFI) no longer block the engine actor loop. Key changes: - Add NodeState::Creating variant to streamkit-core. The actor inserts this state immediately when AddNode arrives, closing the observability gap between 'message received' and 'node exists'. - AddNode handler spawns a background task instead of calling create_node() synchronously. Results are sent back via an internal mpsc channel (NodeCreatedEvent) polled in the actor select! loop. - Deferred connection queue: Connect requests where one or both endpoints are still Creating are stored in a Vec and replayed when NodeCreated completes successfully. - RemoveNode while Creating: cancelled node IDs are tracked in a HashSet. When NodeCreated arrives for a cancelled node, the result is discarded (no zombie nodes). - Pipeline activation naturally gates on Creating state since it doesn't match Ready|Running|Degraded|Recovering. - UI: NodeStateIndicator.tsx and sessionStatus.ts handle the new Creating state (uses initializing color, treated like Initializing for session status). - 10 comprehensive test cases covering: basic async creation, deferred connections, concurrent creation, creation failure, RemoveNode while Creating, pipeline activation timing, duplicate AddNode, remove then re-add, shutdown while Creating, and mixed realized/creating connections. Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- crates/core/src/state.rs | 17 + crates/engine/src/dynamic_actor.rs | 316 +++++- crates/engine/src/lib.rs | 7 + .../engine/src/tests/async_node_creation.rs | 913 ++++++++++++++++++ crates/engine/src/tests/connection_types.rs | 6 + crates/engine/src/tests/mod.rs | 2 + .../engine/src/tests/pipeline_activation.rs | 19 + ui/src/components/NodeStateIndicator.tsx | 3 + ui/src/types/generated/api-types.ts | 2 +- ui/src/utils/sessionStatus.ts | 4 +- 10 files changed, 1248 insertions(+), 41 deletions(-) create mode 100644 crates/engine/src/tests/async_node_creation.rs diff --git a/crates/core/src/state.rs b/crates/core/src/state.rs index ebab9a6a..bb696085 100644 --- a/crates/core/src/state.rs +++ b/crates/core/src/state.rs @@ -12,6 +12,8 @@ //! Nodes transition through these states during their lifecycle: //! //! ```text +//! Creating +//! ↓ //! Initializing //! ↓ //! Ready ──────────┐ @@ -89,6 +91,8 @@ impl From for StopReason { /// Nodes transition through these states during their lifecycle: /// /// ```text +/// Creating +/// ↓ /// Initializing /// ↓ /// Ready ──────────┐ @@ -105,6 +109,8 @@ impl From for StopReason { /// ``` /// /// ### Valid Transitions: +/// - `Creating` → `Initializing` (node factory completed successfully) +/// - `Creating` → `Failed` (node factory returned an error) /// - `Initializing` → `Ready` (source nodes) or `Running` (processing nodes) /// - `Ready` → `Running` (when pipeline is ready) /// - `Running` → `Recovering` (temporary issues, will retry) @@ -120,6 +126,11 @@ impl From for StopReason { #[derive(Debug, Clone, Serialize, Deserialize, TS)] #[ts(export)] pub enum NodeState { + /// Node is being created by the factory (e.g., loading ONNX models via FFI). + /// This state is set immediately when `AddNode` is received, before the + /// (potentially slow) constructor runs in a background task. + Creating, + /// Node is starting up and performing initialization. /// Examples: Opening connections, loading resources, validating configuration. Initializing, @@ -219,6 +230,12 @@ pub mod state_helpers { let _ = state_tx.try_send(NodeStateUpdate::new(node_id.to_string(), state)); } + /// Emits a Creating state. + #[inline] + pub fn emit_creating(state_tx: &mpsc::Sender, node_id: &str) { + emit_state(state_tx, node_id, NodeState::Creating); + } + /// Emits an Initializing state. #[inline] pub fn emit_initializing(state_tx: &mpsc::Sender, node_id: &str) { diff --git a/crates/engine/src/dynamic_actor.rs b/crates/engine/src/dynamic_actor.rs index c0777001..099b56dc 100644 --- a/crates/engine/src/dynamic_actor.rs +++ b/crates/engine/src/dynamic_actor.rs @@ -16,7 +16,7 @@ use crate::{ graph_builder, }; use opentelemetry::KeyValue; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::{Arc, RwLock}; use streamkit_core::control::{EngineControlMessage, NodeControlMessage}; use streamkit_core::error::StreamKitError; @@ -50,6 +50,24 @@ struct NodeChannels { view_data: mpsc::Sender, } +/// Result of a background node creation task, sent back to the actor loop. +pub struct NodeCreatedEvent { + node_id: String, + kind: String, + result: Result, StreamKitError>, +} + +/// A connection request deferred because one or both endpoints are still in +/// `Creating` state and not yet present in `live_nodes`. +#[derive(Debug)] +pub struct PendingConnection { + from_node: String, + from_pin: String, + to_node: String, + to_pin: String, + mode: crate::dynamic_messages::ConnectionMode, +} + /// The state for the long-running, dynamic engine actor (Control Plane). pub struct DynamicEngine { pub(super) registry: Arc>, @@ -124,10 +142,21 @@ pub struct DynamicEngine { pub(super) node_packets_errored_counter: opentelemetry::metrics::Counter, // Node state metric (1=running, 0=not running) pub(super) node_state_gauge: opentelemetry::metrics::Gauge, + /// Sender half of the internal channel for background node creation results. + /// Cloned into each spawned creation task. + pub(super) node_created_tx: mpsc::Sender, + /// Receiver half — polled in the actor `select!` loop. + pub(super) node_created_rx: mpsc::Receiver, + /// Connections deferred because one or both endpoints are still `Creating`. + pub(super) pending_connections: Vec, + /// Node IDs whose creation was cancelled (via `RemoveNode` while `Creating`). + /// When `NodeCreated` arrives for a cancelled ID, the result is discarded. + pub(super) cancelled_creations: HashSet, } impl DynamicEngine { const fn node_state_name(state: &NodeState) -> &'static str { match state { + NodeState::Creating => "creating", NodeState::Initializing => "initializing", NodeState::Ready => "ready", NodeState::Running => "running", @@ -156,10 +185,13 @@ impl DynamicEngine { loop { tokio::select! { Some(control_msg) = self.control_rx.recv() => { - if !self.handle_engine_control(control_msg, &channels).await { + if !self.handle_engine_control(control_msg).await { break; // Shutdown requested } }, + Some(created) = self.node_created_rx.recv() => { + self.handle_node_created(created, &channels).await; + }, Some(query_msg) = self.query_rx.recv() => { self.handle_query(query_msg).await; }, @@ -1472,58 +1504,253 @@ impl DynamicEngine { self.nodes_active_gauge.record(self.live_nodes.len() as u64, &[]); } + /// Handles a completed background node creation. + /// + /// On success: initializes the node, then flushes any pending connections + /// whose endpoints are now both realized. + /// On failure: transitions the node to `Failed`, drains pending connections + /// referencing the failed node. + async fn handle_node_created(&mut self, event: NodeCreatedEvent, channels: &NodeChannels) { + let NodeCreatedEvent { node_id, kind, result } = event; + + // If the node was cancelled (RemoveNode arrived while Creating), + // discard the result silently. + if self.cancelled_creations.remove(&node_id) { + tracing::info!( + node = %node_id, + "Discarding creation result for cancelled node" + ); + // Drain pending connections referencing this node. + self.pending_connections.retain(|pc| pc.from_node != node_id && pc.to_node != node_id); + return; + } + + match result { + Ok(node) => { + tracing::info!(node = %node_id, kind = %kind, "Node created successfully, initializing"); + if let Err(e) = self.initialize_node(node, &node_id, &kind, channels).await { + tracing::error!( + node_id = %node_id, + kind = %kind, + error = %e, + "Failed to initialize node after async creation" + ); + self.node_states + .insert(node_id.clone(), NodeState::Failed { reason: e.to_string() }); + + // Broadcast the Failed state to subscribers. + let update = NodeStateUpdate::new( + node_id.clone(), + NodeState::Failed { reason: e.to_string() }, + ); + self.broadcast_state_update(&update); + + // Drain pending connections referencing this node. + self.pending_connections + .retain(|pc| pc.from_node != node_id && pc.to_node != node_id); + return; + } + + // Flush pending connections where both endpoints are now realized. + self.flush_pending_connections().await; + }, + Err(e) => { + tracing::error!( + node_id = %node_id, + kind = %kind, + error = %e, + "Background node creation failed" + ); + self.node_states + .insert(node_id.clone(), NodeState::Failed { reason: e.to_string() }); + + // Broadcast the Failed state to subscribers. + let update = NodeStateUpdate::new( + node_id.clone(), + NodeState::Failed { reason: e.to_string() }, + ); + self.broadcast_state_update(&update); + + // Drain pending connections referencing this node. + self.pending_connections + .retain(|pc| pc.from_node != node_id && pc.to_node != node_id); + }, + } + } + + /// Broadcast a state update to all subscribers (used when the actor itself + /// synthesizes a state transition, e.g. `Creating → Failed`). + fn broadcast_state_update(&mut self, update: &NodeStateUpdate) { + let state_name = Self::node_state_name(&update.state); + self.node_state_transitions_counter.add( + 1, + &[KeyValue::new("node_id", update.node_id.clone()), KeyValue::new("state", state_name)], + ); + self.node_state_gauge.record( + 1, + &[KeyValue::new("node_id", update.node_id.clone()), KeyValue::new("state", state_name)], + ); + + self.state_subscribers.retain(|subscriber| match subscriber.try_send(update.clone()) { + Ok(()) => true, + Err(mpsc::error::TrySendError::Full(_)) => { + let subscriber = subscriber.clone(); + let update = update.clone(); + tokio::spawn(async move { + let _ = subscriber.send(update).await; + }); + true + }, + Err(mpsc::error::TrySendError::Closed(_)) => false, + }); + } + + /// Execute any pending connections whose both endpoints are now realized + /// (i.e., present in `live_nodes`). + async fn flush_pending_connections(&mut self) { + // Drain the vec, keeping connections that still have unrealized endpoints. + let pending = std::mem::take(&mut self.pending_connections); + let mut still_pending = Vec::new(); + + for pc in pending { + let from_realized = self.live_nodes.contains_key(&pc.from_node); + let to_realized = self.live_nodes.contains_key(&pc.to_node); + + if from_realized && to_realized { + tracing::info!( + "Replaying deferred connection {}.{} -> {}.{}", + pc.from_node, + pc.from_pin, + pc.to_node, + pc.to_pin + ); + self.connect_nodes(pc.from_node, pc.from_pin, pc.to_node, pc.to_pin, pc.mode).await; + self.check_and_activate_pipeline(); + } else { + still_pending.push(pc); + } + } + + self.pending_connections = still_pending; + } + + /// Returns `true` if the node is in `Creating` state (not yet in `live_nodes`). + fn is_node_creating(&self, node_id: &str) -> bool { + matches!(self.node_states.get(node_id), Some(NodeState::Creating)) + } + /// Handles a single control message sent to the engine. /// Returns true if the engine should continue running, false if it should shut down. #[allow(clippy::cognitive_complexity)] - async fn handle_engine_control( - &mut self, - msg: EngineControlMessage, - channels: &NodeChannels, - ) -> bool { + async fn handle_engine_control(&mut self, msg: EngineControlMessage) -> bool { match msg { EngineControlMessage::AddNode { node_id, kind, params } => { self.engine_operations_counter.add(1, &[KeyValue::new("operation", "add_node")]); - tracing::info!(name = %node_id, kind = %kind, "Adding node to graph"); - let node_result = { - let registry = match self.registry.read() { - Ok(guard) => guard, - Err(err) => { - tracing::error!(error = %err, "Registry lock poisoned while adding node"); - return true; - }, - }; - registry.create_node(&kind, params.as_ref()) - }; + tracing::info!(name = %node_id, kind = %kind, "Adding node to graph (async)"); - match node_result { - Ok(node) => { - self.node_kinds.insert(node_id.clone(), kind.clone()); - if let Err(e) = self.initialize_node(node, &node_id, &kind, channels).await - { - tracing::error!( - node_id = %node_id, - kind = %kind, - error = %e, - "Failed to initialize node" - ); - } - }, - Err(e) => tracing::error!("Failed to create node '{}': {}", node_id, e), + // Reject duplicate node IDs — the node already exists in + // node_states (either Creating or fully initialized). + if self.node_states.contains_key(&node_id) { + tracing::error!( + node_id = %node_id, + kind = %kind, + "Cannot add node: a node with this ID already exists" + ); + return true; } + + // Record state and kind immediately so the actor loop + // continues processing the next message without blocking. + self.node_states.insert(node_id.clone(), NodeState::Creating); + self.node_kinds.insert(node_id.clone(), kind.clone()); + + // Broadcast Creating state to subscribers. + let update = NodeStateUpdate::new(node_id.clone(), NodeState::Creating); + self.broadcast_state_update(&update); + + // Spawn background creation: `create_node` may invoke FFI + // that blocks for 10-20+ seconds (ONNX model loading). + let registry = Arc::clone(&self.registry); + let tx = self.node_created_tx.clone(); + let spawn_node_id = node_id; + let spawn_kind = kind.clone(); + tokio::spawn(async move { + let result = tokio::task::spawn_blocking(move || { + let guard = match registry.read() { + Ok(g) => g, + Err(err) => { + return Err(StreamKitError::Runtime(format!( + "Registry lock poisoned: {err}" + ))); + }, + }; + guard.create_node(&spawn_kind, params.as_ref()) + }) + .await; + + let result = match result { + Ok(inner) => inner, + Err(join_err) => Err(StreamKitError::Runtime(format!( + "Node creation task panicked: {join_err}" + ))), + }; + + let _ = + tx.send(NodeCreatedEvent { node_id: spawn_node_id, kind, result }).await; + }); }, EngineControlMessage::RemoveNode { node_id } => { self.engine_operations_counter.add(1, &[KeyValue::new("operation", "remove_node")]); tracing::info!(name = %node_id, "Removing node from graph"); - // Delegate shutdown to helper function - self.shutdown_node(&node_id).await; + + if self.is_node_creating(&node_id) { + // Node is still being created in the background. + // Track the ID so we discard the NodeCreated result + // when it arrives. + tracing::info!( + node_id = %node_id, + "Node is still Creating — marking for cancellation" + ); + self.cancelled_creations.insert(node_id.clone()); + self.node_states.remove(&node_id); + self.node_kinds.remove(&node_id); + // Drain pending connections referencing this node. + self.pending_connections + .retain(|pc| pc.from_node != node_id && pc.to_node != node_id); + } else { + // Normal shutdown for a fully initialized node. + self.shutdown_node(&node_id).await; + } }, EngineControlMessage::Connect { from_node, from_pin, to_node, to_pin, mode } => { self.engine_operations_counter.add(1, &[KeyValue::new("operation", "connect")]); - // Delegate connection logic - self.connect_nodes(from_node, from_pin, to_node, to_pin, mode).await; - // Check if pipeline is ready to activate after connection is established - self.check_and_activate_pipeline(); + // If either endpoint is still Creating, defer the connection. + let from_creating = self.is_node_creating(&from_node); + let to_creating = self.is_node_creating(&to_node); + + if from_creating || to_creating { + tracing::info!( + "Deferring connection {}.{} -> {}.{} (from_creating={}, to_creating={})", + from_node, + from_pin, + to_node, + to_pin, + from_creating, + to_creating + ); + self.pending_connections.push(PendingConnection { + from_node, + from_pin, + to_node, + to_pin, + mode, + }); + } else { + // Both endpoints are realized — connect immediately. + self.connect_nodes(from_node, from_pin, to_node, to_pin, mode).await; + self.check_and_activate_pipeline(); + } }, EngineControlMessage::Disconnect { from_node, from_pin, to_node, to_pin } => { self.engine_operations_counter.add(1, &[KeyValue::new("operation", "disconnect")]); @@ -1545,6 +1772,19 @@ impl DynamicEngine { EngineControlMessage::Shutdown => { tracing::info!("Received shutdown signal, stopping all nodes"); + // Step 0: Clean up nodes still in Creating state. + // Mark all as cancelled so NodeCreated results are discarded. + let creating_ids: Vec = self + .node_states + .iter() + .filter(|(_, state)| matches!(state, NodeState::Creating)) + .map(|(id, _)| id.clone()) + .collect(); + for id in creating_ids { + self.cancelled_creations.insert(id); + } + self.pending_connections.clear(); + // Step 1: Close all input channels so nodes blocked on recv() will exit // This ensures nodes that don't check control_rx will still shut down self.node_inputs.clear(); diff --git a/crates/engine/src/lib.rs b/crates/engine/src/lib.rs index 83fba63e..5446a3b8 100644 --- a/crates/engine/src/lib.rs +++ b/crates/engine/src/lib.rs @@ -176,6 +176,9 @@ impl Engine { "Starting Dynamic Engine actor" ); + // Internal channel for background node creation results. + let (nc_tx, nc_rx) = mpsc::channel(64); + let meter = global::meter("skit_engine"); let dynamic_engine = DynamicEngine { registry: Arc::clone(&self.registry), @@ -236,6 +239,10 @@ impl Engine { .u64_gauge("node.state") .with_description("Node state (1=running, 0=stopped/failed)") .build(), + node_created_tx: nc_tx, + node_created_rx: nc_rx, + pending_connections: Vec::new(), + cancelled_creations: std::collections::HashSet::new(), }; let engine_task = tokio::spawn(dynamic_engine.run()); diff --git a/crates/engine/src/tests/async_node_creation.rs b/crates/engine/src/tests/async_node_creation.rs new file mode 100644 index 00000000..974b20d1 --- /dev/null +++ b/crates/engine/src/tests/async_node_creation.rs @@ -0,0 +1,913 @@ +// SPDX-FileCopyrightText: © 2025 StreamKit Contributors +// +// SPDX-License-Identifier: MPL-2.0 + +//! Tests for async (non-blocking) node creation in the dynamic engine. +//! +//! Validates that `AddNode` no longer blocks the actor loop: node constructors +//! run inside `spawn_blocking`, connections are deferred while endpoints are +//! `Creating`, and edge cases (cancellation, failure, shutdown) are handled. + +use super::super::*; +use std::sync::{ + atomic::{AtomicBool, AtomicU32, Ordering}, + Arc, +}; +use std::time::{Duration, Instant}; +use streamkit_core::control::EngineControlMessage; +use streamkit_core::state::NodeState; +use streamkit_core::{NodeRegistry, ProcessorNode, StreamKitError}; + +// --------------------------------------------------------------------------- +// Test node implementations +// --------------------------------------------------------------------------- + +/// A node whose constructor sleeps for a configurable duration, simulating +/// heavy FFI work (e.g., ONNX model loading). Uses `std::thread::sleep` +/// because the constructor runs inside `spawn_blocking`. +struct SlowTestNode { + _delay: Duration, +} + +impl SlowTestNode { + fn factory( + delay: Duration, + created: Arc, + ) -> impl Fn(Option<&serde_json::Value>) -> Result, StreamKitError> + + Send + + Sync + + 'static { + move |_params| { + std::thread::sleep(delay); + created.store(true, Ordering::SeqCst); + Ok(Box::new(Self { _delay: delay }) as Box) + } + } +} + +#[streamkit_core::async_trait] +impl ProcessorNode for SlowTestNode { + fn input_pins(&self) -> Vec { + vec![streamkit_core::InputPin { + name: "in".to_string(), + accepts_types: vec![streamkit_core::types::PacketType::Any], + cardinality: streamkit_core::PinCardinality::One, + }] + } + + fn output_pins(&self) -> Vec { + vec![streamkit_core::OutputPin { + name: "out".to_string(), + produces_type: streamkit_core::types::PacketType::Binary, + cardinality: streamkit_core::PinCardinality::Broadcast, + }] + } + + async fn run( + self: Box, + mut context: streamkit_core::NodeContext, + ) -> Result<(), StreamKitError> { + loop { + match context.control_rx.recv().await { + Some(streamkit_core::control::NodeControlMessage::Shutdown) | None => return Ok(()), + Some( + streamkit_core::control::NodeControlMessage::Start + | streamkit_core::control::NodeControlMessage::UpdateParams(_), + ) => {}, + } + } + } +} + +/// A simple source node (no inputs) that stays alive until shutdown. +struct SimpleSourceNode; + +#[streamkit_core::async_trait] +impl ProcessorNode for SimpleSourceNode { + fn input_pins(&self) -> Vec { + Vec::new() + } + + fn output_pins(&self) -> Vec { + vec![streamkit_core::OutputPin { + name: "out".to_string(), + produces_type: streamkit_core::types::PacketType::Binary, + cardinality: streamkit_core::PinCardinality::Broadcast, + }] + } + + async fn run( + self: Box, + mut context: streamkit_core::NodeContext, + ) -> Result<(), StreamKitError> { + loop { + match context.control_rx.recv().await { + Some(streamkit_core::control::NodeControlMessage::Shutdown) | None => return Ok(()), + Some( + streamkit_core::control::NodeControlMessage::Start + | streamkit_core::control::NodeControlMessage::UpdateParams(_), + ) => {}, + } + } + } +} + +/// A node whose constructor always fails with an error. +struct FailingConstructorNode; + +impl FailingConstructorNode { + fn factory( + ) -> impl Fn(Option<&serde_json::Value>) -> Result, StreamKitError> + + Send + + Sync + + 'static { + |_params| { + std::thread::sleep(Duration::from_millis(100)); + Err(StreamKitError::Runtime("Model loading failed: out of memory".to_string())) + } + } +} + +/// A fast node whose constructor records the creation count (for concurrency tests). +struct FastTestNode; + +impl FastTestNode { + fn factory( + counter: Arc, + ) -> impl Fn(Option<&serde_json::Value>) -> Result, StreamKitError> + + Send + + Sync + + 'static { + move |_params| { + counter.fetch_add(1, Ordering::SeqCst); + Ok(Box::new(Self) as Box) + } + } +} + +#[streamkit_core::async_trait] +impl ProcessorNode for FastTestNode { + fn input_pins(&self) -> Vec { + vec![streamkit_core::InputPin { + name: "in".to_string(), + accepts_types: vec![streamkit_core::types::PacketType::Any], + cardinality: streamkit_core::PinCardinality::One, + }] + } + + fn output_pins(&self) -> Vec { + vec![streamkit_core::OutputPin { + name: "out".to_string(), + produces_type: streamkit_core::types::PacketType::Binary, + cardinality: streamkit_core::PinCardinality::Broadcast, + }] + } + + async fn run( + self: Box, + mut context: streamkit_core::NodeContext, + ) -> Result<(), StreamKitError> { + loop { + match context.control_rx.recv().await { + Some(streamkit_core::control::NodeControlMessage::Shutdown) | None => return Ok(()), + Some( + streamkit_core::control::NodeControlMessage::Start + | streamkit_core::control::NodeControlMessage::UpdateParams(_), + ) => {}, + } + } + } +} + +// --------------------------------------------------------------------------- +// Helper: build an engine with a pre-populated registry +// --------------------------------------------------------------------------- + +fn build_engine(registry: NodeRegistry) -> (Engine, DynamicEngineHandle) { + let engine = Engine { + registry: Arc::new(std::sync::RwLock::new(registry)), + audio_pool: Arc::new(streamkit_core::AudioFramePool::audio_default()), + video_pool: Arc::new(streamkit_core::VideoFramePool::video_default()), + }; + let handle = engine.start_dynamic_actor(DynamicEngineConfig::default()); + (engine, handle) +} + +/// Poll `handle.get_node_states()` until a predicate holds, with timeout. +async fn wait_for_states(handle: &DynamicEngineHandle, timeout_dur: Duration, pred: F) -> bool +where + F: Fn(&std::collections::HashMap) -> bool, +{ + let deadline = Instant::now() + timeout_dur; + while Instant::now() < deadline { + if let Ok(states) = handle.get_node_states().await { + if pred(&states) { + return true; + } + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + false +} + +// --------------------------------------------------------------------------- +// Test 1: Basic async creation — actor can process other AddNode messages +// while a slow node is being created. +// --------------------------------------------------------------------------- + +#[tokio::test] +#[allow(clippy::expect_used)] +async fn test_basic_async_creation() { + let slow_created = Arc::new(AtomicBool::new(false)); + + let mut registry = NodeRegistry::new(); + registry.register_dynamic( + "test::slow", + SlowTestNode::factory(Duration::from_secs(1), slow_created.clone()), + serde_json::json!({}), + vec!["test".to_string()], + false, + ); + + let fast_counter = Arc::new(AtomicU32::new(0)); + registry.register_dynamic( + "test::fast", + FastTestNode::factory(fast_counter.clone()), + serde_json::json!({}), + vec!["test".to_string()], + false, + ); + + let (_engine, handle) = build_engine(registry); + + // Add slow node first, then fast node immediately after. + handle + .send_control(EngineControlMessage::AddNode { + node_id: "slow".to_string(), + kind: "test::slow".to_string(), + params: None, + }) + .await + .expect("send AddNode slow"); + + handle + .send_control(EngineControlMessage::AddNode { + node_id: "fast".to_string(), + kind: "test::fast".to_string(), + params: None, + }) + .await + .expect("send AddNode fast"); + + // The fast node should become available (past Creating) well before the + // slow node finishes its 1-second sleep. + let fast_ready = wait_for_states(&handle, Duration::from_secs(3), |states| { + states.get("fast").is_some_and(|s| !matches!(s, NodeState::Creating)) + }) + .await; + assert!(fast_ready, "fast node should leave Creating before slow node finishes"); + + // At this point the slow node should still be Creating (or just finishing). + // Wait for it to also complete. + let slow_ready = wait_for_states(&handle, Duration::from_secs(5), |states| { + states.get("slow").is_some_and(|s| !matches!(s, NodeState::Creating)) + }) + .await; + assert!(slow_ready, "slow node should eventually leave Creating"); + assert!(slow_created.load(Ordering::SeqCst), "slow constructor should have run"); + + handle.shutdown_and_wait().await.expect("shutdown"); +} + +// --------------------------------------------------------------------------- +// Test 2: Deferred connections — Connect sent before slow node finishes +// is replayed after creation completes. +// --------------------------------------------------------------------------- + +#[tokio::test] +#[allow(clippy::expect_used)] +async fn test_deferred_connections() { + let slow_created = Arc::new(AtomicBool::new(false)); + + let mut registry = NodeRegistry::new(); + registry.register_dynamic( + "test::source", + |_params| Ok(Box::new(SimpleSourceNode) as Box), + serde_json::json!({}), + vec!["test".to_string()], + false, + ); + registry.register_dynamic( + "test::slow", + SlowTestNode::factory(Duration::from_millis(500), slow_created.clone()), + serde_json::json!({}), + vec!["test".to_string()], + false, + ); + + let (_engine, handle) = build_engine(registry); + + // Add both nodes. + handle + .send_control(EngineControlMessage::AddNode { + node_id: "src".to_string(), + kind: "test::source".to_string(), + params: None, + }) + .await + .expect("add source"); + + handle + .send_control(EngineControlMessage::AddNode { + node_id: "slow".to_string(), + kind: "test::slow".to_string(), + params: None, + }) + .await + .expect("add slow"); + + // Connect immediately — slow node is still Creating. + handle + .send_control(EngineControlMessage::Connect { + from_node: "src".to_string(), + from_pin: "out".to_string(), + to_node: "slow".to_string(), + to_pin: "in".to_string(), + mode: streamkit_core::control::ConnectionMode::Reliable, + }) + .await + .expect("connect"); + + // After slow node finishes, both should be initialized and the deferred + // connection should have been replayed. + let both_ready = wait_for_states(&handle, Duration::from_secs(5), |states| { + let src_ok = states.get("src").is_some_and(|s| { + matches!(s, NodeState::Ready | NodeState::Running | NodeState::Initializing) + }); + let slow_ok = states.get("slow").is_some_and(|s| { + matches!(s, NodeState::Ready | NodeState::Running | NodeState::Initializing) + }); + src_ok && slow_ok + }) + .await; + assert!(both_ready, "both nodes should be initialized after deferred connection replay"); + + handle.shutdown_and_wait().await.expect("shutdown"); +} + +// --------------------------------------------------------------------------- +// Test 3: Multiple slow nodes — they should be created concurrently +// (total time ≈ max, not sum). +// --------------------------------------------------------------------------- + +#[tokio::test] +#[allow(clippy::expect_used)] +async fn test_multiple_slow_nodes_concurrent() { + let created_a = Arc::new(AtomicBool::new(false)); + let created_b = Arc::new(AtomicBool::new(false)); + let created_c = Arc::new(AtomicBool::new(false)); + + let mut registry = NodeRegistry::new(); + registry.register_dynamic( + "test::slow_a", + SlowTestNode::factory(Duration::from_millis(500), created_a.clone()), + serde_json::json!({}), + vec!["test".to_string()], + false, + ); + registry.register_dynamic( + "test::slow_b", + SlowTestNode::factory(Duration::from_millis(500), created_b.clone()), + serde_json::json!({}), + vec!["test".to_string()], + false, + ); + registry.register_dynamic( + "test::slow_c", + SlowTestNode::factory(Duration::from_millis(500), created_c.clone()), + serde_json::json!({}), + vec!["test".to_string()], + false, + ); + + let (_engine, handle) = build_engine(registry); + + let start = Instant::now(); + + for (id, kind) in [("a", "test::slow_a"), ("b", "test::slow_b"), ("c", "test::slow_c")] { + handle + .send_control(EngineControlMessage::AddNode { + node_id: id.to_string(), + kind: kind.to_string(), + params: None, + }) + .await + .expect("add node"); + } + + // Wait for all three to leave Creating. + let all_done = wait_for_states(&handle, Duration::from_secs(5), |states| { + ["a", "b", "c"] + .iter() + .all(|id| states.get(*id).is_some_and(|s| !matches!(s, NodeState::Creating))) + }) + .await; + assert!(all_done, "all three slow nodes should finish creation"); + + let elapsed = start.elapsed(); + // If sequential, ~1.5s; if concurrent, ~0.5s + overhead. + assert!( + elapsed < Duration::from_millis(1200), + "3 x 500ms nodes should complete in ~500ms (concurrent), but took {elapsed:?}", + ); + + assert!(created_a.load(Ordering::SeqCst)); + assert!(created_b.load(Ordering::SeqCst)); + assert!(created_c.load(Ordering::SeqCst)); + + handle.shutdown_and_wait().await.expect("shutdown"); +} + +// --------------------------------------------------------------------------- +// Test 4: Creation failure — node transitions to Failed, pending connections +// referencing it are drained. +// --------------------------------------------------------------------------- + +#[tokio::test] +#[allow(clippy::expect_used)] +async fn test_creation_failure() { + let mut registry = NodeRegistry::new(); + registry.register_dynamic( + "test::source", + |_params| Ok(Box::new(SimpleSourceNode) as Box), + serde_json::json!({}), + vec!["test".to_string()], + false, + ); + registry.register_dynamic( + "test::failing", + FailingConstructorNode::factory(), + serde_json::json!({}), + vec!["test".to_string()], + false, + ); + + let (_engine, handle) = build_engine(registry); + + handle + .send_control(EngineControlMessage::AddNode { + node_id: "src".to_string(), + kind: "test::source".to_string(), + params: None, + }) + .await + .expect("add source"); + + handle + .send_control(EngineControlMessage::AddNode { + node_id: "bad".to_string(), + kind: "test::failing".to_string(), + params: None, + }) + .await + .expect("add failing"); + + // Queue a connection to the failing node. + handle + .send_control(EngineControlMessage::Connect { + from_node: "src".to_string(), + from_pin: "out".to_string(), + to_node: "bad".to_string(), + to_pin: "in".to_string(), + mode: streamkit_core::control::ConnectionMode::Reliable, + }) + .await + .expect("connect"); + + // The failing node should transition to Failed. + let failed = wait_for_states(&handle, Duration::from_secs(3), |states| { + matches!(states.get("bad"), Some(NodeState::Failed { .. })) + }) + .await; + assert!(failed, "failing node should transition to Failed"); + + // Source node should still be fine. + let states = handle.get_node_states().await.expect("get states"); + assert!( + states.get("src").is_some_and(|s| !matches!(s, NodeState::Failed { .. })), + "source node should be unaffected" + ); + + handle.shutdown_and_wait().await.expect("shutdown"); +} + +// --------------------------------------------------------------------------- +// Test 5: RemoveNode while Creating — background result is discarded. +// --------------------------------------------------------------------------- + +#[tokio::test] +#[allow(clippy::expect_used)] +async fn test_remove_node_while_creating() { + let slow_created = Arc::new(AtomicBool::new(false)); + + let mut registry = NodeRegistry::new(); + registry.register_dynamic( + "test::slow", + SlowTestNode::factory(Duration::from_secs(1), slow_created.clone()), + serde_json::json!({}), + vec!["test".to_string()], + false, + ); + + let (_engine, handle) = build_engine(registry); + + // Add slow node. + handle + .send_control(EngineControlMessage::AddNode { + node_id: "doomed".to_string(), + kind: "test::slow".to_string(), + params: None, + }) + .await + .expect("add doomed"); + + // Give the actor a moment to process AddNode and set Creating state. + tokio::time::sleep(Duration::from_millis(50)).await; + + // Remove it while still Creating. + handle + .send_control(EngineControlMessage::RemoveNode { node_id: "doomed".to_string() }) + .await + .expect("remove doomed"); + + // Wait for the background creation to complete (1s), then verify + // the node was NOT added to the engine. + tokio::time::sleep(Duration::from_millis(1500)).await; + + let states = handle.get_node_states().await.expect("get states"); + assert!( + !states.contains_key("doomed"), + "removed-while-Creating node should not appear in states" + ); + + // The constructor did run (it was already spawned), but the result + // should have been discarded. + assert!( + slow_created.load(Ordering::SeqCst), + "constructor runs to completion even if cancelled" + ); + + handle.shutdown_and_wait().await.expect("shutdown"); +} + +// --------------------------------------------------------------------------- +// Test 6: Pipeline activation timing — source nodes do NOT activate until +// all nodes leave Creating state. +// --------------------------------------------------------------------------- + +#[tokio::test] +#[allow(clippy::expect_used)] +async fn test_pipeline_activation_timing() { + let slow_created = Arc::new(AtomicBool::new(false)); + + let mut registry = NodeRegistry::new(); + registry.register_dynamic( + "test::source", + |_params| Ok(Box::new(SimpleSourceNode) as Box), + serde_json::json!({}), + vec!["test".to_string()], + false, + ); + registry.register_dynamic( + "test::slow", + SlowTestNode::factory(Duration::from_millis(800), slow_created.clone()), + serde_json::json!({}), + vec!["test".to_string()], + false, + ); + + let (_engine, handle) = build_engine(registry); + + // Subscribe to state updates to observe activation. + let mut state_rx = handle.subscribe_state().await.expect("subscribe"); + + handle + .send_control(EngineControlMessage::AddNode { + node_id: "src".to_string(), + kind: "test::source".to_string(), + params: None, + }) + .await + .expect("add source"); + + handle + .send_control(EngineControlMessage::AddNode { + node_id: "proc".to_string(), + kind: "test::slow".to_string(), + params: None, + }) + .await + .expect("add slow processor"); + + handle + .send_control(EngineControlMessage::Connect { + from_node: "src".to_string(), + from_pin: "out".to_string(), + to_node: "proc".to_string(), + to_pin: "in".to_string(), + mode: streamkit_core::control::ConnectionMode::Reliable, + }) + .await + .expect("connect"); + + // Drain state updates; verify source doesn't go to Running before slow node + // leaves Creating. + let mut slow_left_creating = false; + let mut src_ran_before_slow_ready = false; + + let drain_deadline = Instant::now() + Duration::from_secs(5); + while Instant::now() < drain_deadline { + match tokio::time::timeout(Duration::from_millis(100), state_rx.recv()).await { + Ok(Some(update)) => { + if update.node_id == "proc" && !matches!(update.state, NodeState::Creating) { + slow_left_creating = true; + } + if update.node_id == "src" && matches!(update.state, NodeState::Running) { + if !slow_left_creating { + src_ran_before_slow_ready = true; + } + break; + } + }, + _ => { + if slow_left_creating { + break; + } + }, + } + } + + assert!( + !src_ran_before_slow_ready, + "source should NOT start Running before slow node leaves Creating" + ); + + handle.shutdown_and_wait().await.expect("shutdown"); +} + +// --------------------------------------------------------------------------- +// Test 7: Duplicate AddNode — second call is rejected. +// --------------------------------------------------------------------------- + +#[tokio::test] +#[allow(clippy::expect_used)] +async fn test_duplicate_add_node() { + let slow_created = Arc::new(AtomicBool::new(false)); + + let mut registry = NodeRegistry::new(); + registry.register_dynamic( + "test::slow", + SlowTestNode::factory(Duration::from_millis(500), slow_created.clone()), + serde_json::json!({}), + vec!["test".to_string()], + false, + ); + + let (_engine, handle) = build_engine(registry); + + // Add node. + handle + .send_control(EngineControlMessage::AddNode { + node_id: "dup".to_string(), + kind: "test::slow".to_string(), + params: None, + }) + .await + .expect("add first"); + + // Give actor time to process. + tokio::time::sleep(Duration::from_millis(50)).await; + + // Try adding the same node_id again. + handle + .send_control(EngineControlMessage::AddNode { + node_id: "dup".to_string(), + kind: "test::slow".to_string(), + params: None, + }) + .await + .expect("add duplicate"); + + // Wait for the original to finish. + let done = wait_for_states(&handle, Duration::from_secs(3), |states| { + states.get("dup").is_some_and(|s| !matches!(s, NodeState::Creating)) + }) + .await; + assert!(done, "original node should finish creating"); + + // The engine should still be responsive (no double-init crash). + let states = handle.get_node_states().await.expect("get states"); + assert!(states.contains_key("dup"), "node should exist"); + + handle.shutdown_and_wait().await.expect("shutdown"); +} + +// --------------------------------------------------------------------------- +// Test 8: RemoveNode then re-AddNode with same ID — new node is created +// correctly and old creation result is discarded. +// --------------------------------------------------------------------------- + +#[tokio::test] +#[allow(clippy::expect_used)] +async fn test_remove_then_readd_same_id() { + let created_v1 = Arc::new(AtomicBool::new(false)); + let created_v2 = Arc::new(AtomicBool::new(false)); + + let mut registry = NodeRegistry::new(); + registry.register_dynamic( + "test::slow_v1", + SlowTestNode::factory(Duration::from_secs(1), created_v1.clone()), + serde_json::json!({}), + vec!["test".to_string()], + false, + ); + registry.register_dynamic( + "test::fast_v2", + SlowTestNode::factory(Duration::from_millis(50), created_v2.clone()), + serde_json::json!({}), + vec!["test".to_string()], + false, + ); + + let (_engine, handle) = build_engine(registry); + + // Add slow v1. + handle + .send_control(EngineControlMessage::AddNode { + node_id: "node".to_string(), + kind: "test::slow_v1".to_string(), + params: None, + }) + .await + .expect("add v1"); + + tokio::time::sleep(Duration::from_millis(50)).await; + + // Remove while Creating. + handle + .send_control(EngineControlMessage::RemoveNode { node_id: "node".to_string() }) + .await + .expect("remove"); + + tokio::time::sleep(Duration::from_millis(50)).await; + + // Re-add with a different (fast) kind. + handle + .send_control(EngineControlMessage::AddNode { + node_id: "node".to_string(), + kind: "test::fast_v2".to_string(), + params: None, + }) + .await + .expect("add v2"); + + // Wait for v2 to finish. + let v2_done = wait_for_states(&handle, Duration::from_secs(3), |states| { + states.get("node").is_some_and(|s| !matches!(s, NodeState::Creating)) + }) + .await; + assert!(v2_done, "v2 node should finish creating"); + + // v2 should have been created. + assert!(created_v2.load(Ordering::SeqCst), "v2 constructor should have run"); + + // Wait for v1 background task to also complete (it was already spawned). + tokio::time::sleep(Duration::from_millis(1200)).await; + assert!(created_v1.load(Ordering::SeqCst), "v1 constructor runs to completion"); + + // The node should be the v2 version (kind = test::fast_v2). + let states = handle.get_node_states().await.expect("get states"); + assert!(states.contains_key("node"), "node should exist"); + + handle.shutdown_and_wait().await.expect("shutdown"); +} + +// --------------------------------------------------------------------------- +// Test 9: Shutdown while Creating — clean shutdown, no panics. +// --------------------------------------------------------------------------- + +#[tokio::test] +#[allow(clippy::expect_used)] +async fn test_shutdown_while_creating() { + let slow_created = Arc::new(AtomicBool::new(false)); + + let mut registry = NodeRegistry::new(); + registry.register_dynamic( + "test::slow", + SlowTestNode::factory(Duration::from_secs(2), slow_created.clone()), + serde_json::json!({}), + vec!["test".to_string()], + false, + ); + + let (_engine, handle) = build_engine(registry); + + handle + .send_control(EngineControlMessage::AddNode { + node_id: "slow".to_string(), + kind: "test::slow".to_string(), + params: None, + }) + .await + .expect("add slow"); + + // Give actor time to set Creating state. + tokio::time::sleep(Duration::from_millis(50)).await; + + // Shutdown while slow node is still Creating. + let result = handle.shutdown_and_wait().await; + assert!(result.is_ok(), "shutdown should complete cleanly: {result:?}"); +} + +// --------------------------------------------------------------------------- +// Test 10: Connect with one realized, one creating — connection is deferred +// and replayed correctly. +// --------------------------------------------------------------------------- + +#[tokio::test] +#[allow(clippy::expect_used)] +async fn test_connect_one_realized_one_creating() { + let slow_created = Arc::new(AtomicBool::new(false)); + + let mut registry = NodeRegistry::new(); + registry.register_dynamic( + "test::source", + |_params| Ok(Box::new(SimpleSourceNode) as Box), + serde_json::json!({}), + vec!["test".to_string()], + false, + ); + registry.register_dynamic( + "test::slow", + SlowTestNode::factory(Duration::from_millis(500), slow_created.clone()), + serde_json::json!({}), + vec!["test".to_string()], + false, + ); + + let (_engine, handle) = build_engine(registry); + + // Add source (fast) — it will be realized quickly. + handle + .send_control(EngineControlMessage::AddNode { + node_id: "source".to_string(), + kind: "test::source".to_string(), + params: None, + }) + .await + .expect("add source"); + + // Wait for source to leave Creating. + let source_ready = wait_for_states(&handle, Duration::from_secs(2), |states| { + states.get("source").is_some_and(|s| !matches!(s, NodeState::Creating)) + }) + .await; + assert!(source_ready, "source should be realized quickly"); + + // Now add slow node. + handle + .send_control(EngineControlMessage::AddNode { + node_id: "slow".to_string(), + kind: "test::slow".to_string(), + params: None, + }) + .await + .expect("add slow"); + + // Connect while source is realized but slow is still Creating. + handle + .send_control(EngineControlMessage::Connect { + from_node: "source".to_string(), + from_pin: "out".to_string(), + to_node: "slow".to_string(), + to_pin: "in".to_string(), + mode: streamkit_core::control::ConnectionMode::Reliable, + }) + .await + .expect("connect"); + + // Wait for slow node to finish and verify both are initialized. + let both_done = wait_for_states(&handle, Duration::from_secs(5), |states| { + let source_ok = states.get("source").is_some_and(|s| { + matches!(s, NodeState::Ready | NodeState::Running | NodeState::Initializing) + }); + let slow_ok = states.get("slow").is_some_and(|s| { + matches!(s, NodeState::Ready | NodeState::Running | NodeState::Initializing) + }); + source_ok && slow_ok + }) + .await; + assert!(both_done, "both nodes should be initialized after deferred connection is replayed"); + + handle.shutdown_and_wait().await.expect("shutdown"); +} diff --git a/crates/engine/src/tests/connection_types.rs b/crates/engine/src/tests/connection_types.rs index 45f62e94..ed40c2e1 100644 --- a/crates/engine/src/tests/connection_types.rs +++ b/crates/engine/src/tests/connection_types.rs @@ -21,6 +21,8 @@ fn create_test_engine() -> DynamicEngine { drop(control_tx); drop(query_tx); + let (node_created_tx, node_created_rx) = mpsc::channel(32); + let meter = opentelemetry::global::meter("test"); DynamicEngine { registry: std::sync::Arc::new(std::sync::RwLock::new(NodeRegistry::new())), @@ -57,6 +59,10 @@ fn create_test_engine() -> DynamicEngine { node_state_gauge: meter.u64_gauge("test.state").build(), runtime_schemas: HashMap::new(), runtime_schema_subscribers: Vec::new(), + node_created_tx, + node_created_rx, + pending_connections: Vec::new(), + cancelled_creations: std::collections::HashSet::new(), } } diff --git a/crates/engine/src/tests/mod.rs b/crates/engine/src/tests/mod.rs index 0a9b9bf5..c35ece7f 100644 --- a/crates/engine/src/tests/mod.rs +++ b/crates/engine/src/tests/mod.rs @@ -4,6 +4,8 @@ //! Unit tests for the engine crate. +#[cfg(feature = "dynamic")] +mod async_node_creation; #[cfg(feature = "dynamic")] mod connection_types; #[cfg(feature = "dynamic")] diff --git a/crates/engine/src/tests/pipeline_activation.rs b/crates/engine/src/tests/pipeline_activation.rs index 7792d7e1..f007e87e 100644 --- a/crates/engine/src/tests/pipeline_activation.rs +++ b/crates/engine/src/tests/pipeline_activation.rs @@ -22,6 +22,8 @@ fn create_test_engine() -> DynamicEngine { drop(control_tx); drop(query_tx); + let (nc_tx, nc_rx) = mpsc::channel(32); + let meter = opentelemetry::global::meter("test"); DynamicEngine { registry: std::sync::Arc::new(std::sync::RwLock::new(NodeRegistry::new())), @@ -58,6 +60,10 @@ fn create_test_engine() -> DynamicEngine { node_state_gauge: meter.u64_gauge("test.state").build(), runtime_schemas: HashMap::new(), runtime_schema_subscribers: Vec::new(), + node_created_tx: nc_tx, + node_created_rx: nc_rx, + pending_connections: Vec::new(), + cancelled_creations: std::collections::HashSet::new(), } } @@ -270,6 +276,19 @@ async fn test_activation_blocked_by_failed_node() { ); } +/// Source node should NOT receive Start while any node is still Creating. +#[tokio::test] +async fn test_activation_blocked_by_creating_node() { + let mut engine = create_test_engine(); + let mut source_rx = add_source_node(&mut engine, "source", NodeState::Ready); + add_processor_node(&mut engine, "slow_node", NodeState::Creating); + + engine.check_and_activate_pipeline(); + + let msg = source_rx.try_recv(); + assert!(msg.is_err(), "source node should NOT receive Start while a node is still Creating"); +} + /// Source node should NOT receive Start when a downstream node has Stopped. #[tokio::test] async fn test_activation_blocked_by_stopped_node() { diff --git a/ui/src/components/NodeStateIndicator.tsx b/ui/src/components/NodeStateIndicator.tsx index 97d6c80f..725b2d1a 100644 --- a/ui/src/components/NodeStateIndicator.tsx +++ b/ui/src/components/NodeStateIndicator.tsx @@ -152,6 +152,7 @@ function renderSlowPinsSummary( function getStateColor(state: NodeState): string { if (typeof state === 'string') { switch (state) { + case 'Creating': case 'Initializing': return 'var(--sk-status-initializing)'; case 'Running': @@ -201,6 +202,8 @@ function getStateLabel(state: NodeState): string { function getStateDescription(state: NodeState): string { if (typeof state === 'string') { switch (state) { + case 'Creating': + return 'Node is being created (loading resources)'; case 'Initializing': return 'Node is starting up and performing initialization'; case 'Running': diff --git a/ui/src/types/generated/api-types.ts b/ui/src/types/generated/api-types.ts index dba2147f..61aed408 100644 --- a/ui/src/types/generated/api-types.ts +++ b/ui/src/types/generated/api-types.ts @@ -101,7 +101,7 @@ bidirectional: boolean, }; export type StopReason = "completed" | "input_closed" | "output_closed" | "shutdown" | "no_inputs" | "unknown"; -export type NodeState = "Initializing" | "Ready" | "Running" | { "Recovering": { reason: string, details: JsonValue, } } | { "Degraded": { reason: string, details: JsonValue, } } | { "Failed": { reason: string, } } | { "Stopped": { reason: StopReason, } }; +export type NodeState = "Creating" | "Initializing" | "Ready" | "Running" | { "Recovering": { reason: string, details: JsonValue, } } | { "Degraded": { reason: string, details: JsonValue, } } | { "Failed": { reason: string, } } | { "Stopped": { reason: StopReason, } }; export type NodeStats = { /** diff --git a/ui/src/utils/sessionStatus.ts b/ui/src/utils/sessionStatus.ts index cd5c41d4..3548edaf 100644 --- a/ui/src/utils/sessionStatus.ts +++ b/ui/src/utils/sessionStatus.ts @@ -54,8 +54,8 @@ export function computeSessionStatus(nodeStates: Record): Ses return 'recovering'; } - // Check for initializing - if (states.some((state) => state === 'Initializing')) { + // Check for creating or initializing + if (states.some((state) => state === 'Creating' || state === 'Initializing')) { return 'initializing'; } From 399c7b5712f065198e41ed6017b88baf90638f77 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Sat, 11 Apr 2026 16:16:46 +0000 Subject: [PATCH 02/16] fix(engine): address review findings in async node creation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix remove-then-readd race: clear cancelled_creations entry when a new AddNode arrives with the same ID, preventing the new creation result from being mistakenly discarded. - Fix Disconnect not draining pending_connections: remove matching deferred connections so they aren't replayed after the user explicitly disconnected them. - Fix broadcast_state_update skipping previous-state gauge zeroing: mirror the one-hot gauge pattern from handle_state_update so engine-synthesized transitions (Creating → Failed) don't leave stale gauge series at 1. - Strengthen test_remove_then_readd_same_id to verify the node is fully initialized (not Creating/Failed) after the re-add. Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- crates/engine/src/dynamic_actor.rs | 34 ++++++++++++++++++- .../engine/src/tests/async_node_creation.rs | 8 ++++- 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/crates/engine/src/dynamic_actor.rs b/crates/engine/src/dynamic_actor.rs index 099b56dc..542a4988 100644 --- a/crates/engine/src/dynamic_actor.rs +++ b/crates/engine/src/dynamic_actor.rs @@ -1586,6 +1586,22 @@ impl DynamicEngine { 1, &[KeyValue::new("node_id", update.node_id.clone()), KeyValue::new("state", state_name)], ); + + // Zero-out the previous state's gauge series (one-hot pattern), + // mirroring the logic in `handle_state_update`. + let prev_state = self.node_states.get(&update.node_id); + if let Some(prev_state) = prev_state { + let prev_state_name = Self::node_state_name(prev_state); + if prev_state_name != state_name { + self.node_state_gauge.record( + 0, + &[ + KeyValue::new("node_id", update.node_id.clone()), + KeyValue::new("state", prev_state_name), + ], + ); + } + } self.node_state_gauge.record( 1, &[KeyValue::new("node_id", update.node_id.clone()), KeyValue::new("state", state_name)], @@ -1664,6 +1680,12 @@ impl DynamicEngine { self.node_states.insert(node_id.clone(), NodeState::Creating); self.node_kinds.insert(node_id.clone(), kind.clone()); + // Clear any stale cancellation for this ID left over from a + // previous Remove → re-Add cycle. Without this, the new + // creation result would be mistakenly discarded in + // `handle_node_created`. + self.cancelled_creations.remove(&node_id); + // Broadcast Creating state to subscribers. let update = NodeStateUpdate::new(node_id.clone(), NodeState::Creating); self.broadcast_state_update(&update); @@ -1754,7 +1776,17 @@ impl DynamicEngine { }, EngineControlMessage::Disconnect { from_node, from_pin, to_node, to_pin } => { self.engine_operations_counter.add(1, &[KeyValue::new("operation", "disconnect")]); - // Delegate disconnection logic + + // Also remove any matching deferred connection so it isn't + // replayed later by `flush_pending_connections`. + self.pending_connections.retain(|pc| { + !(pc.from_node == from_node + && pc.from_pin == from_pin + && pc.to_node == to_node + && pc.to_pin == to_pin) + }); + + // Delegate disconnection logic for realized connections. self.disconnect_nodes(from_node, from_pin, to_node, to_pin).await; }, EngineControlMessage::TuneNode { node_id, message } => { diff --git a/crates/engine/src/tests/async_node_creation.rs b/crates/engine/src/tests/async_node_creation.rs index 974b20d1..18d725b6 100644 --- a/crates/engine/src/tests/async_node_creation.rs +++ b/crates/engine/src/tests/async_node_creation.rs @@ -785,9 +785,15 @@ async fn test_remove_then_readd_same_id() { tokio::time::sleep(Duration::from_millis(1200)).await; assert!(created_v1.load(Ordering::SeqCst), "v1 constructor runs to completion"); - // The node should be the v2 version (kind = test::fast_v2). + // The node should be the v2 version — verify it's not in Creating or + // Failed state (it should be fully initialized). let states = handle.get_node_states().await.expect("get states"); assert!(states.contains_key("node"), "node should exist"); + let state = states.get("node").expect("node state"); + assert!( + !matches!(state, NodeState::Creating | NodeState::Failed { .. }), + "v2 node should be initialized, not Creating/Failed, got: {state:?}" + ); handle.shutdown_and_wait().await.expect("shutdown"); } From 38d6ef6712e6e1b6fef0c15d0a0fa2a07c8dd6cc Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Sat, 11 Apr 2026 16:25:27 +0000 Subject: [PATCH 03/16] fix(engine): replace cancelled_creations with creation_id generation counter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous cancelled_creations HashSet approach had a race condition in Remove → re-Add sequences: clearing the cancellation on re-Add allowed the old background task's result to also be processed, causing double initialization and resource leaks. Replace with a monotonic creation_id counter (next_creation_id) and an active_creations map (node_id → creation_id). Each spawned creation task carries its creation_id; handle_node_created only accepts results whose creation_id matches the current active entry. Stale results from cancelled or superseded creations are silently discarded. Also fix broadcast_state_update to read previous state from node_states BEFORE inserting the new one (it now owns the insertion), so the one-hot gauge zeroing works correctly for engine-synthesized transitions like Creating → Failed. Add a zero_state_gauge helper for the Creating → Initializing transition in handle_node_created. Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- crates/engine/src/dynamic_actor.rs | 151 ++++++++++-------- crates/engine/src/lib.rs | 3 +- crates/engine/src/tests/connection_types.rs | 3 +- .../engine/src/tests/pipeline_activation.rs | 3 +- 4 files changed, 94 insertions(+), 66 deletions(-) diff --git a/crates/engine/src/dynamic_actor.rs b/crates/engine/src/dynamic_actor.rs index 542a4988..484d630e 100644 --- a/crates/engine/src/dynamic_actor.rs +++ b/crates/engine/src/dynamic_actor.rs @@ -16,7 +16,7 @@ use crate::{ graph_builder, }; use opentelemetry::KeyValue; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::sync::{Arc, RwLock}; use streamkit_core::control::{EngineControlMessage, NodeControlMessage}; use streamkit_core::error::StreamKitError; @@ -54,6 +54,7 @@ struct NodeChannels { pub struct NodeCreatedEvent { node_id: String, kind: String, + creation_id: u64, result: Result, StreamKitError>, } @@ -149,9 +150,14 @@ pub struct DynamicEngine { pub(super) node_created_rx: mpsc::Receiver, /// Connections deferred because one or both endpoints are still `Creating`. pub(super) pending_connections: Vec, - /// Node IDs whose creation was cancelled (via `RemoveNode` while `Creating`). - /// When `NodeCreated` arrives for a cancelled ID, the result is discarded. - pub(super) cancelled_creations: HashSet, + /// Monotonically increasing counter used to tag each spawned creation task. + /// Lets `handle_node_created` distinguish stale results (from a previous + /// Remove → re-Add cycle) from the current active creation. + pub(super) next_creation_id: u64, + /// Maps node_id → creation_id for nodes currently in `Creating` state. + /// When `NodeCreated` arrives, its `creation_id` must match the active + /// entry; otherwise the result is stale and discarded. + pub(super) active_creations: HashMap, } impl DynamicEngine { const fn node_state_name(state: &NodeState) -> &'static str { @@ -1511,23 +1517,37 @@ impl DynamicEngine { /// On failure: transitions the node to `Failed`, drains pending connections /// referencing the failed node. async fn handle_node_created(&mut self, event: NodeCreatedEvent, channels: &NodeChannels) { - let NodeCreatedEvent { node_id, kind, result } = event; - - // If the node was cancelled (RemoveNode arrived while Creating), - // discard the result silently. - if self.cancelled_creations.remove(&node_id) { - tracing::info!( - node = %node_id, - "Discarding creation result for cancelled node" - ); - // Drain pending connections referencing this node. - self.pending_connections.retain(|pc| pc.from_node != node_id && pc.to_node != node_id); - return; + let NodeCreatedEvent { node_id, kind, creation_id, result } = event; + + // Check whether this creation result is still the active one. + // A mismatch means either: + // - RemoveNode was called while Creating (entry removed), or + // - Remove → re-Add happened and a newer creation superseded this one. + // In both cases, discard the stale result. + match self.active_creations.get(&node_id) { + Some(&active_id) if active_id == creation_id => { + // This is the current active creation — remove the tracking + // entry and proceed with initialization. + self.active_creations.remove(&node_id); + }, + _ => { + tracing::info!( + node = %node_id, + creation_id, + "Discarding stale/cancelled creation result" + ); + return; + }, } match result { Ok(node) => { tracing::info!(node = %node_id, kind = %kind, "Node created successfully, initializing"); + + // Zero the Creating gauge before initialize_node overwrites + // node_states with Initializing. + self.zero_state_gauge(&node_id, &NodeState::Creating); + if let Err(e) = self.initialize_node(node, &node_id, &kind, channels).await { tracing::error!( node_id = %node_id, @@ -1535,15 +1555,12 @@ impl DynamicEngine { error = %e, "Failed to initialize node after async creation" ); - self.node_states - .insert(node_id.clone(), NodeState::Failed { reason: e.to_string() }); - // Broadcast the Failed state to subscribers. - let update = NodeStateUpdate::new( - node_id.clone(), + // Broadcast Failed (reads prev state before inserting). + self.broadcast_state_update( + &node_id, NodeState::Failed { reason: e.to_string() }, ); - self.broadcast_state_update(&update); // Drain pending connections referencing this node. self.pending_connections @@ -1561,15 +1578,12 @@ impl DynamicEngine { error = %e, "Background node creation failed" ); - self.node_states - .insert(node_id.clone(), NodeState::Failed { reason: e.to_string() }); - // Broadcast the Failed state to subscribers. - let update = NodeStateUpdate::new( - node_id.clone(), + // Broadcast Failed (reads prev state before inserting). + self.broadcast_state_update( + &node_id, NodeState::Failed { reason: e.to_string() }, ); - self.broadcast_state_update(&update); // Drain pending connections referencing this node. self.pending_connections @@ -1578,35 +1592,51 @@ impl DynamicEngine { } } + /// Zero-out the gauge for a specific state (one-hot pattern helper). + fn zero_state_gauge(&self, node_id: &str, state: &NodeState) { + let state_name = Self::node_state_name(state); + self.node_state_gauge.record( + 0, + &[KeyValue::new("node_id", node_id.to_owned()), KeyValue::new("state", state_name)], + ); + } + /// Broadcast a state update to all subscribers (used when the actor itself /// synthesizes a state transition, e.g. `Creating → Failed`). - fn broadcast_state_update(&mut self, update: &NodeStateUpdate) { - let state_name = Self::node_state_name(&update.state); + /// + /// Reads the previous state from `node_states` **before** inserting the + /// new one, so the one-hot gauge zeroing is correct. + fn broadcast_state_update(&mut self, node_id: &str, new_state: NodeState) { + let state_name = Self::node_state_name(&new_state); self.node_state_transitions_counter.add( 1, - &[KeyValue::new("node_id", update.node_id.clone()), KeyValue::new("state", state_name)], + &[KeyValue::new("node_id", node_id.to_owned()), KeyValue::new("state", state_name)], ); // Zero-out the previous state's gauge series (one-hot pattern), // mirroring the logic in `handle_state_update`. - let prev_state = self.node_states.get(&update.node_id); - if let Some(prev_state) = prev_state { + if let Some(prev_state) = self.node_states.get(node_id) { let prev_state_name = Self::node_state_name(prev_state); if prev_state_name != state_name { self.node_state_gauge.record( 0, &[ - KeyValue::new("node_id", update.node_id.clone()), + KeyValue::new("node_id", node_id.to_owned()), KeyValue::new("state", prev_state_name), ], ); } } + + // Insert the new state AFTER reading the previous one. + self.node_states.insert(node_id.to_owned(), new_state.clone()); + self.node_state_gauge.record( 1, - &[KeyValue::new("node_id", update.node_id.clone()), KeyValue::new("state", state_name)], + &[KeyValue::new("node_id", node_id.to_owned()), KeyValue::new("state", state_name)], ); + let update = NodeStateUpdate::new(node_id.to_owned(), new_state); self.state_subscribers.retain(|subscriber| match subscriber.try_send(update.clone()) { Ok(()) => true, Err(mpsc::error::TrySendError::Full(_)) => { @@ -1675,20 +1705,20 @@ impl DynamicEngine { return true; } - // Record state and kind immediately so the actor loop - // continues processing the next message without blocking. - self.node_states.insert(node_id.clone(), NodeState::Creating); - self.node_kinds.insert(node_id.clone(), kind.clone()); + // Assign a unique creation ID so handle_node_created can + // distinguish stale results from a previous Remove → re-Add + // cycle. + let creation_id = self.next_creation_id; + self.next_creation_id += 1; + self.active_creations.insert(node_id.clone(), creation_id); - // Clear any stale cancellation for this ID left over from a - // previous Remove → re-Add cycle. Without this, the new - // creation result would be mistakenly discarded in - // `handle_node_created`. - self.cancelled_creations.remove(&node_id); + // Record kind immediately so the actor loop continues + // processing the next message without blocking. + self.node_kinds.insert(node_id.clone(), kind.clone()); - // Broadcast Creating state to subscribers. - let update = NodeStateUpdate::new(node_id.clone(), NodeState::Creating); - self.broadcast_state_update(&update); + // Insert Creating state and broadcast to subscribers. + // broadcast_state_update handles gauge + node_states insert. + self.broadcast_state_update(&node_id, NodeState::Creating); // Spawn background creation: `create_node` may invoke FFI // that blocks for 10-20+ seconds (ONNX model loading). @@ -1717,8 +1747,9 @@ impl DynamicEngine { ))), }; - let _ = - tx.send(NodeCreatedEvent { node_id: spawn_node_id, kind, result }).await; + let _ = tx + .send(NodeCreatedEvent { node_id: spawn_node_id, kind, creation_id, result }) + .await; }); }, EngineControlMessage::RemoveNode { node_id } => { @@ -1727,13 +1758,14 @@ impl DynamicEngine { if self.is_node_creating(&node_id) { // Node is still being created in the background. - // Track the ID so we discard the NodeCreated result - // when it arrives. + // Remove the active_creations entry so that when the + // background task completes, handle_node_created finds + // no matching entry and discards the result. tracing::info!( node_id = %node_id, - "Node is still Creating — marking for cancellation" + "Node is still Creating — cancelling" ); - self.cancelled_creations.insert(node_id.clone()); + self.active_creations.remove(&node_id); self.node_states.remove(&node_id); self.node_kinds.remove(&node_id); // Drain pending connections referencing this node. @@ -1805,16 +1837,9 @@ impl DynamicEngine { tracing::info!("Received shutdown signal, stopping all nodes"); // Step 0: Clean up nodes still in Creating state. - // Mark all as cancelled so NodeCreated results are discarded. - let creating_ids: Vec = self - .node_states - .iter() - .filter(|(_, state)| matches!(state, NodeState::Creating)) - .map(|(id, _)| id.clone()) - .collect(); - for id in creating_ids { - self.cancelled_creations.insert(id); - } + // Clear all active_creations so any background results + // that arrive after shutdown are discarded. + self.active_creations.clear(); self.pending_connections.clear(); // Step 1: Close all input channels so nodes blocked on recv() will exit diff --git a/crates/engine/src/lib.rs b/crates/engine/src/lib.rs index 5446a3b8..bd9fc12c 100644 --- a/crates/engine/src/lib.rs +++ b/crates/engine/src/lib.rs @@ -242,7 +242,8 @@ impl Engine { node_created_tx: nc_tx, node_created_rx: nc_rx, pending_connections: Vec::new(), - cancelled_creations: std::collections::HashSet::new(), + next_creation_id: 0, + active_creations: std::collections::HashMap::new(), }; let engine_task = tokio::spawn(dynamic_engine.run()); diff --git a/crates/engine/src/tests/connection_types.rs b/crates/engine/src/tests/connection_types.rs index ed40c2e1..e9790cce 100644 --- a/crates/engine/src/tests/connection_types.rs +++ b/crates/engine/src/tests/connection_types.rs @@ -62,7 +62,8 @@ fn create_test_engine() -> DynamicEngine { node_created_tx, node_created_rx, pending_connections: Vec::new(), - cancelled_creations: std::collections::HashSet::new(), + next_creation_id: 0, + active_creations: std::collections::HashMap::new(), } } diff --git a/crates/engine/src/tests/pipeline_activation.rs b/crates/engine/src/tests/pipeline_activation.rs index f007e87e..5eff1761 100644 --- a/crates/engine/src/tests/pipeline_activation.rs +++ b/crates/engine/src/tests/pipeline_activation.rs @@ -63,7 +63,8 @@ fn create_test_engine() -> DynamicEngine { node_created_tx: nc_tx, node_created_rx: nc_rx, pending_connections: Vec::new(), - cancelled_creations: std::collections::HashSet::new(), + next_creation_id: 0, + active_creations: std::collections::HashMap::new(), } } From b429367b93b2983c130a09d27211c2f35c701149 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Sat, 11 Apr 2026 16:25:37 +0000 Subject: [PATCH 04/16] style(engine): apply cargo fmt Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- crates/engine/src/dynamic_actor.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/crates/engine/src/dynamic_actor.rs b/crates/engine/src/dynamic_actor.rs index 484d630e..54c77378 100644 --- a/crates/engine/src/dynamic_actor.rs +++ b/crates/engine/src/dynamic_actor.rs @@ -1580,10 +1580,7 @@ impl DynamicEngine { ); // Broadcast Failed (reads prev state before inserting). - self.broadcast_state_update( - &node_id, - NodeState::Failed { reason: e.to_string() }, - ); + self.broadcast_state_update(&node_id, NodeState::Failed { reason: e.to_string() }); // Drain pending connections referencing this node. self.pending_connections @@ -1748,7 +1745,12 @@ impl DynamicEngine { }; let _ = tx - .send(NodeCreatedEvent { node_id: spawn_node_id, kind, creation_id, result }) + .send(NodeCreatedEvent { + node_id: spawn_node_id, + kind, + creation_id, + result, + }) .await; }); }, From e02edd86e5483fe13f537b52891d5d5b371a2df1 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Sat, 11 Apr 2026 16:32:30 +0000 Subject: [PATCH 05/16] fix(engine): zero Creating gauge on RemoveNode while Creating When RemoveNode cancels a Creating node, the node_state_gauge for (node_id, 'creating') was never zeroed, causing a permanent metrics leak. Add zero_state_gauge call before removing state, mirroring shutdown_node's pattern. Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- crates/engine/src/dynamic_actor.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/engine/src/dynamic_actor.rs b/crates/engine/src/dynamic_actor.rs index 54c77378..a7503d3c 100644 --- a/crates/engine/src/dynamic_actor.rs +++ b/crates/engine/src/dynamic_actor.rs @@ -1768,6 +1768,8 @@ impl DynamicEngine { "Node is still Creating — cancelling" ); self.active_creations.remove(&node_id); + // Zero the gauge before removing state (mirrors shutdown_node). + self.zero_state_gauge(&node_id, &NodeState::Creating); self.node_states.remove(&node_id); self.node_kinds.remove(&node_id); // Drain pending connections referencing this node. From a17b6cb8ed774570a4a5d8ef709881e1966191fb Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Sat, 11 Apr 2026 16:44:44 +0000 Subject: [PATCH 06/16] fix(test): accept Creating and Failed states in session_destroy test With async node creation, nodes transition through Creating before reaching Initializing/Ready/Running. In the test environment, nodes may also reach Failed if the node kind isn't available in the test registry. The test's purpose is verifying clean session destruction, not specific node states, so accept all valid lifecycle states. Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- apps/skit/tests/session_lifecycle_test.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/apps/skit/tests/session_lifecycle_test.rs b/apps/skit/tests/session_lifecycle_test.rs index 37cb572d..c0ebbb3e 100644 --- a/apps/skit/tests/session_lifecycle_test.rs +++ b/apps/skit/tests/session_lifecycle_test.rs @@ -562,18 +562,22 @@ async fn test_session_destroy_shuts_down_pipeline() { ResponsePayload::Pipeline { pipeline } => { assert_eq!(pipeline.nodes.len(), 2); - // Check that nodes are in Running state (not Failed or Stopped) + // Check that nodes are in a valid lifecycle state. + // With async node creation, nodes may still be in Creating + // state if the background task hasn't completed yet. for (node_id, node) in &pipeline.nodes { if let Some(state) = &node.state { println!("Node '{}' state: {:?}", node_id, state); assert!( matches!( state, - streamkit_core::NodeState::Initializing + streamkit_core::NodeState::Creating + | streamkit_core::NodeState::Initializing | streamkit_core::NodeState::Ready | streamkit_core::NodeState::Running + | streamkit_core::NodeState::Failed { .. } ), - "Node '{}' should be initializing/ready/running, got: {:?}", + "Node '{}' should be in a valid lifecycle state, got: {:?}", node_id, state ); From 164d97e79143bf9843098f976c1b6ab95cfb606b Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Sat, 11 Apr 2026 16:49:51 +0000 Subject: [PATCH 07/16] fix(engine): close gauge gap, prevent leaked pending connections, improve TuneNode log MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Route Creating → Initializing through broadcast_state_update so the gauge transition zeroes Creating and sets Initializing atomically (no window where no gauge reads 1 for the node). - Add upfront check in Connect handler: both endpoints must exist in node_states before deferring. Connecting to a non-existent node while the other is Creating would otherwise leak a pending connection that is never flushed. - Distinguish 'still Creating' from 'non-existent' in TuneNode warning. - Remove unused emit_creating helper (dead code). Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- crates/core/src/state.rs | 6 ----- crates/engine/src/dynamic_actor.rs | 36 +++++++++++++++++++++++++----- 2 files changed, 31 insertions(+), 11 deletions(-) diff --git a/crates/core/src/state.rs b/crates/core/src/state.rs index bb696085..d69279ba 100644 --- a/crates/core/src/state.rs +++ b/crates/core/src/state.rs @@ -230,12 +230,6 @@ pub mod state_helpers { let _ = state_tx.try_send(NodeStateUpdate::new(node_id.to_string(), state)); } - /// Emits a Creating state. - #[inline] - pub fn emit_creating(state_tx: &mpsc::Sender, node_id: &str) { - emit_state(state_tx, node_id, NodeState::Creating); - } - /// Emits an Initializing state. #[inline] pub fn emit_initializing(state_tx: &mpsc::Sender, node_id: &str) { diff --git a/crates/engine/src/dynamic_actor.rs b/crates/engine/src/dynamic_actor.rs index a7503d3c..ddd7f359 100644 --- a/crates/engine/src/dynamic_actor.rs +++ b/crates/engine/src/dynamic_actor.rs @@ -641,7 +641,10 @@ impl DynamicEngine { } // 3. Initialize State and Stats - self.node_states.insert(node_id.to_string(), NodeState::Initializing); + // Use broadcast_state_update so the gauge transition (e.g. + // Creating → Initializing) zeroes the previous gauge and sets + // the new one atomically — no window where no gauge reads 1. + self.broadcast_state_update(node_id, NodeState::Initializing); self.node_stats.insert(node_id.to_string(), NodeStats::default()); // 4. Setup pin management channel. @@ -1544,10 +1547,9 @@ impl DynamicEngine { Ok(node) => { tracing::info!(node = %node_id, kind = %kind, "Node created successfully, initializing"); - // Zero the Creating gauge before initialize_node overwrites - // node_states with Initializing. - self.zero_state_gauge(&node_id, &NodeState::Creating); - + // initialize_node calls broadcast_state_update(Initializing) + // which reads Creating as the previous state and zeroes its + // gauge before setting Initializing to 1 — no gap. if let Err(e) = self.initialize_node(node, &node_id, &kind, channels).await { tracing::error!( node_id = %node_id, @@ -1783,6 +1785,25 @@ impl DynamicEngine { EngineControlMessage::Connect { from_node, from_pin, to_node, to_pin, mode } => { self.engine_operations_counter.add(1, &[KeyValue::new("operation", "connect")]); + // Both endpoints must at least exist in node_states + // (Creating or fully initialized). If either is completely + // unknown, the connection would be deferred forever. + let from_exists = self.node_states.contains_key(&from_node); + let to_exists = self.node_states.contains_key(&to_node); + if !from_exists || !to_exists { + tracing::error!( + "Cannot connect {}.{} -> {}.{}: endpoint(s) not found \ + (from_exists={}, to_exists={})", + from_node, + from_pin, + to_node, + to_pin, + from_exists, + to_exists + ); + return true; + } + // If either endpoint is still Creating, defer the connection. let from_creating = self.is_node_creating(&from_node); let to_creating = self.is_node_creating(&to_node); @@ -1833,6 +1854,11 @@ impl DynamicEngine { node_id ); } + } else if self.is_node_creating(&node_id) { + tracing::warn!( + "Could not tune node '{}': still in Creating state (not yet initialized)", + node_id + ); } else { tracing::warn!("Could not tune non-existent node '{}'", node_id); } From 6055fc62e593e405f737e93405244a1783bd4f80 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Sat, 11 Apr 2026 16:57:37 +0000 Subject: [PATCH 08/16] fix(test): remove Failed from accepted states in session_destroy test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Built-in nodes (silence, gain) should always succeed creation. Only accept Creating/Initializing/Ready/Running — if these nodes fail, that's a real regression that should surface as a test failure. Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- apps/skit/tests/session_lifecycle_test.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/apps/skit/tests/session_lifecycle_test.rs b/apps/skit/tests/session_lifecycle_test.rs index c0ebbb3e..4c6cc4a9 100644 --- a/apps/skit/tests/session_lifecycle_test.rs +++ b/apps/skit/tests/session_lifecycle_test.rs @@ -575,9 +575,8 @@ async fn test_session_destroy_shuts_down_pipeline() { | streamkit_core::NodeState::Initializing | streamkit_core::NodeState::Ready | streamkit_core::NodeState::Running - | streamkit_core::NodeState::Failed { .. } ), - "Node '{}' should be in a valid lifecycle state, got: {:?}", + "Node '{}' should be creating/initializing/ready/running, got: {:?}", node_id, state ); From 6493d4f8818762afb3b6fd2cec5622e46e4491a8 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Sat, 11 Apr 2026 17:04:43 +0000 Subject: [PATCH 09/16] fix(engine): queue TuneNode for Creating nodes, reject duplicate AddNode at API layer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit P1: TuneNode messages arriving while a node is still in Creating state are now queued in pending_tunes and replayed once the node finishes initialization. This prevents a regression where UpdateParams was persisted and broadcast to clients but the eventual node instance never received the config change. P2: handle_add_node now checks for duplicate node_id in the pipeline model before inserting. Previously, the API layer would silently overwrite the pipeline entry and broadcast NodeAdded while the engine actor rejected the duplicate — leaving clients showing stale kind/params while the old live node continued running. Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- apps/skit/src/websocket_handlers.rs | 5 ++ crates/engine/src/dynamic_actor.rs | 53 +++++++++++++++++-- crates/engine/src/lib.rs | 1 + crates/engine/src/tests/connection_types.rs | 1 + .../engine/src/tests/pipeline_activation.rs | 1 + 5 files changed, 56 insertions(+), 5 deletions(-) diff --git a/apps/skit/src/websocket_handlers.rs b/apps/skit/src/websocket_handlers.rs index 7da72e5b..6a07c501 100644 --- a/apps/skit/src/websocket_handlers.rs +++ b/apps/skit/src/websocket_handlers.rs @@ -519,6 +519,11 @@ async fn handle_add_node( { let mut pipeline = session.pipeline.lock().await; + if pipeline.nodes.contains_key(&node_id) { + return Some(ResponsePayload::Error { + message: format!("Node '{node_id}' already exists in the pipeline"), + }); + } pipeline.nodes.insert( node_id.clone(), streamkit_api::Node { kind: kind.clone(), params: params.clone(), state: None }, diff --git a/crates/engine/src/dynamic_actor.rs b/crates/engine/src/dynamic_actor.rs index ddd7f359..0a3a0abd 100644 --- a/crates/engine/src/dynamic_actor.rs +++ b/crates/engine/src/dynamic_actor.rs @@ -69,6 +69,15 @@ pub struct PendingConnection { mode: crate::dynamic_messages::ConnectionMode, } +/// A `TuneNode` message deferred because the target node is still in +/// `Creating` state. Replayed once the node finishes initialization and +/// enters `live_nodes`. +#[derive(Debug)] +pub struct PendingTune { + node_id: String, + message: NodeControlMessage, +} + /// The state for the long-running, dynamic engine actor (Control Plane). pub struct DynamicEngine { pub(super) registry: Arc>, @@ -150,6 +159,8 @@ pub struct DynamicEngine { pub(super) node_created_rx: mpsc::Receiver, /// Connections deferred because one or both endpoints are still `Creating`. pub(super) pending_connections: Vec, + /// TuneNode messages deferred because the target node is still `Creating`. + pub(super) pending_tunes: Vec, /// Monotonically increasing counter used to tag each spawned creation task. /// Lets `handle_node_created` distinguish stale results (from a previous /// Remove → re-Add cycle) from the current active creation. @@ -1564,14 +1575,18 @@ impl DynamicEngine { NodeState::Failed { reason: e.to_string() }, ); - // Drain pending connections referencing this node. + // Drain pending connections and tunes referencing this node. self.pending_connections .retain(|pc| pc.from_node != node_id && pc.to_node != node_id); + self.pending_tunes.retain(|pt| pt.node_id != node_id); return; } // Flush pending connections where both endpoints are now realized. self.flush_pending_connections().await; + + // Replay any TuneNode messages that arrived while Creating. + self.flush_pending_tunes(&node_id).await; }, Err(e) => { tracing::error!( @@ -1584,9 +1599,10 @@ impl DynamicEngine { // Broadcast Failed (reads prev state before inserting). self.broadcast_state_update(&node_id, NodeState::Failed { reason: e.to_string() }); - // Drain pending connections referencing this node. + // Drain pending connections and tunes referencing this node. self.pending_connections .retain(|pc| pc.from_node != node_id && pc.to_node != node_id); + self.pending_tunes.retain(|pt| pt.node_id != node_id); }, } } @@ -1679,6 +1695,30 @@ impl DynamicEngine { self.pending_connections = still_pending; } + /// Replay any deferred `TuneNode` messages for a node that has just been + /// initialized and is now present in `live_nodes`. + async fn flush_pending_tunes(&mut self, node_id: &str) { + let (for_node, rest): (Vec<_>, Vec<_>) = + std::mem::take(&mut self.pending_tunes).into_iter().partition(|pt| pt.node_id == node_id); + + self.pending_tunes = rest; + + for pt in for_node { + if let Some(node) = self.live_nodes.get(&pt.node_id) { + tracing::info!( + node_id = %pt.node_id, + "Replaying deferred TuneNode message" + ); + if node.control_tx.send(pt.message).await.is_err() { + tracing::warn!( + "Could not replay TuneNode for '{}': node may have shut down", + pt.node_id + ); + } + } + } + } + /// Returns `true` if the node is in `Creating` state (not yet in `live_nodes`). fn is_node_creating(&self, node_id: &str) -> bool { matches!(self.node_states.get(node_id), Some(NodeState::Creating)) @@ -1774,9 +1814,10 @@ impl DynamicEngine { self.zero_state_gauge(&node_id, &NodeState::Creating); self.node_states.remove(&node_id); self.node_kinds.remove(&node_id); - // Drain pending connections referencing this node. + // Drain pending connections and tunes referencing this node. self.pending_connections .retain(|pc| pc.from_node != node_id && pc.to_node != node_id); + self.pending_tunes.retain(|pt| pt.node_id != node_id); } else { // Normal shutdown for a fully initialized node. self.shutdown_node(&node_id).await; @@ -1855,10 +1896,11 @@ impl DynamicEngine { ); } } else if self.is_node_creating(&node_id) { - tracing::warn!( - "Could not tune node '{}': still in Creating state (not yet initialized)", + tracing::info!( + "Deferring TuneNode for '{}': still in Creating state", node_id ); + self.pending_tunes.push(PendingTune { node_id, message }); } else { tracing::warn!("Could not tune non-existent node '{}'", node_id); } @@ -1871,6 +1913,7 @@ impl DynamicEngine { // that arrive after shutdown are discarded. self.active_creations.clear(); self.pending_connections.clear(); + self.pending_tunes.clear(); // Step 1: Close all input channels so nodes blocked on recv() will exit // This ensures nodes that don't check control_rx will still shut down diff --git a/crates/engine/src/lib.rs b/crates/engine/src/lib.rs index bd9fc12c..f975c421 100644 --- a/crates/engine/src/lib.rs +++ b/crates/engine/src/lib.rs @@ -242,6 +242,7 @@ impl Engine { node_created_tx: nc_tx, node_created_rx: nc_rx, pending_connections: Vec::new(), + pending_tunes: Vec::new(), next_creation_id: 0, active_creations: std::collections::HashMap::new(), }; diff --git a/crates/engine/src/tests/connection_types.rs b/crates/engine/src/tests/connection_types.rs index e9790cce..ddb962bf 100644 --- a/crates/engine/src/tests/connection_types.rs +++ b/crates/engine/src/tests/connection_types.rs @@ -62,6 +62,7 @@ fn create_test_engine() -> DynamicEngine { node_created_tx, node_created_rx, pending_connections: Vec::new(), + pending_tunes: Vec::new(), next_creation_id: 0, active_creations: std::collections::HashMap::new(), } diff --git a/crates/engine/src/tests/pipeline_activation.rs b/crates/engine/src/tests/pipeline_activation.rs index 5eff1761..c7e78e3a 100644 --- a/crates/engine/src/tests/pipeline_activation.rs +++ b/crates/engine/src/tests/pipeline_activation.rs @@ -63,6 +63,7 @@ fn create_test_engine() -> DynamicEngine { node_created_tx: nc_tx, node_created_rx: nc_rx, pending_connections: Vec::new(), + pending_tunes: Vec::new(), next_creation_id: 0, active_creations: std::collections::HashMap::new(), } From 3c7312dc5b5bfda7f76733f4fe7e7639ba3d53d4 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Sat, 11 Apr 2026 17:04:54 +0000 Subject: [PATCH 10/16] fix(engine): queue TuneNode for Creating nodes, reject duplicate AddNode at API layer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit P1: TuneNode messages arriving while a node is still in Creating state are now queued in pending_tunes and replayed once the node finishes initialization. This prevents a regression where UpdateParams was persisted and broadcast to clients but the eventual node instance never received the config change. P2: handle_add_node now checks for duplicate node_id in the pipeline model before inserting. Previously, the API layer would silently overwrite the pipeline entry and broadcast NodeAdded while the engine actor rejected the duplicate — leaving clients showing stale kind/params while the old live node continued running. Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- crates/engine/src/dynamic_actor.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/crates/engine/src/dynamic_actor.rs b/crates/engine/src/dynamic_actor.rs index 0a3a0abd..c6eaa2f7 100644 --- a/crates/engine/src/dynamic_actor.rs +++ b/crates/engine/src/dynamic_actor.rs @@ -1698,8 +1698,9 @@ impl DynamicEngine { /// Replay any deferred `TuneNode` messages for a node that has just been /// initialized and is now present in `live_nodes`. async fn flush_pending_tunes(&mut self, node_id: &str) { - let (for_node, rest): (Vec<_>, Vec<_>) = - std::mem::take(&mut self.pending_tunes).into_iter().partition(|pt| pt.node_id == node_id); + let (for_node, rest): (Vec<_>, Vec<_>) = std::mem::take(&mut self.pending_tunes) + .into_iter() + .partition(|pt| pt.node_id == node_id); self.pending_tunes = rest; @@ -1896,10 +1897,7 @@ impl DynamicEngine { ); } } else if self.is_node_creating(&node_id) { - tracing::info!( - "Deferring TuneNode for '{}': still in Creating state", - node_id - ); + tracing::info!("Deferring TuneNode for '{}': still in Creating state", node_id); self.pending_tunes.push(PendingTune { node_id, message }); } else { tracing::warn!("Could not tune non-existent node '{}'", node_id); From 6f0ec93ec8029f0ae9c73493a31bcf27c7fe1bfa Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Sat, 11 Apr 2026 17:12:04 +0000 Subject: [PATCH 11/16] fix(test): accept Failed state in session_destroy test with rationale MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The test server uses Config::default() which only registers core nodes. 'silence' and 'gain' are not core nodes, so async creation correctly transitions them Creating → Failed. This test validates clean session destruction regardless of individual node outcomes, so Failed is a valid observed state. Added comment explaining why. Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- apps/skit/tests/session_lifecycle_test.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/apps/skit/tests/session_lifecycle_test.rs b/apps/skit/tests/session_lifecycle_test.rs index 4c6cc4a9..919383c6 100644 --- a/apps/skit/tests/session_lifecycle_test.rs +++ b/apps/skit/tests/session_lifecycle_test.rs @@ -562,9 +562,12 @@ async fn test_session_destroy_shuts_down_pipeline() { ResponsePayload::Pipeline { pipeline } => { assert_eq!(pipeline.nodes.len(), 2); - // Check that nodes are in a valid lifecycle state. - // With async node creation, nodes may still be in Creating - // state if the background task hasn't completed yet. + // Verify each node has entered the async lifecycle. The test + // server uses Config::default() which only registers core nodes; + // "silence" and "gain" are not core nodes, so their creation + // will fail (Creating → Failed). This test validates clean + // session destruction regardless of individual node outcomes, so + // we accept any non-Stopped lifecycle state here. for (node_id, node) in &pipeline.nodes { if let Some(state) = &node.state { println!("Node '{}' state: {:?}", node_id, state); @@ -575,8 +578,9 @@ async fn test_session_destroy_shuts_down_pipeline() { | streamkit_core::NodeState::Initializing | streamkit_core::NodeState::Ready | streamkit_core::NodeState::Running + | streamkit_core::NodeState::Failed { .. } ), - "Node '{}' should be creating/initializing/ready/running, got: {:?}", + "Node '{}' should be in a valid lifecycle state, got: {:?}", node_id, state ); From e7b5ee950ff248b797a23c68f3d5f1392f4b5c81 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Sat, 11 Apr 2026 17:16:10 +0000 Subject: [PATCH 12/16] fix(test): use registered audio::gain nodes in session_destroy test The test was using unregistered node types ('silence', 'gain') which only passed before because synchronous creation failure was silent (no state broadcast). With async creation these correctly transition to Failed. Fix by using 'audio::gain' which is a real registered built-in node type with in/out pins. Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- apps/skit/tests/session_lifecycle_test.rs | 28 ++++++++--------------- 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/apps/skit/tests/session_lifecycle_test.rs b/apps/skit/tests/session_lifecycle_test.rs index 919383c6..fbc5d9a3 100644 --- a/apps/skit/tests/session_lifecycle_test.rs +++ b/apps/skit/tests/session_lifecycle_test.rs @@ -458,19 +458,15 @@ async fn test_session_destroy_shuts_down_pipeline() { println!("✅ Session created: {}", session_id); - // Add a source node (silence generator) + // Add a source node (audio::gain is a registered core node with in/out pins) let add_source_request = Request { message_type: MessageType::Request, correlation_id: Some("add-source".to_string()), payload: RequestPayload::AddNode { session_id: session_id.clone(), node_id: "source".to_string(), - kind: "silence".to_string(), - params: Some(json!({ - "duration_ms": 10000, // 10 seconds - "sample_rate": 48000, - "channels": 2 - })), + kind: "audio::gain".to_string(), + params: Some(json!({"gain": 1.0})), }, }; @@ -488,15 +484,15 @@ async fn test_session_destroy_shuts_down_pipeline() { println!("✅ Added source node"); - // Add a gain node + // Add a second gain node let add_gain_request = Request { message_type: MessageType::Request, correlation_id: Some("add-gain".to_string()), payload: RequestPayload::AddNode { session_id: session_id.clone(), node_id: "gain".to_string(), - kind: "gain".to_string(), - params: Some(json!({"gain": 1.0})), + kind: "audio::gain".to_string(), + params: Some(json!({"gain": 0.5})), }, }; @@ -562,12 +558,9 @@ async fn test_session_destroy_shuts_down_pipeline() { ResponsePayload::Pipeline { pipeline } => { assert_eq!(pipeline.nodes.len(), 2); - // Verify each node has entered the async lifecycle. The test - // server uses Config::default() which only registers core nodes; - // "silence" and "gain" are not core nodes, so their creation - // will fail (Creating → Failed). This test validates clean - // session destruction regardless of individual node outcomes, so - // we accept any non-Stopped lifecycle state here. + // Both nodes use "audio::gain" which is a registered built-in + // node type. With async creation they may still be in + // Creating state, or have progressed to Initializing/Ready. for (node_id, node) in &pipeline.nodes { if let Some(state) = &node.state { println!("Node '{}' state: {:?}", node_id, state); @@ -578,9 +571,8 @@ async fn test_session_destroy_shuts_down_pipeline() { | streamkit_core::NodeState::Initializing | streamkit_core::NodeState::Ready | streamkit_core::NodeState::Running - | streamkit_core::NodeState::Failed { .. } ), - "Node '{}' should be in a valid lifecycle state, got: {:?}", + "Node '{}' should be creating/initializing/ready/running, got: {:?}", node_id, state ); From 561d27c1b39a0dd9a2ba49ccf82e24b3b1cb2b16 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Sat, 11 Apr 2026 17:17:07 +0000 Subject: [PATCH 13/16] test(engine): add test for TuneNode queuing during Creating state Verifies that UpdateParams messages sent while a node is still being constructed (Creating state) are queued and replayed after the node finishes initialization. Uses a TuneTrackingSlowNode that counts received UpdateParams messages. Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- .../engine/src/tests/async_node_creation.rs | 141 ++++++++++++++++++ 1 file changed, 141 insertions(+) diff --git a/crates/engine/src/tests/async_node_creation.rs b/crates/engine/src/tests/async_node_creation.rs index 18d725b6..a6f72d5c 100644 --- a/crates/engine/src/tests/async_node_creation.rs +++ b/crates/engine/src/tests/async_node_creation.rs @@ -79,6 +79,64 @@ impl ProcessorNode for SlowTestNode { } } +/// A slow node that records every `UpdateParams` message it receives. +/// Used to verify that TuneNode messages sent while the node is Creating +/// are queued and replayed after initialization. +struct TuneTrackingSlowNode { + tune_count: Arc, +} + +impl TuneTrackingSlowNode { + fn factory( + delay: Duration, + created: Arc, + tune_count: Arc, + ) -> impl Fn(Option<&serde_json::Value>) -> Result, StreamKitError> + + Send + + Sync + + 'static { + move |_params| { + std::thread::sleep(delay); + created.store(true, Ordering::SeqCst); + Ok(Box::new(Self { tune_count: tune_count.clone() }) as Box) + } + } +} + +#[streamkit_core::async_trait] +impl ProcessorNode for TuneTrackingSlowNode { + fn input_pins(&self) -> Vec { + vec![streamkit_core::InputPin { + name: "in".to_string(), + accepts_types: vec![streamkit_core::types::PacketType::Any], + cardinality: streamkit_core::PinCardinality::One, + }] + } + + fn output_pins(&self) -> Vec { + vec![streamkit_core::OutputPin { + name: "out".to_string(), + produces_type: streamkit_core::types::PacketType::Binary, + cardinality: streamkit_core::PinCardinality::Broadcast, + }] + } + + async fn run( + self: Box, + mut context: streamkit_core::NodeContext, + ) -> Result<(), StreamKitError> { + loop { + match context.control_rx.recv().await { + Some(streamkit_core::control::NodeControlMessage::Shutdown) | None => return Ok(()), + Some(streamkit_core::control::NodeControlMessage::UpdateParams(_)) => { + self.tune_count.fetch_add(1, Ordering::SeqCst); + }, + Some(streamkit_core::control::NodeControlMessage::Start) => {}, + } + } + } +} + /// A simple source node (no inputs) that stays alive until shutdown. struct SimpleSourceNode; @@ -917,3 +975,86 @@ async fn test_connect_one_realized_one_creating() { handle.shutdown_and_wait().await.expect("shutdown"); } + +// --------------------------------------------------------------------------- +// Test 11: TuneNode messages sent while a node is Creating are queued and +// replayed after initialization completes. +// --------------------------------------------------------------------------- + +#[tokio::test] +#[allow(clippy::expect_used)] +async fn test_tune_node_queued_while_creating() { + let created = Arc::new(AtomicBool::new(false)); + let tune_count = Arc::new(AtomicU32::new(0)); + + let mut registry = NodeRegistry::new(); + registry.register_dynamic( + "test::tune_tracking_slow", + TuneTrackingSlowNode::factory( + Duration::from_secs(1), + created.clone(), + tune_count.clone(), + ), + serde_json::json!({}), + vec!["test".to_string()], + false, + ); + + let (_engine, handle) = build_engine(registry); + + // Add the slow node. + handle + .send_control(EngineControlMessage::AddNode { + node_id: "tracked".to_string(), + kind: "test::tune_tracking_slow".to_string(), + params: None, + }) + .await + .expect("add tracked"); + + // Verify it's still Creating (constructor sleeps 1s). + tokio::time::sleep(Duration::from_millis(50)).await; + assert!(!created.load(Ordering::SeqCst), "node should still be creating"); + + // Send two TuneNode messages while the node is Creating. + handle + .send_control(EngineControlMessage::TuneNode { + node_id: "tracked".to_string(), + message: streamkit_core::control::NodeControlMessage::UpdateParams( + serde_json::json!({"gain": 0.5}), + ), + }) + .await + .expect("tune 1"); + + handle + .send_control(EngineControlMessage::TuneNode { + node_id: "tracked".to_string(), + message: streamkit_core::control::NodeControlMessage::UpdateParams( + serde_json::json!({"gain": 0.8}), + ), + }) + .await + .expect("tune 2"); + + // Wait for the node to finish creation and initialization. + let initialized = wait_for_states(&handle, Duration::from_secs(5), |states| { + states.get("tracked").is_some_and(|s| { + matches!(s, NodeState::Ready | NodeState::Running | NodeState::Initializing) + }) + }) + .await; + assert!(initialized, "node should be initialized"); + + // Give a moment for the queued tunes to be replayed and processed. + tokio::time::sleep(Duration::from_millis(200)).await; + + // Both UpdateParams messages should have been delivered. + assert_eq!( + tune_count.load(Ordering::SeqCst), + 2, + "node should have received both queued TuneNode messages" + ); + + handle.shutdown_and_wait().await.expect("shutdown"); +} From 5f877f380ec756353bd6ef810f8d11b78ca8c1b3 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Sat, 11 Apr 2026 17:17:17 +0000 Subject: [PATCH 14/16] style: format TuneTrackingSlowNode factory call Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- crates/engine/src/tests/async_node_creation.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/crates/engine/src/tests/async_node_creation.rs b/crates/engine/src/tests/async_node_creation.rs index a6f72d5c..69e1801c 100644 --- a/crates/engine/src/tests/async_node_creation.rs +++ b/crates/engine/src/tests/async_node_creation.rs @@ -990,11 +990,7 @@ async fn test_tune_node_queued_while_creating() { let mut registry = NodeRegistry::new(); registry.register_dynamic( "test::tune_tracking_slow", - TuneTrackingSlowNode::factory( - Duration::from_secs(1), - created.clone(), - tune_count.clone(), - ), + TuneTrackingSlowNode::factory(Duration::from_secs(1), created.clone(), tune_count.clone()), serde_json::json!({}), vec!["test".to_string()], false, From 3dbe16b04e78683955af4cedfaacf962a4a320e6 Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Sat, 11 Apr 2026 17:36:47 +0000 Subject: [PATCH 15/16] fix(engine,skit): batch duplicate check, node_kinds cleanup, test nit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add pre-validation pass in handle_apply_batch to reject duplicate AddNode IDs before mutating the pipeline model. Simulates the batch's Add/Remove sequence so Remove→Add within a batch is still allowed. - Clean up node_kinds in both handle_node_created failure paths (creation failure and initialization failure), matching the cleanup done in RemoveNode-while-Creating. - Remove unused _delay field from SlowTestNode (zero-sized struct). Signed-off-by: StreamKit Devin Co-Authored-By: Claudio Costa --- apps/skit/src/websocket_handlers.rs | 27 +++++++++++++++++++ crates/engine/src/dynamic_actor.rs | 6 +++++ .../engine/src/tests/async_node_creation.rs | 6 ++--- 3 files changed, 35 insertions(+), 4 deletions(-) diff --git a/apps/skit/src/websocket_handlers.rs b/apps/skit/src/websocket_handlers.rs index 6a07c501..ee7af00c 100644 --- a/apps/skit/src/websocket_handlers.rs +++ b/apps/skit/src/websocket_handlers.rs @@ -1229,6 +1229,33 @@ async fn handle_apply_batch( }); } + // Pre-validate duplicate node_ids against the pipeline model. + // Simulate the batch's Add/Remove sequence so that Remove→Add for + // the same ID within the batch is allowed, but duplicate Adds + // (without intervening Remove) are rejected before any mutation. + { + let pipeline = session.pipeline.lock().await; + let mut live_ids: std::collections::HashSet<&str> = + pipeline.nodes.keys().map(String::as_str).collect(); + for op in &operations { + match op { + streamkit_api::BatchOperation::AddNode { node_id, .. } => { + if !live_ids.insert(node_id.as_str()) { + return Some(ResponsePayload::Error { + message: format!( + "Batch rejected: node '{node_id}' already exists in the pipeline" + ), + }); + } + }, + streamkit_api::BatchOperation::RemoveNode { node_id } => { + live_ids.remove(node_id.as_str()); + }, + _ => {}, + } + } + } // Pipeline lock released after pre-validation + // Validate permissions for all operations for op in &operations { if let streamkit_api::BatchOperation::AddNode { kind, params, .. } = op { diff --git a/crates/engine/src/dynamic_actor.rs b/crates/engine/src/dynamic_actor.rs index c6eaa2f7..97f076e0 100644 --- a/crates/engine/src/dynamic_actor.rs +++ b/crates/engine/src/dynamic_actor.rs @@ -1575,6 +1575,9 @@ impl DynamicEngine { NodeState::Failed { reason: e.to_string() }, ); + // Clean up node_kinds (mirrors RemoveNode-while-Creating). + self.node_kinds.remove(&node_id); + // Drain pending connections and tunes referencing this node. self.pending_connections .retain(|pc| pc.from_node != node_id && pc.to_node != node_id); @@ -1599,6 +1602,9 @@ impl DynamicEngine { // Broadcast Failed (reads prev state before inserting). self.broadcast_state_update(&node_id, NodeState::Failed { reason: e.to_string() }); + // Clean up node_kinds (mirrors RemoveNode-while-Creating). + self.node_kinds.remove(&node_id); + // Drain pending connections and tunes referencing this node. self.pending_connections .retain(|pc| pc.from_node != node_id && pc.to_node != node_id); diff --git a/crates/engine/src/tests/async_node_creation.rs b/crates/engine/src/tests/async_node_creation.rs index 69e1801c..1e3a5411 100644 --- a/crates/engine/src/tests/async_node_creation.rs +++ b/crates/engine/src/tests/async_node_creation.rs @@ -25,9 +25,7 @@ use streamkit_core::{NodeRegistry, ProcessorNode, StreamKitError}; /// A node whose constructor sleeps for a configurable duration, simulating /// heavy FFI work (e.g., ONNX model loading). Uses `std::thread::sleep` /// because the constructor runs inside `spawn_blocking`. -struct SlowTestNode { - _delay: Duration, -} +struct SlowTestNode; impl SlowTestNode { fn factory( @@ -40,7 +38,7 @@ impl SlowTestNode { move |_params| { std::thread::sleep(delay); created.store(true, Ordering::SeqCst); - Ok(Box::new(Self { _delay: delay }) as Box) + Ok(Box::new(Self) as Box) } } } From 7b91d3ceedba38d04f7a5679a5c2518d8764708e Mon Sep 17 00:00:00 2001 From: StreamKit Devin Date: Sat, 11 Apr 2026 18:30:52 +0000 Subject: [PATCH 16/16] =?UTF-8?q?fix:=20update=20rand=200.10.0=20=E2=86=92?= =?UTF-8?q?=200.10.1=20(RUSTSEC-2026-0097)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claudio Costa --- Cargo.lock | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9ba9822b..a0b10d8a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4410,9 +4410,9 @@ dependencies = [ [[package]] name = "rand" -version = "0.10.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc266eb313df6c5c09c1c7b1fbe2510961e5bcd3add930c1e31f7ed9da0feff8" +checksum = "d2e8e8bcc7961af1fdac401278c6a831614941f6164ee3bf4ce61b7edb162207" dependencies = [ "chacha20", "getrandom 0.4.2", @@ -5636,7 +5636,7 @@ dependencies = [ "clap", "futures", "futures-util", - "rand 0.10.0", + "rand 0.10.1", "reqwest 0.13.2", "rustyline", "serde", @@ -6180,7 +6180,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" dependencies = [ "fastrand", - "getrandom 0.4.2", + "getrandom 0.3.4", "once_cell", "rustix 1.1.4", "windows-sys 0.61.2",