Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 20 additions & 13 deletions alas/alas/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,35 @@ use std::io::Write;
use std::sync::Arc;
use tokio;
use tokio::signal;
use tokio::sync::{ broadcast, RwLock };
use tokio::sync::{ broadcast, Mutex, RwLock };

#[tokio::main]
async fn main() {
// TODO: this was originally designed to be Redux-like but then it turned evil. Refactor.
let state = Arc::new(RwLock::new(AlasState::new()));
let (event_bus, _) = broadcast::channel::<AlasMessage>(256);

// Conditionally initialize redundancy manager and WireGuard interface
let _redundancy_manager = if state.read().await.config.redundancy.is_some() {
let manager = redundancy::RedundancyManager::new();
if let Err(e) = manager.initialize(&state).await {
eprintln!("Failed to initialize redundancy manager: {}", e);
}
if let Err(e) = manager.start_wireguard_interface().await {
// Always initialize redundancy manager (will create default config if needed)
let redundancy_manager = redundancy::RedundancyManager::new();
if let Err(e) = redundancy_manager.initialize(&state).await {
eprintln!("Failed to initialize redundancy manager: {}", e);
}

// Only start WireGuard interface if config is complete
let config_is_complete = state.read().await.config.redundancy
.as_ref()
.map(|c| !c.is_default())
.unwrap_or(false);

if config_is_complete {
if let Err(e) = redundancy_manager.start_wireguard_interface().await {
eprintln!("Failed to start WireGuard interface: {}", e);
}
Some(manager)
} else {
println!("ℹ️ Redundancy not configured");
None
};
println!("ℹ️ Redundancy not configured, use web UI to set up");
}

let redundancy_manager = Arc::new(Mutex::new(redundancy_manager));

let lcd_thread = lcd_display::start(event_bus.clone(), &state).await;

Expand Down Expand Up @@ -61,7 +68,7 @@ async fn main() {
// Start webhook listener
start_webhook_listener(event_bus.subscribe(), state.clone()).await;

let web_server = web_server::run_rocket_server(event_bus.clone(), &state).await;
let web_server = web_server::run_rocket_server(event_bus.clone(), &state, redundancy_manager.clone()).await;

// Wait for exit here! All code below is for clean-up!

Expand Down
11 changes: 2 additions & 9 deletions alas/alas/src/web_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,12 @@ async fn go() -> &'static str {

pub async fn run_rocket_server(
bus: Sender<AlasMessage>,
alas_state: &SafeState
alas_state: &SafeState,
redundancy_manager: Arc<Mutex<RedundancyManager>>
) -> JoinHandle<Rocket<Ignite>> {
println!("Starting web server...");
let tokio_state = alas_state.clone();
tokio::spawn(async move {
// Initialize RedundancyManager
let redundancy_manager = RedundancyManager::new();
if let Err(e) = redundancy_manager.initialize(&tokio_state).await {
eprintln!("Failed to initialize redundancy manager: {}", e);
// log::error!("Failed to initialize redundancy manager: {}", e);
}
let redundancy_manager = Arc::new(Mutex::new(redundancy_manager));

let allowed_origins = AllowedOrigins::some_exact(
&["http://localhost:5173", "https://alas.krdf.org", "http://alasradio.local:8000"]
);
Expand Down
90 changes: 35 additions & 55 deletions alas/alas_lib/src/audio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,8 @@ pub async fn start(
let config_reset = Arc::new(AtomicBool::new(false));
let mut audio_last_seen = UNIX_EPOCH;

// Check which services are enabled
let enabled = alas_state.blocking_read().config.enabled_services();
let num_consumers = (enabled.recording as usize) + (enabled.streaming as usize);

let mut audio_bus = if num_consumers > 0 {
Some(Bus::<Vec<f32>>::new(2204 * 30))
} else {
println!("ℹ️ No audio consumers configured, audio processing disabled");
None
};
// Always create audio bus - threads always run but handle missing config gracefully
let mut audio_bus = Bus::<Vec<f32>>::new(2204 * 30);

// Config watch
let mut subscriber = bus.subscribe();
Expand Down Expand Up @@ -80,41 +72,23 @@ pub async fn start(
println!("✅ Exited config thread");
});

// Conditionally spawn Icecast streaming thread
let icecast = if enabled.streaming {
if let Some(ref mut audio_bus_ref) = audio_bus {
let icecast_rx = audio_bus_ref.add_rx();
Some(start_icecast_thread(
icecast_rx,
desire_to_broadcast.clone(),
alas_state.clone(),
bus.clone(),
config_reset.clone()
))
} else {
None
}
} else {
println!("ℹ️ Icecast not configured, streaming disabled");
None
};

// Conditionally spawn file saving thread
let record = if enabled.recording {
if let Some(ref mut audio_bus_ref) = audio_bus {
let file_rx = audio_bus_ref.add_rx();
Some(start_file_save_thread(
file_rx,
desire_to_broadcast.clone(),
bus.clone()
))
} else {
None
}
} else {
println!("ℹ️ Recording not configured, recording disabled");
None
};
// Always spawn Icecast streaming thread - it handles missing config gracefully
let icecast_rx = audio_bus.add_rx();
let icecast = start_icecast_thread(
icecast_rx,
desire_to_broadcast.clone(),
alas_state.clone(),
bus.clone(),
config_reset.clone()
);

// Always spawn file saving thread
let file_rx = audio_bus.add_rx();
let record = start_file_save_thread(
file_rx,
desire_to_broadcast.clone(),
bus.clone()
);

let host = cpal::default_host();

Expand Down Expand Up @@ -152,7 +126,7 @@ pub async fn start(
&alas_state,
&mut desire_to_broadcast,
&mut audio_last_seen,
audio_bus.as_mut()
&mut audio_bus
)
},
err_fn,
Expand Down Expand Up @@ -190,8 +164,8 @@ pub async fn start(

AudioThreads {
config_thread,
icecast,
recording: record,
icecast: Some(icecast),
recording: Some(record),
}
})
}
Expand Down Expand Up @@ -296,11 +270,19 @@ fn start_icecast_thread(
};

if desire_to_broadcast.load(Ordering::Relaxed) {
// Check if icecast config exists before attempting connection
let has_config = state.blocking_read().config.icecast.is_some();
if !has_config {
// No config - just drain the audio bus and continue
continue;
}

let icecast_connection = match connect_to_icecast(&state) {
Some(conn) => conn,
None => {
println!("ℹ️ Cannot connect to Icecast, config not available");
break;
println!("ℹ️ Cannot connect to Icecast, will retry");
std::thread::sleep(std::time::Duration::from_secs(1));
continue;
}
};
config_reset.store(false, Ordering::Relaxed);
Expand Down Expand Up @@ -454,7 +436,7 @@ fn handle_samples<T>(
state: &SafeState,
desire_to_broadcast: &AtomicBool,
audio_last_seen: &mut SystemTime,
sender: Option<&mut Bus<Vec<T>>>
sender: &mut Bus<Vec<T>>
)
where T: Sample
{
Expand Down Expand Up @@ -495,10 +477,8 @@ fn handle_samples<T>(
}
}

// Only broadcast to audio bus if it exists (i.e., if we have consumers)
if let Some(sender) = sender {
sender.broadcast(input.to_vec().clone());
}
// Always broadcast to audio bus - threads always run
sender.broadcast(input.to_vec().clone());
}

fn calculate_rms_levels<T>(data: &[T], channels: usize) -> (f32, f32) where T: cpal::Sample {
Expand Down
Binary file added cm5/CM5IO-backups/CM5IO-2026-01-17_212627.zip
Binary file not shown.
Binary file added cm5/CM5IO-backups/CM5IO-2026-01-17_213632.zip
Binary file not shown.
Binary file added cm5/CM5IO-backups/CM5IO-2026-01-18_185301.zip
Binary file not shown.