Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions crates/nodes/src/audio/codecs/opus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,11 +512,6 @@ impl ProcessorNode for OpusEncoderNode {

// Only clone if padding is needed, otherwise use slice directly
let encode_result = if samples.len() < expected_samples {
tracing::debug!(
"Padding frame from {} to {} samples with silence",
samples.len(),
expected_samples
);
let mut padded = samples.as_ref().to_vec();
padded.resize(expected_samples, 0.0);
enc.encode_float(&padded, &mut encode_buffer)
Expand Down
45 changes: 11 additions & 34 deletions crates/nodes/src/containers/ogg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,13 @@ impl ProcessorNode for OggMuxerNode {
if let Packet::Binary { data, metadata, .. } = packet {
packet_count += 1;
stats_tracker.received();

tracing::debug!(
"OggMuxer received packet #{}, {} bytes",
packet_count,
data.len()
);
if packet_count.is_multiple_of(1000) {
tracing::debug!(
"OggMuxer processed {} packets (last packet: {} bytes)",
packet_count,
data.len()
);
}

// Force every packet to end a page for maximum streaming behavior.
// This allows chunk_size to work as expected by ensuring
Expand All @@ -229,21 +230,10 @@ impl ProcessorNode for OggMuxerNode {
if let Some(timestamp_us) = meta.timestamp_us {
// Convert timestamp from microseconds to 48kHz samples
last_granule_pos = (timestamp_us * 48000) / 1_000_000;
tracing::debug!(
"Using metadata timestamp: {}us -> granule_pos: {}",
timestamp_us,
last_granule_pos
);
} else if let Some(duration_us) = meta.duration_us {
// If we don't have timestamp but have duration, accumulate
let samples = (duration_us * 48000) / 1_000_000;
last_granule_pos += samples;
tracing::debug!(
"Using metadata duration: {}us ({} samples) -> granule_pos: {}",
duration_us,
samples,
last_granule_pos
);
} else {
// Fallback: assume 960 samples (20ms at 48kHz)
last_granule_pos = 960 * packet_count;
Expand All @@ -253,11 +243,6 @@ impl ProcessorNode for OggMuxerNode {
last_granule_pos = 960 * packet_count;
}

tracing::debug!(
"About to write packet #{} to OGG writer (granule: {})",
packet_count,
last_granule_pos
);
if let Err(e) = writer.write_packet(
data.to_vec(),
self.config.stream_serial,
Expand All @@ -270,7 +255,6 @@ impl ProcessorNode for OggMuxerNode {
state_helpers::emit_failed(&context.state_tx, &node_name, &err_msg);
return Err(StreamKitError::Runtime(err_msg));
}
tracing::debug!("Packet #{} written to OGG writer successfully", packet_count);

// Flush any bytes accumulated by the Ogg writer immediately to maximize streaming.
// This avoids buffering large chunks in memory and delivers data as soon as pages are ready.
Expand All @@ -288,7 +272,6 @@ impl ProcessorNode for OggMuxerNode {
};

if let Some(data) = data_to_send {
tracing::trace!("Flushing {} bytes to output", data.len());
if context
.output_sender
.send(
Expand Down Expand Up @@ -468,7 +451,7 @@ impl ProcessorNode for OggDemuxerNode {
});

// Process packets from the async reader
let mut packets_extracted = 0;
let mut packets_extracted = 0u64;
let mut last_granule_pos: Option<u64> = None;
let mut packets_at_granule_pos = 0u64;
let mut detected_frame_duration_us: Option<u64> = None;
Expand All @@ -494,6 +477,9 @@ impl ProcessorNode for OggDemuxerNode {
Ok(packet) => {
packets_extracted += 1;
stats_tracker.received();
if packets_extracted.is_multiple_of(1000) {
tracing::debug!("OggDemuxer extracted {} packets", packets_extracted);
}

// Extract granule position for timing metadata
let granule_pos = packet.absgp_page();
Expand Down Expand Up @@ -559,14 +545,6 @@ impl ProcessorNode for OggDemuxerNode {
None
};

tracing::debug!(
"Extracted Ogg packet {} with {} bytes (granule_pos: {}, metadata: {:?})",
packets_extracted,
packet.data.len(),
granule_pos,
metadata
);

// Send the packet data to the output with timing metadata
let output_packet = Packet::Binary {
data: Bytes::from(packet.data),
Expand Down Expand Up @@ -817,7 +795,6 @@ impl ProcessorNode for SymphoniaOggDemuxerNode {
let data_tx = data_tx;
while let Some(packet) = input_rx.recv().await {
if let Packet::Binary { data, .. } = packet {
tracing::debug!("Forwarding {} bytes to Symphonia reader", data.len());
if data_tx.send(data).await.is_err() {
break;
}
Expand Down
1 change: 1 addition & 0 deletions docs/astro.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export default defineConfig({
items: [
{ label: 'Creating Pipelines', slug: 'guides/creating-pipelines' },
{ label: 'Performance Tuning', slug: 'guides/performance' },
{ label: 'Load Testing', slug: 'guides/load-testing' },
{ label: 'Observability', slug: 'guides/observability' },
{ label: 'Script Node', slug: 'guides/script-node' },
{ label: 'Using the Web UI', slug: 'guides/web-ui' },
Expand Down
109 changes: 109 additions & 0 deletions docs/src/content/docs/guides/load-testing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
---
# SPDX-FileCopyrightText: © 2025 StreamKit Contributors
# SPDX-License-Identifier: MPL-2.0
title: Load Testing
description: Run targeted stress tests and capture profiles for StreamKit
---

StreamKit ships with a small load-test runner (`skit-cli loadtest`) plus a set of ready-made configs under `samples/loadtest/`.

Use these when you want to:

- Reproduce and profile a specific hotspot (codec, muxing, mixing, control plane, session lifecycle).
- Compare changes across runs with consistent inputs.
- Stress a single subsystem without the UI in the loop.

## Prerequisites

- Start the server: `just skit serve`
- Load tests use the client binary: `just skit-cli -- lt <config>`
- Some presets require a local MoQ relay at `http://localhost:4443`

## Running Presets

All configs live in `samples/loadtest/`. You can run them via a single `just` target:

- `just lt <id>` runs `samples/loadtest/<id>.toml`
- `just lt <path>` runs an explicit TOML path

Examples:

- `just lt stress-oneshot`
- `just lt oneshot-opus-transcode-fast`
- `just lt dynamic-tune-heavy --cleanup`

### Oneshot (HTTP batch pipelines)

- `just lt stress-oneshot` — default oneshot stress preset
- `just lt oneshot-http-passthrough` — multipart upload + oneshot engine overhead (minimal node CPU)
- `just lt oneshot-graph-chain` — graph wiring + channel hops (passthrough chain)
- `just lt oneshot-opus-transcode-fast` — codec-heavy (Ogg demux + Opus decode/encode), no pacer

### Dynamic (long-lived sessions)

- `just lt stress-dynamic` — default dynamic stress preset
- `just lt dynamic-scale-audio-gain` — many sessions, sustained decode, low tune rate
- `just lt dynamic-tune-heavy` — stresses control plane param updates (frequent tuning, many `audio::gain` nodes)
- `just lt dynamic-moq-fanout` — MoQ fanout (requires relay at `http://localhost:4443`)

## Capturing CPU Profiles

The easiest workflow is:

1. Run a profiling build of the server: `just skit-profiling serve`
2. Run a load test preset in another terminal (examples above)
3. Fetch profiles:
- Top view: `just profile-top 30`
- Web UI: `just profile-web 30`

`profile-*` commands require Go (`go tool pprof`).

## What Each Preset Targets

### `lt-oneshot-http-passthrough`

Targets request handling and oneshot overhead:

- Multipart parsing and streaming input
- Pipeline compilation/validation
- Graph wiring/spawn + channel plumbing

### `lt-oneshot-opus-transcode-fast`

Targets codec throughput:

- `containers::ogg::demuxer` (parsing)
- `audio::opus::{decoder,encoder}`

This intentionally runs “as fast as possible” (no pacer), so it’s useful for CPU profiling and throughput regressions.

### `lt-dynamic-tune-heavy`

Targets control-plane churn:

- Session creation churn (up to `dynamic.session_count`)
- Control WebSocket tuning rate (`dynamic.tune_interval_ms`)
- Parameter updates to many `audio::gain` nodes

### `lt-dynamic-moq-fanout`

Targets MoQ transport + data plane in dynamic sessions:

- One broadcaster session publishes to `input`
- Many subscriber sessions subscribe/transcode/publish

## Writing Your Own Config

Configs are TOML and validated by `skit-cli` before running:

- Pick a scenario: `test.scenario = "oneshot" | "dynamic" | "mixed"`
- Point to a pipeline YAML for that scenario
- Adjust `oneshot.concurrency` or `dynamic.session_count`

See `apps/skit-cli/src/load_test/config.rs` for the full schema.

## Tips

- For profiling, keep logging quiet (e.g. `RUST_LOG=warn`) to avoid measuring log formatting instead of pipeline CPU.
- For dynamic tests, use `--cleanup` when you want sessions deleted at the end: `just lt-dynamic-cleanup`.
- Prefer small input files for high-throughput profiling (e.g. `samples/audio/system/speech_2m.opus`) and larger files for sustained steady-state load.
30 changes: 24 additions & 6 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -99,25 +99,43 @@ skit-cli *args='':
skit-lt config='loadtest.toml' *args='':
@cargo run -p streamkit-client --bin skit-cli -- loadtest {{config}} {{args}}

# Alias for skit-lt
alias lt := skit-lt
# Run a load test by preset id (maps to `samples/loadtest/<id>.toml`) or by explicit path.
#
# Examples:
# - `just lt` # runs `samples/loadtest/stress-oneshot.toml` by default
# - `just lt stress-dynamic` # runs `samples/loadtest/stress-dynamic.toml`
# - `just lt dynamic-tune-heavy --cleanup`
# - `just lt samples/loadtest/ui-demo.toml`
lt preset_or_path='stress-oneshot' *args='':
@cfg=""
@if [ -f "{{preset_or_path}}" ]; then \
cfg="{{preset_or_path}}"; \
elif [ -f "samples/loadtest/{{preset_or_path}}.toml" ]; then \
cfg="samples/loadtest/{{preset_or_path}}.toml"; \
else \
echo "❌ Loadtest config not found: '{{preset_or_path}}'"; \
echo " - If passing a preset, expected: samples/loadtest/{{preset_or_path}}.toml"; \
echo " - If passing a path, ensure the file exists"; \
exit 1; \
fi; \
just skit-lt "$cfg" {{args}}

# --- Load test presets ---
# Run the standard oneshot stress test config
lt-oneshot *args='':
@just skit-lt samples/loadtest/stress-oneshot.toml {{args}}
@just lt stress-oneshot {{args}}

# Run the standard dynamic session stress test config
lt-dynamic *args='':
@just skit-lt samples/loadtest/stress-dynamic.toml {{args}}
@just lt stress-dynamic {{args}}

# Run the standard dynamic session stress test config with cleanup enabled
lt-dynamic-cleanup *args='':
@just skit-lt samples/loadtest/stress-dynamic.toml --cleanup {{args}}
@just lt stress-dynamic --cleanup {{args}}

# Run the long-running UI demo config
lt-ui-demo *args='':
@just skit-lt samples/loadtest/ui-demo.toml {{args}}
@just lt ui-demo {{args}}

# Run skit tests
# Note: We exclude dhat-heap since it's mutually exclusive with profiling (both define global allocators)
Expand Down
38 changes: 38 additions & 0 deletions samples/loadtest/dynamic-moq-fanout.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Load Test Configuration: Dynamic MoQ Fanout
# Requires a MoQ relay at http://localhost:4443.
# Creates a broadcaster session publishing to "input", then many subscriber sessions that transcode.

[server]
url = "http://127.0.0.1:4545"

[test]
duration_secs = 180
scenario = "dynamic"

[oneshot]
enabled = false
concurrency = 0
pipeline = ""
input_file = ""

[dynamic]
enabled = true
session_count = 100
tune_interval_ms = 1000
pipelines = [
"samples/loadtest/pipelines/moq_subscriber_transcode.yml",
]

[dynamic.broadcaster]
pipeline = "samples/loadtest/pipelines/moq_broadcaster.yml"
count = 1

[populate]
load_plugins = false
plugins_native = []
plugins_wasm = []

[output]
format = "text"
real_time_updates = true
update_interval_ms = 2000
33 changes: 33 additions & 0 deletions samples/loadtest/dynamic-scale-audio-gain.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Load Test Configuration: Dynamic Scale (Audio Gain + Sink)
# Stresses dynamic engine graph management and session lifecycle at high session counts.

[server]
url = "http://127.0.0.1:4545"

[test]
duration_secs = 180
scenario = "dynamic"

[oneshot]
enabled = false
concurrency = 0
pipeline = ""
input_file = ""

[dynamic]
enabled = true
session_count = 200
tune_interval_ms = 5000
pipelines = [
"samples/loadtest/pipelines/dynamic_audio_gain_chain.yml",
]

[populate]
load_plugins = false
plugins_native = []
plugins_wasm = []

[output]
format = "text"
real_time_updates = true
update_interval_ms = 2000
33 changes: 33 additions & 0 deletions samples/loadtest/dynamic-tune-heavy.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Load Test Configuration: Dynamic Tune Heavy (Many Gain Nodes)
# Stresses the control WebSocket + node param updates under sustained tuning.

[server]
url = "http://127.0.0.1:4545"

[test]
duration_secs = 180
scenario = "dynamic"

[oneshot]
enabled = false
concurrency = 0
pipeline = ""
input_file = ""

[dynamic]
enabled = true
session_count = 50
tune_interval_ms = 200
pipelines = [
"samples/loadtest/pipelines/dynamic_many_gains.yml",
]

[populate]
load_plugins = false
plugins_native = []
plugins_wasm = []

[output]
format = "text"
real_time_updates = true
update_interval_ms = 2000
Loading
Loading