Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/audio.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::any::Any;
// Copyright (C) 2026 Michael Wilson <mike@mdwn.dev>
//
// This program is free software: you can redistribute it and/or modify it under
Expand All @@ -12,6 +11,8 @@ use std::any::Any;
// You should have received a copy of the GNU General Public License along with
// this program. If not, see <https://www.gnu.org/licenses/>.
//
use std::any::Any;
use std::sync::atomic::{AtomicU64, Ordering};
use std::{error::Error, fmt, sync::Arc, time::Duration};

use crate::config;
Expand All @@ -29,6 +30,14 @@ pub mod sample_source;
// Re-export the format types for backward compatibility
pub use format::{SampleFormat, TargetFormat};

/// Global source ID counter shared by song playback and sample triggers so IDs are unique.
static SOURCE_ID_COUNTER: AtomicU64 = AtomicU64::new(1);

/// Returns the next unique source ID for the mixer. Used by both song play_from and sample engine.
pub(crate) fn next_source_id() -> u64 {
SOURCE_ID_COUNTER.fetch_add(1, Ordering::Relaxed)
}

/// Type alias for the channel sender used to add sources to the mixer.
pub type SourceSender = crossbeam_channel::Sender<mixer::ActiveSource>;

Expand Down
238 changes: 151 additions & 87 deletions src/audio/cpal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ use std::{
error::Error,
fmt,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc,
atomic::{AtomicBool, Ordering},
Arc, Condvar, Mutex,
},
thread,
time::Duration,
Expand All @@ -35,9 +35,6 @@ use crate::{
};
use std::sync::Barrier;

/// Global atomic counter for generating unique source IDs
static SOURCE_ID_COUNTER: AtomicU64 = AtomicU64::new(1);

/// A small wrapper around a cpal::Device. Used for storing some extra
/// data that makes multitrack playing more convenient.
pub struct Device {
Expand Down Expand Up @@ -102,31 +99,32 @@ fn create_direct_f32_callback(
}
}

