Skip to content

Commit aa742af

Browse files
authored
Reduce allocs in oneshot hotpaths (#20)
* perf: reduce webm buffer allocs * perf(core,engine): reduce FramePool alloc churn under load
1 parent 9dd389a commit aa742af

File tree

5 files changed

+101
-48
lines changed

5 files changed

+101
-48
lines changed

crates/core/src/frame_pool.rs

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -111,22 +111,31 @@ impl<T> FramePool<T> {
111111
}
112112

113113
impl<T: Clone + Default> FramePool<T> {
114-
pub fn preallocated(bucket_sizes: &[usize], buffers_per_bucket: usize) -> Self {
115-
let pool = Self::with_buckets(bucket_sizes.to_vec(), buffers_per_bucket);
114+
pub fn preallocated_with_max(
115+
bucket_sizes: &[usize],
116+
preallocate_per_bucket: usize,
117+
max_per_bucket: usize,
118+
) -> Self {
119+
let pool = Self::with_buckets(bucket_sizes.to_vec(), max_per_bucket);
116120
let Ok(mut guard) = pool.inner.lock() else {
117121
return pool;
118122
};
119123

124+
let preallocate_per_bucket = preallocate_per_bucket.min(max_per_bucket);
120125
for idx in 0..guard.bucket_sizes.len() {
121126
let bucket_size = guard.bucket_sizes[idx];
122-
for _ in 0..buffers_per_bucket {
127+
for _ in 0..preallocate_per_bucket {
123128
guard.buckets[idx].push(vec![T::default(); bucket_size]);
124129
}
125130
}
126131
drop(guard);
127132
pool
128133
}
129134

135+
pub fn preallocated(bucket_sizes: &[usize], buffers_per_bucket: usize) -> Self {
136+
Self::preallocated_with_max(bucket_sizes, buffers_per_bucket, buffers_per_bucket)
137+
}
138+
130139
/// Get pooled storage for at least `min_len` elements.
131140
///
132141
/// If `min_len` doesn't fit in any bucket, returns a non-pooled buffer of exact size.
@@ -232,12 +241,13 @@ impl<T: Clone + Default> Clone for PooledFrameData<T> {
232241
if let Ok(mut guard) = inner.lock() {
233242
if let Some(bucket_idx) = guard.bucket_index_for_min_len(self.len) {
234243
let bucket_size = guard.bucket_sizes[bucket_idx];
235-
let mut data = guard
236-
.buckets
237-
.get_mut(bucket_idx)
238-
.and_then(std::vec::Vec::pop)
239-
.unwrap_or_else(|| vec![T::default(); bucket_size]);
240-
guard.hits += 1;
244+
let data = guard.buckets.get_mut(bucket_idx).and_then(std::vec::Vec::pop);
245+
if data.is_some() {
246+
guard.hits += 1;
247+
} else {
248+
guard.misses += 1;
249+
}
250+
let mut data = data.unwrap_or_else(|| vec![T::default(); bucket_size]);
241251

242252
data[..self.len].clone_from_slice(self.as_slice());
243253
return Self::from_pool(data, self.len, pool.clone(), bucket_idx);
@@ -294,10 +304,15 @@ pub type PooledSamples = PooledFrameData<f32>;
294304

295305
pub const DEFAULT_AUDIO_BUCKET_SIZES: &[usize] = &[960, 1920, 3840, 7680];
296306
pub const DEFAULT_AUDIO_BUFFERS_PER_BUCKET: usize = 32;
307+
pub const DEFAULT_AUDIO_MAX_BUFFERS_PER_BUCKET: usize = 256;
297308

298309
impl FramePool<f32> {
299310
pub fn audio_default() -> Self {
300-
Self::preallocated(DEFAULT_AUDIO_BUCKET_SIZES, DEFAULT_AUDIO_BUFFERS_PER_BUCKET)
311+
Self::preallocated_with_max(
312+
DEFAULT_AUDIO_BUCKET_SIZES,
313+
DEFAULT_AUDIO_BUFFERS_PER_BUCKET,
314+
DEFAULT_AUDIO_MAX_BUFFERS_PER_BUCKET,
315+
)
301316
}
302317
}
303318

crates/engine/src/lib.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ use dynamic_actor::DynamicEngine;
5252
/// It can be used to run stateless pipelines or to start the long-running dynamic actor.
5353
pub struct Engine {
5454
pub registry: Arc<RwLock<NodeRegistry>>,
55+
pub audio_pool: Arc<streamkit_core::AudioFramePool>,
5556
}
5657
impl Default for Engine {
5758
fn default() -> Self {
@@ -152,7 +153,10 @@ impl Engine {
152153
Self::load_plugins(&mut registry, plugin_dir);
153154
}
154155

155-
Self { registry: Arc::new(RwLock::new(registry)) }
156+
Self {
157+
registry: Arc::new(RwLock::new(registry)),
158+
audio_pool: Arc::new(streamkit_core::AudioFramePool::audio_default()),
159+
}
156160
}
157161

158162
#[cfg(feature = "plugins")]
@@ -233,7 +237,7 @@ impl Engine {
233237
node_pin_metadata: HashMap::new(),
234238
batch_size: config.packet_batch_size,
235239
session_id: config.session_id,
236-
audio_pool: Arc::new(streamkit_core::FramePool::<f32>::audio_default()),
240+
audio_pool: self.audio_pool.clone(),
237241
node_input_capacity,
238242
pin_distributor_capacity,
239243
node_states: HashMap::new(),

crates/engine/src/oneshot.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -274,8 +274,8 @@ impl Engine {
274274
let node_kinds: HashMap<String, String> =
275275
definition.nodes.iter().map(|(name, def)| (name.clone(), def.kind.clone())).collect();
276276

277-
// Per-pipeline audio buffer pool for hot paths (e.g., Opus decode).
278-
let audio_pool = std::sync::Arc::new(streamkit_core::FramePool::<f32>::audio_default());
277+
// Shared audio buffer pool for hot paths (e.g., Opus decode).
278+
let audio_pool = self.audio_pool.clone();
279279

280280
// Oneshot pipelines don't track state, so pass None for state_tx
281281
let live_nodes = graph_builder::wire_and_spawn_graph(

crates/engine/src/tests/dynamic_initialize.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,10 @@ async fn test_dynamic_engine_calls_initialize() {
6464
false,
6565
);
6666

67-
let engine = Engine { registry: Arc::new(std::sync::RwLock::new(registry)) };
67+
let engine = Engine {
68+
registry: Arc::new(std::sync::RwLock::new(registry)),
69+
audio_pool: Arc::new(streamkit_core::AudioFramePool::audio_default()),
70+
};
6871
let handle = engine.start_dynamic_actor(DynamicEngineConfig::default());
6972

7073
if let Err(e) = handle

crates/nodes/src/containers/webm.rs

Lines changed: 64 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,14 @@ fn opus_head_codec_private(sample_rate: u32, channels: u32) -> Result<[u8; 19],
6060
/// A shared, thread-safe buffer that wraps a Cursor for WebM writing.
6161
/// This allows us to stream out data as it's written while still supporting Seek.
6262
///
63-
/// Uses a sliding window approach to prevent unbounded memory growth:
64-
/// - Keeps a configurable window of recent data for WebM library seeks
65-
/// - Discards old data beyond the window that's been sent
66-
/// - Tracks a base_offset for proper position calculations after discarding
63+
/// Supports two buffering modes:
64+
///
65+
/// - **Streaming (non-seek)**: Bytes are drained on every `take_data()` call.
66+
/// This mode is intended for `Writer::new_non_seek` and avoids copying.
67+
/// - **Seek window**: Keeps a configurable window of recent data for WebM library seeks
68+
/// and trims old data that has already been sent.
69+
///
70+
/// The node selects the appropriate mode based on `WebMStreamingMode`.
6771
#[derive(Clone)]
6872
struct SharedPacketBuffer {
6973
cursor: Arc<Mutex<Cursor<Vec<u8>>>>,
@@ -84,9 +88,14 @@ impl SharedPacketBuffer {
8488
}
8589
}
8690

87-
fn new() -> Self {
88-
// Default 1MB window (enough for ~6 seconds of 128kbps audio)
89-
Self::new_with_window(1024 * 1024)
91+
/// Create a non-seek streaming buffer.
92+
///
93+
/// This is designed for `Writer::new_non_seek` in live streaming mode. Since the writer
94+
/// does not seek/backpatch, we can drain bytes out by moving the underlying `Vec<u8>`
95+
/// (no copy) and reset the cursor to keep memory bounded.
96+
fn new_streaming() -> Self {
97+
// window_size=0 is treated as "drain everything on take_data"
98+
Self::new_with_window(0)
9099
}
91100

92101
/// Takes any new data written since the last call, and trims old data beyond the window.
@@ -108,30 +117,52 @@ impl SharedPacketBuffer {
108117
let base = *base_offset_guard;
109118

110119
let result = if current_len > last_sent {
111-
// Copy only the new data since last send
112-
let new_data = Bytes::copy_from_slice(&vec[last_sent..current_len]);
113-
*last_sent_guard = current_len;
114-
115-
// Trim old data if buffer exceeds window size
116-
if current_len > self.window_size {
117-
let trim_amount = current_len - self.window_size;
118-
// Keep the last window_size bytes
119-
let remaining = vec.split_off(trim_amount);
120-
*vec = remaining;
121-
// Update base offset to reflect discarded data
122-
*base_offset_guard = base + trim_amount;
123-
// Adjust last_sent and cursor position
124-
*last_sent_guard = self.window_size;
125-
buffer_guard.set_position(self.window_size as u64);
126-
127-
tracing::debug!(
128-
"Trimmed {} bytes from WebM buffer, new base_offset: {}",
129-
trim_amount,
130-
*base_offset_guard
131-
);
132-
}
120+
if self.window_size == 0 {
121+
// Streaming mode (non-seek): drain everything written so far without copying.
122+
//
123+
// This avoids two major sources of allocation churn in DHAT profiles:
124+
// - copying out incremental slices on every flush
125+
// - repeatedly trimming a sliding window with `split_off` (copies the window)
126+
let data_vec = std::mem::take(vec);
127+
// Advance base_offset so Seek::Start can clamp consistently if it ever happens.
128+
*base_offset_guard = base + current_len;
129+
*last_sent_guard = 0;
130+
buffer_guard.set_position(0);
131+
Some(Bytes::from(data_vec))
132+
} else if self.window_size == usize::MAX && last_sent == 0 {
133+
// File mode: nothing has been sent yet, so move the entire buffer out.
134+
// The segment is finalized before this is called, so no more writes/seeks occur.
135+
let data_vec = std::mem::take(vec);
136+
*base_offset_guard = base + current_len;
137+
*last_sent_guard = 0;
138+
buffer_guard.set_position(0);
139+
Some(Bytes::from(data_vec))
140+
} else {
141+
// Seek-window mode: copy incremental bytes while retaining a backwards-seek window.
142+
let new_data = Bytes::copy_from_slice(&vec[last_sent..current_len]);
143+
*last_sent_guard = current_len;
144+
145+
// Trim old data if buffer exceeds window size.
146+
if current_len > self.window_size {
147+
let trim_amount = current_len - self.window_size;
148+
// Keep the last window_size bytes.
149+
let remaining = vec.split_off(trim_amount);
150+
*vec = remaining;
151+
// Update base offset to reflect discarded data.
152+
*base_offset_guard = base + trim_amount;
153+
// Adjust last_sent and cursor position.
154+
*last_sent_guard = self.window_size;
155+
buffer_guard.set_position(self.window_size as u64);
156+
157+
tracing::debug!(
158+
"Trimmed {} bytes from WebM buffer, new base_offset: {}",
159+
trim_amount,
160+
*base_offset_guard
161+
);
162+
}
133163

134-
Some(new_data)
164+
Some(new_data)
165+
}
135166
} else {
136167
None
137168
};
@@ -286,11 +317,11 @@ impl ProcessorNode for WebMMuxerNode {
286317
// Stats tracking
287318
let mut stats_tracker = NodeStatsTracker::new(node_name.clone(), context.stats_tx.clone());
288319

289-
// In Live mode we only need a small sliding window to support any internal backtracking
290-
// while continuously streaming bytes out; in File mode we must keep the whole buffer
320+
// In Live mode we use a non-seek writer, so we can drain bytes out without keeping
321+
// any history (zero-copy streaming). In File mode we must keep the whole buffer
291322
// because we only emit bytes once the segment is finalized.
292323
let shared_buffer = match self.config.streaming_mode {
293-
WebMStreamingMode::Live => SharedPacketBuffer::new(),
324+
WebMStreamingMode::Live => SharedPacketBuffer::new_streaming(),
294325
WebMStreamingMode::File => SharedPacketBuffer::new_with_window(usize::MAX),
295326
};
296327

0 commit comments

Comments
 (0)