From 3e729f0d1e685cfce5edf7c87cc866fa3cbaab09 Mon Sep 17 00:00:00 2001 From: Michael Wilson Date: Fri, 6 Feb 2026 15:14:51 -0500 Subject: [PATCH] Robustness in output stream. The audio callbacks should now be a lot more robust, and the output stream will be rebuilt on failure. A bug was discovered and fixed where the source IDs generated for both songs and samples could collide, which could result in a sample trigger with the same source ID as a song causing both the sample and the song to stop. --- src/audio.rs | 11 +- src/audio/cpal.rs | 238 +++++++++++++++++++++++++++--------------- src/audio/mixer.rs | 101 +++++++++++------- src/samples/engine.rs | 7 +- 4 files changed, 227 insertions(+), 130 deletions(-) diff --git a/src/audio.rs b/src/audio.rs index 635eac9..874a966 100644 --- a/src/audio.rs +++ b/src/audio.rs @@ -1,4 +1,3 @@ -use std::any::Any; // Copyright (C) 2026 Michael Wilson // // This program is free software: you can redistribute it and/or modify it under @@ -12,6 +11,8 @@ use std::any::Any; // You should have received a copy of the GNU General Public License along with // this program. If not, see . // +use std::any::Any; +use std::sync::atomic::{AtomicU64, Ordering}; use std::{error::Error, fmt, sync::Arc, time::Duration}; use crate::config; @@ -29,6 +30,14 @@ pub mod sample_source; // Re-export the format types for backward compatibility pub use format::{SampleFormat, TargetFormat}; +/// Global source ID counter shared by song playback and sample triggers so IDs are unique. +static SOURCE_ID_COUNTER: AtomicU64 = AtomicU64::new(1); + +/// Returns the next unique source ID for the mixer. Used by both song play_from and sample engine. +pub(crate) fn next_source_id() -> u64 { + SOURCE_ID_COUNTER.fetch_add(1, Ordering::Relaxed) +} + /// Type alias for the channel sender used to add sources to the mixer. pub type SourceSender = crossbeam_channel::Sender; diff --git a/src/audio/cpal.rs b/src/audio/cpal.rs index fafa01e..3413510 100644 --- a/src/audio/cpal.rs +++ b/src/audio/cpal.rs @@ -16,8 +16,8 @@ use std::{ error::Error, fmt, sync::{ - atomic::{AtomicBool, AtomicU64, Ordering}, - Arc, + atomic::{AtomicBool, Ordering}, + Arc, Condvar, Mutex, }, thread, time::Duration, @@ -35,9 +35,6 @@ use crate::{ }; use std::sync::Barrier; -/// Global atomic counter for generating unique source IDs -static SOURCE_ID_COUNTER: AtomicU64 = AtomicU64::new(1); - /// A small wrapper around a cpal::Device. Used for storing some extra /// data that makes multitrack playing more convenient. pub struct Device { @@ -102,16 +99,19 @@ fn create_direct_f32_callback( } } -/// Direct mixer callback for integer output - no intermediate ring buffer +/// Direct mixer callback for integer output - no intermediate ring buffer. +/// `max_samples` should be the stream period size in samples (e.g. buffer_size * num_channels) +/// so the temp buffer is pre-allocated and never resized in the callback. fn create_direct_int_callback + std::fmt::Debug>( mixer: AudioMixer, source_rx: crossbeam_channel::Receiver, num_channels: u16, + max_samples: usize, ) -> impl FnMut(&mut [T], &cpal::OutputCallbackInfo) + Send + 'static where f32: cpal::FromSample, { - let mut temp_buffer: Vec = Vec::new(); + let mut temp_buffer = vec![0.0f32; max_samples]; move |data: &mut [T], _: &cpal::OutputCallbackInfo| { // Process any pending new sources (non-blocking) @@ -119,14 +119,12 @@ where mixer.add_source(new_source); } - // Ensure temp buffer is large enough + // Pre-allocated for typical period size; resize only if backend gives a larger buffer (rare) if temp_buffer.len() < data.len() { temp_buffer.resize(data.len(), 0.0); } - - // Mix into temp buffer (cleanup happens inline) - let num_frames = data.len() / num_channels as usize; let temp_slice = &mut temp_buffer[..data.len()]; + let num_frames = data.len() / num_channels as usize; mixer.process_into_output(temp_slice, num_frames); // Convert to output format @@ -191,6 +189,7 @@ impl OutputManager { /// Starts the output thread that creates and manages the CPAL stream. /// Uses direct callback mode - no intermediate ring buffer for lowest latency. + /// On ALSA/backend errors (e.g. POLLERR), the stream is recreated automatically. fn start_output_thread( &mut self, device: cpal::Device, @@ -202,13 +201,16 @@ impl OutputManager { let num_channels = mixer.num_channels(); let sample_rate = mixer.sample_rate(); - // Use a barrier to ensure the stream is created before we return + // Notify the output thread when the CPAL error callback runs (e.g. ALSA POLLERR). + // The output thread blocks on the condvar and recreates the stream on notification. + let stream_error_notify = Arc::new((Mutex::new(false), Condvar::new())); + + // Use a barrier to ensure the first stream is created before we return let barrier = Arc::new(Barrier::new(2)); let barrier_clone = barrier.clone(); - // Start the output thread - create the stream inside the thread + // Start the output thread - create the stream inside the thread, recreate on error let output_thread = thread::spawn(move || { - // Create the CPAL stream configuration let buffer_size = match output_buffer_size { Some(size) => cpal::BufferSize::Fixed(size), None => cpal::BufferSize::Default, @@ -218,87 +220,144 @@ impl OutputManager { sample_rate, buffer_size, }; + let max_samples = output_buffer_size + .map(|f| f as usize * num_channels as usize) + .unwrap_or(4096 * num_channels as usize); - // Create the output stream with direct mixer callback (no ring buffer) - let stream_result = if target_format.sample_format == crate::audio::SampleFormat::Float - { - let mut callback = - create_direct_f32_callback(mixer.clone(), source_rx.clone(), num_channels); - device.build_output_stream( - &config, - move |data: &mut [f32], info: &cpal::OutputCallbackInfo| { - callback(data, info); - }, - |err| error!("CPAL output stream error: {}", err), - None, - ) - } else { - // For integer formats, we need to convert from f32 to the target integer type - match target_format.bits_per_sample { - 16 => { - let mut callback = create_direct_int_callback::( - mixer.clone(), - source_rx.clone(), - num_channels, - ); - device.build_output_stream( - &config, - move |data: &mut [i16], info: &cpal::OutputCallbackInfo| { - callback(data, info); - }, - |err| error!("CPAL output stream error: {}", err), - None, - ) + let mut first_run = true; + + loop { + let notify = stream_error_notify.clone(); + let on_error = move |err: cpal::StreamError| { + error!( + "CPAL output stream error: {} (will attempt to recover)", + err + ); + let (mutex, condvar) = &*notify; + let mut guard = mutex.lock().unwrap(); + *guard = true; + condvar.notify_one(); + }; + + // Create the output stream with direct mixer callback (no ring buffer) + let stream_result = if target_format.sample_format + == crate::audio::SampleFormat::Float + { + let mut callback = + create_direct_f32_callback(mixer.clone(), source_rx.clone(), num_channels); + device.build_output_stream( + &config, + move |data: &mut [f32], info: &cpal::OutputCallbackInfo| { + callback(data, info); + }, + on_error, + None, + ) + } else { + match target_format.bits_per_sample { + 16 => { + let mut callback = create_direct_int_callback::( + mixer.clone(), + source_rx.clone(), + num_channels, + max_samples, + ); + let on_err = stream_error_notify.clone(); + device.build_output_stream( + &config, + move |data: &mut [i16], info: &cpal::OutputCallbackInfo| { + callback(data, info); + }, + move |err: cpal::StreamError| { + error!( + "CPAL output stream error: {} (will attempt to recover)", + err + ); + let (mutex, condvar) = &*on_err; + let mut guard = mutex.lock().unwrap(); + *guard = true; + condvar.notify_one(); + }, + None, + ) + } + 32 => { + let mut callback = create_direct_int_callback::( + mixer.clone(), + source_rx.clone(), + num_channels, + max_samples, + ); + let on_err = stream_error_notify.clone(); + device.build_output_stream( + &config, + move |data: &mut [i32], info: &cpal::OutputCallbackInfo| { + callback(data, info); + }, + move |err: cpal::StreamError| { + error!( + "CPAL output stream error: {} (will attempt to recover)", + err + ); + let (mutex, condvar) = &*on_err; + let mut guard = mutex.lock().unwrap(); + *guard = true; + condvar.notify_one(); + }, + None, + ) + } + _ => { + error!("Unsupported bit depth for integer format"); + if first_run { + barrier_clone.wait(); + } + return; + } } - 32 => { - let mut callback = create_direct_int_callback::( - mixer.clone(), - source_rx.clone(), - num_channels, - ); - device.build_output_stream( - &config, - move |data: &mut [i32], info: &cpal::OutputCallbackInfo| { - callback(data, info); - }, - |err| error!("CPAL output stream error: {}", err), - None, - ) + }; + + match stream_result { + Ok(stream) => { + if let Err(e) = stream.play() { + error!("Failed to start CPAL stream: {}", e); + if first_run { + barrier_clone.wait(); + } + return; + } + if first_run { + info!("CPAL output stream started successfully (direct callback mode)"); + barrier_clone.wait(); + first_run = false; + } else { + info!("CPAL output stream recovered after backend error"); + } + + // Keep the stream alive; block until the error callback notifies us + let (mutex, condvar) = &*stream_error_notify; + let mut guard = mutex.lock().unwrap(); + while !*guard { + guard = condvar.wait(guard).unwrap(); + } + *guard = false; + drop(guard); + + // Drop the stream so we can create a new one + drop(stream); } - _ => { - error!("Unsupported bit depth for integer format"); - barrier_clone.wait(); + Err(e) => { + error!("Failed to create CPAL stream: {}", e); + if first_run { + barrier_clone.wait(); + } return; } } - }; - - // Start the stream - match stream_result { - Ok(stream) => { - if let Err(e) = stream.play() { - error!("Failed to start CPAL stream: {}", e); - barrier_clone.wait(); - return; - } - info!("CPAL output stream started successfully (direct callback mode)"); - - // Signal that stream is ready - barrier_clone.wait(); - - // Keep the stream alive by waiting - loop { - thread::sleep(Duration::from_millis(100)); - } - } - Err(e) => { - error!("Failed to create CPAL stream: {}", e); - barrier_clone.wait(); - } } }); - // Wait for stream to be created + // Wait for first stream to be created barrier.wait(); self.output_thread = Some(output_thread); @@ -486,7 +545,7 @@ impl AudioDevice for Device { let mut source_finish_flags = Vec::new(); for source in channel_mapped_sources.into_iter() { - let current_source_id = SOURCE_ID_COUNTER.fetch_add(1, Ordering::Relaxed); + let current_source_id = crate::audio::next_source_id(); let source_channel_count = source.source_channel_count(); let is_finished = Arc::new(AtomicBool::new(false)); source_finish_flags.push(is_finished.clone()); @@ -513,6 +572,7 @@ impl AudioDevice for Device { // This is completely lock-free - just checks atomic flags let finished_monitor = finished.clone(); let cancel_handle_for_notify = cancel_handle.clone(); + let num_song_sources = source_finish_flags.len(); thread::spawn(move || { loop { // Check if all sources have finished (lock-free) @@ -521,6 +581,10 @@ impl AudioDevice for Device { .all(|flag| flag.load(Ordering::Relaxed)); if all_finished { + tracing::debug!( + num_song_sources, + "play_from: all song sources finished, notifying" + ); finished_monitor.store(true, Ordering::Relaxed); cancel_handle_for_notify.notify(); break; diff --git a/src/audio/mixer.rs b/src/audio/mixer.rs index 904e1a7..c7152dd 100644 --- a/src/audio/mixer.rs +++ b/src/audio/mixer.rs @@ -13,6 +13,7 @@ // // Core audio mixing logic that can be used by both CPAL and test implementations use crate::audio::sample_source::ChannelMappedSampleSource; +use std::cell::RefCell; use std::collections::{HashMap, HashSet}; #[cfg(test)] use std::sync::atomic::AtomicUsize; @@ -20,6 +21,12 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, Mutex, RwLock}; #[cfg(test)] use std::time::Instant; +use tracing::debug; + +// Thread-local scratch for process_into_output so the audio callback never allocates. +thread_local! { + static SOURCE_FRAME_SCRATCH: RefCell> = RefCell::new(vec![0.0; 64]); +} /// Core audio mixing logic that's independent of any audio backend #[derive(Clone)] @@ -286,8 +293,6 @@ impl AudioMixer { }; let mut finished_source_ids = HashSet::new(); - // Reusable scratch buffer for source frames (max 64 channels should cover most cases) - let mut source_frame_buffer = vec![0.0f32; 64]; // Process each active source across all frames for active_source_arc in sources_to_process { @@ -296,6 +301,15 @@ impl AudioMixer { if active_source.is_finished.load(Ordering::Relaxed) || active_source.cancel_handle.is_cancelled() { + debug!( + source_id = active_source.id, + reason = if active_source.is_finished.load(Ordering::Relaxed) { + "already_finished" + } else { + "cancel_handle_cancelled" + }, + "mixer: source marked finished (skip)" + ); finished_source_ids.insert(active_source.id); continue; } @@ -304,7 +318,12 @@ impl AudioMixer { if let Some(ref cancel_at) = active_source.cancel_at_sample { let cancel_sample = cancel_at.load(Ordering::Relaxed); if cancel_sample > 0 && current_sample >= cancel_sample { - // Scheduled cancellation time reached + debug!( + source_id = active_source.id, + cancel_sample, + current_sample, + "mixer: source marked finished (cancel_at_sample reached)" + ); active_source.is_finished.store(true, Ordering::Relaxed); finished_source_ids.insert(active_source.id); continue; @@ -344,46 +363,49 @@ impl AudioMixer { }; let source_channel_count = active_source.cached_source_channel_count as usize; - // Resize buffer if needed (should be rare) - if source_frame_buffer.len() < source_channel_count { - source_frame_buffer.resize(source_channel_count, 0.0); - } - - for frame_index in start_frame..end_frame { - match active_source - .source - .next_frame(&mut source_frame_buffer[..source_channel_count]) - { - Ok(Some(_count)) => { - // Mix using precomputed mappings - for (source_channel, &sample) in source_frame_buffer[..source_channel_count] - .iter() - .enumerate() - { - if let Some(output_channels) = - active_source.channel_mappings.get(source_channel) - { - let base = frame_index * channels; - for &output_index in output_channels { - if output_index < channels { - output[base + output_index] += sample; + SOURCE_FRAME_SCRATCH.with(|cell| { + let mut buf = cell.borrow_mut(); + if buf.len() < source_channel_count { + buf.resize(source_channel_count, 0.0); + } + let sbuf = &mut buf[..source_channel_count]; + for frame_index in start_frame..end_frame { + match active_source.source.next_frame(sbuf) { + Ok(Some(_count)) => { + for (source_channel, &sample) in sbuf.iter().enumerate() { + if let Some(output_channels) = + active_source.channel_mappings.get(source_channel) + { + let base = frame_index * channels; + for &output_index in output_channels { + if output_index < channels { + output[base + output_index] += sample; + } } } } } - } - Ok(None) => { - active_source.is_finished.store(true, Ordering::Relaxed); - finished_source_ids.insert(active_source.id); - break; - } - Err(_) => { - active_source.is_finished.store(true, Ordering::Relaxed); - finished_source_ids.insert(active_source.id); - break; + Ok(None) => { + debug!( + source_id = active_source.id, + "mixer: source marked finished (next_frame returned None)" + ); + active_source.is_finished.store(true, Ordering::Relaxed); + finished_source_ids.insert(active_source.id); + break; + } + Err(_) => { + debug!( + source_id = active_source.id, + "mixer: source marked finished (next_frame error)" + ); + active_source.is_finished.store(true, Ordering::Relaxed); + finished_source_ids.insert(active_source.id); + break; + } } } - } + }); } // Increment the sample counter @@ -392,6 +414,11 @@ impl AudioMixer { // Clean up finished sources inline - we're the only accessor in direct callback mode if !finished_source_ids.is_empty() { + debug!( + source_ids = ?finished_source_ids, + remaining_before = self.active_sources.read().unwrap().len(), + "mixer: removing finished sources" + ); let mut sources = self.active_sources.write().unwrap(); sources.retain(|source| { let source_guard = source.lock().unwrap(); diff --git a/src/samples/engine.rs b/src/samples/engine.rs index 499eea7..5377747 100644 --- a/src/samples/engine.rs +++ b/src/samples/engine.rs @@ -17,7 +17,6 @@ use std::collections::HashMap; use std::error::Error; use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::RwLock; use midly::live::LiveEvent; @@ -26,14 +25,12 @@ use tracing::{debug, error, info, warn}; use super::loader::{LoadedSample, SampleLoader}; use super::voice::{Voice, VoiceManager}; +use crate::audio; use crate::audio::sample_source::ChannelMappedSource; use crate::config::samples::{NoteOffBehavior, SampleDefinition, SampleTrigger, SamplesConfig}; use crate::config::ToMidiEvent; use crate::playsync::CancelHandle; -/// Global source ID counter for the mixer. -static NEXT_SOURCE_ID: AtomicU64 = AtomicU64::new(1); - /// Precomputed data for a loaded sample file, avoiding allocations during trigger. struct PrecomputedSampleData { /// The loaded sample audio data. @@ -412,7 +409,7 @@ impl SampleEngine { // Create a new source for playback let source = precomputed.loaded.create_source(volume); - let source_id = NEXT_SOURCE_ID.fetch_add(1, Ordering::SeqCst); + let source_id = audio::next_source_id(); // Use precomputed channel labels and track mappings (no allocations!) let channel_mapped = ChannelMappedSource::new(