Skip to content

Commit 6594851

Browse files
staging-devin-ai-integration[bot]streamkit-devinstreamer45
authored
feat(engine): make AddNode non-blocking with async node creation (#286)
* feat(engine): make AddNode non-blocking with async node creation Node creation (registry.create_node()) now runs inside tokio::task::spawn_blocking so that slow native plugin constructors (e.g. ONNX model loading via FFI) no longer block the engine actor loop. Key changes: - Add NodeState::Creating variant to streamkit-core. The actor inserts this state immediately when AddNode arrives, closing the observability gap between 'message received' and 'node exists'. - AddNode handler spawns a background task instead of calling create_node() synchronously. Results are sent back via an internal mpsc channel (NodeCreatedEvent) polled in the actor select! loop. - Deferred connection queue: Connect requests where one or both endpoints are still Creating are stored in a Vec<PendingConnection> and replayed when NodeCreated completes successfully. - RemoveNode while Creating: cancelled node IDs are tracked in a HashSet. When NodeCreated arrives for a cancelled node, the result is discarded (no zombie nodes). - Pipeline activation naturally gates on Creating state since it doesn't match Ready|Running|Degraded|Recovering. - UI: NodeStateIndicator.tsx and sessionStatus.ts handle the new Creating state (uses initializing color, treated like Initializing for session status). - 10 comprehensive test cases covering: basic async creation, deferred connections, concurrent creation, creation failure, RemoveNode while Creating, pipeline activation timing, duplicate AddNode, remove then re-add, shutdown while Creating, and mixed realized/creating connections. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(engine): address review findings in async node creation - Fix remove-then-readd race: clear cancelled_creations entry when a new AddNode arrives with the same ID, preventing the new creation result from being mistakenly discarded. - Fix Disconnect not draining pending_connections: remove matching deferred connections so they aren't replayed after the user explicitly disconnected them. - Fix broadcast_state_update skipping previous-state gauge zeroing: mirror the one-hot gauge pattern from handle_state_update so engine-synthesized transitions (Creating → Failed) don't leave stale gauge series at 1. - Strengthen test_remove_then_readd_same_id to verify the node is fully initialized (not Creating/Failed) after the re-add. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(engine): replace cancelled_creations with creation_id generation counter The previous cancelled_creations HashSet approach had a race condition in Remove → re-Add sequences: clearing the cancellation on re-Add allowed the old background task's result to also be processed, causing double initialization and resource leaks. Replace with a monotonic creation_id counter (next_creation_id) and an active_creations map (node_id → creation_id). Each spawned creation task carries its creation_id; handle_node_created only accepts results whose creation_id matches the current active entry. Stale results from cancelled or superseded creations are silently discarded. Also fix broadcast_state_update to read previous state from node_states BEFORE inserting the new one (it now owns the insertion), so the one-hot gauge zeroing works correctly for engine-synthesized transitions like Creating → Failed. Add a zero_state_gauge helper for the Creating → Initializing transition in handle_node_created. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * style(engine): apply cargo fmt Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(engine): zero Creating gauge on RemoveNode while Creating When RemoveNode cancels a Creating node, the node_state_gauge for (node_id, 'creating') was never zeroed, causing a permanent metrics leak. Add zero_state_gauge call before removing state, mirroring shutdown_node's pattern. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(test): accept Creating and Failed states in session_destroy test With async node creation, nodes transition through Creating before reaching Initializing/Ready/Running. In the test environment, nodes may also reach Failed if the node kind isn't available in the test registry. The test's purpose is verifying clean session destruction, not specific node states, so accept all valid lifecycle states. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(engine): close gauge gap, prevent leaked pending connections, improve TuneNode log - Route Creating → Initializing through broadcast_state_update so the gauge transition zeroes Creating and sets Initializing atomically (no window where no gauge reads 1 for the node). - Add upfront check in Connect handler: both endpoints must exist in node_states before deferring. Connecting to a non-existent node while the other is Creating would otherwise leak a pending connection that is never flushed. - Distinguish 'still Creating' from 'non-existent' in TuneNode warning. - Remove unused emit_creating helper (dead code). Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(test): remove Failed from accepted states in session_destroy test Built-in nodes (silence, gain) should always succeed creation. Only accept Creating/Initializing/Ready/Running — if these nodes fail, that's a real regression that should surface as a test failure. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(engine): queue TuneNode for Creating nodes, reject duplicate AddNode at API layer P1: TuneNode messages arriving while a node is still in Creating state are now queued in pending_tunes and replayed once the node finishes initialization. This prevents a regression where UpdateParams was persisted and broadcast to clients but the eventual node instance never received the config change. P2: handle_add_node now checks for duplicate node_id in the pipeline model before inserting. Previously, the API layer would silently overwrite the pipeline entry and broadcast NodeAdded while the engine actor rejected the duplicate — leaving clients showing stale kind/params while the old live node continued running. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(engine): queue TuneNode for Creating nodes, reject duplicate AddNode at API layer P1: TuneNode messages arriving while a node is still in Creating state are now queued in pending_tunes and replayed once the node finishes initialization. This prevents a regression where UpdateParams was persisted and broadcast to clients but the eventual node instance never received the config change. P2: handle_add_node now checks for duplicate node_id in the pipeline model before inserting. Previously, the API layer would silently overwrite the pipeline entry and broadcast NodeAdded while the engine actor rejected the duplicate — leaving clients showing stale kind/params while the old live node continued running. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(test): accept Failed state in session_destroy test with rationale The test server uses Config::default() which only registers core nodes. 'silence' and 'gain' are not core nodes, so async creation correctly transitions them Creating → Failed. This test validates clean session destruction regardless of individual node outcomes, so Failed is a valid observed state. Added comment explaining why. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(test): use registered audio::gain nodes in session_destroy test The test was using unregistered node types ('silence', 'gain') which only passed before because synchronous creation failure was silent (no state broadcast). With async creation these correctly transition to Failed. Fix by using 'audio::gain' which is a real registered built-in node type with in/out pins. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * test(engine): add test for TuneNode queuing during Creating state Verifies that UpdateParams messages sent while a node is still being constructed (Creating state) are queued and replayed after the node finishes initialization. Uses a TuneTrackingSlowNode that counts received UpdateParams messages. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * style: format TuneTrackingSlowNode factory call Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix(engine,skit): batch duplicate check, node_kinds cleanup, test nit - Add pre-validation pass in handle_apply_batch to reject duplicate AddNode IDs before mutating the pipeline model. Simulates the batch's Add/Remove sequence so Remove→Add within a batch is still allowed. - Clean up node_kinds in both handle_node_created failure paths (creation failure and initialization failure), matching the cleanup done in RemoveNode-while-Creating. - Remove unused _delay field from SlowTestNode (zero-sized struct). Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com> * fix: update rand 0.10.0 → 0.10.1 (RUSTSEC-2026-0097) Co-Authored-By: Claudio Costa <cstcld91@gmail.com> --------- Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-authored-by: StreamKit Devin <devin@streamkit.dev> Co-authored-by: Claudio Costa <cstcld91@gmail.com>
1 parent cffe692 commit 6594851

File tree

13 files changed

+1572
-59
lines changed

13 files changed

+1572
-59
lines changed

Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apps/skit/src/websocket_handlers.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,11 @@ async fn handle_add_node(
519519

520520
{
521521
let mut pipeline = session.pipeline.lock().await;
522+
if pipeline.nodes.contains_key(&node_id) {
523+
return Some(ResponsePayload::Error {
524+
message: format!("Node '{node_id}' already exists in the pipeline"),
525+
});
526+
}
522527
pipeline.nodes.insert(
523528
node_id.clone(),
524529
streamkit_api::Node { kind: kind.clone(), params: params.clone(), state: None },
@@ -1224,6 +1229,33 @@ async fn handle_apply_batch(
12241229
});
12251230
}
12261231

1232+
// Pre-validate duplicate node_ids against the pipeline model.
1233+
// Simulate the batch's Add/Remove sequence so that Remove→Add for
1234+
// the same ID within the batch is allowed, but duplicate Adds
1235+
// (without intervening Remove) are rejected before any mutation.
1236+
{
1237+
let pipeline = session.pipeline.lock().await;
1238+
let mut live_ids: std::collections::HashSet<&str> =
1239+
pipeline.nodes.keys().map(String::as_str).collect();
1240+
for op in &operations {
1241+
match op {
1242+
streamkit_api::BatchOperation::AddNode { node_id, .. } => {
1243+
if !live_ids.insert(node_id.as_str()) {
1244+
return Some(ResponsePayload::Error {
1245+
message: format!(
1246+
"Batch rejected: node '{node_id}' already exists in the pipeline"
1247+
),
1248+
});
1249+
}
1250+
},
1251+
streamkit_api::BatchOperation::RemoveNode { node_id } => {
1252+
live_ids.remove(node_id.as_str());
1253+
},
1254+
_ => {},
1255+
}
1256+
}
1257+
} // Pipeline lock released after pre-validation
1258+
12271259
// Validate permissions for all operations
12281260
for op in &operations {
12291261
if let streamkit_api::BatchOperation::AddNode { kind, params, .. } = op {

apps/skit/tests/session_lifecycle_test.rs

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -458,19 +458,15 @@ async fn test_session_destroy_shuts_down_pipeline() {
458458

459459
println!("✅ Session created: {}", session_id);
460460

461-
// Add a source node (silence generator)
461+
// Add a source node (audio::gain is a registered core node with in/out pins)
462462
let add_source_request = Request {
463463
message_type: MessageType::Request,
464464
correlation_id: Some("add-source".to_string()),
465465
payload: RequestPayload::AddNode {
466466
session_id: session_id.clone(),
467467
node_id: "source".to_string(),
468-
kind: "silence".to_string(),
469-
params: Some(json!({
470-
"duration_ms": 10000, // 10 seconds
471-
"sample_rate": 48000,
472-
"channels": 2
473-
})),
468+
kind: "audio::gain".to_string(),
469+
params: Some(json!({"gain": 1.0})),
474470
},
475471
};
476472

@@ -488,15 +484,15 @@ async fn test_session_destroy_shuts_down_pipeline() {
488484

489485
println!("✅ Added source node");
490486

491-
// Add a gain node
487+
// Add a second gain node
492488
let add_gain_request = Request {
493489
message_type: MessageType::Request,
494490
correlation_id: Some("add-gain".to_string()),
495491
payload: RequestPayload::AddNode {
496492
session_id: session_id.clone(),
497493
node_id: "gain".to_string(),
498-
kind: "gain".to_string(),
499-
params: Some(json!({"gain": 1.0})),
494+
kind: "audio::gain".to_string(),
495+
params: Some(json!({"gain": 0.5})),
500496
},
501497
};
502498

@@ -562,18 +558,21 @@ async fn test_session_destroy_shuts_down_pipeline() {
562558
ResponsePayload::Pipeline { pipeline } => {
563559
assert_eq!(pipeline.nodes.len(), 2);
564560

565-
// Check that nodes are in Running state (not Failed or Stopped)
561+
// Both nodes use "audio::gain" which is a registered built-in
562+
// node type. With async creation they may still be in
563+
// Creating state, or have progressed to Initializing/Ready.
566564
for (node_id, node) in &pipeline.nodes {
567565
if let Some(state) = &node.state {
568566
println!("Node '{}' state: {:?}", node_id, state);
569567
assert!(
570568
matches!(
571569
state,
572-
streamkit_core::NodeState::Initializing
570+
streamkit_core::NodeState::Creating
571+
| streamkit_core::NodeState::Initializing
573572
| streamkit_core::NodeState::Ready
574573
| streamkit_core::NodeState::Running
575574
),
576-
"Node '{}' should be initializing/ready/running, got: {:?}",
575+
"Node '{}' should be creating/initializing/ready/running, got: {:?}",
577576
node_id,
578577
state
579578
);

crates/core/src/state.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
//! Nodes transition through these states during their lifecycle:
1313
//!
1414
//! ```text
15+
//! Creating
16+
//! ↓
1517
//! Initializing
1618
//! ↓
1719
//! Ready ──────────┐
@@ -89,6 +91,8 @@ impl From<String> for StopReason {
8991
/// Nodes transition through these states during their lifecycle:
9092
///
9193
/// ```text
94+
/// Creating
95+
/// ↓
9296
/// Initializing
9397
/// ↓
9498
/// Ready ──────────┐
@@ -105,6 +109,8 @@ impl From<String> for StopReason {
105109
/// ```
106110
///
107111
/// ### Valid Transitions:
112+
/// - `Creating` → `Initializing` (node factory completed successfully)
113+
/// - `Creating` → `Failed` (node factory returned an error)
108114
/// - `Initializing` → `Ready` (source nodes) or `Running` (processing nodes)
109115
/// - `Ready` → `Running` (when pipeline is ready)
110116
/// - `Running` → `Recovering` (temporary issues, will retry)
@@ -120,6 +126,11 @@ impl From<String> for StopReason {
120126
#[derive(Debug, Clone, Serialize, Deserialize, TS)]
121127
#[ts(export)]
122128
pub enum NodeState {
129+
/// Node is being created by the factory (e.g., loading ONNX models via FFI).
130+
/// This state is set immediately when `AddNode` is received, before the
131+
/// (potentially slow) constructor runs in a background task.
132+
Creating,
133+
123134
/// Node is starting up and performing initialization.
124135
/// Examples: Opening connections, loading resources, validating configuration.
125136
Initializing,

0 commit comments

Comments
 (0)