Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ log = '0.4.11'
signal-hook = "0.3.17"
clap = { version = "3.0.13", features = ["derive"] }
hostname = '^0.4'
arrayvec = '0.7.1'
211 changes: 200 additions & 11 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,24 @@ use clap::Parser;
use hostname;
use signal_hook::{consts::SIGINT, consts::SIGTERM, consts::SIGUSR1, consts::SIGUSR2};
use simplelog::*;
use std::cell::RefCell;
use std::error::Error;
use std::ffi::CString;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::{thread, time};

use arrayvec::ArrayVec;
extern crate cec_rs;
use cec_rs::{
CecCommand, CecConnectionCfgBuilder, CecDeviceType, CecDeviceTypeVec, CecLogMessage,
CecLogicalAddress,
CecCommand, CecConnection, CecConnectionCfgBuilder, CecDatapacket, CecDeviceType,
CecDeviceTypeVec, CecLogMessage, CecLogicalAddress, CecOpcode,
};

use std::sync::atomic::AtomicUsize;

static GLOBAL_THREAD_COUNT: AtomicUsize = AtomicUsize::new(0);

#[derive(Parser, Debug)]
#[clap(version, about, long_about = None)]
struct Args {
Expand Down Expand Up @@ -54,6 +60,88 @@ fn on_command_received(command: CecCommand) {
"onCommandReceived: opcode: {:?}, initiator: {:?}",
command.opcode, command.initiator
);
// Note that Relaxed ordering doesn't synchronize anything
// except the global thread counter itself.
let old_thread_count = GLOBAL_THREAD_COUNT.fetch_add(1, Ordering::Relaxed);
// Note that this number may not be true at the moment of printing
// because some other thread may have changed static value already.
debug!("live threads: {}", old_thread_count + 1);

THREAD_CONNECTION.with(|connection| {
debug!(
"onCommandReceived: opcode type: {:?}",
std::any::type_name_of_val(&command.opcode)
);
debug!("onCommandReceived: try to borrow the connection: {:?}", std::any::type_name_of_val(&connection));
// Use the static CONNECTION variable instead of the thread-local one
if let Some(Some(conn)) = CONNECTION.get() {
debug!(
"onCommandReceived: Connection successfully borrowed from thread-local storage: {:?}",
std::any::type_name_of_val(&conn)
);
match command.opcode {
CecOpcode::GiveDevicePowerStatus => {
debug!(
"onCommandReceived: Got a GiveDevicePowerStatus command!!: opcode: {:?}, initiator: {:?}, destination: {:?}, ack: {:?}, eom: {:?}, parameters: {:?}, opcode_set?: {:?}, transmit_timeout: {:?}",
command.opcode, command.initiator, command.destination, command.ack, command.eom, command.parameters, command.opcode_set, command.transmit_timeout
);

let mut a = ArrayVec::new();
a.push(0x00); // CEC_POWER_STATUS_ON
let packet = CecDatapacket(a);

let _ = conn.transmit(CecCommand {
initiator: CecLogicalAddress::Playbackdevice1,
destination: command.initiator,
opcode: CecOpcode::ReportPowerStatus,
parameters: packet,
eom: true,
ack: false,
opcode_set: false,
transmit_timeout: time::Duration::from_secs(5),
});
}
CecOpcode::ReportPowerStatus => {
debug!(
"onCommandReceived: Got a ReportPowerStatus command!!: opcode: {:?}, initiator: {:?}, destination: {:?}, ack: {:?}, eom: {:?}, parameters: {:?}, opcode_set?: {:?}, transmit_timeout: {:?}",
command.opcode, command.initiator, command.destination, command.ack, command.eom, command.parameters, command.opcode_set, command.transmit_timeout
);
}
_ => {
debug!(
"onCommandReceived: Unknown command: opcode: {:?}, initiator: {:?}, destination: {:?}",
command.opcode, command.initiator, command.destination
);
}
}
}
else {
debug!("<b><red>Error:</> Could not borrow the connection: {:?}", std::any::type_name_of_val(&connection));

debug!("<b><red>Debug:</> RefCell wrapper type: {}", std::any::type_name_of_val(&connection));
// Get the contents of RefCell
let borrowed = connection.borrow();
debug!("<b><red>Debug:</> After borrow() is_some()??: {:#?} (type: {})",
borrowed.is_some(),
std::any::type_name_of_val(&borrowed)
);

// Look at the Option value inside
match *borrowed {
Some(ref cec_conn) => {
debug!("Connection exists (type: {}) with:", std::any::type_name_of_val(cec_conn));
debug!(" - Logical addresses: {:?}", cec_conn.get_logical_addresses());
debug!(" - Active source: {:?}", cec_conn.get_active_source());
let foo = cec_conn.is_active_source(CecLogicalAddress::Playbackdevice1);
debug!(" - Is Playbackdevice1 active source?: {:#?}", foo);
},
None => {
debug!("Connection is None!");
}
}
}
GLOBAL_THREAD_COUNT.fetch_sub(1, Ordering::Relaxed);
})
}

fn on_log_message(log_message: CecLogMessage) {
Expand Down Expand Up @@ -117,7 +205,14 @@ fn get_osd_hostname() -> String {
}
}

static CONNECTION: std::sync::OnceLock<Option<Arc<CecConnection>>> = std::sync::OnceLock::new();
thread_local! {
static THREAD_CONNECTION: RefCell<Option<Arc<CecConnection>>> = RefCell::new(None);
}

