diff --git a/crates/nodes/src/audio/codecs/opus.rs b/crates/nodes/src/audio/codecs/opus.rs index 494e6a35..3482d37f 100644 --- a/crates/nodes/src/audio/codecs/opus.rs +++ b/crates/nodes/src/audio/codecs/opus.rs @@ -512,11 +512,6 @@ impl ProcessorNode for OpusEncoderNode { // Only clone if padding is needed, otherwise use slice directly let encode_result = if samples.len() < expected_samples { - tracing::debug!( - "Padding frame from {} to {} samples with silence", - samples.len(), - expected_samples - ); let mut padded = samples.as_ref().to_vec(); padded.resize(expected_samples, 0.0); enc.encode_float(&padded, &mut encode_buffer) diff --git a/crates/nodes/src/containers/ogg.rs b/crates/nodes/src/containers/ogg.rs index 9f34bc5c..8c3ed2d9 100644 --- a/crates/nodes/src/containers/ogg.rs +++ b/crates/nodes/src/containers/ogg.rs @@ -208,12 +208,13 @@ impl ProcessorNode for OggMuxerNode { if let Packet::Binary { data, metadata, .. } = packet { packet_count += 1; stats_tracker.received(); - - tracing::debug!( - "OggMuxer received packet #{}, {} bytes", - packet_count, - data.len() - ); + if packet_count.is_multiple_of(1000) { + tracing::debug!( + "OggMuxer processed {} packets (last packet: {} bytes)", + packet_count, + data.len() + ); + } // Force every packet to end a page for maximum streaming behavior. // This allows chunk_size to work as expected by ensuring @@ -229,21 +230,10 @@ impl ProcessorNode for OggMuxerNode { if let Some(timestamp_us) = meta.timestamp_us { // Convert timestamp from microseconds to 48kHz samples last_granule_pos = (timestamp_us * 48000) / 1_000_000; - tracing::debug!( - "Using metadata timestamp: {}us -> granule_pos: {}", - timestamp_us, - last_granule_pos - ); } else if let Some(duration_us) = meta.duration_us { // If we don't have timestamp but have duration, accumulate let samples = (duration_us * 48000) / 1_000_000; last_granule_pos += samples; - tracing::debug!( - "Using metadata duration: {}us ({} samples) -> granule_pos: {}", - duration_us, - samples, - last_granule_pos - ); } else { // Fallback: assume 960 samples (20ms at 48kHz) last_granule_pos = 960 * packet_count; @@ -253,11 +243,6 @@ impl ProcessorNode for OggMuxerNode { last_granule_pos = 960 * packet_count; } - tracing::debug!( - "About to write packet #{} to OGG writer (granule: {})", - packet_count, - last_granule_pos - ); if let Err(e) = writer.write_packet( data.to_vec(), self.config.stream_serial, @@ -270,7 +255,6 @@ impl ProcessorNode for OggMuxerNode { state_helpers::emit_failed(&context.state_tx, &node_name, &err_msg); return Err(StreamKitError::Runtime(err_msg)); } - tracing::debug!("Packet #{} written to OGG writer successfully", packet_count); // Flush any bytes accumulated by the Ogg writer immediately to maximize streaming. // This avoids buffering large chunks in memory and delivers data as soon as pages are ready. @@ -288,7 +272,6 @@ impl ProcessorNode for OggMuxerNode { }; if let Some(data) = data_to_send { - tracing::trace!("Flushing {} bytes to output", data.len()); if context .output_sender .send( @@ -468,7 +451,7 @@ impl ProcessorNode for OggDemuxerNode { }); // Process packets from the async reader - let mut packets_extracted = 0; + let mut packets_extracted = 0u64; let mut last_granule_pos: Option = None; let mut packets_at_granule_pos = 0u64; let mut detected_frame_duration_us: Option = None; @@ -494,6 +477,9 @@ impl ProcessorNode for OggDemuxerNode { Ok(packet) => { packets_extracted += 1; stats_tracker.received(); + if packets_extracted.is_multiple_of(1000) { + tracing::debug!("OggDemuxer extracted {} packets", packets_extracted); + } // Extract granule position for timing metadata let granule_pos = packet.absgp_page(); @@ -559,14 +545,6 @@ impl ProcessorNode for OggDemuxerNode { None }; - tracing::debug!( - "Extracted Ogg packet {} with {} bytes (granule_pos: {}, metadata: {:?})", - packets_extracted, - packet.data.len(), - granule_pos, - metadata - ); - // Send the packet data to the output with timing metadata let output_packet = Packet::Binary { data: Bytes::from(packet.data), @@ -817,7 +795,6 @@ impl ProcessorNode for SymphoniaOggDemuxerNode { let data_tx = data_tx; while let Some(packet) = input_rx.recv().await { if let Packet::Binary { data, .. } = packet { - tracing::debug!("Forwarding {} bytes to Symphonia reader", data.len()); if data_tx.send(data).await.is_err() { break; } diff --git a/docs/astro.config.mjs b/docs/astro.config.mjs index 2742eb5b..1436e219 100644 --- a/docs/astro.config.mjs +++ b/docs/astro.config.mjs @@ -57,6 +57,7 @@ export default defineConfig({ items: [ { label: 'Creating Pipelines', slug: 'guides/creating-pipelines' }, { label: 'Performance Tuning', slug: 'guides/performance' }, + { label: 'Load Testing', slug: 'guides/load-testing' }, { label: 'Observability', slug: 'guides/observability' }, { label: 'Script Node', slug: 'guides/script-node' }, { label: 'Using the Web UI', slug: 'guides/web-ui' }, diff --git a/docs/src/content/docs/guides/load-testing.md b/docs/src/content/docs/guides/load-testing.md new file mode 100644 index 00000000..5ddadbc8 --- /dev/null +++ b/docs/src/content/docs/guides/load-testing.md @@ -0,0 +1,109 @@ +--- +# SPDX-FileCopyrightText: © 2025 StreamKit Contributors +# SPDX-License-Identifier: MPL-2.0 +title: Load Testing +description: Run targeted stress tests and capture profiles for StreamKit +--- + +StreamKit ships with a small load-test runner (`skit-cli loadtest`) plus a set of ready-made configs under `samples/loadtest/`. + +Use these when you want to: + +- Reproduce and profile a specific hotspot (codec, muxing, mixing, control plane, session lifecycle). +- Compare changes across runs with consistent inputs. +- Stress a single subsystem without the UI in the loop. + +## Prerequisites + +- Start the server: `just skit serve` +- Load tests use the client binary: `just skit-cli -- lt ` +- Some presets require a local MoQ relay at `http://localhost:4443` + +## Running Presets + +All configs live in `samples/loadtest/`. You can run them via a single `just` target: + +- `just lt ` runs `samples/loadtest/.toml` +- `just lt ` runs an explicit TOML path + +Examples: + +- `just lt stress-oneshot` +- `just lt oneshot-opus-transcode-fast` +- `just lt dynamic-tune-heavy --cleanup` + +### Oneshot (HTTP batch pipelines) + +- `just lt stress-oneshot` — default oneshot stress preset +- `just lt oneshot-http-passthrough` — multipart upload + oneshot engine overhead (minimal node CPU) +- `just lt oneshot-graph-chain` — graph wiring + channel hops (passthrough chain) +- `just lt oneshot-opus-transcode-fast` — codec-heavy (Ogg demux + Opus decode/encode), no pacer + +### Dynamic (long-lived sessions) + +- `just lt stress-dynamic` — default dynamic stress preset +- `just lt dynamic-scale-audio-gain` — many sessions, sustained decode, low tune rate +- `just lt dynamic-tune-heavy` — stresses control plane param updates (frequent tuning, many `audio::gain` nodes) +- `just lt dynamic-moq-fanout` — MoQ fanout (requires relay at `http://localhost:4443`) + +## Capturing CPU Profiles + +The easiest workflow is: + +1. Run a profiling build of the server: `just skit-profiling serve` +2. Run a load test preset in another terminal (examples above) +3. Fetch profiles: + - Top view: `just profile-top 30` + - Web UI: `just profile-web 30` + +`profile-*` commands require Go (`go tool pprof`). + +## What Each Preset Targets + +### `lt-oneshot-http-passthrough` + +Targets request handling and oneshot overhead: + +- Multipart parsing and streaming input +- Pipeline compilation/validation +- Graph wiring/spawn + channel plumbing + +### `lt-oneshot-opus-transcode-fast` + +Targets codec throughput: + +- `containers::ogg::demuxer` (parsing) +- `audio::opus::{decoder,encoder}` + +This intentionally runs “as fast as possible” (no pacer), so it’s useful for CPU profiling and throughput regressions. + +### `lt-dynamic-tune-heavy` + +Targets control-plane churn: + +- Session creation churn (up to `dynamic.session_count`) +- Control WebSocket tuning rate (`dynamic.tune_interval_ms`) +- Parameter updates to many `audio::gain` nodes + +### `lt-dynamic-moq-fanout` + +Targets MoQ transport + data plane in dynamic sessions: + +- One broadcaster session publishes to `input` +- Many subscriber sessions subscribe/transcode/publish + +## Writing Your Own Config + +Configs are TOML and validated by `skit-cli` before running: + +- Pick a scenario: `test.scenario = "oneshot" | "dynamic" | "mixed"` +- Point to a pipeline YAML for that scenario +- Adjust `oneshot.concurrency` or `dynamic.session_count` + +See `apps/skit-cli/src/load_test/config.rs` for the full schema. + +## Tips + +- For profiling, keep logging quiet (e.g. `RUST_LOG=warn`) to avoid measuring log formatting instead of pipeline CPU. +- For dynamic tests, use `--cleanup` when you want sessions deleted at the end: `just lt-dynamic-cleanup`. +- Prefer small input files for high-throughput profiling (e.g. `samples/audio/system/speech_2m.opus`) and larger files for sustained steady-state load. diff --git a/justfile b/justfile index fb2e42ec..43b7bdbf 100644 --- a/justfile +++ b/justfile @@ -99,25 +99,43 @@ skit-cli *args='': skit-lt config='loadtest.toml' *args='': @cargo run -p streamkit-client --bin skit-cli -- loadtest {{config}} {{args}} -# Alias for skit-lt -alias lt := skit-lt +# Run a load test by preset id (maps to `samples/loadtest/.toml`) or by explicit path. +# +# Examples: +# - `just lt` # runs `samples/loadtest/stress-oneshot.toml` by default +# - `just lt stress-dynamic` # runs `samples/loadtest/stress-dynamic.toml` +# - `just lt dynamic-tune-heavy --cleanup` +# - `just lt samples/loadtest/ui-demo.toml` +lt preset_or_path='stress-oneshot' *args='': + @cfg="" + @if [ -f "{{preset_or_path}}" ]; then \ + cfg="{{preset_or_path}}"; \ + elif [ -f "samples/loadtest/{{preset_or_path}}.toml" ]; then \ + cfg="samples/loadtest/{{preset_or_path}}.toml"; \ + else \ + echo "❌ Loadtest config not found: '{{preset_or_path}}'"; \ + echo " - If passing a preset, expected: samples/loadtest/{{preset_or_path}}.toml"; \ + echo " - If passing a path, ensure the file exists"; \ + exit 1; \ + fi; \ + just skit-lt "$cfg" {{args}} # --- Load test presets --- # Run the standard oneshot stress test config lt-oneshot *args='': - @just skit-lt samples/loadtest/stress-oneshot.toml {{args}} + @just lt stress-oneshot {{args}} # Run the standard dynamic session stress test config lt-dynamic *args='': - @just skit-lt samples/loadtest/stress-dynamic.toml {{args}} + @just lt stress-dynamic {{args}} # Run the standard dynamic session stress test config with cleanup enabled lt-dynamic-cleanup *args='': - @just skit-lt samples/loadtest/stress-dynamic.toml --cleanup {{args}} + @just lt stress-dynamic --cleanup {{args}} # Run the long-running UI demo config lt-ui-demo *args='': - @just skit-lt samples/loadtest/ui-demo.toml {{args}} + @just lt ui-demo {{args}} # Run skit tests # Note: We exclude dhat-heap since it's mutually exclusive with profiling (both define global allocators) diff --git a/samples/loadtest/dynamic-moq-fanout.toml b/samples/loadtest/dynamic-moq-fanout.toml new file mode 100644 index 00000000..618abbeb --- /dev/null +++ b/samples/loadtest/dynamic-moq-fanout.toml @@ -0,0 +1,38 @@ +# Load Test Configuration: Dynamic MoQ Fanout +# Requires a MoQ relay at http://localhost:4443. +# Creates a broadcaster session publishing to "input", then many subscriber sessions that transcode. + +[server] +url = "http://127.0.0.1:4545" + +[test] +duration_secs = 180 +scenario = "dynamic" + +[oneshot] +enabled = false +concurrency = 0 +pipeline = "" +input_file = "" + +[dynamic] +enabled = true +session_count = 100 +tune_interval_ms = 1000 +pipelines = [ + "samples/loadtest/pipelines/moq_subscriber_transcode.yml", +] + +[dynamic.broadcaster] +pipeline = "samples/loadtest/pipelines/moq_broadcaster.yml" +count = 1 + +[populate] +load_plugins = false +plugins_native = [] +plugins_wasm = [] + +[output] +format = "text" +real_time_updates = true +update_interval_ms = 2000 diff --git a/samples/loadtest/dynamic-scale-audio-gain.toml b/samples/loadtest/dynamic-scale-audio-gain.toml new file mode 100644 index 00000000..20a33f57 --- /dev/null +++ b/samples/loadtest/dynamic-scale-audio-gain.toml @@ -0,0 +1,33 @@ +# Load Test Configuration: Dynamic Scale (Audio Gain + Sink) +# Stresses dynamic engine graph management and session lifecycle at high session counts. + +[server] +url = "http://127.0.0.1:4545" + +[test] +duration_secs = 180 +scenario = "dynamic" + +[oneshot] +enabled = false +concurrency = 0 +pipeline = "" +input_file = "" + +[dynamic] +enabled = true +session_count = 200 +tune_interval_ms = 5000 +pipelines = [ + "samples/loadtest/pipelines/dynamic_audio_gain_chain.yml", +] + +[populate] +load_plugins = false +plugins_native = [] +plugins_wasm = [] + +[output] +format = "text" +real_time_updates = true +update_interval_ms = 2000 diff --git a/samples/loadtest/dynamic-tune-heavy.toml b/samples/loadtest/dynamic-tune-heavy.toml new file mode 100644 index 00000000..772354ce --- /dev/null +++ b/samples/loadtest/dynamic-tune-heavy.toml @@ -0,0 +1,33 @@ +# Load Test Configuration: Dynamic Tune Heavy (Many Gain Nodes) +# Stresses the control WebSocket + node param updates under sustained tuning. + +[server] +url = "http://127.0.0.1:4545" + +[test] +duration_secs = 180 +scenario = "dynamic" + +[oneshot] +enabled = false +concurrency = 0 +pipeline = "" +input_file = "" + +[dynamic] +enabled = true +session_count = 50 +tune_interval_ms = 200 +pipelines = [ + "samples/loadtest/pipelines/dynamic_many_gains.yml", +] + +[populate] +load_plugins = false +plugins_native = [] +plugins_wasm = [] + +[output] +format = "text" +real_time_updates = true +update_interval_ms = 2000 diff --git a/samples/loadtest/oneshot-graph-chain.toml b/samples/loadtest/oneshot-graph-chain.toml new file mode 100644 index 00000000..28a6225d --- /dev/null +++ b/samples/loadtest/oneshot-graph-chain.toml @@ -0,0 +1,31 @@ +# Load Test Configuration: OneShot Passthrough Chain +# Stresses graph wiring + task/channel overhead (no heavy codecs). + +[server] +url = "http://127.0.0.1:4545" + +[test] +duration_secs = 60 +scenario = "oneshot" + +[oneshot] +enabled = true +concurrency = 30 +pipeline = "samples/loadtest/pipelines/oneshot_passthrough_chain_16.yml" +input_file = "samples/audio/system/speech_10m.opus" + +[dynamic] +enabled = false +session_count = 0 +tune_interval_ms = 1000 +pipelines = [] + +[populate] +load_plugins = false +plugins_native = [] +plugins_wasm = [] + +[output] +format = "text" +real_time_updates = true +update_interval_ms = 2000 diff --git a/samples/loadtest/oneshot-http-passthrough.toml b/samples/loadtest/oneshot-http-passthrough.toml new file mode 100644 index 00000000..8e40b5e1 --- /dev/null +++ b/samples/loadtest/oneshot-http-passthrough.toml @@ -0,0 +1,31 @@ +# Load Test Configuration: OneShot HTTP Passthrough +# Stresses multipart upload + oneshot graph plumbing with minimal processing. + +[server] +url = "http://127.0.0.1:4545" + +[test] +duration_secs = 60 +scenario = "oneshot" + +[oneshot] +enabled = true +concurrency = 50 +pipeline = "samples/loadtest/pipelines/oneshot_http_passthrough.yml" +input_file = "samples/audio/system/speech_10m.opus" + +[dynamic] +enabled = false +session_count = 0 +tune_interval_ms = 1000 +pipelines = [] + +[populate] +load_plugins = false +plugins_native = [] +plugins_wasm = [] + +[output] +format = "text" +real_time_updates = true +update_interval_ms = 2000 diff --git a/samples/loadtest/oneshot-opus-transcode-fast.toml b/samples/loadtest/oneshot-opus-transcode-fast.toml new file mode 100644 index 00000000..0dded91a --- /dev/null +++ b/samples/loadtest/oneshot-opus-transcode-fast.toml @@ -0,0 +1,31 @@ +# Load Test Configuration: OneShot Opus Transcode (No Pacer) +# Stresses Ogg demux + Opus decode/encode throughput. + +[server] +url = "http://127.0.0.1:4545" + +[test] +duration_secs = 60 +scenario = "oneshot" + +[oneshot] +enabled = true +concurrency = 20 +pipeline = "samples/loadtest/pipelines/oneshot_opus_transcode_fast.yml" +input_file = "samples/audio/system/speech_2m.opus" + +[dynamic] +enabled = false +session_count = 0 +tune_interval_ms = 1000 +pipelines = [] + +[populate] +load_plugins = false +plugins_native = [] +plugins_wasm = [] + +[output] +format = "text" +real_time_updates = true +update_interval_ms = 2000 diff --git a/samples/loadtest/pipelines/dynamic_audio_gain_chain.yml b/samples/loadtest/pipelines/dynamic_audio_gain_chain.yml new file mode 100644 index 00000000..5a851c83 --- /dev/null +++ b/samples/loadtest/pipelines/dynamic_audio_gain_chain.yml @@ -0,0 +1,38 @@ +# SPDX-FileCopyrightText: © 2025 StreamKit Contributors +# +# SPDX-License-Identifier: MPL-2.0 + +name: "Loadtest: Dynamic Audio Gain Chain" +description: "Stresses dynamic engine graph + sustained decode; tuner updates audio::gain." +mode: dynamic +nodes: + file_reader: + kind: core::file_reader + params: + path: samples/audio/system/speech_10m.opus + chunk_size: 8192 + + demux: + kind: containers::ogg::demuxer + needs: file_reader + + pacer: + kind: core::pacer + params: + speed: 1 + buffer_size: 16 + needs: demux + + decode: + kind: audio::opus::decoder + needs: pacer + + gain: + kind: audio::gain + params: + gain: 1.0 + needs: decode + + sink: + kind: core::sink + needs: gain diff --git a/samples/loadtest/pipelines/dynamic_many_gains.yml b/samples/loadtest/pipelines/dynamic_many_gains.yml new file mode 100644 index 00000000..cf04a786 --- /dev/null +++ b/samples/loadtest/pipelines/dynamic_many_gains.yml @@ -0,0 +1,41 @@ +# SPDX-FileCopyrightText: © 2025 StreamKit Contributors +# +# SPDX-License-Identifier: MPL-2.0 + +name: "Loadtest: Dynamic Many Gains" +description: "Stresses control WS + param updates (many tunable audio::gain nodes)." +mode: dynamic +nodes: + file_reader: + kind: core::file_reader + params: + path: samples/audio/system/speech_10m.opus + chunk_size: 8192 + + demux: + kind: containers::ogg::demuxer + needs: file_reader + + pacer: + kind: core::pacer + params: + speed: 1 + buffer_size: 16 + needs: demux + + decode: + kind: audio::opus::decoder + needs: pacer + + gain_0: { kind: audio::gain, params: { gain: 1.0 }, needs: decode } + gain_1: { kind: audio::gain, params: { gain: 1.0 }, needs: gain_0 } + gain_2: { kind: audio::gain, params: { gain: 1.0 }, needs: gain_1 } + gain_3: { kind: audio::gain, params: { gain: 1.0 }, needs: gain_2 } + gain_4: { kind: audio::gain, params: { gain: 1.0 }, needs: gain_3 } + gain_5: { kind: audio::gain, params: { gain: 1.0 }, needs: gain_4 } + gain_6: { kind: audio::gain, params: { gain: 1.0 }, needs: gain_5 } + gain_7: { kind: audio::gain, params: { gain: 1.0 }, needs: gain_6 } + + sink: + kind: core::sink + needs: gain_7 diff --git a/samples/loadtest/pipelines/moq_subscriber_transcode.yml b/samples/loadtest/pipelines/moq_subscriber_transcode.yml new file mode 100644 index 00000000..93666550 --- /dev/null +++ b/samples/loadtest/pipelines/moq_subscriber_transcode.yml @@ -0,0 +1,34 @@ +# SPDX-FileCopyrightText: © 2025 StreamKit Contributors +# +# SPDX-License-Identifier: MPL-2.0 + +name: "Loadtest: MoQ Subscriber Transcode" +description: "Many sessions subscribe to one broadcast; stresses MoQ + decode/encode + tuning." +mode: dynamic +nodes: + sub: + kind: transport::moq::subscriber + params: + url: http://localhost:4443 + broadcast: input + + decode: + kind: audio::opus::decoder + needs: sub + + gain: + kind: audio::gain + params: + gain: 1.0 + needs: decode + + encode: + kind: audio::opus::encoder + needs: gain + + pub: + kind: transport::moq::publisher + params: + url: http://localhost:4443 + broadcast: output + needs: encode diff --git a/samples/loadtest/pipelines/oneshot_http_passthrough.yml b/samples/loadtest/pipelines/oneshot_http_passthrough.yml new file mode 100644 index 00000000..4370a5a7 --- /dev/null +++ b/samples/loadtest/pipelines/oneshot_http_passthrough.yml @@ -0,0 +1,18 @@ +# SPDX-FileCopyrightText: © 2025 StreamKit Contributors +# +# SPDX-License-Identifier: MPL-2.0 + +name: "Loadtest: HTTP Passthrough" +description: "Stresses multipart upload + oneshot engine plumbing with minimal node CPU." +mode: oneshot +nodes: + http_input: + kind: streamkit::http_input + + passthrough: + kind: core::passthrough + needs: http_input + + http_output: + kind: streamkit::http_output + needs: passthrough diff --git a/samples/loadtest/pipelines/oneshot_mixing_no_pacer.yml b/samples/loadtest/pipelines/oneshot_mixing_no_pacer.yml new file mode 100644 index 00000000..fe44c9f9 --- /dev/null +++ b/samples/loadtest/pipelines/oneshot_mixing_no_pacer.yml @@ -0,0 +1,85 @@ +# +# Load-test pipeline: mixing without pacing +# +# Purpose: maximize throughput/CPU load by removing real-time pacer nodes. +# This intentionally processes inputs as fast as possible. +# +# skit:input_asset_tags=speech +# + +name: "Loadtest: Audio Mixing (No Pacer)" +description: "Mixes uploaded audio with a local track and returns Opus/WebM; no pacer nodes for maximum throughput." +mode: oneshot +nodes: + # ============================================================ + # INPUTS: HTTP upload + local music file + # ============================================================ + http_input: + kind: streamkit::http_input + + music_file_reader: + kind: core::file_reader + params: + chunk_size: 8192 + path: samples/audio/system/THE LADY IS A TRAMP.opus + + # ============================================================ + # UPLOAD STREAM: Demux, decode, apply gain (no pacer) + # ============================================================ + upload_ogg_demuxer: + kind: containers::ogg::demuxer + needs: http_input + + upload_opus_decoder: + kind: audio::opus::decoder + needs: upload_ogg_demuxer + + upload_gain: + kind: audio::gain + params: + gain: 1 + needs: upload_opus_decoder + + # ============================================================ + # MUSIC STREAM: Demux, decode, apply gain (no pacer) + # ============================================================ + music_ogg_demuxer: + kind: containers::ogg::demuxer + needs: music_file_reader + + music_opus_decoder: + kind: audio::opus::decoder + needs: music_ogg_demuxer + + music_gain: + kind: audio::gain + params: + gain: 0.1 + needs: music_opus_decoder + + # ============================================================ + # OUTPUT: Mix streams and encode to WebM + # ============================================================ + mixer: + kind: audio::mixer + params: + num_inputs: 2 + needs: + - upload_gain + - music_gain + + opus_encoder: + kind: audio::opus::encoder + needs: mixer + + webm_muxer: + kind: containers::webm::muxer + params: + channels: 1 + chunk_size: 65536 + sample_rate: 48000 + needs: opus_encoder + + http_output: + kind: streamkit::http_output + needs: webm_muxer diff --git a/samples/loadtest/pipelines/oneshot_opus_transcode_fast.yml b/samples/loadtest/pipelines/oneshot_opus_transcode_fast.yml new file mode 100644 index 00000000..0ceecdab --- /dev/null +++ b/samples/loadtest/pipelines/oneshot_opus_transcode_fast.yml @@ -0,0 +1,32 @@ +# SPDX-FileCopyrightText: © 2025 StreamKit Contributors +# +# SPDX-License-Identifier: MPL-2.0 + +name: "Loadtest: Opus Transcode (No Pacer)" +description: "Stresses Ogg demux + Opus decode/encode; streams raw Opus frames out." +mode: oneshot +nodes: + http_input: + kind: streamkit::http_input + + demux: + kind: containers::ogg::demuxer + needs: http_input + + decode: + kind: audio::opus::decoder + needs: demux + + gain: + kind: audio::gain + params: + gain: 1.0 + needs: decode + + encode: + kind: audio::opus::encoder + needs: gain + + http_output: + kind: streamkit::http_output + needs: encode diff --git a/samples/loadtest/pipelines/oneshot_passthrough_chain_16.yml b/samples/loadtest/pipelines/oneshot_passthrough_chain_16.yml new file mode 100644 index 00000000..20325ccf --- /dev/null +++ b/samples/loadtest/pipelines/oneshot_passthrough_chain_16.yml @@ -0,0 +1,31 @@ +# SPDX-FileCopyrightText: © 2025 StreamKit Contributors +# +# SPDX-License-Identifier: MPL-2.0 + +name: "Loadtest: Passthrough Chain x16" +description: "Stresses graph wiring + task/channel overhead (no codecs)." +mode: oneshot +nodes: + http_input: + kind: streamkit::http_input + + p00: { kind: core::passthrough, needs: http_input } + p01: { kind: core::passthrough, needs: p00 } + p02: { kind: core::passthrough, needs: p01 } + p03: { kind: core::passthrough, needs: p02 } + p04: { kind: core::passthrough, needs: p03 } + p05: { kind: core::passthrough, needs: p04 } + p06: { kind: core::passthrough, needs: p05 } + p07: { kind: core::passthrough, needs: p06 } + p08: { kind: core::passthrough, needs: p07 } + p09: { kind: core::passthrough, needs: p08 } + p10: { kind: core::passthrough, needs: p09 } + p11: { kind: core::passthrough, needs: p10 } + p12: { kind: core::passthrough, needs: p11 } + p13: { kind: core::passthrough, needs: p12 } + p14: { kind: core::passthrough, needs: p13 } + p15: { kind: core::passthrough, needs: p14 } + + http_output: + kind: streamkit::http_output + needs: p15 diff --git a/samples/loadtest/stress-dynamic.toml b/samples/loadtest/stress-dynamic.toml index b93ca6eb..ca45d615 100644 --- a/samples/loadtest/stress-dynamic.toml +++ b/samples/loadtest/stress-dynamic.toml @@ -19,8 +19,8 @@ enabled = true session_count = 50 tune_interval_ms = 1500 pipelines = [ - "samples/pipelines/dynamic/moq_selfcontained.yml", - "samples/pipelines/dynamic/moq_mixing_selfcontained.yml", + "samples/loadtest/pipelines/moq_selfcontained.yml", + "samples/loadtest/pipelines/moq_mixing_selfcontained.yml", ] [populate] diff --git a/samples/loadtest/stress-oneshot.toml b/samples/loadtest/stress-oneshot.toml index e425da8e..e9cc50ca 100644 --- a/samples/loadtest/stress-oneshot.toml +++ b/samples/loadtest/stress-oneshot.toml @@ -12,8 +12,8 @@ scenario = "oneshot" [oneshot] enabled = true concurrency = 20 -pipeline = "samples/pipelines/oneshot/double_volume.yml" -input_file = "samples/audio/system/sample.ogg" +pipeline = "samples/loadtest/pipelines/oneshot_mixing_no_pacer.yml" +input_file = "samples/audio/system/speech_2m.opus" [dynamic] enabled = false