/// Direct mixer callback for integer output - no intermediate ring buffer
/// Direct mixer callback for integer output - no intermediate ring buffer.
/// `max_samples` should be the stream period size in samples (e.g. buffer_size * num_channels)
/// so the temp buffer is pre-allocated and never resized in the callback.
fn create_direct_int_callback<T: cpal::Sample + cpal::FromSample<f32> + std::fmt::Debug>(
mixer: AudioMixer,
source_rx: crossbeam_channel::Receiver<MixerActiveSource>,
num_channels: u16,
max_samples: usize,
) -> impl FnMut(&mut [T], &cpal::OutputCallbackInfo) + Send + 'static
where
f32: cpal::FromSample<T>,
{
let mut temp_buffer: Vec<f32> = Vec::new();
let mut temp_buffer = vec![0.0f32; max_samples];

move |data: &mut [T], _: &cpal::OutputCallbackInfo| {
// Process any pending new sources (non-blocking)
while let Ok(new_source) = source_rx.try_recv() {
mixer.add_source(new_source);
}

// Ensure temp buffer is large enough
// Pre-allocated for typical period size; resize only if backend gives a larger buffer (rare)
if temp_buffer.len() < data.len() {
temp_buffer.resize(data.len(), 0.0);
}

// Mix into temp buffer (cleanup happens inline)
let num_frames = data.len() / num_channels as usize;
let temp_slice = &mut temp_buffer[..data.len()];
let num_frames = data.len() / num_channels as usize;
mixer.process_into_output(temp_slice, num_frames);

// Convert to output format
Expand Down Expand Up @@ -191,6 +189,7 @@ impl OutputManager {

/// Starts the output thread that creates and manages the CPAL stream.
/// Uses direct callback mode - no intermediate ring buffer for lowest latency.
/// On ALSA/backend errors (e.g. POLLERR), the stream is recreated automatically.
fn start_output_thread(
&mut self,
device: cpal::Device,
Expand All @@ -202,13 +201,16 @@ impl OutputManager {
let num_channels = mixer.num_channels();
let sample_rate = mixer.sample_rate();

// Use a barrier to ensure the stream is created before we return
// Notify the output thread when the CPAL error callback runs (e.g. ALSA POLLERR).
// The output thread blocks on the condvar and recreates the stream on notification.
let stream_error_notify = Arc::new((Mutex::new(false), Condvar::new()));

// Use a barrier to ensure the first stream is created before we return
let barrier = Arc::new(Barrier::new(2));
let barrier_clone = barrier.clone();

// Start the output thread - create the stream inside the thread
// Start the output thread - create the stream inside the thread, recreate on error
let output_thread = thread::spawn(move || {
// Create the CPAL stream configuration
let buffer_size = match output_buffer_size {
Some(size) => cpal::BufferSize::Fixed(size),
None => cpal::BufferSize::Default,
Expand All @@ -218,87 +220,144 @@ impl OutputManager {
sample_rate,
buffer_size,
};
let max_samples = output_buffer_size
.map(|f| f as usize * num_channels as usize)
.unwrap_or(4096 * num_channels as usize);

// Create the output stream with direct mixer callback (no ring buffer)
let stream_result = if target_format.sample_format == crate::audio::SampleFormat::Float
{
let mut callback =
create_direct_f32_callback(mixer.clone(), source_rx.clone(), num_channels);
device.build_output_stream(
&config,
move |data: &mut [f32], info: &cpal::OutputCallbackInfo| {
callback(data, info);
},
|err| error!("CPAL output stream error: {}", err),
None,
)
} else {
// For integer formats, we need to convert from f32 to the target integer type
match target_format.bits_per_sample {
16 => {
let mut callback = create_direct_int_callback::<i16>(
mixer.clone(),
source_rx.clone(),
num_channels,
);
device.build_output_stream(
&config,
move |data: &mut [i16], info: &cpal::OutputCallbackInfo| {
callback(data, info);
},
|err| error!("CPAL output stream error: {}", err),
None,
)
let mut first_run = true;

loop {
let notify = stream_error_notify.clone();
let on_error = move |err: cpal::StreamError| {
error!(
"CPAL output stream error: {} (will attempt to recover)",
err
);
let (mutex, condvar) = &*notify;
let mut guard = mutex.lock().unwrap();
*guard = true;
condvar.notify_one();
};

// Create the output stream with direct mixer callback (no ring buffer)
let stream_result = if target_format.sample_format
== crate::audio::SampleFormat::Float
{
let mut callback =
create_direct_f32_callback(mixer.clone(), source_rx.clone(), num_channels);
device.build_output_stream(
&config,
move |data: &mut [f32], info: &cpal::OutputCallbackInfo| {
callback(data, info);
},
on_error,
None,
)
} else {
match target_format.bits_per_sample {
16 => {
let mut callback = create_direct_int_callback::<i16>(
mixer.clone(),
source_rx.clone(),
num_channels,
max_samples,
);
let on_err = stream_error_notify.clone();
device.build_output_stream(
&config,
move |data: &mut [i16], info: &cpal::OutputCallbackInfo| {
callback(data, info);
},
move |err: cpal::StreamError| {
error!(
"CPAL output stream error: {} (will attempt to recover)",
err
);
let (mutex, condvar) = &*on_err;
let mut guard = mutex.lock().unwrap();
*guard = true;
condvar.notify_one();
},
None,
)
}
32 => {
let mut callback = create_direct_int_callback::<i32>(
mixer.clone(),
source_rx.clone(),
num_channels,
max_samples,
);
let on_err = stream_error_notify.clone();
device.build_output_stream(
&config,
move |data: &mut [i32], info: &cpal::OutputCallbackInfo| {
callback(data, info);
},
move |err: cpal::StreamError| {
error!(
"CPAL output stream error: {} (will attempt to recover)",
err
);
let (mutex, condvar) = &*on_err;
let mut guard = mutex.lock().unwrap();
*guard = true;
condvar.notify_one();
},
None,
)
}
_ => {
error!("Unsupported bit depth for integer format");
if first_run {
barrier_clone.wait();
}
return;
}
}
32 => {
let mut callback = create_direct_int_callback::<i32>(
mixer.clone(),
source_rx.clone(),
num_channels,
);
device.build_output_stream(
&config,
move |data: &mut [i32], info: &cpal::OutputCallbackInfo| {
callback(data, info);
},
|err| error!("CPAL output stream error: {}", err),
None,
)
};

match stream_result {
Ok(stream) => {
if let Err(e) = stream.play() {
error!("Failed to start CPAL stream: {}", e);
if first_run {
barrier_clone.wait();
}
return;
}
if first_run {
info!("CPAL output stream started successfully (direct callback mode)");
barrier_clone.wait();
first_run = false;
} else {
info!("CPAL output stream recovered after backend error");
}

// Keep the stream alive; block until the error callback notifies us
let (mutex, condvar) = &*stream_error_notify;
let mut guard = mutex.lock().unwrap();
while !*guard {
guard = condvar.wait(guard).unwrap();
}
*guard = false;
drop(guard);

// Drop the stream so we can create a new one
drop(stream);
}
_ => {
error!("Unsupported bit depth for integer format");
barrier_clone.wait();
Err(e) => {
error!("Failed to create CPAL stream: {}", e);
if first_run {
barrier_clone.wait();
}
return;
}
}
};

// Start the stream
match stream_result {
Ok(stream) => {
if let Err(e) = stream.play() {
error!("Failed to start CPAL stream: {}", e);
barrier_clone.wait();
return;
}
info!("CPAL output stream started successfully (direct callback mode)");

// Signal that stream is ready
barrier_clone.wait();

// Keep the stream alive by waiting
loop {
thread::sleep(Duration::from_millis(100));
}
}
Err(e) => {
error!("Failed to create CPAL stream: {}", e);
barrier_clone.wait();
}
}
});

// Wait for stream to be created
// Wait for first stream to be created
barrier.wait();

self.output_thread = Some(output_thread);
Expand Down Expand Up @@ -486,7 +545,7 @@ impl AudioDevice for Device {
let mut source_finish_flags = Vec::new();

for source in channel_mapped_sources.into_iter() {
let current_source_id = SOURCE_ID_COUNTER.fetch_add(1, Ordering::Relaxed);
let current_source_id = crate::audio::next_source_id();
let source_channel_count = source.source_channel_count();
let is_finished = Arc::new(AtomicBool::new(false));
source_finish_flags.push(is_finished.clone());
Expand All @@ -513,6 +572,7 @@ impl AudioDevice for Device {
// This is completely lock-free - just checks atomic flags
let finished_monitor = finished.clone();
let cancel_handle_for_notify = cancel_handle.clone();
let num_song_sources = source_finish_flags.len();
thread::spawn(move || {
loop {
// Check if all sources have finished (lock-free)
Expand All @@ -521,6 +581,10 @@ impl AudioDevice for Device {
.all(|flag| flag.load(Ordering::Relaxed));

if all_finished {
tracing::debug!(
num_song_sources,
"play_from: all song sources finished, notifying"
);
finished_monitor.store(true, Ordering::Relaxed);
cancel_handle_for_notify.notify();
break;
Expand Down
Loading