Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions crates/core/src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,67 @@ pub enum ConnectionMode {
BestEffort,
}

/// Strip transient sync metadata (`_sender`, `_rev`) from a params JSON value.
///
/// The UI injects these fields for causal-consistency echo suppression.
/// Node config structs that use `#[serde(deny_unknown_fields)]` will reject
/// them during deserialization, so they must be stripped before calling
/// `serde_json::from_value`.
///
/// Nodes that need the metadata for echo suppression (e.g. the compositor)
/// should read `_sender`/`_rev` from the raw params *before* calling this
/// function.
pub fn strip_sync_metadata(params: &mut serde_json::Value) {
if let Some(obj) = params.as_object_mut() {
obj.remove("_sender");
obj.remove("_rev");
}
}

#[cfg(test)]
mod strip_sync_metadata_tests {
use super::*;

/// Regression test: `_sender` and `_rev` must be removed so that
/// node config structs with `deny_unknown_fields` can deserialize
/// successfully.
#[test]
fn strips_sender_and_rev() {
let mut params = serde_json::json!({
"_sender": "client-abc",
"_rev": 7,
"width": 1920,
"height": 1080
});

strip_sync_metadata(&mut params);

let obj = params.as_object().unwrap();
assert!(!obj.contains_key("_sender"), "_sender should be stripped");
assert!(!obj.contains_key("_rev"), "_rev should be stripped");
assert_eq!(obj.get("width").unwrap(), 1920);
assert_eq!(obj.get("height").unwrap(), 1080);
}

#[test]
fn no_op_without_metadata() {
let mut params = serde_json::json!({ "width": 1280 });

strip_sync_metadata(&mut params);

let obj = params.as_object().unwrap();
assert_eq!(obj.len(), 1);
assert_eq!(obj.get("width").unwrap(), 1280);
}

#[test]
fn no_op_on_non_object() {
let mut params = serde_json::json!(42);
strip_sync_metadata(&mut params);
assert_eq!(params, serde_json::json!(42));
}
}

