From ff4d0d3a65f45a0cca5ebc97bb95e1ff762e2a77 Mon Sep 17 00:00:00 2001 From: James Cuzella Date: Wed, 3 Dec 2025 14:58:45 -0700 Subject: [PATCH 1/4] cargo: Add arrayvec 0.7.1 --- Cargo.lock | 1 + Cargo.toml | 1 + 2 files changed, 2 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 9a28dee..ecc3c06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -161,6 +161,7 @@ dependencies = [ name = "cec-dpms" version = "0.1.1" dependencies = [ + "arrayvec", "cec-rs", "clap", "hostname", diff --git a/Cargo.toml b/Cargo.toml index 3edffd5..a3ce4b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,3 +20,4 @@ log = '0.4.11' signal-hook = "0.3.17" clap = { version = "3.0.13", features = ["derive"] } hostname = '^0.4' +arrayvec = '0.7.1' From 882e0d61b9e353b15d361f948bd1bf2de47cbccf Mon Sep 17 00:00:00 2001 From: James Cuzella Date: Wed, 3 Dec 2025 14:50:42 -0700 Subject: [PATCH 2/4] cargo fmt & Initial implementation of thread-local storage CecConnection handling --- src/main.rs | 234 +++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 223 insertions(+), 11 deletions(-) diff --git a/src/main.rs b/src/main.rs index 0856795..42188c4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,16 +2,18 @@ use clap::Parser; use hostname; use signal_hook::{consts::SIGINT, consts::SIGTERM, consts::SIGUSR1, consts::SIGUSR2}; use simplelog::*; +use std::cell::RefCell; use std::error::Error; use std::ffi::CString; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::{thread, time}; +use arrayvec::ArrayVec; extern crate cec_rs; use cec_rs::{ - CecCommand, CecConnectionCfgBuilder, CecDeviceType, CecDeviceTypeVec, CecLogMessage, - CecLogicalAddress, + CecCommand, CecConnection, CecConnectionCfg, CecConnectionCfgBuilder, CecDatapacket, + CecDeviceType, CecDeviceTypeVec, CecLogMessage, CecLogicalAddress, CecOpcode, }; #[derive(Parser, Debug)] @@ -54,6 +56,80 @@ fn on_command_received(command: CecCommand) { "onCommandReceived: opcode: {:?}, initiator: {:?}", command.opcode, command.initiator ); + + CONNECTION.with(|connection| { + debug!( + "onCommandReceived: opcode type: {:?}", + std::any::type_name_of_val(&command.opcode) + ); + debug!("onCommandReceived: try to borrow the connection: {:?}", std::any::type_name_of_val(&connection)); + if let Some(conn) = connection.borrow().as_ref() { + debug!( + "onCommandReceived: Connection successfully borrowed from thread-local storage: {:?}", + std::any::type_name_of_val(&conn) + ); + match command.opcode { + CecOpcode::GiveDevicePowerStatus => { + debug!( + "onCommandReceived: Got a GiveDevicePowerStatus command!!: opcode: {:?}, initiator: {:?}, destination: {:?}, ack: {:?}, eom: {:?}, parameters: {:?}, opcode_set?: {:?}, transmit_timeout: {:?}", + command.opcode, command.initiator, command.destination, command.ack, command.eom, command.parameters, command.opcode_set, command.transmit_timeout + ); + + let mut a = ArrayVec::new(); + a.push(0x00); // CEC_POWER_STATUS_ON + let packet = CecDatapacket(a); + + let _ = conn.transmit(CecCommand { + initiator: CecLogicalAddress::Playbackdevice1, + destination: command.initiator, + opcode: CecOpcode::ReportPowerStatus, + parameters: packet, + eom: true, + ack: false, + opcode_set: false, + transmit_timeout: time::Duration::from_secs(5), + }); + } + CecOpcode::ReportPowerStatus => { + debug!( + "onCommandReceived: Got a ReportPowerStatus command!!: opcode: {:?}, initiator: {:?}, destination: {:?}, ack: {:?}, eom: {:?}, parameters: {:?}, opcode_set?: {:?}, transmit_timeout: {:?}", + command.opcode, command.initiator, command.destination, command.ack, command.eom, command.parameters, command.opcode_set, command.transmit_timeout + ); + } + _ => { + debug!( + "onCommandReceived: Unknown command: opcode: {:?}, initiator: {:?}, destination: {:?}", + command.opcode, command.initiator, command.destination + ); + } + } + } + else { + debug!("Error: Could not borrow the connection: {:?}", std::any::type_name_of_val(&connection)); + + debug!("Debug: RefCell wrapper type: {}", std::any::type_name_of_val(&connection)); + // Get the contents of RefCell + let borrowed = connection.borrow(); + debug!("Debug: After borrow() is_some()??: {:#?} (type: {})", + borrowed.is_some(), + std::any::type_name_of_val(&borrowed) + ); + + // Look at the Option value inside + match *borrowed { + Some(ref cec_conn) => { + debug!("Connection exists (type: {}) with:", std::any::type_name_of_val(cec_conn)); + debug!(" - Logical addresses: {:?}", cec_conn.get_logical_addresses()); + debug!(" - Active source: {:?}", cec_conn.get_active_source()); + let foo = cec_conn.is_active_source(CecLogicalAddress::Playbackdevice1); + debug!(" - Is Playbackdevice1 active source?: {:#?}", foo); + }, + None => { + debug!("Connection is None!"); + } + } + } +}) } fn on_log_message(log_message: CecLogMessage) { @@ -117,6 +193,58 @@ fn get_osd_hostname() -> String { } } +thread_local! { + static CONNECTION: RefCell> = RefCell::new(None); + static CONNECTION_CONFIG: RefCell> = RefCell::new(None); +} + +/// Initializes a `CecConnection` from `CONNECTION_CONFIG` and stores it in +/// thread-local storage as `CONNECTION` +/// +/// This function gets the `CecConnectionCfg` from thread-local storage +/// variable: `CONNECTION_CONFIG`. +/// +/// ## Example +/// +/// ```rust +/// use std::io; +/// fn main() -> io::Result<()> { +/// match initialize_connection() { +/// Some(()) => { Ok(()) } +/// None => { Err("Could not open CEC connection") } +/// } +/// } +/// ``` +/// +/// ## Errors +/// +/// None - If an error was encountered opening the CEC connection, then `None` +/// is returned. +fn initialize_connection() -> Option<()> { + CONNECTION.with(|conn| { + // Get mutable access to the thread_local RefCell contents and set it + CONNECTION_CONFIG.with(|opt_config| { + if let Some(cfg) = opt_config.borrow_mut().take() { + match cfg.open() { + Ok(c) => { + info!("Successfully opened CEC connection"); + *conn.borrow_mut() = Some(c); + Some(()) + } + Err(e) => { + error!("Failed to initialize CEC connection: {:?}", e); + *conn.borrow_mut() = None; + None + } + } + } else { + error!("Failed to get mutable reference to thread-local CecConnectionCfg"); + None + } + }) + }) +} + fn main() -> Result<(), Box> { let args = Args::parse(); logging_init(args.debug); @@ -136,7 +264,11 @@ fn main() -> Result<(), Box> { .device_types(CecDeviceTypeVec::new(CecDeviceType::PlaybackDevice)) .build() .unwrap(); - let connection = cfg.open().unwrap(); + CONNECTION_CONFIG.with(|config| { + // store it in thread-local RefCell's value + *config.borrow_mut() = Some(cfg); + }); + // Setup signal handling flags let usr1 = Arc::new(AtomicBool::new(false)); let usr2 = Arc::new(AtomicBool::new(false)); let terminate = Arc::new(AtomicBool::new(false)); @@ -145,22 +277,100 @@ fn main() -> Result<(), Box> { signal_hook::flag::register(SIGTERM, Arc::clone(&terminate))?; signal_hook::flag::register(SIGINT, Arc::clone(&terminate))?; - info!("Active source: {:?}", connection.get_active_source()); + // Initialize CecConnection and store it in thread-local CONNECTION + initialize_connection(); + + // Sharing same CEC connection with callback function threads, so only borrow it when needed + CONNECTION.with(|conn| { + // Get mutable access to the thread_local RefCell contents and set it + // *conn.borrow_mut() = cfg.open().ok(); + // connection = cfg.open().unwrap(); + if let Some(connection) = conn.borrow().as_ref() { + info!( + "Am I active source? {:?}", + connection.is_active_source(CecLogicalAddress::Playbackdevice1) + ); + info!("Active source: {:?}", connection.get_active_source()); + Ok(()) as Result<(), Box> + } else { + let err_msg = "Failed to open CEC connection"; + error!("{}", err_msg); + Err(err_msg.to_string().into()) + // Err(Box::new(std::io::Error::new( + // std::io::ErrorKind::Other, + // err_msg, + // ))) + } + })?; + info!("Waiting for signals..."); loop { if usr1.load(Ordering::Relaxed) { info!("USR1: powering ON"); usr1.store(false, Ordering::Relaxed); - let _ = connection.set_active_source(CecDeviceType::PlaybackDevice); + // This apparently set active source to Tv?? + // let _ = connection.send_power_on_devices(CecLogicalAddress::Tv); + CONNECTION.with(|conn| { + if let Some(connection) = conn.borrow().as_ref() { + let power_on_devices_result = + connection.send_power_on_devices(CecLogicalAddress::Tv); + match power_on_devices_result { + Ok(()) => { + info!("Success! Sent power on command to Tv"); + } + Err(e) => { + error!( + "Error: Failed to send power on devices command! {:?}", + e + ); + } + } + //the following call is working the same on my samsung, idk what is more proper: + let set_active_source_result: Result<(), cec_rs::CecConnectionResultError> = + connection.set_active_source(CecDeviceType::PlaybackDevice); + match set_active_source_result { + Ok(o) => { + info!("Success! Set active source {:?}", o); + } + Err(e) => { + error!("Error: Failed to set active source {:?}!", e); + } + } + info!( + "connection.get_logical_addresses() = {:?}", + connection.get_logical_addresses() + ); + Ok(()) as Result<(), Box> + } else { + let err_msg = "Failed to open CEC connection"; + error!("{}", err_msg); + Err(err_msg.to_string().into()) + } + })?; } if usr2.load(Ordering::Relaxed) { info!("USR2: powering OFF"); usr2.store(false, Ordering::Relaxed); - if connection.get_active_source() == CecLogicalAddress::Playbackdevice1 { - let _ = connection.send_standby_devices(CecLogicalAddress::Tv); - } else { - info!("reguest ignored: we are not an active source"); - } + CONNECTION.with(|conn| { + // Get mutable access to the thread_local RefCell contents and set it + // *conn.borrow_mut() = cfg.open().ok(); + if let Some(connection) = conn.borrow().as_ref() { + info!( + "Active source: {:?}", + connection.get_active_source() + ); + if connection.get_active_source() == CecLogicalAddress::Playbackdevice1 { + let _ = connection.send_standby_devices(CecLogicalAddress::Tv); + } else { + info!("reguest ignored: we are not an active source"); + } + Ok(()) as Result<(), Box> + } else { + let err_msg = "Failed to open CEC connection"; + error!("{}", err_msg); + Err(err_msg.to_string().into()) + } + })?; } if terminate.load(Ordering::Relaxed) { info!("Terminating"); @@ -168,5 +378,7 @@ fn main() -> Result<(), Box> { } thread::sleep(time::Duration::from_secs(1)); } - Ok(()) + Ok(()) as Result<(), Box> + + // Ok(()) } From a08b94369ab31ee560532a3a8366af492eb3983c Mon Sep 17 00:00:00 2001 From: James Cuzella Date: Tue, 2 Dec 2025 23:11:26 -0700 Subject: [PATCH 3/4] debug: Add global atomic thread counter --- src/main.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/main.rs b/src/main.rs index 42188c4..fc27dbe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,6 +16,10 @@ use cec_rs::{ CecDeviceType, CecDeviceTypeVec, CecLogMessage, CecLogicalAddress, CecOpcode, }; +use std::sync::atomic::AtomicUsize; + +static GLOBAL_THREAD_COUNT: AtomicUsize = AtomicUsize::new(0); + #[derive(Parser, Debug)] #[clap(version, about, long_about = None)] struct Args { @@ -56,6 +60,12 @@ fn on_command_received(command: CecCommand) { "onCommandReceived: opcode: {:?}, initiator: {:?}", command.opcode, command.initiator ); + // Note that Relaxed ordering doesn't synchronize anything + // except the global thread counter itself. + let old_thread_count = GLOBAL_THREAD_COUNT.fetch_add(1, Ordering::Relaxed); + // Note that this number may not be true at the moment of printing + // because some other thread may have changed static value already. + debug!("live threads: {}", old_thread_count + 1); CONNECTION.with(|connection| { debug!( @@ -129,6 +139,7 @@ fn on_command_received(command: CecCommand) { } } } + GLOBAL_THREAD_COUNT.fetch_sub(1, Ordering::Relaxed); }) } @@ -246,6 +257,8 @@ fn initialize_connection() -> Option<()> { } fn main() -> Result<(), Box> { + let old_thread_count = GLOBAL_THREAD_COUNT.fetch_add(1, Ordering::Relaxed); + debug!("live threads at start of main(): {}", old_thread_count + 1); let args = Args::parse(); logging_init(args.debug); let device_path = args.input.unwrap().into_os_string().into_string().unwrap(); @@ -303,10 +316,14 @@ fn main() -> Result<(), Box> { } })?; + let last_thread_count = &GLOBAL_THREAD_COUNT.load(Ordering::Relaxed); + debug!("live threads at start of main(): {}", last_thread_count); info!("Waiting for signals..."); loop { if usr1.load(Ordering::Relaxed) { info!("USR1: powering ON"); + let last_thread_count = &GLOBAL_THREAD_COUNT.load(Ordering::Relaxed); + debug!("live threads at USR1 handler start: {}", last_thread_count); usr1.store(false, Ordering::Relaxed); // This apparently set active source to Tv?? // let _ = connection.send_power_on_devices(CecLogicalAddress::Tv); From 5742dec4eef4270f7023c95ca842c9fe1e762c7d Mon Sep 17 00:00:00 2001 From: James Cuzella Date: Tue, 2 Dec 2025 23:14:49 -0700 Subject: [PATCH 4/4] connection: Refactor conn handling w/static OnceLock>> Thread-local wasn't working to truly share the _same_ connection across threads because the mutable borrow was failing. To complicate matters, `CecConnection` and `CecConnectionCfg` don't implement Copy or Clone traits, nor are they Sync. `Option>` implements the Clone trait, so relying on this to store an Arc to be shared across threads works. Add to this using `OnceLock` for the global static variable to be initialized only once. Thus, we create a singleton pattern: the connection initialized in `main()` once becomes the single shared connection instance, for use in the `on_command_received()` handler function across libcec threads. Note: LazyLock was tried, but the initializer proc/closure was always failing to borrow and reliably access thread-local variables. Since initialization may have been called from multiple threads, any dereferencing call will block the calling thread if another initialization routine is currently running. The way that lower-level cec_rs and libcec-sys libraries implement the callback functions (e.g. `ICECCallbacks::commandReceived`) is to execute them in another thread. However, due to the way that Rust type & borrow checked code is run, we would otherwise lose access to the same `CecConnection` object across the ffi boundary: Rust main() -> libcec ffi C code -> thread(s?) -> on_command_received() Using this `OnceLock` singleton pattern with `Option>` seems to work. --- src/main.rs | 114 +++++++++++++++++----------------------------------- 1 file changed, 37 insertions(+), 77 deletions(-) diff --git a/src/main.rs b/src/main.rs index fc27dbe..fefb8a4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,8 +12,8 @@ use std::{thread, time}; use arrayvec::ArrayVec; extern crate cec_rs; use cec_rs::{ - CecCommand, CecConnection, CecConnectionCfg, CecConnectionCfgBuilder, CecDatapacket, - CecDeviceType, CecDeviceTypeVec, CecLogMessage, CecLogicalAddress, CecOpcode, + CecCommand, CecConnection, CecConnectionCfgBuilder, CecDatapacket, CecDeviceType, + CecDeviceTypeVec, CecLogMessage, CecLogicalAddress, CecOpcode, }; use std::sync::atomic::AtomicUsize; @@ -67,13 +67,14 @@ fn on_command_received(command: CecCommand) { // because some other thread may have changed static value already. debug!("live threads: {}", old_thread_count + 1); - CONNECTION.with(|connection| { + THREAD_CONNECTION.with(|connection| { debug!( "onCommandReceived: opcode type: {:?}", std::any::type_name_of_val(&command.opcode) ); debug!("onCommandReceived: try to borrow the connection: {:?}", std::any::type_name_of_val(&connection)); - if let Some(conn) = connection.borrow().as_ref() { + // Use the static CONNECTION variable instead of the thread-local one + if let Some(Some(conn)) = CONNECTION.get() { debug!( "onCommandReceived: Connection successfully borrowed from thread-local storage: {:?}", std::any::type_name_of_val(&conn) @@ -204,56 +205,9 @@ fn get_osd_hostname() -> String { } } +static CONNECTION: std::sync::OnceLock>> = std::sync::OnceLock::new(); thread_local! { - static CONNECTION: RefCell> = RefCell::new(None); - static CONNECTION_CONFIG: RefCell> = RefCell::new(None); -} - -/// Initializes a `CecConnection` from `CONNECTION_CONFIG` and stores it in -/// thread-local storage as `CONNECTION` -/// -/// This function gets the `CecConnectionCfg` from thread-local storage -/// variable: `CONNECTION_CONFIG`. -/// -/// ## Example -/// -/// ```rust -/// use std::io; -/// fn main() -> io::Result<()> { -/// match initialize_connection() { -/// Some(()) => { Ok(()) } -/// None => { Err("Could not open CEC connection") } -/// } -/// } -/// ``` -/// -/// ## Errors -/// -/// None - If an error was encountered opening the CEC connection, then `None` -/// is returned. -fn initialize_connection() -> Option<()> { - CONNECTION.with(|conn| { - // Get mutable access to the thread_local RefCell contents and set it - CONNECTION_CONFIG.with(|opt_config| { - if let Some(cfg) = opt_config.borrow_mut().take() { - match cfg.open() { - Ok(c) => { - info!("Successfully opened CEC connection"); - *conn.borrow_mut() = Some(c); - Some(()) - } - Err(e) => { - error!("Failed to initialize CEC connection: {:?}", e); - *conn.borrow_mut() = None; - None - } - } - } else { - error!("Failed to get mutable reference to thread-local CecConnectionCfg"); - None - } - }) - }) + static THREAD_CONNECTION: RefCell>> = RefCell::new(None); } fn main() -> Result<(), Box> { @@ -277,10 +231,6 @@ fn main() -> Result<(), Box> { .device_types(CecDeviceTypeVec::new(CecDeviceType::PlaybackDevice)) .build() .unwrap(); - CONNECTION_CONFIG.with(|config| { - // store it in thread-local RefCell's value - *config.borrow_mut() = Some(cfg); - }); // Setup signal handling flags let usr1 = Arc::new(AtomicBool::new(false)); let usr2 = Arc::new(AtomicBool::new(false)); @@ -290,31 +240,41 @@ fn main() -> Result<(), Box> { signal_hook::flag::register(SIGTERM, Arc::clone(&terminate))?; signal_hook::flag::register(SIGINT, Arc::clone(&terminate))?; - // Initialize CecConnection and store it in thread-local CONNECTION - initialize_connection(); + // Open CecConnection directly + let connection = match cfg.open() { + Ok(conn) => { + info!("Successfully opened CEC connection"); + Some(Arc::new(conn)) + } + Err(e) => { + error!("Failed to open CEC connection: {:?}", e); + None + } + }; + + // Store in static for access from callbacks and main thread + let _ = CONNECTION.set(connection.clone()); - // Sharing same CEC connection with callback function threads, so only borrow it when needed - CONNECTION.with(|conn| { - // Get mutable access to the thread_local RefCell contents and set it - // *conn.borrow_mut() = cfg.open().ok(); - // connection = cfg.open().unwrap(); - if let Some(connection) = conn.borrow().as_ref() { + // Also store in thread-local for main thread use + THREAD_CONNECTION.with(|tconn| { + *tconn.borrow_mut() = connection.clone(); + }); + + // Verify connection is working + let _res = connection + .map(|conn| { info!( "Am I active source? {:?}", - connection.is_active_source(CecLogicalAddress::Playbackdevice1) + conn.is_active_source(CecLogicalAddress::Playbackdevice1) ); - info!("Active source: {:?}", connection.get_active_source()); + info!("Active source: {:?}", conn.get_active_source()); Ok(()) as Result<(), Box> - } else { + }) + .unwrap_or_else(|| { let err_msg = "Failed to open CEC connection"; error!("{}", err_msg); Err(err_msg.to_string().into()) - // Err(Box::new(std::io::Error::new( - // std::io::ErrorKind::Other, - // err_msg, - // ))) - } - })?; + }); let last_thread_count = &GLOBAL_THREAD_COUNT.load(Ordering::Relaxed); debug!("live threads at start of main(): {}", last_thread_count); @@ -327,7 +287,7 @@ fn main() -> Result<(), Box> { usr1.store(false, Ordering::Relaxed); // This apparently set active source to Tv?? // let _ = connection.send_power_on_devices(CecLogicalAddress::Tv); - CONNECTION.with(|conn| { + let _res = THREAD_CONNECTION.with(|conn| { if let Some(connection) = conn.borrow().as_ref() { let power_on_devices_result = connection.send_power_on_devices(CecLogicalAddress::Tv); @@ -363,12 +323,12 @@ fn main() -> Result<(), Box> { error!("{}", err_msg); Err(err_msg.to_string().into()) } - })?; + }); } if usr2.load(Ordering::Relaxed) { info!("USR2: powering OFF"); usr2.store(false, Ordering::Relaxed); - CONNECTION.with(|conn| { + THREAD_CONNECTION.with(|conn| { // Get mutable access to the thread_local RefCell contents and set it // *conn.borrow_mut() = cfg.open().ok(); if let Some(connection) = conn.borrow().as_ref() {