fn main() -> Result<(), Box<dyn Error>> {
let old_thread_count = GLOBAL_THREAD_COUNT.fetch_add(1, Ordering::Relaxed);
debug!("live threads at start of main(): {}", old_thread_count + 1);
let args = Args::parse();
logging_init(args.debug);
let device_path = args.input.unwrap().into_os_string().into_string().unwrap();
Expand All @@ -136,7 +231,7 @@ fn main() -> Result<(), Box<dyn Error>> {
.device_types(CecDeviceTypeVec::new(CecDeviceType::PlaybackDevice))
.build()
.unwrap();
let connection = cfg.open().unwrap();
// Setup signal handling flags
let usr1 = Arc::new(AtomicBool::new(false));
let usr2 = Arc::new(AtomicBool::new(false));
let terminate = Arc::new(AtomicBool::new(false));
Expand All @@ -145,28 +240,122 @@ fn main() -> Result<(), Box<dyn Error>> {
signal_hook::flag::register(SIGTERM, Arc::clone(&terminate))?;
signal_hook::flag::register(SIGINT, Arc::clone(&terminate))?;

info!("Active source: <b>{:?}</>", connection.get_active_source());
// Open CecConnection directly
let connection = match cfg.open() {
Ok(conn) => {
info!("Successfully opened CEC connection");
Some(Arc::new(conn))
}
Err(e) => {
error!("Failed to open CEC connection: {:?}", e);
None
}
};

// Store in static for access from callbacks and main thread
let _ = CONNECTION.set(connection.clone());

// Also store in thread-local for main thread use
THREAD_CONNECTION.with(|tconn| {
*tconn.borrow_mut() = connection.clone();
});

// Verify connection is working
let _res = connection
.map(|conn| {
info!(
"Am I active source? <b>{:?}</>",
conn.is_active_source(CecLogicalAddress::Playbackdevice1)
);
info!("Active source: <b>{:?}</>", conn.get_active_source());
Ok(()) as Result<(), Box<dyn Error>>
})
.unwrap_or_else(|| {
let err_msg = "Failed to open CEC connection";
error!("{}", err_msg);
Err(err_msg.to_string().into())
});

let last_thread_count = &GLOBAL_THREAD_COUNT.load(Ordering::Relaxed);
debug!("live threads at start of main(): {}", last_thread_count);
info!("Waiting for signals...");
loop {
if usr1.load(Ordering::Relaxed) {
info!("<b><green>USR1</>: powering <b>ON</>");
let last_thread_count = &GLOBAL_THREAD_COUNT.load(Ordering::Relaxed);
debug!("live threads at USR1 handler start: {}", last_thread_count);
usr1.store(false, Ordering::Relaxed);
let _ = connection.set_active_source(CecDeviceType::PlaybackDevice);
// This apparently set active source to Tv??
// let _ = connection.send_power_on_devices(CecLogicalAddress::Tv);
let _res = THREAD_CONNECTION.with(|conn| {
if let Some(connection) = conn.borrow().as_ref() {
let power_on_devices_result =
connection.send_power_on_devices(CecLogicalAddress::Tv);
match power_on_devices_result {
Ok(()) => {
info!("<b><green>Success!</> Sent power on command to Tv");
}
Err(e) => {
error!(
"<b><red>Error:</> Failed to send power on devices command! {:?}",
e
);
}
}
//the following call is working the same on my samsung, idk what is more proper:
let set_active_source_result: Result<(), cec_rs::CecConnectionResultError> =
connection.set_active_source(CecDeviceType::PlaybackDevice);
match set_active_source_result {
Ok(o) => {
info!("<b><green>Success!</> Set active source {:?}", o);
}
Err(e) => {
error!("<b><red>Error:</> Failed to set active source {:?}!", e);
}
}
info!(
"<i>connection.get_logical_addresses()</i> = {:?}",
connection.get_logical_addresses()
);
Ok(()) as Result<(), Box<dyn Error>>
} else {
let err_msg = "Failed to open CEC connection";
error!("{}", err_msg);
Err(err_msg.to_string().into())
}
});
}
if usr2.load(Ordering::Relaxed) {
info!("<b><green>USR2</>: powering <b>OFF</>");
usr2.store(false, Ordering::Relaxed);
if connection.get_active_source() == CecLogicalAddress::Playbackdevice1 {
let _ = connection.send_standby_devices(CecLogicalAddress::Tv);
} else {
info!("<i>reguest ignored</>: we are not an active source");
}
THREAD_CONNECTION.with(|conn| {
// Get mutable access to the thread_local RefCell contents and set it
// *conn.borrow_mut() = cfg.open().ok();
if let Some(connection) = conn.borrow().as_ref() {
info!(
"<b><green>Active source:</> <b>{:?}</>",
connection.get_active_source()
);
if connection.get_active_source() == CecLogicalAddress::Playbackdevice1 {
let _ = connection.send_standby_devices(CecLogicalAddress::Tv);
} else {
info!("<i>reguest ignored</>: we are not an active source");
}
Ok(()) as Result<(), Box<dyn Error>>
} else {
let err_msg = "Failed to open CEC connection";
error!("{}", err_msg);
Err(err_msg.to_string().into())
}
})?;
}
if terminate.load(Ordering::Relaxed) {
info!("Terminating");
break;
}
thread::sleep(time::Duration::from_secs(1));
}
Ok(())
Ok(()) as Result<(), Box<dyn Error>>

// Ok(())
}