From 12dedb3434d40dbaf75e7085ac990655cee9501b Mon Sep 17 00:00:00 2001 From: Michael Wilson Date: Fri, 6 Feb 2026 12:20:41 -0500 Subject: [PATCH] General improvements. Some general improvements have been made for maintainability, panic aversion, and error reporting. --- src/audio/sample_source/audio.rs | 140 +++++++++++++---------------- src/config.rs | 2 + src/config/error.rs | 21 +++++ src/config/player.rs | 6 +- src/config/playlist.rs | 6 +- src/config/song.rs | 2 +- src/dmx/engine.rs | 127 +++++++++++++++++++-------- src/dmx/ola_client.rs | 21 ++++- src/dmx/universe.rs | 36 +++----- src/main.rs | 52 +++++------ src/player.rs | 145 +++++++++++++++++++++++-------- src/playlist.rs | 25 +++--- src/playsync.rs | 19 ++-- src/samples/engine.rs | 25 ++++-- src/samples/loader.rs | 12 ++- src/songs.rs | 22 +++-- 16 files changed, 409 insertions(+), 252 deletions(-) create mode 100644 src/config/error.rs diff --git a/src/audio/sample_source/audio.rs b/src/audio/sample_source/audio.rs index 007fabe..4d4e352 100644 --- a/src/audio/sample_source/audio.rs +++ b/src/audio/sample_source/audio.rs @@ -103,8 +103,14 @@ impl AudioSampleSource { start_time: Option, buffer_size: usize, ) -> Result { - // Open the file - let file = File::open(&path)?; + // Open the file (include path in error so user sees which file failed) + let path_ref = path.as_ref(); + let file = File::open(path_ref).map_err(|e| { + SampleSourceError::IoError(std::io::Error::new( + e.kind(), + format!("{}: {}", path_ref.display(), e), + )) + })?; let mss = MediaSourceStream::new(Box::new(file), Default::default()); // Create a hint to help the format registry guess the format @@ -257,6 +263,45 @@ impl AudioSampleSource { } } + /// Reads and decodes the next packet for the given track. Handles ResetRequired by + /// resetting the decoder and retrying. Returns `Ok(Some((samples, channels)))` when + /// a packet was decoded, `Ok(None)` on EOF, or `Err` on other errors. + fn read_and_decode_next_packet_for_track( + format_reader: &mut dyn FormatReader, + decoder: &mut dyn symphonia::core::codecs::Decoder, + track_id: u32, + ) -> Result, usize)>, SampleSourceError> { + loop { + let packet = match Self::read_next_packet(format_reader) { + Ok(Some(packet)) => packet, + Ok(None) => return Ok(None), + Err(SampleSourceError::AudioError(SymphoniaError::ResetRequired)) => { + decoder.reset(); + continue; + } + Err(e) => return Err(e), + }; + if packet.track_id() != track_id { + continue; + } + let decoded = match decoder.decode(&packet) { + Ok(decoded) => decoded, + Err(SymphoniaError::ResetRequired) => { + decoder.reset(); + match decoder.decode(&packet) { + Ok(decoded) => decoded, + Err(e) => return Err(SampleSourceError::AudioError(e)), + } + } + Err(e) => return Err(SampleSourceError::AudioError(e)), + }; + let (samples, channels) = Self::decode_buffer_to_f32(decoded)?; + if channels > 0 && !samples.is_empty() { + return Ok(Some((samples, channels))); + } + } + } + /// Refills the sample buffer by reading a chunk from the audio file fn refill_buffer(&mut self) -> Result<(), SampleSourceError> { // Clear the buffer and reset position @@ -292,22 +337,15 @@ impl AudioSampleSource { // those as "no progress" would cause us to bail out early and never see // the real audio data. loop { - // Read the next packet - let packet = match Self::read_next_packet(self.format_reader.as_mut()) { - Ok(Some(packet)) => packet, - Ok(None) => { - // EOF reached - break; - } - Err(SampleSourceError::AudioError(SymphoniaError::ResetRequired)) => { - // The codec needs to be reset after a discontinuity (e.g., after seeking). - // Reset the decoder and continue reading the next packet. - self.decoder.reset(); - continue; - } + let (samples, _decoded_channels) = match Self::read_and_decode_next_packet_for_track( + self.format_reader.as_mut(), + self.decoder.as_mut(), + self.track_id, + ) { + Ok(Some((samples, ch))) => (samples, ch), + Ok(None) => break, Err(e) => { // For very small files, some errors might indicate EOF - // Check if we've read any samples - if not, this might be a false error if samples_read == 0 && self.sample_buffer.is_empty() { break; } @@ -315,30 +353,6 @@ impl AudioSampleSource { } }; - // Only process packets from the track we're interested in - if packet.track_id() != self.track_id { - continue; - } - - // Decode the packet - let decoded = match self.decoder.decode(&packet) { - Ok(decoded) => decoded, - Err(SymphoniaError::ResetRequired) => { - // The codec needs to be reset. Reset and retry decoding the same packet. - self.decoder.reset(); - match self.decoder.decode(&packet) { - Ok(decoded) => decoded, - Err(e) => return Err(SampleSourceError::AudioError(e)), - } - } - Err(e) => return Err(SampleSourceError::AudioError(e)), - }; - - // Convert the decoded buffer to f32 samples. Channel count is - // established during construction; here we only care about the - // sample data. - let (samples, _decoded_channels) = Self::decode_buffer_to_f32(decoded)?; - // Add samples to the buffer if !samples.is_empty() { let remaining = target_samples.saturating_sub(samples_read); @@ -396,48 +410,12 @@ impl AudioSampleSource { decoder: &mut dyn symphonia::core::codecs::Decoder, track_id: u32, ) -> Result<(u16, Vec), SampleSourceError> { - loop { - let packet = match Self::read_next_packet(format_reader) { - Ok(Some(packet)) => packet, - Ok(None) => { - // EOF reached - break; - } - Err(SampleSourceError::AudioError(SymphoniaError::ResetRequired)) => { - // The codec needs to be reset after a discontinuity. - // Reset the decoder and continue reading the next packet. - decoder.reset(); - continue; - } - Err(e) => return Err(e), - }; - - if packet.track_id() != track_id { - continue; - } - - let decoded = match decoder.decode(&packet) { - Ok(decoded) => decoded, - Err(SymphoniaError::ResetRequired) => { - // The codec needs to be reset. Reset and retry decoding the same packet. - decoder.reset(); - match decoder.decode(&packet) { - Ok(decoded) => decoded, - Err(e) => return Err(SampleSourceError::AudioError(e)), - } - } - Err(e) => return Err(SampleSourceError::AudioError(e)), - }; - - let (samples, channels) = Self::decode_buffer_to_f32(decoded)?; - if channels > 0 && !samples.is_empty() { - return Ok((channels as u16, samples)); - } + match Self::read_and_decode_next_packet_for_track(format_reader, decoder, track_id)? { + Some((samples, channels)) => Ok((channels as u16, samples)), + None => Err(SampleSourceError::SampleConversionFailed( + "Channels not specified".to_string(), + )), } - - Err(SampleSourceError::SampleConversionFailed( - "Channels not specified".to_string(), - )) } /// Converts a decoded AudioBufferRef to a Vec of interleaved samples diff --git a/src/config.rs b/src/config.rs index 10024ea..93481d4 100644 --- a/src/config.rs +++ b/src/config.rs @@ -14,6 +14,7 @@ mod audio; mod controller; mod dmx; +mod error; pub mod lighting; #[cfg(test)] pub mod midi; @@ -35,6 +36,7 @@ pub use self::controller::OscController; pub use self::controller::DEFAULT_GRPC_PORT; pub use self::dmx::Dmx; pub use self::dmx::Universe; +pub use self::error::ConfigError; pub use self::lighting::Lighting; pub use self::midi::Midi; pub use self::midi::MidiTransformer; diff --git a/src/config/error.rs b/src/config/error.rs new file mode 100644 index 0000000..15f618a --- /dev/null +++ b/src/config/error.rs @@ -0,0 +1,21 @@ +// Copyright (C) 2026 Michael Wilson +// +// This program is free software: you can redistribute it and/or modify it under +// the terms of the GNU General Public License as published by the Free Software +// Foundation, version 3. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +// FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License along with +// this program. If not, see . +// + +/// Typed error for config load/parse failures so callers can distinguish +/// e.g. file-not-found from parse errors without string matching. +#[derive(Debug, thiserror::Error)] +pub enum ConfigError { + #[error("Config load/parse error: {0}")] + Load(#[from] config::ConfigError), +} diff --git a/src/config/player.rs b/src/config/player.rs index 44979eb..b885573 100644 --- a/src/config/player.rs +++ b/src/config/player.rs @@ -21,8 +21,10 @@ use super::trackmappings::TrackMappings; use config::{Config, File}; use serde::Deserialize; use std::collections::HashMap; -use std::error::Error; use std::path::{Path, PathBuf}; + +use super::error::ConfigError; +use std::error::Error; use tracing::{error, info}; /// The configuration for the multitrack player. @@ -92,7 +94,7 @@ impl Player { } /// Deserializes a file from the path into a player configuration struct. - pub fn deserialize(path: &Path) -> Result> { + pub fn deserialize(path: &Path) -> Result { Ok(Config::builder() .add_source(File::from(path)) .build()? diff --git a/src/config/playlist.rs b/src/config/playlist.rs index 49c8b3c..71609e2 100644 --- a/src/config/playlist.rs +++ b/src/config/playlist.rs @@ -11,11 +11,13 @@ // You should have received a copy of the GNU General Public License along with // this program. If not, see . // -use std::{error::Error, path::Path}; +use std::path::Path; use config::{Config, File}; use serde::Deserialize; +use super::error::ConfigError; + /// The configuration for a playlist. #[derive(Deserialize)] pub struct Playlist { @@ -32,7 +34,7 @@ impl Playlist { } /// Parse a playlist from a YAML file. - pub fn deserialize(path: &Path) -> Result> { + pub fn deserialize(path: &Path) -> Result { Ok(Config::builder() .add_source(File::from(path)) .build()? diff --git a/src/config/song.rs b/src/config/song.rs index 50188f2..9bf349c 100644 --- a/src/config/song.rs +++ b/src/config/song.rs @@ -77,7 +77,7 @@ impl Song { } /// Deserializes a file from the path into a song configuration struct. - pub fn deserialize(path: &Path) -> Result> { + pub fn deserialize(path: &Path) -> Result { Ok(Config::builder() .add_source(File::from(path)) .build()? diff --git a/src/dmx/engine.rs b/src/dmx/engine.rs index ff5d969..324e9ef 100644 --- a/src/dmx/engine.rs +++ b/src/dmx/engine.rs @@ -195,13 +195,18 @@ impl Engine { // Check if timeline has finished (all cues processed) // and notify the waiting thread if so if !engine.timeline_finished.load(Ordering::Relaxed) { - let timeline = engine.current_song_timeline.lock().unwrap(); + let timeline = engine + .current_song_timeline + .lock() + .unwrap_or_else(|e| e.into_inner()); if let Some(ref tl) = *timeline { if tl.is_finished() { engine.timeline_finished.store(true, Ordering::Relaxed); // Notify the cancel handle so wait() returns - if let Some(ref cancel_handle) = - *engine.timeline_cancel_handle.lock().unwrap() + if let Some(ref cancel_handle) = *engine + .timeline_cancel_handle + .lock() + .unwrap_or_else(|e| e.into_inner()) { cancel_handle.notify(); } @@ -374,11 +379,17 @@ impl Engine { let timeline = LightingTimeline::new(all_shows); // Set the tempo map on the effect engine if the timeline has one if let Some(tempo_map) = timeline.tempo_map() { - let mut effect_engine = dmx_engine.effect_engine.lock().unwrap(); + let mut effect_engine = dmx_engine + .effect_engine + .lock() + .unwrap_or_else(|e| e.into_inner()); effect_engine.set_tempo_map(Some(tempo_map.clone())); } { - let mut current_timeline = dmx_engine.current_song_timeline.lock().unwrap(); + let mut current_timeline = dmx_engine + .current_song_timeline + .lock() + .unwrap_or_else(|e| e.into_inner()); *current_timeline = Some(timeline); } } @@ -510,7 +521,10 @@ impl Engine { // Store the cancel handle so the effects loop can notify when timeline finishes { - let mut handle = dmx_engine.timeline_cancel_handle.lock().unwrap(); + let mut handle = dmx_engine + .timeline_cancel_handle + .lock() + .unwrap_or_else(|e| e.into_inner()); *handle = Some(cancel_handle.clone()); } @@ -648,7 +662,7 @@ impl Engine { // Update the effects engine with a 44Hz frame time (matching Universe TARGET_HZ) let dt = Duration::from_secs_f64(1.0 / 44.0); let song_time = self.get_song_time(); - let mut effect_engine = self.effect_engine.lock().unwrap(); + let mut effect_engine = self.effect_engine.lock().unwrap_or_else(|e| e.into_inner()); let commands = effect_engine.update(dt, Some(song_time))?; // Group commands by universe @@ -679,7 +693,7 @@ impl Engine { &self, effect: crate::lighting::EffectInstance, ) -> Result<(), Box> { - let mut effect_engine = self.effect_engine.lock().unwrap(); + let mut effect_engine = self.effect_engine.lock().unwrap_or_else(|e| e.into_inner()); effect_engine.start_effect(effect)?; Ok(()) } @@ -687,9 +701,9 @@ impl Engine { /// Registers all fixtures from the current venue (thread-safe version) pub fn register_venue_fixtures_safe(&self) -> Result<(), Box> { if let Some(lighting_system) = &self.lighting_system { - let lighting_system = lighting_system.lock().unwrap(); + let lighting_system = lighting_system.lock().unwrap_or_else(|e| e.into_inner()); let fixture_infos = lighting_system.get_current_venue_fixtures()?; - let mut effect_engine = self.effect_engine.lock().unwrap(); + let mut effect_engine = self.effect_engine.lock().unwrap_or_else(|e| e.into_inner()); for fixture_info in fixture_infos { effect_engine.register_fixture(fixture_info); @@ -706,7 +720,10 @@ impl Engine { use crate::lighting::parser::LayerCommandType; let timeline_update = { - let mut current_timeline = self.current_song_timeline.lock().unwrap(); + let mut current_timeline = self + .current_song_timeline + .lock() + .unwrap_or_else(|e| e.into_inner()); if let Some(timeline) = current_timeline.as_mut() { timeline.update(song_time) } else { @@ -716,7 +733,7 @@ impl Engine { // Process layer commands first (they affect subsequent effects) if !timeline_update.layer_commands.is_empty() { - let mut effects_engine = self.effect_engine.lock().unwrap(); + let mut effects_engine = self.effect_engine.lock().unwrap_or_else(|e| e.into_inner()); for cmd in &timeline_update.layer_commands { match cmd.command_type { LayerCommandType::Clear => { @@ -761,7 +778,7 @@ impl Engine { // Process stop sequence commands if !timeline_update.stop_sequences.is_empty() { - let mut effects_engine = self.effect_engine.lock().unwrap(); + let mut effects_engine = self.effect_engine.lock().unwrap_or_else(|e| e.into_inner()); for sequence_name in &timeline_update.stop_sequences { effects_engine.stop_sequence(sequence_name); } @@ -782,7 +799,7 @@ impl Engine { for (effect, elapsed_time) in effects_sorted { // Resolve groups to fixtures if lighting system is available if let Some(lighting_system) = &self.lighting_system { - let mut lighting_system = lighting_system.lock().unwrap(); + let mut lighting_system = lighting_system.lock().unwrap_or_else(|e| e.into_inner()); let mut resolved_fixtures = Vec::new(); // Resolve each group to fixture names @@ -795,7 +812,8 @@ impl Engine { let mut resolved_effect = effect.clone(); resolved_effect.target_fixtures = resolved_fixtures; - let mut effect_engine = self.effect_engine.lock().unwrap(); + let mut effect_engine = + self.effect_engine.lock().unwrap_or_else(|e| e.into_inner()); if let Err(e) = effect_engine.start_effect_with_elapsed(resolved_effect, *elapsed_time) { @@ -803,7 +821,8 @@ impl Engine { } } else { // No lighting system, just start the effect as-is - let mut effect_engine = self.effect_engine.lock().unwrap(); + let mut effect_engine = + self.effect_engine.lock().unwrap_or_else(|e| e.into_inner()); if let Err(e) = effect_engine.start_effect_with_elapsed(effect.clone(), *elapsed_time) { @@ -817,7 +836,8 @@ impl Engine { for effect in timeline_update.effects { // Resolve groups to fixtures if lighting system is available if let Some(lighting_system) = &self.lighting_system { - let mut lighting_system = lighting_system.lock().unwrap(); + let mut lighting_system = + lighting_system.lock().unwrap_or_else(|e| e.into_inner()); let mut resolved_fixtures = Vec::new(); // Resolve each group to fixture names @@ -848,12 +868,15 @@ impl Engine { pub fn start_lighting_timeline_at(&self, start_time: Duration) { // Clear effects from previous song before starting new timeline { - let mut effect_engine = self.effect_engine.lock().unwrap(); + let mut effect_engine = self.effect_engine.lock().unwrap_or_else(|e| e.into_inner()); effect_engine.stop_all_effects(); } let timeline_update = { - let mut current_timeline = self.current_song_timeline.lock().unwrap(); + let mut current_timeline = self + .current_song_timeline + .lock() + .unwrap_or_else(|e| e.into_inner()); if let Some(timeline) = current_timeline.as_mut() { if start_time == Duration::ZERO { timeline.start(); @@ -885,7 +908,7 @@ impl Engine { // Process layer commands first (they affect subsequent effects) if !timeline_update.layer_commands.is_empty() { - let mut effects_engine = self.effect_engine.lock().unwrap(); + let mut effects_engine = self.effect_engine.lock().unwrap_or_else(|e| e.into_inner()); for cmd in &timeline_update.layer_commands { match cmd.command_type { LayerCommandType::Clear => { @@ -930,7 +953,7 @@ impl Engine { // Process stop sequence commands if !timeline_update.stop_sequences.is_empty() { - let mut effects_engine = self.effect_engine.lock().unwrap(); + let mut effects_engine = self.effect_engine.lock().unwrap_or_else(|e| e.into_inner()); for sequence_name in &timeline_update.stop_sequences { effects_engine.stop_sequence(sequence_name); } @@ -951,7 +974,7 @@ impl Engine { for (effect, elapsed_time) in effects_sorted { // Resolve groups to fixtures if lighting system is available if let Some(lighting_system) = &self.lighting_system { - let mut lighting_system = lighting_system.lock().unwrap(); + let mut lighting_system = lighting_system.lock().unwrap_or_else(|e| e.into_inner()); let mut resolved_fixtures = Vec::new(); // Resolve each group to fixture names @@ -964,7 +987,8 @@ impl Engine { let mut resolved_effect = effect.clone(); resolved_effect.target_fixtures = resolved_fixtures; - let mut effect_engine = self.effect_engine.lock().unwrap(); + let mut effect_engine = + self.effect_engine.lock().unwrap_or_else(|e| e.into_inner()); if let Err(e) = effect_engine.start_effect_with_elapsed(resolved_effect, *elapsed_time) { @@ -972,7 +996,8 @@ impl Engine { } } else { // No lighting system, just start the effect as-is - let mut effect_engine = self.effect_engine.lock().unwrap(); + let mut effect_engine = + self.effect_engine.lock().unwrap_or_else(|e| e.into_inner()); if let Err(e) = effect_engine.start_effect_with_elapsed(effect.clone(), *elapsed_time) { @@ -986,7 +1011,8 @@ impl Engine { for effect in timeline_update.effects { // Resolve groups to fixtures if lighting system is available if let Some(lighting_system) = &self.lighting_system { - let mut lighting_system = lighting_system.lock().unwrap(); + let mut lighting_system = + lighting_system.lock().unwrap_or_else(|e| e.into_inner()); let mut resolved_fixtures = Vec::new(); // Resolve each group to fixture names @@ -1015,7 +1041,10 @@ impl Engine { /// Stops the lighting timeline pub fn stop_lighting_timeline(&self) { - let mut current_timeline = self.current_song_timeline.lock().unwrap(); + let mut current_timeline = self + .current_song_timeline + .lock() + .unwrap_or_else(|e| e.into_inner()); if let Some(timeline) = current_timeline.as_mut() { timeline.stop(); } @@ -1028,25 +1057,34 @@ impl Engine { /// Updates the current song time pub fn update_song_time(&self, song_time: Duration) { - let mut current_time = self.current_song_time.lock().unwrap(); + let mut current_time = self + .current_song_time + .lock() + .unwrap_or_else(|e| e.into_inner()); *current_time = song_time; } /// Gets the current song time pub fn get_song_time(&self) -> Duration { - let current_time = self.current_song_time.lock().unwrap(); + let current_time = self + .current_song_time + .lock() + .unwrap_or_else(|e| e.into_inner()); *current_time } /// Get a formatted string listing all active effects pub fn format_active_effects(&self) -> String { - let effect_engine = self.effect_engine.lock().unwrap(); + let effect_engine = self.effect_engine.lock().unwrap_or_else(|e| e.into_inner()); effect_engine.format_active_effects() } /// Gets all cues from the current timeline with their times and indices pub fn get_timeline_cues(&self) -> Vec<(Duration, usize)> { - let timeline = self.current_song_timeline.lock().unwrap(); + let timeline = self + .current_song_timeline + .lock() + .unwrap_or_else(|e| e.into_inner()); if let Some(timeline) = timeline.as_ref() { timeline.cues() } else { @@ -1082,7 +1120,7 @@ impl Engine { loop { match receiver.recv() { Ok(message) => { - let mut client = client.lock().unwrap(); + let mut client = client.lock().unwrap_or_else(|e| e.into_inner()); if let Err(err) = client.send_dmx(message.universe, &message.buffer) { error!("error sending DMX to OLA: {}", err.to_string()) } @@ -1248,7 +1286,10 @@ mod test { }; { - let mut effect_engine = engine.effect_engine.lock().unwrap(); + let mut effect_engine = engine + .effect_engine + .lock() + .unwrap_or_else(|e| e.into_inner()); effect_engine.register_fixture(fixture_info); } @@ -1376,7 +1417,10 @@ mod test { }; { - let mut effect_engine = engine.effect_engine.lock().unwrap(); + let mut effect_engine = engine + .effect_engine + .lock() + .unwrap_or_else(|e| e.into_inner()); effect_engine.register_fixture(fixture_info); } @@ -1466,7 +1510,10 @@ mod test { // Register fixture through the effect engine { - let mut effect_engine = engine.effect_engine.lock().unwrap(); + let mut effect_engine = engine + .effect_engine + .lock() + .unwrap_or_else(|e| e.into_inner()); effect_engine.register_fixture(fixture_info); } // Drop the lock here @@ -1642,7 +1689,10 @@ mod test { )?; // Verify timeline was created (may be None if no lighting config) - let _timeline = engine.current_song_timeline.lock().unwrap(); + let _timeline = engine + .current_song_timeline + .lock() + .unwrap_or_else(|e| e.into_inner()); // Timeline may be None if no lighting configuration is provided // This is acceptable behavior for the test @@ -1687,7 +1737,7 @@ mod test { let _ = engine.update_effects(); // Verify that DMX commands were sent (if any) - let mock_client = mock_client.lock().unwrap(); + let mock_client = mock_client.lock().unwrap_or_else(|e| e.into_inner()); let _message = mock_client.get_last_message(); // DMX commands may or may not be generated depending on fixture registration @@ -1799,7 +1849,10 @@ mod test { // Register the fixture { - let mut effect_engine = engine.effect_engine.lock().unwrap(); + let mut effect_engine = engine + .effect_engine + .lock() + .unwrap_or_else(|e| e.into_inner()); effect_engine.register_fixture(fixture_info); } diff --git a/src/dmx/ola_client.rs b/src/dmx/ola_client.rs index a68bd17..c38ac75 100644 --- a/src/dmx/ola_client.rs +++ b/src/dmx/ola_client.rs @@ -66,17 +66,27 @@ impl MockOlaClient { /// Get the number of messages sent pub fn message_count(&self) -> usize { - self.sent_messages.lock().unwrap().len() + self.sent_messages + .lock() + .unwrap_or_else(|e| e.into_inner()) + .len() } /// Get the last sent message pub fn get_last_message(&self) -> Option { - self.sent_messages.lock().unwrap().last().cloned() + self.sent_messages + .lock() + .unwrap_or_else(|e| e.into_inner()) + .last() + .cloned() } /// Clear all sent messages pub fn clear_messages(&self) { - self.sent_messages.lock().unwrap().clear(); + self.sent_messages + .lock() + .unwrap_or_else(|e| e.into_inner()) + .clear(); } /// Get messages for a specific universe @@ -109,7 +119,10 @@ impl OlaClient for MockOlaClient { universe, buffer: buffer.clone(), }; - self.sent_messages.lock().unwrap().push(message); + self.sent_messages + .lock() + .unwrap_or_else(|e| e.into_inner()) + .push(message); Ok(()) } } diff --git a/src/dmx/universe.rs b/src/dmx/universe.rs index d0149a0..7182dbe 100644 --- a/src/dmx/universe.rs +++ b/src/dmx/universe.rs @@ -77,14 +77,12 @@ impl Universe { *self .global_dim_rate .read() - .expect("Unable to get global dim rate read lock") + .unwrap_or_else(|e| e.into_inner()) } #[cfg(test)] pub fn get_target_value(&self, channel_index: usize) -> f64 { - self.target - .read() - .expect("Unable to get universe target read lock")[channel_index] + self.target.read().unwrap_or_else(|e| e.into_inner())[channel_index] } /// Updates the dim speed. @@ -92,7 +90,7 @@ impl Universe { let mut global_dim_rate = self .global_dim_rate .write() - .expect("Unable to get global dim rate write lock"); + .unwrap_or_else(|e| e.into_inner()); if dim_rate.is_zero() { *global_dim_rate = 1.0 } else { @@ -108,21 +106,13 @@ impl Universe { 0 // Handle channel 0 case - map to index 0 }; let value = f64::from(value); - self.target - .write() - .expect("Unable to get universe target write lock")[channel_index] = value; - self.rates - .write() - .expect("Unable to get universe rates write lock")[channel_index] = if dim { - (value - - self - .current - .read() - .expect("unable to get universe current read lock")[channel_index]) + self.target.write().unwrap_or_else(|e| e.into_inner())[channel_index] = value; + self.rates.write().unwrap_or_else(|e| e.into_inner())[channel_index] = if dim { + (value - self.current.read().unwrap_or_else(|e| e.into_inner())[channel_index]) / *self .global_dim_rate .read() - .expect("Unable to get universe global dim rate") + .unwrap_or_else(|e| e.into_inner()) } else { 0.0 }; @@ -193,15 +183,9 @@ impl Universe { max_channels: &Arc, buffer: &mut DmxBuffer, ) -> bool { - let mut current = current - .write() - .expect("Unable to get current universe information write lock"); - let rates = rates - .read() - .expect("Unable to get rates universe information lock"); - let target = target - .read() - .expect("Unable to get target universe information lock"); + let mut current = current.write().unwrap_or_else(|e| e.into_inner()); + let rates = rates.read().unwrap_or_else(|e| e.into_inner()); + let target = target.read().unwrap_or_else(|e| e.into_inner()); let mut changed = false; for i in 0..usize::from(max_channels.load(Ordering::Relaxed)) { diff --git a/src/main.rs b/src/main.rs index 987ad18..92ba42f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -39,6 +39,7 @@ use proto::player::v1::{ use std::collections::HashSet; use std::env; use std::error::Error; +use std::fmt::Display; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::Duration; @@ -167,6 +168,18 @@ enum Commands { }, } +/// Prints a list of devices for the Devices and MidiDevices subcommands. +fn print_device_list(devices: Vec, empty_msg: &str) { + if devices.is_empty() { + println!("{}", empty_msg); + return; + } + println!("Devices:"); + for device in devices { + println!("- {}", device); + } +} + /// Verifies a light show file, optionally validating against a config file. fn verify_light_show(show_path: &str, config_path: Option<&str>) -> Result<(), Box> { let path = Path::new(show_path); @@ -326,30 +339,10 @@ async fn run() -> Result<(), Box> { } } Commands::Devices {} => { - let devices = audio::list_devices()?; - - if devices.is_empty() { - println!("No devices found."); - return Ok(()); - } - - println!("Devices:"); - for device in devices { - println!("- {}", device); - } + print_device_list(audio::list_devices()?, "No devices found."); } Commands::MidiDevices {} => { - let devices = midi::list_devices()?; - - if devices.is_empty() { - println!("No devices found."); - return Ok(()); - } - - println!("Devices:"); - for device in devices { - println!("- {}", device); - } + print_device_list(midi::list_devices()?, "No devices found."); } Commands::Playlist { repository_path, @@ -377,7 +370,12 @@ async fn run() -> Result<(), Box> { playlist_path = if let Some(parent) = player_path.parent() { parent.join(playlist_path) } else { - return Err("Unable to determining playlist path. Please specify the playlist path using an absolute path.")?; + return Err(format!( + "Unable to determine playlist path (config base path has no parent): {}. \ + Please specify the playlist path using an absolute path.", + playlist_path.display() + ) + .into()); }; }; let songs = songs::get_all_songs(&player_config.songs(player_path))?; @@ -533,9 +531,7 @@ fn print_song(song: Option) -> Result<(), Box> { async fn connect( host_port: Option, ) -> Result, Box> { - Ok(PlayerServiceClient::connect( - "http://".to_owned() - + &host_port.unwrap_or(format!("0.0.0.0:{}", config::DEFAULT_GRPC_PORT)), - ) - .await?) + // Use 127.0.0.1 for local client; 0.0.0.0 is a bind address, not valid for connect. + let addr = host_port.unwrap_or_else(|| format!("127.0.0.1:{}", config::DEFAULT_GRPC_PORT)); + Ok(PlayerServiceClient::connect(format!("http://{}", addr)).await?) } diff --git a/src/player.rs b/src/player.rs index fcee62a..a0bf04f 100644 --- a/src/player.rs +++ b/src/player.rs @@ -87,16 +87,16 @@ impl Player { let device = Self::wait_for_ok("audio device".to_string(), || { audio::get_device(config.audio()) - }); + })?; let dmx_engine = Self::wait_for_ok("dmx engine".to_string(), || { dmx::create_engine(config.dmx(), base_path) - }); + })?; let midi_device = Self::wait_for_ok("midi device".to_string(), || { midi::get_device(config.midi(), dmx_engine.clone()) - }); + })?; let status_events = Self::wait_for_ok("status events".to_string(), || { StatusEvents::new(config.status_events()) - }); + })?; // Initialize the sample engine if the audio device supports it let sample_engine = match (device.mixer(), device.source_sender()) { @@ -161,15 +161,36 @@ impl Player { Ok(player) } - /// Wait for constructor function to return an Ok(result) variant - fn wait_for_ok Result>(name: String, constructor: F) -> T { + /// Wait for constructor function to return an Ok(result) variant. + /// Respects MTRACK_DEVICE_RETRY_LIMIT: if set to N, tries at most N times then returns + /// the last error. If unset or 0, retries indefinitely (original behavior). + fn wait_for_ok(name: String, constructor: F) -> Result> + where + E: Display + Into>, + F: Fn() -> Result, + { + let max_attempts = std::env::var("MTRACK_DEVICE_RETRY_LIMIT") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(0); + let delay_ms = 500; + let mut attempt = 0u32; + loop { match constructor() { - Ok(ok) => return ok, + Ok(ok) => return Ok(ok), Err(err) => { warn!("Could not get {name}! {err}"); + attempt += 1; + if max_attempts > 0 && attempt >= max_attempts { + error!( + attempt = attempt, + limit = max_attempts, + "Retry limit reached, giving up" + ); + return Err(err.into()); + } info!("Retrying after delay."); - let delay_ms = 500; thread::sleep(Duration::from_millis(delay_ms)); } } @@ -350,7 +371,7 @@ impl Player { let cancel_handle = CancelHandle::new(); let cancel_handle_for_cleanup = cancel_handle.clone(); - let (play_tx, play_rx) = oneshot::channel::<()>(); + let (play_tx, play_rx) = oneshot::channel::>(); let join_handle = { let song = song.clone(); @@ -388,15 +409,30 @@ impl Player { let song = song.clone(); let midi_device = self.midi_device.clone(); tokio::spawn(async move { - if let Err(e) = play_rx.await { - error!(err = e.to_string(), "Error receiving signal"); - return; - } + let playback_ok = match play_rx.await { + Ok(Ok(())) => true, + Ok(Err(e)) => { + error!( + err = %e, + song = song.name(), + "Playback failed (e.g. audio error); playlist not advanced" + ); + false + } + Err(_e) => { + error!("Error receiving playback signal (receiver dropped)"); + false + } + }; let mut join = join_mutex.lock().await; let cancelled = cancel_handle_for_cleanup.is_cancelled(); - // Only move to the next playlist entry if this wasn't cancelled. + // Only move to the next playlist entry if not cancelled. On playback failure we still + // advance so the user is not stuck, but we already logged the error above. if !cancelled { + if !playback_ok { + warn!("Advancing playlist despite playback failure so user is not stuck"); + } Player::next_and_emit(midi_device.clone(), playlist); } @@ -409,6 +445,7 @@ impl Player { info!( song = song.name(), cancelled = cancelled, + playback_ok = playback_ok, "Song finished playing." ); @@ -430,7 +467,7 @@ impl Player { dmx_engine: Option>, song: Arc, cancel_handle: CancelHandle, - play_tx: oneshot::Sender<()>, + play_tx: oneshot::Sender>, start_time: Duration, ) { let song = song.clone(); @@ -451,24 +488,29 @@ impl Player { num_barriers })); + let audio_outcome: Arc>>> = + Arc::new(std::sync::Mutex::new(None)); + let audio_join_handle = { let device = device.clone(); let song = song.clone(); let barrier = barrier.clone(); let cancel_handle = cancel_handle.clone(); + let audio_outcome = audio_outcome.clone(); thread::spawn(move || { let song_name = song.name().to_string(); - - if let Err(e) = - device.play_from(song, &mappings, cancel_handle, barrier, start_time) - { + let result = device.play_from(song, &mappings, cancel_handle, barrier, start_time); + if let Err(ref e) = result { error!( err = e.as_ref(), song = song_name, "Error while playing song" ); } + let outcome = result.map_err(|e| e.to_string()); + let mut guard = audio_outcome.lock().unwrap_or_else(|e| e.into_inner()); + *guard = Some(outcome); }) }; @@ -530,23 +572,43 @@ impl Player { } } - if play_tx.send(()).is_err() { - error!("Error while sending to finish channel.") + let outcome = audio_outcome + .lock() + .unwrap_or_else(|e| e.into_inner()) + .take() + .unwrap_or_else(|| { + warn!( + "Audio thread did not set outcome (e.g. panicked before setting); \ + treating as success so playlist is not stuck" + ); + Ok(()) + }); + if play_tx.send(outcome).is_err() { + error!("Error while sending to finish channel (receiver dropped).") } } - /// Next goes to the next entry in the playlist. - pub async fn next(&self) -> Arc { + /// If a song is currently playing, returns `Some(current_song)` so the caller can short-circuit. + /// Returns `None` if the player is idle. + async fn if_playing_then_current_song(&self) -> Option> { let join = self.join.lock().await; - let playlist = self.get_playlist(); if join.is_some() { - let current = playlist.current(); + Some(self.get_playlist().current()) + } else { + None + } + } + + /// Next goes to the next entry in the playlist. + pub async fn next(&self) -> Arc { + if let Some(current) = self.if_playing_then_current_song().await { info!( current_song = current.name(), "Can't go to next, player is active." ); return current; } + let playlist = self.get_playlist(); let song = Player::next_and_emit(self.midi_device.clone(), playlist); self.load_song_samples(&song); song @@ -554,16 +616,14 @@ impl Player { /// Prev goes to the previous entry in the playlist. pub async fn prev(&self) -> Arc { - let join = self.join.lock().await; - let playlist = self.get_playlist(); - if join.is_some() { - let current = playlist.current(); + if let Some(current) = self.if_playing_then_current_song().await { info!( current_song = current.name(), "Can't go to previous, player is active." ); return current; } + let playlist = self.get_playlist(); let song = Player::prev_and_emit(self.midi_device.clone(), playlist); self.load_song_samples(&song); song @@ -600,11 +660,22 @@ impl Player { drop(play_handles.join); drop(join); - // Reset stop_run after a brief delay to allow any remaining cleanup to complete + // stop_run is cleared by the cleanup task when it runs (after play_rx fires). As a fallback + // (e.g. if cleanup never runs because play_tx was dropped or the blocking task hangs), + // clear stop_run after a timeout so the user can call stop() again. + const STOP_RUN_FALLBACK_TIMEOUT: Duration = Duration::from_secs(30); let stop_run = self.stop_run.clone(); tokio::spawn(async move { - tokio::time::sleep(Duration::from_millis(100)).await; - stop_run.store(false, Ordering::Relaxed); + tokio::time::sleep(STOP_RUN_FALLBACK_TIMEOUT).await; + if stop_run + .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) + .is_ok() + { + warn!( + "Stop cleanup did not complete within {:?}; cleared stop_run so further stop() can be attempted", + STOP_RUN_FALLBACK_TIMEOUT + ); + } }); Some(song) @@ -612,10 +683,9 @@ impl Player { /// Switch to the all songs playlist. pub async fn switch_to_all_songs(&self) { - let join = self.join.lock().await; - if join.is_some() { + if let Some(current) = self.if_playing_then_current_song().await { info!( - current_song = self.get_playlist().current().name(), + current_song = current.name(), "Can't switch to all songs, player is active." ); return; @@ -628,10 +698,9 @@ impl Player { /// Switch to the regular playlist. pub async fn switch_to_playlist(&self) { - let join = self.join.lock().await; - if join.is_some() { + if let Some(current) = self.if_playing_then_current_song().await { info!( - current_song = self.get_playlist().current().name(), + current_song = current.name(), "Can't switch to playlist, player is active." ); return; diff --git a/src/playlist.rs b/src/playlist.rs index 37baeb4..9d9dc3d 100644 --- a/src/playlist.rs +++ b/src/playlist.rs @@ -16,9 +16,15 @@ use tracing::{info, span, Level, Span}; use crate::config; use crate::songs::{Song, Songs}; use core::fmt; -use std::error::Error; use std::sync::{Arc, RwLock}; +/// Typed error for playlist creation so callers can distinguish e.g. missing song in registry. +#[derive(Debug, thiserror::Error)] +pub enum PlaylistError { + #[error("Song not in registry: {0}")] + SongNotFound(String), +} + /// Playlist is a playlist for use by a player. pub struct Playlist { /// The name of this playlist. @@ -53,11 +59,13 @@ impl Playlist { name: &str, config: &config::Playlist, registry: Arc, - ) -> Result, Box> { + ) -> Result, PlaylistError> { // Verify that each song in the playlist exists in the registry. let song_names = config.songs(); for song_name in song_names.iter() { - registry.get(song_name)?; + registry + .get(song_name) + .map_err(|_| PlaylistError::SongNotFound(song_name.clone()))?; } Ok(Arc::new(Playlist { @@ -84,7 +92,7 @@ impl Playlist { pub fn next(&self) -> Arc { let _enter = self.span.enter(); - let mut position = self.position.write().expect("unable to get lock"); + let mut position = self.position.write().unwrap_or_else(|e| e.into_inner()); if *position < self.songs.len() - 1 { *position += 1; } @@ -106,10 +114,7 @@ impl Playlist { /// Move to the previous element of the playlist. If we're at the beginning of the playlist, the position /// will not decrement. The song at the current position will be returned. pub fn prev(&self) -> Arc { - let mut position = self - .position - .write() - .expect("unable to get write lock on position"); + let mut position = self.position.write().unwrap_or_else(|e| e.into_inner()); if *position > 0 { *position -= 1; } @@ -130,7 +135,7 @@ impl Playlist { /// Return the song at the current position of the playlist. pub fn current(&self) -> Arc { - let position = self.position.read().expect("unable to get lock"); + let position = self.position.read().unwrap_or_else(|e| e.into_inner()); Arc::clone( &self .registry @@ -141,7 +146,7 @@ impl Playlist { } /// Creates an alphabetized playlist from all available songs. -pub fn from_songs(songs: Arc) -> Result, Box> { +pub fn from_songs(songs: Arc) -> Result, PlaylistError> { // The easiest thing to do here is to gather the names of all of the songs and pass them // to new. This is a little silly, since new is just going to double check that they // all exist and then do an explicit mapping each time. However, the easiest way to diff --git a/src/playsync.rs b/src/playsync.rs index 946dc55..0508091 100644 --- a/src/playsync.rs +++ b/src/playsync.rs @@ -49,20 +49,19 @@ impl Default for CancelHandle { impl CancelHandle { /// Returns true if the device process has been cancelled. pub fn is_cancelled(&self) -> bool { - *self.cancelled.lock().expect("Error getting lock") == CancelState::Cancelled + *self.cancelled.lock().unwrap_or_else(|e| e.into_inner()) == CancelState::Cancelled } /// Waits for the cancel handle to be cancelled or for finished to be set to true. pub fn wait(&self, finished: Arc) { - let _unused = self + let guard = self.cancelled.lock().unwrap_or_else(|e| e.into_inner()); + let _guard = self .condvar - .wait_while( - self.cancelled.lock().expect("Error getting lock"), - |cancelled| { - *cancelled == CancelState::Untouched && !finished.load(Ordering::Relaxed) - }, - ) - .expect("Error getting lock"); + .wait_while(guard, |cancelled| { + *cancelled == CancelState::Untouched && !finished.load(Ordering::Relaxed) + }) + .unwrap_or_else(|e| e.into_inner()); + drop(_guard); } /// Notifies the cancel handle to see if this the song has been cancelled or if the @@ -73,7 +72,7 @@ impl CancelHandle { /// Cancel the device process. pub fn cancel(&self) { - let mut cancel_state = self.cancelled.lock().expect("Error getting lock"); + let mut cancel_state = self.cancelled.lock().unwrap_or_else(|e| e.into_inner()); if *cancel_state == CancelState::Untouched { *cancel_state = CancelState::Cancelled; self.notify(); diff --git a/src/samples/engine.rs b/src/samples/engine.rs index 459f4fd..499eea7 100644 --- a/src/samples/engine.rs +++ b/src/samples/engine.rs @@ -214,7 +214,10 @@ impl SampleEngine { // Set up per-sample voice limit if configured if let Some(max_voices) = definition.max_voices() { - let mut vm = self.voice_manager.write().unwrap(); + let mut vm = self + .voice_manager + .write() + .unwrap_or_else(|e| e.into_inner()); vm.set_sample_limit(name, max_voices); } @@ -444,7 +447,10 @@ impl SampleEngine { // Acquire voice manager lock BEFORE adding to mixer to prevent race conditions // with concurrent triggers for the same sample (important for cut/monophonic mode) - let mut vm = self.voice_manager.write().unwrap(); + let mut vm = self + .voice_manager + .write() + .unwrap_or_else(|e| e.into_inner()); let to_stop = vm.add_voice(voice, sample.definition.retrigger()); // Schedule old voices to stop at the same time the new one starts (sample-accurate cut) @@ -492,7 +498,10 @@ impl SampleEngine { continue; } - let mut vm = self.voice_manager.write().unwrap(); + let mut vm = self + .voice_manager + .write() + .unwrap_or_else(|e| e.into_inner()); let to_stop = vm.handle_note_off(note, channel, behavior); drop(vm); @@ -515,7 +524,10 @@ impl SampleEngine { /// Stops all sample playback. pub fn stop_all(&self) { - let mut vm = self.voice_manager.write().unwrap(); + let mut vm = self + .voice_manager + .write() + .unwrap_or_else(|e| e.into_inner()); let to_stop = vm.clear(); drop(vm); @@ -532,7 +544,10 @@ impl SampleEngine { /// Returns the number of active voices. pub fn active_voice_count(&self) -> usize { - self.voice_manager.read().unwrap().active_count() + self.voice_manager + .read() + .unwrap_or_else(|e| e.into_inner()) + .active_count() } /// Returns the total memory used by loaded samples. diff --git a/src/samples/loader.rs b/src/samples/loader.rs index d195db3..c8c55cb 100644 --- a/src/samples/loader.rs +++ b/src/samples/loader.rs @@ -92,8 +92,12 @@ impl SampleLoader { info!(path = ?path, "Loading sample into memory"); - // Create a sample source from the file - let mut source = create_sample_source_from_file(path, None, DEFAULT_BUFFER_SIZE)?; + // Create a sample source from the file (include path in error) + let mut source = create_sample_source_from_file(path, None, DEFAULT_BUFFER_SIZE).map_err( + |e| -> Box { + format!("Failed to load sample {}: {}", path.display(), e).into() + }, + )?; let source_sample_rate = source.sample_rate(); let channel_count = source.channel_count(); @@ -170,7 +174,9 @@ impl SampleLoader { } Err(e) => { warn!(path = ?full_path, error = ?e, "Failed to load sample"); - return Err(e); + return Err( + format!("Failed to load sample {}: {}", full_path.display(), e).into(), + ); } } } diff --git a/src/songs.rs b/src/songs.rs index 5dfe1e8..09c296a 100644 --- a/src/songs.rs +++ b/src/songs.rs @@ -570,8 +570,10 @@ impl MidiPlayback { /// Returns a MIDI sheet for the given file. fn parse_midi(midi_file: &PathBuf) -> Result> { - let buf: Vec = fs::read(midi_file)?; - let smf = Smf::parse(&buf)?; + let buf: Vec = fs::read(midi_file) + .map_err(|e| format!("Failed to read MIDI file {}: {}", midi_file.display(), e))?; + let smf = Smf::parse(&buf) + .map_err(|e| format!("Failed to parse MIDI file {}: {}", midi_file.display(), e))?; let ticker = Ticker::try_from(smf.header.timing)?; let midi_sheet = MidiSheet { @@ -665,7 +667,11 @@ impl Track { let file_channel = config.file_channel(); let name = config.name(); - let source = create_sample_source_from_file(&track_file, None, 1024)?; + let source = create_sample_source_from_file(&track_file, None, 1024).map_err( + |e| -> Box { + format!("track \"{}\" (file {}): {}", name, track_file.display(), e).into() + }, + )?; // Extract all metadata before the source might be dropped or cause issues let sample_rate = source.sample_rate(); @@ -704,7 +710,9 @@ impl Track { assert_eq!(extension, "wav", "Expected file name to end in '.wav'"); let track_name = stem.to_string(); - let source = create_sample_source_from_file(track_path, None, 1024)?; + let source = create_sample_source_from_file(track_path, None, 1024).map_err( + |e| -> Box { format!("file {}: {}", track_path.display(), e).into() }, + )?; let sample_rate = source.sample_rate(); let sample_format = source.sample_format(); let duration = source.duration().unwrap_or(Duration::ZERO); @@ -873,7 +881,11 @@ pub fn get_all_songs(path: &Path) -> Result, Box> { if let Ok(song) = config::Song::deserialize(path.as_path()) { let song = match path.parent() { Some(parent) => Song::new(&parent.canonicalize()?, &song)?, - None => return Err("unable to get parent for path".into()), + None => { + return Err( + format!("unable to get parent for path: {}", path.display()).into() + ) + } }; songs.insert(song.name().to_string(), Arc::new(song)); }