From 664e60f3de1ff5cf5c2ea559df61f41a2978b083 Mon Sep 17 00:00:00 2001 From: Michael Wilson Date: Tue, 10 Feb 2026 14:48:38 -0500 Subject: [PATCH 1/3] Buffer songs. Songs now buffer their data using Rayon in order to make the mixing thrad more performant. --- CHANGELOG.md | 1 + Cargo.lock | 132 +++++++++-- Cargo.toml | 1 + src/audio.rs | 2 + src/audio/context.rs | 52 +++++ src/audio/cpal.rs | 310 +++++++++++++++++++++++-- src/audio/mixer.rs | 5 +- src/audio/sample_source.rs | 2 + src/audio/sample_source/buffered.rs | 347 ++++++++++++++++++++++++++++ src/audio/sample_source/tests.rs | 34 ++- src/config.rs | 2 +- src/config/audio.rs | 36 +++ src/main.rs | 29 +++ src/songs.rs | 20 +- 14 files changed, 934 insertions(+), 39 deletions(-) create mode 100644 src/audio/context.rs create mode 100644 src/audio/sample_source/buffered.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 7f5b670..4617ae9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,7 @@ The audio engine has been refactored for lower latency and stability: - Inline cleanup of finished sources during mixing (simpler, no separate cleanup pass) - Bounded source channel (capacity 64) to prevent unbounded memory growth - Precomputed channel mappings at sample load time (no allocations during trigger) +- Song playback is buffered to reduce buffer underrun. ### Changed diff --git a/Cargo.lock b/Cargo.lock index 022f767..f42afd3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1171,6 +1171,7 @@ dependencies = [ "symphonia", "tempfile", "thiserror 2.0.17", + "thread-priority", "tokio", "tonic", "tonic-build", @@ -1464,7 +1465,7 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "windows-link", + "windows-link 0.2.1", ] [[package]] @@ -2403,6 +2404,20 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "thread-priority" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2210811179577da3d54eb69ab0b50490ee40491a25d95b8c6011ba40771cb721" +dependencies = [ + "bitflags 2.9.4", + "cfg-if", + "libc", + "log", + "rustversion", + "windows 0.61.3", +] + [[package]] name = "thread_local" version = "1.1.9" @@ -2911,16 +2926,38 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows" +version = "0.61.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9babd3a767a4c1aef6900409f85f5d53ce2544ccdfaa86dad48c91782c6d6893" +dependencies = [ + "windows-collections 0.2.0", + "windows-core 0.61.2", + "windows-future 0.2.1", + "windows-link 0.1.3", + "windows-numerics 0.2.0", +] + [[package]] name = "windows" version = "0.62.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "527fadee13e0c05939a6a05d5bd6eec6cd2e3dbd648b9f8e447c6518133d8580" dependencies = [ - "windows-collections", + "windows-collections 0.3.2", "windows-core 0.62.2", - "windows-future", - "windows-numerics", + "windows-future 0.3.2", + "windows-numerics 0.3.1", +] + +[[package]] +name = "windows-collections" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3beeceb5e5cfd9eb1d76b381630e82c4241ccd0d27f1a39ed41b2760b255c5e8" +dependencies = [ + "windows-core 0.61.2", ] [[package]] @@ -2944,6 +2981,19 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-core" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3" +dependencies = [ + "windows-implement 0.60.2", + "windows-interface 0.59.3", + "windows-link 0.1.3", + "windows-result 0.3.4", + "windows-strings 0.4.2", +] + [[package]] name = "windows-core" version = "0.62.2" @@ -2952,9 +3002,20 @@ checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb" dependencies = [ "windows-implement 0.60.2", "windows-interface 0.59.3", - "windows-link", + "windows-link 0.2.1", "windows-result 0.4.1", - "windows-strings", + "windows-strings 0.5.1", +] + +[[package]] +name = "windows-future" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc6a41e98427b19fe4b73c550f060b59fa592d7d686537eebf9385621bfbad8e" +dependencies = [ + "windows-core 0.61.2", + "windows-link 0.1.3", + "windows-threading 0.1.0", ] [[package]] @@ -2964,8 +3025,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1d6f90251fe18a279739e78025bd6ddc52a7e22f921070ccdc67dde84c605cb" dependencies = [ "windows-core 0.62.2", - "windows-link", - "windows-threading", + "windows-link 0.2.1", + "windows-threading 0.2.1", ] [[package]] @@ -3012,12 +3073,28 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "windows-link" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" + [[package]] name = "windows-link" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" +[[package]] +name = "windows-numerics" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9150af68066c4c5c07ddc0ce30421554771e528bde427614c61038bc2c92c2b1" +dependencies = [ + "windows-core 0.61.2", + "windows-link 0.1.3", +] + [[package]] name = "windows-numerics" version = "0.3.1" @@ -3025,7 +3102,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e2e40844ac143cdb44aead537bbf727de9b044e107a0f1220392177d15b0f26" dependencies = [ "windows-core 0.62.2", - "windows-link", + "windows-link 0.2.1", ] [[package]] @@ -3037,13 +3114,31 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-result" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6" +dependencies = [ + "windows-link 0.1.3", +] + [[package]] name = "windows-result" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5" dependencies = [ - "windows-link", + "windows-link 0.2.1", +] + +[[package]] +name = "windows-strings" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57" +dependencies = [ + "windows-link 0.1.3", ] [[package]] @@ -3052,7 +3147,7 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091" dependencies = [ - "windows-link", + "windows-link 0.2.1", ] [[package]] @@ -3097,7 +3192,7 @@ version = "0.61.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc" dependencies = [ - "windows-link", + "windows-link 0.2.1", ] [[package]] @@ -3137,7 +3232,7 @@ version = "0.53.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3" dependencies = [ - "windows-link", + "windows-link 0.2.1", "windows_aarch64_gnullvm 0.53.1", "windows_aarch64_msvc 0.53.1", "windows_i686_gnu 0.53.1", @@ -3148,13 +3243,22 @@ dependencies = [ "windows_x86_64_msvc 0.53.1", ] +[[package]] +name = "windows-threading" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b66463ad2e0ea3bbf808b7f1d371311c80e115c0b71d60efc142cafbcfb057a6" +dependencies = [ + "windows-link 0.1.3", +] + [[package]] name = "windows-threading" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3949bd5b99cafdf1c7ca86b43ca564028dfe27d66958f2470940f73d86d75b37" dependencies = [ - "windows-link", + "windows-link 0.2.1", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index f99a3c9..599fd08 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,6 +52,7 @@ tonic-reflection = "0.12.3" tracing = "0.1.41" tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } crossbeam-channel = "0.5.15" +thread-priority = "3.0" pest = "2.7" pest_derive = "2.7" diff --git a/src/audio.rs b/src/audio.rs index 874a966..66bd5a5 100644 --- a/src/audio.rs +++ b/src/audio.rs @@ -21,6 +21,7 @@ use crate::songs::Song; use std::collections::HashMap; use std::sync::Barrier; +pub mod context; pub mod cpal; pub mod format; pub mod mixer; @@ -28,6 +29,7 @@ pub mod mock; pub mod sample_source; // Re-export the format types for backward compatibility +pub use context::PlaybackContext; pub use format::{SampleFormat, TargetFormat}; /// Global source ID counter shared by song playback and sample triggers so IDs are unique. diff --git a/src/audio/context.rs b/src/audio/context.rs new file mode 100644 index 0000000..1296299 --- /dev/null +++ b/src/audio/context.rs @@ -0,0 +1,52 @@ +// Copyright (C) 2026 Michael Wilson +// +// This program is free software: you can redistribute it and/or modify it under +// the terms of the GNU General Public License as published by the Free Software +// Foundation, version 3. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +// FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with +// this program. If not, see . +// +// Shared context for audio playback (song and sample sources). Carries +// format, buffer sizes, and shared pools so call sites don't thread many +// separate arguments. +// + +use std::sync::Arc; + +use crate::audio::format::TargetFormat; +use crate::audio::sample_source::BufferFillPool; + +/// Context passed into playback and source-creation paths so they can +/// obtain target format, buffer size, and shared resources (e.g. buffer +/// fill pool) without many separate parameters. +#[derive(Clone)] +pub struct PlaybackContext { + /// Target sample rate, format, and bit depth for output. + pub target_format: TargetFormat, + /// Device buffer size in frames (used for BufferedSampleSource capacity + /// and for file decode buffer size). + pub buffer_size: usize, + /// Shared pool for prefilling BufferedSampleSource. If None, sources + /// are not wrapped in BufferedSampleSource. + pub buffer_fill_pool: Option>, +} + +impl PlaybackContext { + /// Builds a context from the given format, buffer size, and optional pool. + pub fn new( + target_format: TargetFormat, + buffer_size: usize, + buffer_fill_pool: Option>, + ) -> Self { + Self { + target_format, + buffer_size, + buffer_fill_pool, + } + } +} diff --git a/src/audio/cpal.rs b/src/audio/cpal.rs index 3413510..3d8d3aa 100644 --- a/src/audio/cpal.rs +++ b/src/audio/cpal.rs @@ -20,21 +20,72 @@ use std::{ Arc, Condvar, Mutex, }, thread, - time::Duration, + time::{Duration, Instant}, }; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; +use thread_priority::{set_current_thread_priority, ThreadPriority, ThreadPriorityValue}; use tracing::{error, info, span, Level}; +/// Default priority for the audio callback thread when MTRACK_THREAD_PRIORITY is unset. +const DEFAULT_CALLBACK_THREAD_PRIORITY: u8 = 70; + +/// Reads MTRACK_THREAD_PRIORITY (0-99) once; used when building the callback so we don't touch env in the hot path. +fn callback_thread_priority() -> ThreadPriorityValue { + std::env::var("MTRACK_THREAD_PRIORITY") + .ok() + .and_then(|v| { + let n = v.parse::().ok()?; + (n < 100).then(|| ThreadPriorityValue::try_from(n).ok())? + }) + .unwrap_or_else(|| ThreadPriorityValue::try_from(DEFAULT_CALLBACK_THREAD_PRIORITY).unwrap()) +} + use crate::audio::mixer::{ActiveSource as MixerActiveSource, AudioMixer}; use crate::{ audio::{Device as AudioDevice, SampleFormat, TargetFormat}, config, + config::StreamBufferSize, playsync::CancelHandle, songs::Song, }; use std::sync::Barrier; +/// Returns the minimum supported output buffer size (frames) for the device and format, if known. +fn min_supported_buffer_size( + device: &cpal::Device, + target_format: &TargetFormat, + channels: u16, +) -> Option { + use cpal::SupportedBufferSize; + let rate = target_format.sample_rate; + let want_cpal_format = match (target_format.sample_format, target_format.bits_per_sample) { + (SampleFormat::Float, _) => cpal::SampleFormat::F32, + (SampleFormat::Int, 16) => cpal::SampleFormat::I16, + (SampleFormat::Int, 32) => cpal::SampleFormat::I32, + _ => cpal::SampleFormat::I32, + }; + let configs = device.supported_output_configs().ok()?; + let mut best_min = None::; + for range in configs { + if range.channels() != channels { + continue; + } + if range.sample_format() != want_cpal_format { + continue; + } + let (min_r, max_r) = (range.min_sample_rate(), range.max_sample_rate()); + if rate < min_r || rate > max_r { + continue; + } + if let SupportedBufferSize::Range { min, max: _ } = range.buffer_size() { + let m = *min; + best_min = Some(best_min.map_or(m, |b| b.min(m))); + } + } + best_min +} + /// A small wrapper around a cpal::Device. Used for storing some extra /// data that makes multitrack playing more convenient. pub struct Device { @@ -87,7 +138,34 @@ fn create_direct_f32_callback( source_rx: crossbeam_channel::Receiver, num_channels: u16, ) -> impl FnMut(&mut [f32], &cpal::OutputCallbackInfo) + Send + 'static { + let callback_priority = callback_thread_priority(); + let profile_audio = std::env::var("MTRACK_PROFILE_AUDIO") + .ok() + .map(|v| { + v == "1" + || v.eq_ignore_ascii_case("true") + || v.eq_ignore_ascii_case("yes") + || v.eq_ignore_ascii_case("on") + }) + .unwrap_or(false); + + // Simple in-callback stats: average/max mix time and avg/max callback interval + // (jitter), logged about once per second. + let mut prof_last_log = Instant::now(); + let mut prof_count: u64 = 0; + let mut prof_sum_mix_us: u128 = 0; + let mut prof_max_mix_us: u64 = 0; + let mut prof_last_cb: Option = None; + let mut prof_sum_gap_us: u128 = 0; + let mut prof_gap_count: u64 = 0; + let mut prof_max_gap_us: u64 = 0; + let mut priority_set = false; + move |data: &mut [f32], _: &cpal::OutputCallbackInfo| { + if !priority_set { + let _ = set_current_thread_priority(ThreadPriority::Crossplatform(callback_priority)); + priority_set = true; + } // Process any pending new sources (non-blocking) while let Ok(new_source) = source_rx.try_recv() { mixer.add_source(new_source); @@ -95,7 +173,60 @@ fn create_direct_f32_callback( // Mix directly into the output buffer (cleanup happens inline) let num_frames = data.len() / num_channels as usize; + if profile_audio { + let now = Instant::now(); + if let Some(last) = prof_last_cb { + let gap_us = now.duration_since(last).as_micros() as u64; + prof_sum_gap_us += gap_us as u128; + prof_gap_count += 1; + if gap_us > prof_max_gap_us { + prof_max_gap_us = gap_us; + } + } + prof_last_cb = Some(now); + } + let start = if profile_audio { + Some(Instant::now()) + } else { + None + }; mixer.process_into_output(data, num_frames); + if profile_audio { + let start = start.unwrap(); + let mix_us = start.elapsed().as_micros() as u64; + prof_count += 1; + prof_sum_mix_us += mix_us as u128; + if mix_us > prof_max_mix_us { + prof_max_mix_us = mix_us; + } + if prof_last_log.elapsed().as_secs_f32() >= 1.0 { + let avg = if prof_count > 0 { + (prof_sum_mix_us / prof_count as u128) as u64 + } else { + 0 + }; + let cb_avg_gap_us = if prof_gap_count > 0 { + (prof_sum_gap_us / prof_gap_count as u128) as u64 + } else { + 0 + }; + info!( + mix_avg_us = avg, + mix_max_us = prof_max_mix_us, + cb_avg_gap_us, + cb_max_gap_us = prof_max_gap_us, + callbacks = prof_count, + "audio profile: mix (float)" + ); + prof_last_log = Instant::now(); + prof_count = 0; + prof_sum_mix_us = 0; + prof_max_mix_us = 0; + prof_sum_gap_us = 0; + prof_gap_count = 0; + prof_max_gap_us = 0; + } + } } } @@ -112,25 +243,134 @@ where f32: cpal::FromSample, { let mut temp_buffer = vec![0.0f32; max_samples]; + let callback_priority = callback_thread_priority(); + + let profile_audio = std::env::var("MTRACK_PROFILE_AUDIO") + .ok() + .map(|v| { + v == "1" + || v.eq_ignore_ascii_case("true") + || v.eq_ignore_ascii_case("yes") + || v.eq_ignore_ascii_case("on") + }) + .unwrap_or(false); + + // Simple in-callback stats: average/max mix/convert times and avg/max callback + // interval (jitter), logged about once per second. + let mut prof_last_log = Instant::now(); + let mut prof_count: u64 = 0; + let mut prof_sum_mix_us: u128 = 0; + let mut prof_max_mix_us: u64 = 0; + let mut prof_sum_convert_us: u128 = 0; + let mut prof_max_convert_us: u64 = 0; + let mut prof_last_cb: Option = None; + let mut prof_sum_gap_us: u128 = 0; + let mut prof_gap_count: u64 = 0; + let mut prof_max_gap_us: u64 = 0; + let mut priority_set = false; move |data: &mut [T], _: &cpal::OutputCallbackInfo| { + if !priority_set { + let _ = set_current_thread_priority(ThreadPriority::Crossplatform(callback_priority)); + priority_set = true; + } // Process any pending new sources (non-blocking) while let Ok(new_source) = source_rx.try_recv() { mixer.add_source(new_source); } - // Pre-allocated for typical period size; resize only if backend gives a larger buffer (rare) - if temp_buffer.len() < data.len() { - temp_buffer.resize(data.len(), 0.0); + // Never allocate in the callback: clamp to pre-allocated size. If the backend + // ever sends a larger buffer, we mix only the first max_samples and zero the rest. + let n = std::cmp::min(data.len(), temp_buffer.len()); + let temp_slice = &mut temp_buffer[..n]; + let num_frames = n / num_channels as usize; + if profile_audio { + let now = Instant::now(); + if let Some(last) = prof_last_cb { + let gap_us = now.duration_since(last).as_micros() as u64; + prof_sum_gap_us += gap_us as u128; + prof_gap_count += 1; + if gap_us > prof_max_gap_us { + prof_max_gap_us = gap_us; + } + } + prof_last_cb = Some(now); } - let temp_slice = &mut temp_buffer[..data.len()]; - let num_frames = data.len() / num_channels as usize; + let start_mix = if profile_audio { + Some(Instant::now()) + } else { + None + }; mixer.process_into_output(temp_slice, num_frames); - - // Convert to output format - for (out, &sample) in data.iter_mut().zip(temp_slice.iter()) { + let mix_us = if profile_audio { + start_mix.unwrap().elapsed().as_micros() as u64 + } else { + 0 + }; + let start_convert = if profile_audio { + Some(Instant::now()) + } else { + None + }; + let zero = T::from_sample(0.0); + for (out, &sample) in data[..n].iter_mut().zip(temp_slice.iter()) { *out = T::from_sample(sample); } + if n < data.len() { + data[n..].fill(zero); + } + let convert_us = if profile_audio { + start_convert.unwrap().elapsed().as_micros() as u64 + } else { + 0 + }; + if profile_audio { + prof_count += 1; + prof_sum_mix_us += mix_us as u128; + prof_sum_convert_us += convert_us as u128; + if mix_us > prof_max_mix_us { + prof_max_mix_us = mix_us; + } + if convert_us > prof_max_convert_us { + prof_max_convert_us = convert_us; + } + if prof_last_log.elapsed().as_secs_f32() >= 1.0 { + let avg_mix = if prof_count > 0 { + (prof_sum_mix_us / prof_count as u128) as u64 + } else { + 0 + }; + let avg_convert = if prof_count > 0 { + (prof_sum_convert_us / prof_count as u128) as u64 + } else { + 0 + }; + let cb_avg_gap_us = if prof_gap_count > 0 { + (prof_sum_gap_us / prof_gap_count as u128) as u64 + } else { + 0 + }; + info!( + mix_avg_us = avg_mix, + mix_max_us = prof_max_mix_us, + convert_avg_us = avg_convert, + convert_max_us = prof_max_convert_us, + cb_avg_gap_us, + cb_max_gap_us = prof_max_gap_us, + callbacks = prof_count, + "audio profile: mix/convert (int)" + ); + prof_last_log = Instant::now(); + prof_count = 0; + prof_sum_mix_us = 0; + prof_max_mix_us = 0; + prof_sum_convert_us = 0; + prof_max_convert_us = 0; + prof_sum_gap_us = 0; + prof_gap_count = 0; + prof_max_gap_us = 0; + } + } } } @@ -459,11 +699,32 @@ impl Device { let mut output_manager = OutputManager::new(device.max_channels, device.target_format.sample_rate)?; - // Start the output thread with configured buffer size + // Resolve stream buffer size for CPAL (default / min / fixed) + let output_buffer_size = match config.stream_buffer_size() { + None => Some(config.buffer_size() as u32), + Some(StreamBufferSize::Default) => None, + Some(StreamBufferSize::Min) => { + let min_size = min_supported_buffer_size( + &device.device, + &device.target_format, + device.max_channels, + ); + if let Some(s) = min_size { + info!( + stream_buffer_size = s, + "Using minimum supported stream buffer size (low latency)" + ); + } + min_size.or_else(|| Some(config.buffer_size() as u32)) + } + Some(StreamBufferSize::Fixed(n)) => Some(n as u32), + }; + + // Start the output thread with resolved buffer size output_manager.start_output_thread( device.device.clone(), device.target_format.clone(), - Some(config.buffer_size() as u32), + output_buffer_size, )?; device.output_manager = Arc::new(output_manager); @@ -528,13 +789,30 @@ impl AudioDevice for Device { spin_sleep::sleep(self.playback_delay); - // Create channel mapped sources for each track in the song, starting from start_time - let channel_mapped_sources = song.create_channel_mapped_sources_from( - start_time, - mappings, + // Build playback context (format, buffer size, shared pool) for source creation. + let buffer_threads = self.audio_config.buffer_threads(); + let buffer_fill_pool = + match crate::audio::sample_source::BufferFillPool::new(buffer_threads) { + Ok(pool) => Some(Arc::new(pool)), + Err(e) => { + error!( + error = %e, + threads = buffer_threads, + "Failed to create BufferFillPool, falling back to unbuffered song sources" + ); + None + } + }; + + let playback_context = crate::audio::PlaybackContext::new( self.target_format.clone(), self.audio_config.buffer_size(), - )?; + buffer_fill_pool, + ); + + // Create channel mapped sources for each track in the song, starting from start_time. + let channel_mapped_sources = + song.create_channel_mapped_sources_from(&playback_context, start_time, mappings)?; // Add all sources to the output manager if channel_mapped_sources.is_empty() { diff --git a/src/audio/mixer.rs b/src/audio/mixer.rs index c7152dd..89bc745 100644 --- a/src/audio/mixer.rs +++ b/src/audio/mixer.rs @@ -23,9 +23,10 @@ use std::sync::{Arc, Mutex, RwLock}; use std::time::Instant; use tracing::debug; -// Thread-local scratch for process_into_output so the audio callback never allocates. +// Thread-local scratch for process_into_output. Size 128 avoids resize in the callback +// for typical multichannel sources; >128 channels may still trigger a one-time resize. thread_local! { - static SOURCE_FRAME_SCRATCH: RefCell> = RefCell::new(vec![0.0; 64]); + static SOURCE_FRAME_SCRATCH: RefCell> = RefCell::new(vec![0.0; 128]); } /// Core audio mixing logic that's independent of any audio backend diff --git a/src/audio/sample_source.rs b/src/audio/sample_source.rs index 6cd9b79..4bdcff0 100644 --- a/src/audio/sample_source.rs +++ b/src/audio/sample_source.rs @@ -12,6 +12,7 @@ // this program. If not, see . // pub mod audio; +pub mod buffered; pub mod channel_mapped; pub mod error; pub mod factory; @@ -23,6 +24,7 @@ pub mod transcoder; mod tests; // Re-exports for use by other modules +pub use buffered::{BufferFillPool, BufferedSampleSource}; pub use channel_mapped::create_channel_mapped_sample_source; pub use channel_mapped::ChannelMappedSource; pub use factory::create_sample_source_from_file; diff --git a/src/audio/sample_source/buffered.rs b/src/audio/sample_source/buffered.rs new file mode 100644 index 0000000..dbc4daa --- /dev/null +++ b/src/audio/sample_source/buffered.rs @@ -0,0 +1,347 @@ +// Copyright (C) 2026 Michael Wilson +// +// This program is free software: you can redistribute it and/or modify it under +// the terms of the GNU General Public License as published by the Free Software +// Foundation, version 3. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +// FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with +// this program. If not, see . +// +// Buffered ChannelMappedSampleSource used for song playback. Prefetches audio +// on a shared Rayon thread pool into a ring buffer so the real‑time audio +// callback does no decoding/resampling work and never allocates. +// + +use std::cmp; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Condvar, Mutex, +}; + +use rayon::ThreadPoolBuilder; + +use crate::audio::sample_source::error::SampleSourceError; +use crate::audio::sample_source::traits::ChannelMappedSampleSource; + +/// Shared pool used by BufferedSampleSource instances to prefill and refill +/// their internal buffers. Backed by a dedicated Rayon thread pool. +pub struct BufferFillPool { + pool: rayon::ThreadPool, +} + +impl BufferFillPool { + /// Creates a new pool with the given number of worker threads. + pub fn new(num_threads: usize) -> Result { + let threads = num_threads.max(1); + let pool = ThreadPoolBuilder::new() + .num_threads(threads) + .thread_name(|i| format!("mtrack-buffer-fill-{i}")) + .build() + .map_err(|e| e.to_string())?; + Ok(Self { pool }) + } + + /// Spawns a one‑shot job on the pool. + pub fn spawn(&self, job: F) + where + F: FnOnce() + Send + 'static, + { + self.pool.spawn(job); + } +} + +struct BufferState { + /// Interleaved frames: [frame0_ch0, frame0_ch1, ..., frameN_chC]. + data: Vec, + /// Next frame index to read. + read_index: usize, + /// Next frame index to write. + write_index: usize, + /// Number of valid frames currently buffered. + len_frames: usize, + /// True when the inner source has been fully consumed (EOF or error). + finished: bool, + /// True while a refill job is running for this buffer. + refill_in_progress: bool, +} + +struct BufferInner { + state: Mutex, + condvar: Condvar, +} + +/// Buffered wrapper for any ChannelMappedSampleSource used for song playback. +/// The audio callback only reads from the ring buffer; all heavy work runs on +/// the BufferFillPool workers. +pub struct BufferedSampleSource { + inner: Arc>>, + buffer: Arc, + pool: Arc, + channels: u16, + capacity_frames: usize, + refill_threshold_frames: usize, + warmup_min_frames: usize, + channel_mappings: Vec>, + finished_flag: Arc, +} + +impl BufferedSampleSource { + /// Creates a new buffered wrapper around an existing ChannelMappedSampleSource. + /// + /// - `device_buffer_frames`: current audio device buffer size in frames. + /// - Buffer capacity is 4x `device_buffer_frames`. + /// - Warmup waits for at least `device_buffer_frames` frames before returning. + pub fn new( + inner: Box, + pool: Arc, + device_buffer_frames: usize, + ) -> Self { + let channels = inner.source_channel_count() as usize; + let capacity_frames = cmp::max(device_buffer_frames * 4, device_buffer_frames.max(1)); + let warmup_min_frames = device_buffer_frames.max(1); + let refill_threshold_frames = capacity_frames / 2; + + let channel_mappings = inner.channel_mappings().clone(); + + let buffer_state = BufferState { + data: vec![0.0; capacity_frames * channels], + read_index: 0, + write_index: 0, + len_frames: 0, + finished: false, + refill_in_progress: false, + }; + + let buffer = Arc::new(BufferInner { + state: Mutex::new(buffer_state), + condvar: Condvar::new(), + }); + + let inner = Arc::new(Mutex::new(inner)); + let finished_flag = Arc::new(AtomicBool::new(false)); + + let this = Self { + inner: inner.clone(), + buffer: buffer.clone(), + pool: pool.clone(), + channels: channels as u16, + capacity_frames, + refill_threshold_frames, + warmup_min_frames, + channel_mappings, + finished_flag: finished_flag.clone(), + }; + + // Kick off initial warmup fill. + Self::spawn_fill_task( + pool, + inner, + buffer.clone(), + finished_flag, + channels, + capacity_frames, + capacity_frames, + warmup_min_frames, + ); + + // Block until at least one device buffer worth of frames is ready or the + // source finishes/errs. This runs on a non‑realtime thread (song setup). + { + let mut state = buffer.state.lock().unwrap(); + while !state.finished && state.len_frames < warmup_min_frames { + state = buffer.condvar.wait(state).unwrap(); + } + } + + this + } + + fn spawn_refill_if_needed(&self) { + let mut should_spawn = false; + { + let mut state = self.buffer.state.lock().unwrap(); + if !state.finished + && !state.refill_in_progress + && state.len_frames <= self.refill_threshold_frames + { + state.refill_in_progress = true; + should_spawn = true; + } + } + + if should_spawn { + Self::spawn_fill_task( + self.pool.clone(), + self.inner.clone(), + self.buffer.clone(), + self.finished_flag.clone(), + self.channels as usize, + self.capacity_frames, + self.capacity_frames, + self.warmup_min_frames, + ); + } + } + + #[allow(clippy::too_many_arguments)] + fn spawn_fill_task( + pool: Arc, + inner: Arc>>, + buffer: Arc, + finished_flag: Arc, + channels: usize, + capacity_frames: usize, + max_batch_frames: usize, + warmup_min_frames: usize, + ) { + pool.spawn(move || { + let mut local_frame = vec![0.0f32; channels]; + + loop { + // Early exit if buffer is full or finished. + { + let state = buffer.state.lock().unwrap(); + if state.finished || state.len_frames >= capacity_frames { + break; + } + } + + // How many frames should we try to fill in this batch? + let frames_to_fill = { + let state = buffer.state.lock().unwrap(); + let available = capacity_frames.saturating_sub(state.len_frames); + if available == 0 { + 0 + } else { + cmp::min(max_batch_frames, available) + } + }; + + if frames_to_fill == 0 { + break; + } + + for _ in 0..frames_to_fill { + // Pull next frame from inner source (no locks held on buffer). + let done = { + let mut inner_guard = inner.lock().unwrap(); + match inner_guard.next_frame(&mut local_frame[..]) { + Ok(Some(_count)) => false, + Ok(None) => true, + Err(_) => true, + } + }; + + // Write frame into ring buffer. + { + let mut state = buffer.state.lock().unwrap(); + + if done { + state.finished = true; + finished_flag.store(true, Ordering::Relaxed); + buffer.condvar.notify_all(); + break; + } + + if state.len_frames >= capacity_frames { + break; + } + + let base = state.write_index * channels; + state.data[base..(base + channels)] + .copy_from_slice(&local_frame[..channels]); + state.write_index = (state.write_index + 1) % capacity_frames; + state.len_frames += 1; + + if state.len_frames >= warmup_min_frames { + buffer.condvar.notify_all(); + } + } + } + } + + // Clear refill_in_progress flag and notify any waiters. + let mut state = buffer.state.lock().unwrap(); + state.refill_in_progress = false; + buffer.condvar.notify_all(); + }); + } +} + +impl ChannelMappedSampleSource for BufferedSampleSource { + fn next_sample(&mut self) -> Result, SampleSourceError> { + let channels = self.channels as usize; + let mut frame = vec![0.0f32; channels]; + match self.next_frame(&mut frame[..])? { + Some(count) if count > 0 => Ok(Some(frame[0])), + _ => Ok(None), + } + } + + fn next_frame(&mut self, output: &mut [f32]) -> Result, SampleSourceError> { + let channels = self.channels as usize; + if output.len() < channels { + return Err(SampleSourceError::SampleConversionFailed(format!( + "BufferedSampleSource: output buffer too small: need {channels} samples" + ))); + } + + let mut maybe_spawn_refill = false; + + { + let mut state = self.buffer.state.lock().unwrap(); + + if state.len_frames == 0 { + // Ring buffer underrun: defer to the underlying source rather than + // treating this as end‑of‑stream. This is rare and allowed to do + // heavier work since it only occurs when our prefetch falls behind. + let mut inner = self.inner.lock().unwrap(); + match inner.next_frame(output) { + Ok(Some(count)) => { + // We got a frame directly; do not mark finished. Let the + // background fill task catch up on its next run. + return Ok(Some(count)); + } + Ok(None) => { + state.finished = true; + self.finished_flag.store(true, Ordering::Relaxed); + return Ok(None); + } + Err(e) => { + state.finished = true; + self.finished_flag.store(true, Ordering::Relaxed); + return Err(e); + } + } + } + + let base = state.read_index * channels; + output[..channels].copy_from_slice(&state.data[base..(base + channels)]); + + state.read_index = (state.read_index + 1) % self.capacity_frames; + state.len_frames -= 1; + + if !state.finished && state.len_frames <= self.refill_threshold_frames { + maybe_spawn_refill = true; + } + } + + if maybe_spawn_refill { + self.spawn_refill_if_needed(); + } + + Ok(Some(channels)) + } + + fn channel_mappings(&self) -> &Vec> { + &self.channel_mappings + } + + fn source_channel_count(&self) -> u16 { + self.channels + } +} diff --git a/src/audio/sample_source/tests.rs b/src/audio/sample_source/tests.rs index 60842be..2047aca 100644 --- a/src/audio/sample_source/tests.rs +++ b/src/audio/sample_source/tests.rs @@ -13,11 +13,16 @@ // #[cfg(test)] mod tests { + use std::sync::Arc; + use crate::audio::sample_source::audio::AudioSampleSource; use crate::audio::sample_source::create_sample_source_from_file; use crate::audio::sample_source::memory::MemorySampleSource; - use crate::audio::sample_source::traits::{SampleSource, SampleSourceTestExt}; + use crate::audio::sample_source::traits::{ + ChannelMappedSampleSource, SampleSource, SampleSourceTestExt, + }; use crate::audio::sample_source::transcoder::AudioTranscoder; + use crate::audio::sample_source::{BufferFillPool, BufferedSampleSource, ChannelMappedSource}; use crate::audio::TargetFormat; // --------------------------------------------------------------------- @@ -1895,6 +1900,33 @@ mod tests { std::env::remove_var("MTRACK_FORCE_DETECT_CHANNELS"); } + #[test] + fn test_buffered_sample_source_matches_inner_sequence() { + // Simple 1‑channel source with known samples, wrapped in ChannelMappedSource + let samples = vec![0.1_f32, 0.2_f32, 0.3_f32, 0.4_f32]; + let memory = MemorySampleSource::new(samples.clone(), 1, 44100); + let channel_mappings = vec![vec!["test".to_string()]]; + + let inner: Box = + Box::new(ChannelMappedSource::new( + Box::new(memory), + channel_mappings, + 1, + )); + + let pool = Arc::new(BufferFillPool::new(1).expect("failed to create BufferFillPool")); + + // Use small device buffer size so capacity is also small; this exercises refill logic. + let mut buffered = BufferedSampleSource::new(inner, pool, 2); + + let mut read = Vec::new(); + while let Some(sample) = buffered.next_sample().unwrap() { + read.push(sample); + } + + assert_eq!(read, samples); + } + #[test] fn test_wav_sample_source_4channel() { use crate::testutil::write_wav_with_bits; diff --git a/src/config.rs b/src/config.rs index 93481d4..3d60b33 100644 --- a/src/config.rs +++ b/src/config.rs @@ -28,7 +28,7 @@ mod statusevents; mod track; mod trackmappings; -pub use self::audio::Audio; +pub use self::audio::{Audio, StreamBufferSize}; pub use self::controller::Controller; pub use self::controller::GrpcController; pub use self::controller::MidiController; diff --git a/src/config/audio.rs b/src/config/audio.rs index 04e7f01..432c6a0 100644 --- a/src/config/audio.rs +++ b/src/config/audio.rs @@ -20,6 +20,21 @@ use crate::audio::SampleFormat; const DEFAULT_AUDIO_PLAYBACK_DELAY: Duration = Duration::ZERO; const DEFAULT_BUFFER_SIZE: usize = 1024; +const DEFAULT_BUFFER_THREADS: usize = 2; + +/// How to choose the CPAL stream buffer size (period size). Affects latency vs underrun tolerance. +#[derive(Deserialize, Clone, Debug)] +#[serde(untagged)] +pub enum StreamBufferSize { + /// Use the backend's default (may be high latency on some systems). + #[serde(rename = "default")] + Default, + /// Use the device's minimum supported period size (lowest latency, most jitter-sensitive). + #[serde(rename = "min")] + Min, + /// Use a fixed size in frames (same as buffer_size when not set). + Fixed(usize), +} /// A YAML representation of the audio configuration. #[derive(Deserialize, Clone)] @@ -41,6 +56,14 @@ pub struct Audio { /// Buffer size for decoded audio samples (default: 1024 samples per channel) buffer_size: Option, + + /// CPAL stream buffer: "default" (backend default), "min" (lowest latency), or a number (frames). + /// When unset, uses buffer_size. Lower values = lower latency but more sensitive to callback jitter. + stream_buffer_size: Option, + + /// Number of worker threads for buffered song sources. + /// Defaults to a small fixed value; must be >= 1. + buffer_threads: Option, } impl Audio { @@ -53,6 +76,8 @@ impl Audio { sample_format: None, bits_per_sample: None, buffer_size: None, + stream_buffer_size: None, + buffer_threads: None, } } @@ -91,4 +116,15 @@ impl Audio { pub fn buffer_size(&self) -> usize { self.buffer_size.unwrap_or(DEFAULT_BUFFER_SIZE) } + + /// Returns the number of worker threads used for buffered song sources. + pub fn buffer_threads(&self) -> usize { + self.buffer_threads.unwrap_or(DEFAULT_BUFFER_THREADS).max(1) + } + + /// Returns the stream buffer size choice for CPAL (default/min/fixed). + /// When None, the stream uses buffer_size() as a fixed frame count. + pub fn stream_buffer_size(&self) -> Option { + self.stream_buffer_size.clone() + } } diff --git a/src/main.rs b/src/main.rs index 92ba42f..1e35eeb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -63,6 +63,34 @@ WantedBy=multi-user.target Alias=mtrack.service "#; +/// Default thread priority on the [0; 100) scale when MTRACK_THREAD_PRIORITY is unset. +/// On Linux (normal scheduling) this is roughly in the nice -8 to -10 range—higher than default 0 but below max. +const DEFAULT_THREAD_PRIORITY: u8 = 70; + +/// Sets the current thread's priority for better audio scheduling (cross-platform). +/// Uses the [0; 100) scale: higher value = higher priority. +/// Default is 70 when MTRACK_THREAD_PRIORITY is unset; set to 0-99 to override. +/// Fails silently if lacking permission. Use high values with care; they can starve other threads. +fn apply_thread_priority() { + use thread_priority::{set_current_thread_priority, ThreadPriority, ThreadPriorityValue}; + + let priority = env::var("MTRACK_THREAD_PRIORITY") + .ok() + .and_then(|v| { + let n = v.parse::().ok()?; + (n < 100).then(|| ThreadPriorityValue::try_from(n).ok())? + }) + .unwrap_or_else(|| ThreadPriorityValue::try_from(DEFAULT_THREAD_PRIORITY).unwrap()); + + match set_current_thread_priority(ThreadPriority::Crossplatform(priority)) { + Ok(()) => info!("Set thread priority for audio"), + Err(e) => tracing::warn!( + error = %e, + "Could not set thread priority (e.g. run as root or with CAP_SYS_NICE on Linux)" + ), + } +} + #[derive(Parser)] #[clap( author = "Michael Wilson", @@ -361,6 +389,7 @@ async fn run() -> Result<(), Box> { player_path, playlist_path, } => { + apply_thread_priority(); let player_path = &Path::new(&player_path); let player_config = config::Player::deserialize(player_path)?; let mut playlist_path = playlist_path.as_ref().map(PathBuf::from).or(player_config.playlist()).ok_or( diff --git a/src/songs.rs b/src/songs.rs index 09c296a..a4d9877 100644 --- a/src/songs.rs +++ b/src/songs.rs @@ -402,17 +402,17 @@ impl Song { } /// Creates ChannelMappedSampleSource instances for each track in the song, starting from a specific time. - /// This is the new, simpler architecture that replaces SongSource. + /// Uses the given playback context for target format, buffer size, and optional buffered-source pool. pub fn create_channel_mapped_sources_from( &self, + context: &crate::audio::PlaybackContext, start_time: Duration, track_mappings: &HashMap>, - target_format: TargetFormat, - buffer_size: usize, ) -> Result>, Box> { use crate::audio::sample_source::create_channel_mapped_sample_source; use crate::audio::sample_source::create_sample_source_from_file; + use crate::audio::sample_source::BufferedSampleSource; let mut sources = Vec::new(); @@ -441,7 +441,7 @@ impl Song { } else { Some(start_time) }, - buffer_size, + context.buffer_size, )?; // Get the channel count from the source we just created @@ -467,9 +467,19 @@ impl Song { let source = create_channel_mapped_sample_source( sample_source, - target_format.clone(), + context.target_format.clone(), channel_mappings, )?; + let source: Box = + if let Some(pool) = &context.buffer_fill_pool { + Box::new(BufferedSampleSource::new( + source, + pool.clone(), + context.buffer_size, + )) + } else { + source + }; sources.push(source); } From deaa83fdfe62b7e7560a970148b7f90f75cbba0c Mon Sep 17 00:00:00 2001 From: Michael Wilson Date: Wed, 11 Feb 2026 10:42:01 -0500 Subject: [PATCH 2/3] Let's try the SCHED_FIFO bits. --- src/audio/cpal.rs | 70 +++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 68 insertions(+), 2 deletions(-) diff --git a/src/audio/cpal.rs b/src/audio/cpal.rs index 3d8d3aa..c703229 100644 --- a/src/audio/cpal.rs +++ b/src/audio/cpal.rs @@ -139,6 +139,15 @@ fn create_direct_f32_callback( num_channels: u16, ) -> impl FnMut(&mut [f32], &cpal::OutputCallbackInfo) + Send + 'static { let callback_priority = callback_thread_priority(); + let rt_audio = std::env::var("MTRACK_RT_AUDIO") + .ok() + .map(|v| { + v == "1" + || v.eq_ignore_ascii_case("true") + || v.eq_ignore_ascii_case("yes") + || v.eq_ignore_ascii_case("on") + }) + .unwrap_or(false); let profile_audio = std::env::var("MTRACK_PROFILE_AUDIO") .ok() .map(|v| { @@ -163,7 +172,31 @@ fn create_direct_f32_callback( move |data: &mut [f32], _: &cpal::OutputCallbackInfo| { if !priority_set { - let _ = set_current_thread_priority(ThreadPriority::Crossplatform(callback_priority)); + let tp = ThreadPriority::Crossplatform(callback_priority); + let _ = set_current_thread_priority(tp); + #[cfg(unix)] + if rt_audio { + use thread_priority::unix::{ + set_thread_priority_and_policy, thread_native_id, RealtimeThreadSchedulePolicy, + ThreadSchedulePolicy, + }; + let tid = thread_native_id(); + match set_thread_priority_and_policy( + tid, + tp, + ThreadSchedulePolicy::Realtime(RealtimeThreadSchedulePolicy::Fifo), + ) { + Ok(()) => { + info!("Enabled RT SCHED_FIFO for audio callback thread"); + } + Err(e) => { + tracing::warn!( + error = %e, + "Failed to set RT SCHED_FIFO for audio callback thread" + ); + } + } + } priority_set = true; } // Process any pending new sources (non-blocking) @@ -244,6 +277,15 @@ where { let mut temp_buffer = vec![0.0f32; max_samples]; let callback_priority = callback_thread_priority(); + let rt_audio = std::env::var("MTRACK_RT_AUDIO") + .ok() + .map(|v| { + v == "1" + || v.eq_ignore_ascii_case("true") + || v.eq_ignore_ascii_case("yes") + || v.eq_ignore_ascii_case("on") + }) + .unwrap_or(false); let profile_audio = std::env::var("MTRACK_PROFILE_AUDIO") .ok() @@ -271,7 +313,31 @@ where move |data: &mut [T], _: &cpal::OutputCallbackInfo| { if !priority_set { - let _ = set_current_thread_priority(ThreadPriority::Crossplatform(callback_priority)); + let tp = ThreadPriority::Crossplatform(callback_priority); + let _ = set_current_thread_priority(tp); + #[cfg(unix)] + if rt_audio { + use thread_priority::unix::{ + set_thread_priority_and_policy, thread_native_id, RealtimeThreadSchedulePolicy, + ThreadSchedulePolicy, + }; + let tid = thread_native_id(); + match set_thread_priority_and_policy( + tid, + tp, + ThreadSchedulePolicy::Realtime(RealtimeThreadSchedulePolicy::Fifo), + ) { + Ok(()) => { + info!("Enabled RT SCHED_FIFO for audio callback thread"); + } + Err(e) => { + tracing::warn!( + error = %e, + "Failed to set RT SCHED_FIFO for audio callback thread" + ); + } + } + } priority_set = true; } // Process any pending new sources (non-blocking) From 25a0dc606ef3089761c18ce7998775fd10ebb2d6 Mon Sep 17 00:00:00 2001 From: Michael Wilson Date: Wed, 11 Feb 2026 12:17:09 -0500 Subject: [PATCH 3/3] Real time bits and pieces. --- src/audio/cpal.rs | 472 +++++++++++++++++++++--------------------- src/config/midi.rs | 26 --- src/config/samples.rs | 43 ---- src/samples/voice.rs | 37 ---- 4 files changed, 233 insertions(+), 345 deletions(-) diff --git a/src/audio/cpal.rs b/src/audio/cpal.rs index c703229..aa0f569 100644 --- a/src/audio/cpal.rs +++ b/src/audio/cpal.rs @@ -41,6 +41,224 @@ fn callback_thread_priority() -> ThreadPriorityValue { .unwrap_or_else(|| ThreadPriorityValue::try_from(DEFAULT_CALLBACK_THREAD_PRIORITY).unwrap()) } +fn env_flag(name: &str) -> bool { + std::env::var(name) + .ok() + .map(|v| { + v == "1" + || v.eq_ignore_ascii_case("true") + || v.eq_ignore_ascii_case("yes") + || v.eq_ignore_ascii_case("on") + }) + .unwrap_or(false) +} + +/// Returns whether we should attempt RT (SCHED_FIFO) scheduling for the audio callback thread. +/// Default: enabled. Advanced users can opt out with MTRACK_DISABLE_RT_AUDIO=1. +fn rt_audio_enabled() -> bool { + if env_flag("MTRACK_DISABLE_RT_AUDIO") { + return false; + } + true +} + +fn configure_audio_thread_priority( + priority: ThreadPriorityValue, + rt_audio: bool, + priority_set: &mut bool, +) { + if *priority_set { + return; + } + let tp = ThreadPriority::Crossplatform(priority); + let _ = set_current_thread_priority(tp); + + #[cfg(unix)] + if rt_audio { + use thread_priority::unix::{ + set_thread_priority_and_policy, thread_native_id, RealtimeThreadSchedulePolicy, + ThreadSchedulePolicy, + }; + let tid = thread_native_id(); + match set_thread_priority_and_policy( + tid, + tp, + ThreadSchedulePolicy::Realtime(RealtimeThreadSchedulePolicy::Fifo), + ) { + Ok(()) => { + info!("Enabled RT SCHED_FIFO for audio callback thread"); + } + Err(e) => { + tracing::warn!( + error = %e, + "Failed to set RT SCHED_FIFO for audio callback thread" + ); + } + } + } + + *priority_set = true; +} + +struct CallbackProfiler { + enabled: bool, + last_log: Instant, + count: u64, + sum_mix_us: u128, + max_mix_us: u64, + sum_convert_us: u128, + max_convert_us: u64, + last_cb: Option, + sum_gap_us: u128, + gap_count: u64, + max_gap_us: u64, +} + +impl CallbackProfiler { + fn new(enabled: bool) -> Self { + Self { + enabled, + last_log: Instant::now(), + count: 0, + sum_mix_us: 0, + max_mix_us: 0, + sum_convert_us: 0, + max_convert_us: 0, + last_cb: None, + sum_gap_us: 0, + gap_count: 0, + max_gap_us: 0, + } + } + + fn on_cb_start(&mut self) -> Option { + if !self.enabled { + return None; + } + let now = Instant::now(); + if let Some(last) = self.last_cb { + let gap_us = now.duration_since(last).as_micros() as u64; + self.sum_gap_us += gap_us as u128; + self.gap_count += 1; + if gap_us > self.max_gap_us { + self.max_gap_us = gap_us; + } + } + self.last_cb = Some(now); + Some(now) + } + + fn on_mix_done(&mut self, start: Option) { + if !self.enabled { + return; + } + let start = match start { + Some(s) => s, + None => return, + }; + let mix_us = start.elapsed().as_micros() as u64; + self.count += 1; + self.sum_mix_us += mix_us as u128; + if mix_us > self.max_mix_us { + self.max_mix_us = mix_us; + } + } + + fn on_convert_done(&mut self, start: Option) { + if !self.enabled { + return; + } + let start = match start { + Some(s) => s, + None => return, + }; + let convert_us = start.elapsed().as_micros() as u64; + self.sum_convert_us += convert_us as u128; + if convert_us > self.max_convert_us { + self.max_convert_us = convert_us; + } + } + + fn maybe_log_float(&mut self) { + if !self.enabled { + return; + } + if self.last_log.elapsed().as_secs_f32() < 1.0 { + return; + } + let mix_avg_us = if self.count > 0 { + (self.sum_mix_us / self.count as u128) as u64 + } else { + 0 + }; + let cb_avg_gap_us = if self.gap_count > 0 { + (self.sum_gap_us / self.gap_count as u128) as u64 + } else { + 0 + }; + info!( + mix_avg_us, + mix_max_us = self.max_mix_us, + cb_avg_gap_us, + cb_max_gap_us = self.max_gap_us, + callbacks = self.count, + "audio profile: mix (float)" + ); + + self.last_log = Instant::now(); + self.count = 0; + self.sum_mix_us = 0; + self.max_mix_us = 0; + self.sum_gap_us = 0; + self.gap_count = 0; + self.max_gap_us = 0; + } + + fn maybe_log_int(&mut self) { + if !self.enabled { + return; + } + if self.last_log.elapsed().as_secs_f32() < 1.0 { + return; + } + let mix_avg_us = if self.count > 0 { + (self.sum_mix_us / self.count as u128) as u64 + } else { + 0 + }; + let convert_avg_us = if self.count > 0 { + (self.sum_convert_us / self.count as u128) as u64 + } else { + 0 + }; + let cb_avg_gap_us = if self.gap_count > 0 { + (self.sum_gap_us / self.gap_count as u128) as u64 + } else { + 0 + }; + info!( + mix_avg_us, + mix_max_us = self.max_mix_us, + convert_avg_us, + convert_max_us = self.max_convert_us, + cb_avg_gap_us, + cb_max_gap_us = self.max_gap_us, + callbacks = self.count, + "audio profile: mix/convert (int)" + ); + + self.last_log = Instant::now(); + self.count = 0; + self.sum_mix_us = 0; + self.max_mix_us = 0; + self.sum_convert_us = 0; + self.max_convert_us = 0; + self.sum_gap_us = 0; + self.gap_count = 0; + self.max_gap_us = 0; + } +} + use crate::audio::mixer::{ActiveSource as MixerActiveSource, AudioMixer}; use crate::{ audio::{Device as AudioDevice, SampleFormat, TargetFormat}, @@ -139,66 +357,13 @@ fn create_direct_f32_callback( num_channels: u16, ) -> impl FnMut(&mut [f32], &cpal::OutputCallbackInfo) + Send + 'static { let callback_priority = callback_thread_priority(); - let rt_audio = std::env::var("MTRACK_RT_AUDIO") - .ok() - .map(|v| { - v == "1" - || v.eq_ignore_ascii_case("true") - || v.eq_ignore_ascii_case("yes") - || v.eq_ignore_ascii_case("on") - }) - .unwrap_or(false); - let profile_audio = std::env::var("MTRACK_PROFILE_AUDIO") - .ok() - .map(|v| { - v == "1" - || v.eq_ignore_ascii_case("true") - || v.eq_ignore_ascii_case("yes") - || v.eq_ignore_ascii_case("on") - }) - .unwrap_or(false); - - // Simple in-callback stats: average/max mix time and avg/max callback interval - // (jitter), logged about once per second. - let mut prof_last_log = Instant::now(); - let mut prof_count: u64 = 0; - let mut prof_sum_mix_us: u128 = 0; - let mut prof_max_mix_us: u64 = 0; - let mut prof_last_cb: Option = None; - let mut prof_sum_gap_us: u128 = 0; - let mut prof_gap_count: u64 = 0; - let mut prof_max_gap_us: u64 = 0; + let rt_audio = rt_audio_enabled(); + let profile_audio = env_flag("MTRACK_PROFILE_AUDIO"); + let mut profiler = CallbackProfiler::new(profile_audio); let mut priority_set = false; move |data: &mut [f32], _: &cpal::OutputCallbackInfo| { - if !priority_set { - let tp = ThreadPriority::Crossplatform(callback_priority); - let _ = set_current_thread_priority(tp); - #[cfg(unix)] - if rt_audio { - use thread_priority::unix::{ - set_thread_priority_and_policy, thread_native_id, RealtimeThreadSchedulePolicy, - ThreadSchedulePolicy, - }; - let tid = thread_native_id(); - match set_thread_priority_and_policy( - tid, - tp, - ThreadSchedulePolicy::Realtime(RealtimeThreadSchedulePolicy::Fifo), - ) { - Ok(()) => { - info!("Enabled RT SCHED_FIFO for audio callback thread"); - } - Err(e) => { - tracing::warn!( - error = %e, - "Failed to set RT SCHED_FIFO for audio callback thread" - ); - } - } - } - priority_set = true; - } + configure_audio_thread_priority(callback_priority, rt_audio, &mut priority_set); // Process any pending new sources (non-blocking) while let Ok(new_source) = source_rx.try_recv() { mixer.add_source(new_source); @@ -206,60 +371,10 @@ fn create_direct_f32_callback( // Mix directly into the output buffer (cleanup happens inline) let num_frames = data.len() / num_channels as usize; - if profile_audio { - let now = Instant::now(); - if let Some(last) = prof_last_cb { - let gap_us = now.duration_since(last).as_micros() as u64; - prof_sum_gap_us += gap_us as u128; - prof_gap_count += 1; - if gap_us > prof_max_gap_us { - prof_max_gap_us = gap_us; - } - } - prof_last_cb = Some(now); - } - let start = if profile_audio { - Some(Instant::now()) - } else { - None - }; + let start = profiler.on_cb_start(); mixer.process_into_output(data, num_frames); - if profile_audio { - let start = start.unwrap(); - let mix_us = start.elapsed().as_micros() as u64; - prof_count += 1; - prof_sum_mix_us += mix_us as u128; - if mix_us > prof_max_mix_us { - prof_max_mix_us = mix_us; - } - if prof_last_log.elapsed().as_secs_f32() >= 1.0 { - let avg = if prof_count > 0 { - (prof_sum_mix_us / prof_count as u128) as u64 - } else { - 0 - }; - let cb_avg_gap_us = if prof_gap_count > 0 { - (prof_sum_gap_us / prof_gap_count as u128) as u64 - } else { - 0 - }; - info!( - mix_avg_us = avg, - mix_max_us = prof_max_mix_us, - cb_avg_gap_us, - cb_max_gap_us = prof_max_gap_us, - callbacks = prof_count, - "audio profile: mix (float)" - ); - prof_last_log = Instant::now(); - prof_count = 0; - prof_sum_mix_us = 0; - prof_max_mix_us = 0; - prof_sum_gap_us = 0; - prof_gap_count = 0; - prof_max_gap_us = 0; - } - } + profiler.on_mix_done(start); + profiler.maybe_log_float(); } } @@ -277,69 +392,13 @@ where { let mut temp_buffer = vec![0.0f32; max_samples]; let callback_priority = callback_thread_priority(); - let rt_audio = std::env::var("MTRACK_RT_AUDIO") - .ok() - .map(|v| { - v == "1" - || v.eq_ignore_ascii_case("true") - || v.eq_ignore_ascii_case("yes") - || v.eq_ignore_ascii_case("on") - }) - .unwrap_or(false); - - let profile_audio = std::env::var("MTRACK_PROFILE_AUDIO") - .ok() - .map(|v| { - v == "1" - || v.eq_ignore_ascii_case("true") - || v.eq_ignore_ascii_case("yes") - || v.eq_ignore_ascii_case("on") - }) - .unwrap_or(false); - - // Simple in-callback stats: average/max mix/convert times and avg/max callback - // interval (jitter), logged about once per second. - let mut prof_last_log = Instant::now(); - let mut prof_count: u64 = 0; - let mut prof_sum_mix_us: u128 = 0; - let mut prof_max_mix_us: u64 = 0; - let mut prof_sum_convert_us: u128 = 0; - let mut prof_max_convert_us: u64 = 0; - let mut prof_last_cb: Option = None; - let mut prof_sum_gap_us: u128 = 0; - let mut prof_gap_count: u64 = 0; - let mut prof_max_gap_us: u64 = 0; + let rt_audio = rt_audio_enabled(); + let profile_audio = env_flag("MTRACK_PROFILE_AUDIO"); + let mut profiler = CallbackProfiler::new(profile_audio); let mut priority_set = false; move |data: &mut [T], _: &cpal::OutputCallbackInfo| { - if !priority_set { - let tp = ThreadPriority::Crossplatform(callback_priority); - let _ = set_current_thread_priority(tp); - #[cfg(unix)] - if rt_audio { - use thread_priority::unix::{ - set_thread_priority_and_policy, thread_native_id, RealtimeThreadSchedulePolicy, - ThreadSchedulePolicy, - }; - let tid = thread_native_id(); - match set_thread_priority_and_policy( - tid, - tp, - ThreadSchedulePolicy::Realtime(RealtimeThreadSchedulePolicy::Fifo), - ) { - Ok(()) => { - info!("Enabled RT SCHED_FIFO for audio callback thread"); - } - Err(e) => { - tracing::warn!( - error = %e, - "Failed to set RT SCHED_FIFO for audio callback thread" - ); - } - } - } - priority_set = true; - } + configure_audio_thread_priority(callback_priority, rt_audio, &mut priority_set); // Process any pending new sources (non-blocking) while let Ok(new_source) = source_rx.try_recv() { mixer.add_source(new_source); @@ -350,29 +409,14 @@ where let n = std::cmp::min(data.len(), temp_buffer.len()); let temp_slice = &mut temp_buffer[..n]; let num_frames = n / num_channels as usize; - if profile_audio { - let now = Instant::now(); - if let Some(last) = prof_last_cb { - let gap_us = now.duration_since(last).as_micros() as u64; - prof_sum_gap_us += gap_us as u128; - prof_gap_count += 1; - if gap_us > prof_max_gap_us { - prof_max_gap_us = gap_us; - } - } - prof_last_cb = Some(now); - } + profiler.on_cb_start(); let start_mix = if profile_audio { Some(Instant::now()) } else { None }; mixer.process_into_output(temp_slice, num_frames); - let mix_us = if profile_audio { - start_mix.unwrap().elapsed().as_micros() as u64 - } else { - 0 - }; + profiler.on_mix_done(start_mix); let start_convert = if profile_audio { Some(Instant::now()) } else { @@ -385,58 +429,8 @@ where if n < data.len() { data[n..].fill(zero); } - let convert_us = if profile_audio { - start_convert.unwrap().elapsed().as_micros() as u64 - } else { - 0 - }; - if profile_audio { - prof_count += 1; - prof_sum_mix_us += mix_us as u128; - prof_sum_convert_us += convert_us as u128; - if mix_us > prof_max_mix_us { - prof_max_mix_us = mix_us; - } - if convert_us > prof_max_convert_us { - prof_max_convert_us = convert_us; - } - if prof_last_log.elapsed().as_secs_f32() >= 1.0 { - let avg_mix = if prof_count > 0 { - (prof_sum_mix_us / prof_count as u128) as u64 - } else { - 0 - }; - let avg_convert = if prof_count > 0 { - (prof_sum_convert_us / prof_count as u128) as u64 - } else { - 0 - }; - let cb_avg_gap_us = if prof_gap_count > 0 { - (prof_sum_gap_us / prof_gap_count as u128) as u64 - } else { - 0 - }; - info!( - mix_avg_us = avg_mix, - mix_max_us = prof_max_mix_us, - convert_avg_us = avg_convert, - convert_max_us = prof_max_convert_us, - cb_avg_gap_us, - cb_max_gap_us = prof_max_gap_us, - callbacks = prof_count, - "audio profile: mix/convert (int)" - ); - prof_last_log = Instant::now(); - prof_count = 0; - prof_sum_mix_us = 0; - prof_max_mix_us = 0; - prof_sum_convert_us = 0; - prof_max_convert_us = 0; - prof_sum_gap_us = 0; - prof_gap_count = 0; - prof_max_gap_us = 0; - } - } + profiler.on_convert_done(start_convert); + profiler.maybe_log_int(); } } diff --git a/src/config/midi.rs b/src/config/midi.rs index 005f4fb..23db7b7 100644 --- a/src/config/midi.rs +++ b/src/config/midi.rs @@ -204,19 +204,6 @@ pub struct NoteOff { velocity: u8, } -#[cfg(test)] -impl NoteOff { - /// Gets the channel (1-indexed). - pub fn channel(&self) -> u8 { - self.channel - } - - /// Gets the key. - pub fn key(&self) -> u8 { - self.key - } -} - impl ToMidiEvent for NoteOff { fn to_midi_event(&self) -> Result, Box> { Ok(LiveEvent::Midi { @@ -242,19 +229,6 @@ pub struct NoteOn { velocity: u8, } -#[cfg(test)] -impl NoteOn { - /// Gets the channel (1-indexed). - pub fn channel(&self) -> u8 { - self.channel - } - - /// Gets the key. - pub fn key(&self) -> u8 { - self.key - } -} - impl ToMidiEvent for NoteOn { fn to_midi_event(&self) -> Result, Box> { Ok(LiveEvent::Midi { diff --git a/src/config/samples.rs b/src/config/samples.rs index d5f5b35..3540c24 100644 --- a/src/config/samples.rs +++ b/src/config/samples.rs @@ -155,11 +155,6 @@ impl SampleDefinition { pub fn file(&self) -> Option<&str> { self.file.as_deref() } - - /// Gets the velocity configuration (test only). - pub fn velocity(&self) -> &VelocityConfig { - &self.velocity - } } /// Configuration for velocity handling. @@ -211,26 +206,6 @@ impl VelocityConfig { layers, } } - - /// Gets the velocity mode (test only). - pub fn mode(&self) -> &VelocityMode { - &self.mode - } - - /// Gets the default velocity (test only). - pub fn default(&self) -> Option { - self.default - } - - /// Gets whether to scale volume in layers mode (test only). - pub fn scale_enabled(&self) -> bool { - self.scale.unwrap_or(false) - } - - /// Gets the velocity layers (test only). - pub fn layers(&self) -> &[VelocityLayer] { - &self.layers - } } /// Velocity handling mode. @@ -262,16 +237,6 @@ impl VelocityLayer { pub fn new(range: [u8; 2], file: String) -> Self { Self { range, file } } - - /// Gets the velocity range (test only). - pub fn range(&self) -> [u8; 2] { - self.range - } - - /// Gets the file for this layer (test only). - pub fn file(&self) -> &str { - &self.file - } } /// Behavior when a Note Off event is received for a playing sample. @@ -320,14 +285,6 @@ impl SampleTrigger { } } -#[cfg(test)] -impl SampleTrigger { - /// Creates a new sample trigger (test only). - pub fn new(trigger: midi::Event, sample: String) -> Self { - Self { trigger, sample } - } -} - /// Global samples configuration that can be embedded in player config or loaded from a file. #[derive(Deserialize, Clone, Serialize, Debug, Default)] pub struct SamplesConfig { diff --git a/src/samples/voice.rs b/src/samples/voice.rs index 1823191..59f44da 100644 --- a/src/samples/voice.rs +++ b/src/samples/voice.rs @@ -91,29 +91,6 @@ impl Voice { } } -#[cfg(test)] -impl Voice { - /// Returns the voice ID (test only). - pub fn id(&self) -> u64 { - self.id - } - - /// Returns the sample name (test only). - pub fn sample_name(&self) -> &str { - &self.sample_name - } - - /// Returns the mixer source ID (test only). - pub fn mixer_source_id(&self) -> u64 { - self.mixer_source_id - } - - /// Returns when this voice started (test only). - pub fn start_time(&self) -> Instant { - self.start_time - } -} - /// Manages active voices for sample playback. pub struct VoiceManager { /// Active voices. @@ -250,20 +227,6 @@ impl VoiceManager { } } -#[cfg(test)] -impl VoiceManager { - /// Removes voices by their mixer source IDs (test only). - pub fn remove_by_source_ids(&mut self, source_ids: &[u64]) { - self.voices - .retain(|v| !source_ids.contains(&v.mixer_source_id)); - } - - /// Returns all active voices (test only). - pub fn voices(&self) -> &[Voice] { - &self.voices - } -} - impl std::fmt::Debug for VoiceManager { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("VoiceManager")