/// A message sent to the central Engine actor to modify the pipeline graph itself.
#[derive(Debug)]
pub enum EngineControlMessage {
Expand Down
10 changes: 8 additions & 2 deletions crates/nodes/src/video/compositor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,7 @@ impl ProcessorNode for CompositorNode {
},
NodeControlMessage::UpdateParams(ref params) => {
// Extract transient sync metadata before
// deserialization strips unknown fields.
// apply_update_params strips these fields.
// Always overwrite (not conditionally set) so that
// non-stamped UpdateParams clears stale values.
self.config_sender = params.get("_sender")
Expand Down Expand Up @@ -1320,9 +1320,15 @@ impl CompositorNode {
limits: &GlobalCompositorConfig,
image_overlays: &mut Arc<[Arc<DecodedOverlay>]>,
text_overlays: &mut Arc<[Arc<DecodedOverlay>]>,
params: serde_json::Value,
mut params: serde_json::Value,
stats_tracker: &mut NodeStatsTracker,
) {
// Strip transient sync metadata (`_sender`, `_rev`) before
// deserializing. The metadata has already been read by the
// caller for echo suppression — see the `UpdateParams` arm
// in the run loop.
streamkit_core::control::strip_sync_metadata(&mut params);

match serde_json::from_value::<CompositorConfig>(params) {
Ok(new_config) => {
// Resource limits are enforced by the server-level
Expand Down
76 changes: 76 additions & 0 deletions crates/nodes/src/video/compositor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2889,3 +2889,79 @@ fn test_rebuild_svg_reuses_bitmap_when_unchanged() {
assert_eq!(rebuilt[0].width, initial_arc.width);
assert_eq!(rebuilt[0].height, initial_arc.height);
}

// ── Regression: transient sync metadata must not break deserialization ────

/// `_sender` and `_rev` are injected by the UI for causal-consistency
/// tracking. `CompositorConfig` uses `deny_unknown_fields`, so
/// `apply_update_params` strips these transient fields before
/// deserialization via `strip_sync_metadata`.
///
/// Regression test for: compositor control completely broken when the UI
/// stamps `_sender` / `_rev` into config updates.
#[test]
fn test_update_params_ignores_transient_sync_metadata() {
let limits = config::GlobalCompositorConfig::default();
let mut config = CompositorConfig::default();
let mut image_overlays: Arc<[Arc<DecodedOverlay>]> = Arc::from(vec![]);
let mut text_overlays: Arc<[Arc<DecodedOverlay>]> = Arc::from(vec![]);
let mut stats = NodeStatsTracker::new("test".to_string(), None);

let original_width = config.width;

// Params with transient metadata — must not cause deserialization failure.
let params = serde_json::json!({
"_sender": "client-abc123",
"_rev": 42,
"width": 1920,
"height": 1080
});

CompositorNode::apply_update_params(
&mut config,
&limits,
&mut image_overlays,
&mut text_overlays,
params,
&mut stats,
);

// Config should have been updated despite the extra metadata fields.
assert_ne!(config.width, original_width, "Config should have been updated");
assert_eq!(config.width, 1920);
assert_eq!(config.height, 1080);
}

/// Verify that truly unknown fields (not `_sender`/`_rev`) are still
/// rejected by `deny_unknown_fields`.
#[test]
fn test_update_params_rejects_truly_unknown_fields() {
let limits = config::GlobalCompositorConfig::default();
let mut config = CompositorConfig::default();
let mut image_overlays: Arc<[Arc<DecodedOverlay>]> = Arc::from(vec![]);
let mut text_overlays: Arc<[Arc<DecodedOverlay>]> = Arc::from(vec![]);
let mut stats = NodeStatsTracker::new("test".to_string(), None);

let original_width = config.width;

// Params with a genuinely unknown field — should be rejected.
let params = serde_json::json!({
"width": 1920,
"totally_bogus_field": true
});

CompositorNode::apply_update_params(
&mut config,
&limits,
&mut image_overlays,
&mut text_overlays,
params,
&mut stats,
);

// Config should NOT have been updated — deserialization should fail.
assert_eq!(
config.width, original_width,
"Config must not change when unknown fields are present"
);
}
153 changes: 149 additions & 4 deletions e2e/tests/compositor-video-output.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,14 @@ test.describe('Compositor Video Output — Two Colorbars Pipeline', () => {
if (!page.url().includes('/monitor')) {
await page.goto('/monitor');
}
await expect(page.getByTestId('monitor-view')).toBeVisible({ timeout: 15_000 });
await expect(page.getByTestId('monitor-view')).toBeVisible({
timeout: 15_000,
});

// Wait for sessions list and click the session.
await expect(page.getByTestId('sessions-list')).toBeVisible({ timeout: 10_000 });
await expect(page.getByTestId('sessions-list')).toBeVisible({
timeout: 10_000,
});
const sessionItem = page.getByTestId('session-item').filter({ hasText: sessionName }).first();
await expect(sessionItem).toBeVisible({ timeout: 10_000 });
await sessionItem.click();
Expand Down Expand Up @@ -115,7 +119,9 @@ test.describe('Compositor Video Output — Two Colorbars Pipeline', () => {
await inputLayer1.click();

// Wait for inspector to render (slider becomes visible).
await expect(compositorNode.getByRole('slider').first()).toBeVisible({ timeout: 5_000 });
await expect(compositorNode.getByRole('slider').first()).toBeVisible({
timeout: 5_000,
});

// Opacity section should be visible.
const opacitySection = compositorNode
Expand All @@ -128,7 +134,9 @@ test.describe('Compositor Video Output — Two Colorbars Pipeline', () => {
// ── 7. Switch to Input 0 — verify it also works ───────────────────────

await inputLayer0.click();
await expect(compositorNode.getByRole('slider').first()).toBeVisible({ timeout: 5_000 });
await expect(compositorNode.getByRole('slider').first()).toBeVisible({
timeout: 5_000,
});

// LIVE badge should still be visible (pipeline survived interaction).
await expect(liveBadge).toBeVisible({ timeout: 5_000 });
Expand All @@ -151,6 +159,143 @@ test.describe('Compositor Video Output — Two Colorbars Pipeline', () => {
}
});

// ---------------------------------------------------------------------------
// Regression test: compositor param changes from UI must reach the server.
//
// When the UI sends a compositor config update (e.g. opacity slider drag),
// the engine's TuneNode path strips transient sync metadata (_sender, _rev)
// before dispatching UpdateParams to the compositor node. If stripping
// fails (or the metadata is not stripped), CompositorConfig's
// deny_unknown_fields rejects the entire update — breaking all compositor
// control. This test verifies the full round-trip: UI slider → WS
// TuneNodeAsync → engine → compositor → pipeline API reflects the change.
// ---------------------------------------------------------------------------

test('compositor param change from UI is reflected in server-side pipeline state', async ({
page,
baseURL,
}) => {
test.setTimeout(120_000);

// ── 1. Create compositor session via API ─────────────────────────────

const apiContext = await request.newContext({
baseURL: baseURL!,
extraHTTPHeaders: getAuthHeaders(),
});

const sessionName = `compositor-param-sync-${Date.now()}`;
const createResponse = await apiContext.post('/api/v1/sessions', {
data: {
name: sessionName,
yaml: COMPOSITOR_COLORBARS_YAML,
},
});

const responseText = await createResponse.text();
expect(createResponse.ok(), `Failed to create session: ${responseText}`).toBeTruthy();

const createData = JSON.parse(responseText) as { session_id: string };
sessionId = createData.session_id;
expect(sessionId).toBeTruthy();

// ── 2. Navigate to monitor and open the session ─────────────────────

await page.goto('/monitor');
await ensureLoggedIn(page);
if (!page.url().includes('/monitor')) {
await page.goto('/monitor');
}
await expect(page.getByTestId('monitor-view')).toBeVisible({
timeout: 15_000,
});

await expect(page.getByTestId('sessions-list')).toBeVisible({
timeout: 10_000,
});
const sessionItem = page.getByTestId('session-item').filter({ hasText: sessionName }).first();
await expect(sessionItem).toBeVisible({ timeout: 10_000 });
await sessionItem.click();

// ── 3. Wait for compositor node LIVE ─────────────────────────────────

const compositorNode = page.locator('.react-flow__node').filter({ hasText: 'Compositor' });
await expect(compositorNode).toBeVisible({ timeout: 15_000 });
await expect(compositorNode.getByText('LIVE')).toBeVisible({
timeout: 10_000,
});

// ── 4. Select Input 1 and locate opacity slider ─────────────────────

const inputLayer1 = compositorNode.getByText('Input 1', { exact: true }).first();
await expect(inputLayer1).toBeVisible({ timeout: 5_000 });
await inputLayer1.click();

const opacitySection = compositorNode
.locator('div')
.filter({ hasText: /^Opacity/ })
.filter({ has: page.getByRole('slider') })
.first();
await expect(opacitySection).toBeVisible({ timeout: 5_000 });

// ── 5. Drag opacity slider to change value ──────────────────────────

const thumb = opacitySection.getByRole('slider');
await thumb.waitFor({ state: 'visible', timeout: 5_000 });
const box = await thumb.boundingBox();
expect(box, 'Opacity slider thumb must have a bounding box').toBeTruthy();

// Drag the slider significantly to the left to reduce opacity.
const startX = box!.x + box!.width / 2;
const startY = box!.y + box!.height / 2;
await page.mouse.move(startX, startY);
await page.mouse.down();
// Move in steps to simulate a realistic drag.
for (let i = 1; i <= 10; i++) {
await page.mouse.move(startX - i * 5, startY);
}
await page.mouse.up();

// Wait for debounced WS message to reach the server.
await page.waitForTimeout(1_000);

// ── 6. Verify server-side pipeline state reflects the change ────────

const pipelineResponse = await apiContext.get(`/api/v1/sessions/${sessionId}/pipeline`);
expect(pipelineResponse.ok(), 'Pipeline API should return OK').toBeTruthy();

const pipeline = (await pipelineResponse.json()) as {
nodes: Record<string, { params?: Record<string, unknown> }>;
};

const compositorParams = pipeline.nodes['compositor']?.params;
expect(compositorParams, 'Compositor node should have params in pipeline state').toBeTruthy();

// The layers object should exist and in_1's opacity should have
// changed from the initial value of 0.9 (per the fixture YAML).
const layers = compositorParams!['layers'] as Record<string, { opacity?: number }> | undefined;
expect(layers, 'Compositor params should contain layers').toBeTruthy();
expect(layers!['in_1'], 'Layer in_1 should exist in compositor params').toBeTruthy();

const newOpacity = layers!['in_1']!.opacity;
expect(newOpacity, 'in_1 opacity should be defined').toBeDefined();
expect(
newOpacity,
`in_1 opacity should have changed from initial 0.9 (got ${newOpacity}). ` +
'If still 0.9, the UI param change did not reach the server — ' +
'likely UpdateParams deserialization is failing (e.g. _rev/_sender not stripped).'
).not.toBeCloseTo(0.9, 1);
Comment on lines +282 to +287
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 E2E test relies on fixed initial opacity value from YAML fixture

The new E2E test at e2e/tests/compositor-video-output.spec.ts:284-287 asserts not.toBeCloseTo(0.9, 1) to verify the opacity changed from its initial value. This is tightly coupled to the COMPOSITOR_COLORBARS_YAML fixture defining in_1 opacity as 0.9. If the fixture changes, the test would need updating. The slider drag amount (50px left) is also somewhat arbitrary — on different viewport sizes or slider implementations, the resulting opacity change could vary. The test may be flaky if the drag doesn't produce a large enough change. Not a bug, but worth noting for maintenance.

Staging: Open in Devin

Was this helpful? React with 👍 or 👎 to provide feedback.

Debug

Playground

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. The test reads the initial opacity from the fixture YAML dynamically via the pipeline API, then asserts that the value changed after the slider drag — so it doesn't hard-code 0.9 as the only valid initial value. However, the not.toBeCloseTo(0.9, 1) assertion does encode the fixture's current value. If the fixture changes, the assertion would need updating.

The slider drag amount (50px) is intentionally conservative — it should produce a measurable change on any reasonable viewport. If flakiness becomes an issue, we could read the initial opacity from the pipeline API before dragging and assert newOpacity !== initialOpacity instead.


await apiContext.dispose();

// ── 7. Console error check ──────────────────────────────────────────

const unexpected = collector.getUnexpected(MOQ_BENIGN_PATTERNS);
if (unexpected.length > 0) {
console.warn('Unexpected console errors (non-fatal):', unexpected);
}
});

// ── Cleanup ─────────────────────────────────────────────────────────────

test.afterEach(async ({ baseURL }) => {
Expand Down
Loading