diff --git a/crates/core/src/control.rs b/crates/core/src/control.rs index 11c73b1f..f73e90dd 100644 --- a/crates/core/src/control.rs +++ b/crates/core/src/control.rs @@ -44,6 +44,67 @@ pub enum ConnectionMode { BestEffort, } +/// Strip transient sync metadata (`_sender`, `_rev`) from a params JSON value. +/// +/// The UI injects these fields for causal-consistency echo suppression. +/// Node config structs that use `#[serde(deny_unknown_fields)]` will reject +/// them during deserialization, so they must be stripped before calling +/// `serde_json::from_value`. +/// +/// Nodes that need the metadata for echo suppression (e.g. the compositor) +/// should read `_sender`/`_rev` from the raw params *before* calling this +/// function. +pub fn strip_sync_metadata(params: &mut serde_json::Value) { + if let Some(obj) = params.as_object_mut() { + obj.remove("_sender"); + obj.remove("_rev"); + } +} + +#[cfg(test)] +mod strip_sync_metadata_tests { + use super::*; + + /// Regression test: `_sender` and `_rev` must be removed so that + /// node config structs with `deny_unknown_fields` can deserialize + /// successfully. + #[test] + fn strips_sender_and_rev() { + let mut params = serde_json::json!({ + "_sender": "client-abc", + "_rev": 7, + "width": 1920, + "height": 1080 + }); + + strip_sync_metadata(&mut params); + + let obj = params.as_object().unwrap(); + assert!(!obj.contains_key("_sender"), "_sender should be stripped"); + assert!(!obj.contains_key("_rev"), "_rev should be stripped"); + assert_eq!(obj.get("width").unwrap(), 1920); + assert_eq!(obj.get("height").unwrap(), 1080); + } + + #[test] + fn no_op_without_metadata() { + let mut params = serde_json::json!({ "width": 1280 }); + + strip_sync_metadata(&mut params); + + let obj = params.as_object().unwrap(); + assert_eq!(obj.len(), 1); + assert_eq!(obj.get("width").unwrap(), 1280); + } + + #[test] + fn no_op_on_non_object() { + let mut params = serde_json::json!(42); + strip_sync_metadata(&mut params); + assert_eq!(params, serde_json::json!(42)); + } +} + /// A message sent to the central Engine actor to modify the pipeline graph itself. #[derive(Debug)] pub enum EngineControlMessage { diff --git a/crates/nodes/src/video/compositor/mod.rs b/crates/nodes/src/video/compositor/mod.rs index d516f19c..984f7ca3 100644 --- a/crates/nodes/src/video/compositor/mod.rs +++ b/crates/nodes/src/video/compositor/mod.rs @@ -800,7 +800,7 @@ impl ProcessorNode for CompositorNode { }, NodeControlMessage::UpdateParams(ref params) => { // Extract transient sync metadata before - // deserialization strips unknown fields. + // apply_update_params strips these fields. // Always overwrite (not conditionally set) so that // non-stamped UpdateParams clears stale values. self.config_sender = params.get("_sender") @@ -1320,9 +1320,15 @@ impl CompositorNode { limits: &GlobalCompositorConfig, image_overlays: &mut Arc<[Arc]>, text_overlays: &mut Arc<[Arc]>, - params: serde_json::Value, + mut params: serde_json::Value, stats_tracker: &mut NodeStatsTracker, ) { + // Strip transient sync metadata (`_sender`, `_rev`) before + // deserializing. The metadata has already been read by the + // caller for echo suppression — see the `UpdateParams` arm + // in the run loop. + streamkit_core::control::strip_sync_metadata(&mut params); + match serde_json::from_value::(params) { Ok(new_config) => { // Resource limits are enforced by the server-level diff --git a/crates/nodes/src/video/compositor/tests.rs b/crates/nodes/src/video/compositor/tests.rs index 70aa9969..125750db 100644 --- a/crates/nodes/src/video/compositor/tests.rs +++ b/crates/nodes/src/video/compositor/tests.rs @@ -2889,3 +2889,79 @@ fn test_rebuild_svg_reuses_bitmap_when_unchanged() { assert_eq!(rebuilt[0].width, initial_arc.width); assert_eq!(rebuilt[0].height, initial_arc.height); } + +// ── Regression: transient sync metadata must not break deserialization ──── + +/// `_sender` and `_rev` are injected by the UI for causal-consistency +/// tracking. `CompositorConfig` uses `deny_unknown_fields`, so +/// `apply_update_params` strips these transient fields before +/// deserialization via `strip_sync_metadata`. +/// +/// Regression test for: compositor control completely broken when the UI +/// stamps `_sender` / `_rev` into config updates. +#[test] +fn test_update_params_ignores_transient_sync_metadata() { + let limits = config::GlobalCompositorConfig::default(); + let mut config = CompositorConfig::default(); + let mut image_overlays: Arc<[Arc]> = Arc::from(vec![]); + let mut text_overlays: Arc<[Arc]> = Arc::from(vec![]); + let mut stats = NodeStatsTracker::new("test".to_string(), None); + + let original_width = config.width; + + // Params with transient metadata — must not cause deserialization failure. + let params = serde_json::json!({ + "_sender": "client-abc123", + "_rev": 42, + "width": 1920, + "height": 1080 + }); + + CompositorNode::apply_update_params( + &mut config, + &limits, + &mut image_overlays, + &mut text_overlays, + params, + &mut stats, + ); + + // Config should have been updated despite the extra metadata fields. + assert_ne!(config.width, original_width, "Config should have been updated"); + assert_eq!(config.width, 1920); + assert_eq!(config.height, 1080); +} + +/// Verify that truly unknown fields (not `_sender`/`_rev`) are still +/// rejected by `deny_unknown_fields`. +#[test] +fn test_update_params_rejects_truly_unknown_fields() { + let limits = config::GlobalCompositorConfig::default(); + let mut config = CompositorConfig::default(); + let mut image_overlays: Arc<[Arc]> = Arc::from(vec![]); + let mut text_overlays: Arc<[Arc]> = Arc::from(vec![]); + let mut stats = NodeStatsTracker::new("test".to_string(), None); + + let original_width = config.width; + + // Params with a genuinely unknown field — should be rejected. + let params = serde_json::json!({ + "width": 1920, + "totally_bogus_field": true + }); + + CompositorNode::apply_update_params( + &mut config, + &limits, + &mut image_overlays, + &mut text_overlays, + params, + &mut stats, + ); + + // Config should NOT have been updated — deserialization should fail. + assert_eq!( + config.width, original_width, + "Config must not change when unknown fields are present" + ); +} diff --git a/e2e/tests/compositor-video-output.spec.ts b/e2e/tests/compositor-video-output.spec.ts index 23183c39..245e67c3 100644 --- a/e2e/tests/compositor-video-output.spec.ts +++ b/e2e/tests/compositor-video-output.spec.ts @@ -75,10 +75,14 @@ test.describe('Compositor Video Output — Two Colorbars Pipeline', () => { if (!page.url().includes('/monitor')) { await page.goto('/monitor'); } - await expect(page.getByTestId('monitor-view')).toBeVisible({ timeout: 15_000 }); + await expect(page.getByTestId('monitor-view')).toBeVisible({ + timeout: 15_000, + }); // Wait for sessions list and click the session. - await expect(page.getByTestId('sessions-list')).toBeVisible({ timeout: 10_000 }); + await expect(page.getByTestId('sessions-list')).toBeVisible({ + timeout: 10_000, + }); const sessionItem = page.getByTestId('session-item').filter({ hasText: sessionName }).first(); await expect(sessionItem).toBeVisible({ timeout: 10_000 }); await sessionItem.click(); @@ -115,7 +119,9 @@ test.describe('Compositor Video Output — Two Colorbars Pipeline', () => { await inputLayer1.click(); // Wait for inspector to render (slider becomes visible). - await expect(compositorNode.getByRole('slider').first()).toBeVisible({ timeout: 5_000 }); + await expect(compositorNode.getByRole('slider').first()).toBeVisible({ + timeout: 5_000, + }); // Opacity section should be visible. const opacitySection = compositorNode @@ -128,7 +134,9 @@ test.describe('Compositor Video Output — Two Colorbars Pipeline', () => { // ── 7. Switch to Input 0 — verify it also works ─────────────────────── await inputLayer0.click(); - await expect(compositorNode.getByRole('slider').first()).toBeVisible({ timeout: 5_000 }); + await expect(compositorNode.getByRole('slider').first()).toBeVisible({ + timeout: 5_000, + }); // LIVE badge should still be visible (pipeline survived interaction). await expect(liveBadge).toBeVisible({ timeout: 5_000 }); @@ -151,6 +159,143 @@ test.describe('Compositor Video Output — Two Colorbars Pipeline', () => { } }); + // --------------------------------------------------------------------------- + // Regression test: compositor param changes from UI must reach the server. + // + // When the UI sends a compositor config update (e.g. opacity slider drag), + // the engine's TuneNode path strips transient sync metadata (_sender, _rev) + // before dispatching UpdateParams to the compositor node. If stripping + // fails (or the metadata is not stripped), CompositorConfig's + // deny_unknown_fields rejects the entire update — breaking all compositor + // control. This test verifies the full round-trip: UI slider → WS + // TuneNodeAsync → engine → compositor → pipeline API reflects the change. + // --------------------------------------------------------------------------- + + test('compositor param change from UI is reflected in server-side pipeline state', async ({ + page, + baseURL, + }) => { + test.setTimeout(120_000); + + // ── 1. Create compositor session via API ───────────────────────────── + + const apiContext = await request.newContext({ + baseURL: baseURL!, + extraHTTPHeaders: getAuthHeaders(), + }); + + const sessionName = `compositor-param-sync-${Date.now()}`; + const createResponse = await apiContext.post('/api/v1/sessions', { + data: { + name: sessionName, + yaml: COMPOSITOR_COLORBARS_YAML, + }, + }); + + const responseText = await createResponse.text(); + expect(createResponse.ok(), `Failed to create session: ${responseText}`).toBeTruthy(); + + const createData = JSON.parse(responseText) as { session_id: string }; + sessionId = createData.session_id; + expect(sessionId).toBeTruthy(); + + // ── 2. Navigate to monitor and open the session ───────────────────── + + await page.goto('/monitor'); + await ensureLoggedIn(page); + if (!page.url().includes('/monitor')) { + await page.goto('/monitor'); + } + await expect(page.getByTestId('monitor-view')).toBeVisible({ + timeout: 15_000, + }); + + await expect(page.getByTestId('sessions-list')).toBeVisible({ + timeout: 10_000, + }); + const sessionItem = page.getByTestId('session-item').filter({ hasText: sessionName }).first(); + await expect(sessionItem).toBeVisible({ timeout: 10_000 }); + await sessionItem.click(); + + // ── 3. Wait for compositor node LIVE ───────────────────────────────── + + const compositorNode = page.locator('.react-flow__node').filter({ hasText: 'Compositor' }); + await expect(compositorNode).toBeVisible({ timeout: 15_000 }); + await expect(compositorNode.getByText('LIVE')).toBeVisible({ + timeout: 10_000, + }); + + // ── 4. Select Input 1 and locate opacity slider ───────────────────── + + const inputLayer1 = compositorNode.getByText('Input 1', { exact: true }).first(); + await expect(inputLayer1).toBeVisible({ timeout: 5_000 }); + await inputLayer1.click(); + + const opacitySection = compositorNode + .locator('div') + .filter({ hasText: /^Opacity/ }) + .filter({ has: page.getByRole('slider') }) + .first(); + await expect(opacitySection).toBeVisible({ timeout: 5_000 }); + + // ── 5. Drag opacity slider to change value ────────────────────────── + + const thumb = opacitySection.getByRole('slider'); + await thumb.waitFor({ state: 'visible', timeout: 5_000 }); + const box = await thumb.boundingBox(); + expect(box, 'Opacity slider thumb must have a bounding box').toBeTruthy(); + + // Drag the slider significantly to the left to reduce opacity. + const startX = box!.x + box!.width / 2; + const startY = box!.y + box!.height / 2; + await page.mouse.move(startX, startY); + await page.mouse.down(); + // Move in steps to simulate a realistic drag. + for (let i = 1; i <= 10; i++) { + await page.mouse.move(startX - i * 5, startY); + } + await page.mouse.up(); + + // Wait for debounced WS message to reach the server. + await page.waitForTimeout(1_000); + + // ── 6. Verify server-side pipeline state reflects the change ──────── + + const pipelineResponse = await apiContext.get(`/api/v1/sessions/${sessionId}/pipeline`); + expect(pipelineResponse.ok(), 'Pipeline API should return OK').toBeTruthy(); + + const pipeline = (await pipelineResponse.json()) as { + nodes: Record }>; + }; + + const compositorParams = pipeline.nodes['compositor']?.params; + expect(compositorParams, 'Compositor node should have params in pipeline state').toBeTruthy(); + + // The layers object should exist and in_1's opacity should have + // changed from the initial value of 0.9 (per the fixture YAML). + const layers = compositorParams!['layers'] as Record | undefined; + expect(layers, 'Compositor params should contain layers').toBeTruthy(); + expect(layers!['in_1'], 'Layer in_1 should exist in compositor params').toBeTruthy(); + + const newOpacity = layers!['in_1']!.opacity; + expect(newOpacity, 'in_1 opacity should be defined').toBeDefined(); + expect( + newOpacity, + `in_1 opacity should have changed from initial 0.9 (got ${newOpacity}). ` + + 'If still 0.9, the UI param change did not reach the server — ' + + 'likely UpdateParams deserialization is failing (e.g. _rev/_sender not stripped).' + ).not.toBeCloseTo(0.9, 1); + + await apiContext.dispose(); + + // ── 7. Console error check ────────────────────────────────────────── + + const unexpected = collector.getUnexpected(MOQ_BENIGN_PATTERNS); + if (unexpected.length > 0) { + console.warn('Unexpected console errors (non-fatal):', unexpected); + } + }); + // ── Cleanup ───────────────────────────────────────────────────────────── test.afterEach(async ({ baseURL }) => {