diff --git a/integration-tests/tests/translator_integration.rs b/integration-tests/tests/translator_integration.rs index 49ef69216..20ea07096 100644 --- a/integration-tests/tests/translator_integration.rs +++ b/integration-tests/tests/translator_integration.rs @@ -1950,3 +1950,60 @@ async fn tproxy_sends_single_open_extended_mining_channel_in_aggregated_mode() { shutdown_all!(pool, tproxy); } + +// This test verifies whether we can spawn multiple tproxy in the +// same process. +// +// More info here: https://github.com/stratum-mining/sv2-apps/issues/430 +#[tokio::test] +async fn multiple_tproxy_sessions() { + start_tracing(); + let (_tp, tp_addr) = start_template_provider(None, DifficultyLevel::High); + let (pool, pool_addr, _) = start_pool(sv2_tp_config(tp_addr), vec![], vec![], false).await; + + let (pool_translator_sniffer_1, pool_translator_sniffer_addr_1) = + start_sniffer("0", pool_addr, false, vec![], None); + let (tproxy_1, _, _) = start_sv2_translator( + &[pool_translator_sniffer_addr_1], + true, + vec![], + vec![], + None, + false, + ) + .await; + + let (pool_translator_sniffer_2, pool_translator_sniffer_addr_2) = + start_sniffer("0", pool_addr, false, vec![], None); + let (tproxy_2, _, _) = start_sv2_translator( + &[pool_translator_sniffer_addr_2], + true, + vec![], + vec![], + None, + false, + ) + .await; + + pool_translator_sniffer_1 + .wait_for_message_type(MessageDirection::ToUpstream, MESSAGE_TYPE_SETUP_CONNECTION) + .await; + pool_translator_sniffer_1 + .wait_for_message_type( + MessageDirection::ToDownstream, + MESSAGE_TYPE_SETUP_CONNECTION_SUCCESS, + ) + .await; + + pool_translator_sniffer_2 + .wait_for_message_type(MessageDirection::ToUpstream, MESSAGE_TYPE_SETUP_CONNECTION) + .await; + pool_translator_sniffer_2 + .wait_for_message_type( + MessageDirection::ToDownstream, + MESSAGE_TYPE_SETUP_CONNECTION_SUCCESS, + ) + .await; + + shutdown_all!(pool, tproxy_1, tproxy_2); +} diff --git a/miner-apps/jd-client/src/lib/channel_manager/downstream_message_handler.rs b/miner-apps/jd-client/src/lib/channel_manager/downstream_message_handler.rs index fea08a143..93fb153bb 100644 --- a/miner-apps/jd-client/src/lib/channel_manager/downstream_message_handler.rs +++ b/miner-apps/jd-client/src/lib/channel_manager/downstream_message_handler.rs @@ -34,7 +34,6 @@ use crate::{ ChannelManager, ChannelManagerChannel, SharesOrderedByDiff, FULL_EXTRANONCE_SIZE, }, error::{self, JDCError, JDCErrorKind}, - jd_mode::{get_jd_mode, JdMode}, utils::{add_share_to_cache, create_close_channel_msg}, }; @@ -121,14 +120,12 @@ impl RouteMessageTo<'_> { } } RouteMessageTo::Upstream(message) => { - if get_jd_mode() != JdMode::SoloMining { - let message_static = message.into_static(); - let sv2_frame: Sv2Frame = AnyMessage::Mining(message_static).try_into()?; - channel_manager_channel - .upstream_sender - .send(sv2_frame) - .await?; - } + let message_static = message.into_static(); + let sv2_frame: Sv2Frame = AnyMessage::Mining(message_static).try_into()?; + channel_manager_channel + .upstream_sender + .send(sv2_frame) + .await?; } RouteMessageTo::JobDeclarator(message) => { channel_manager_channel diff --git a/miner-apps/jd-client/src/lib/channel_manager/mod.rs b/miner-apps/jd-client/src/lib/channel_manager/mod.rs index 169f5026a..0dda8f926 100644 --- a/miner-apps/jd-client/src/lib/channel_manager/mod.rs +++ b/miner-apps/jd-client/src/lib/channel_manager/mod.rs @@ -63,6 +63,7 @@ use crate::{ config::JobDeclaratorClientConfig, downstream::Downstream, error::{self, JDCError, JDCErrorKind, JDCResult}, + jd_mode::JDMode, status::{handle_error, Status, StatusSender}, utils::{ AtomicUpstreamState, DownstreamChannelJobId, DownstreamMessage, PendingChannelRequest, @@ -267,6 +268,7 @@ pub struct ChannelManager { /// 3. Connected: An upstream channel is successfully established. /// 4. SoloMining: No upstream is available; the JDC operates in solo mining mode. case. pub upstream_state: AtomicUpstreamState, + pub mode: JDMode, } #[cfg_attr(not(test), hotpath::measure_all)] @@ -285,6 +287,7 @@ impl ChannelManager { coinbase_outputs: Vec, supported_extensions: Vec, required_extensions: Vec, + mode: JDMode, ) -> JDCResult { let (range_0, range_1, range_2) = { let range_1 = 0..JDC_SEARCH_SPACE_BYTES; @@ -348,6 +351,7 @@ impl ChannelManager { miner_tag_string: config.jdc_signature().to_string(), user_identity: config.user_identity().to_string(), upstream_state: AtomicUpstreamState::new(UpstreamState::SoloMining), + mode, }; Ok(channel_manager) diff --git a/miner-apps/jd-client/src/lib/channel_manager/template_message_handler.rs b/miner-apps/jd-client/src/lib/channel_manager/template_message_handler.rs index e0968b25c..a59d0f21d 100644 --- a/miner-apps/jd-client/src/lib/channel_manager/template_message_handler.rs +++ b/miner-apps/jd-client/src/lib/channel_manager/template_message_handler.rs @@ -15,7 +15,6 @@ use tracing::{error, info, warn}; use crate::{ channel_manager::{downstream_message_handler::RouteMessageTo, ChannelManager, DeclaredJob}, error::{self, JDCError, JDCErrorKind}, - jd_mode::{get_jd_mode, JdMode}, }; #[cfg_attr(not(test), hotpath::measure_all)] @@ -63,7 +62,7 @@ impl HandleTemplateDistributionMessagesFromServerAsync for ChannelManager { let mut coinbase_outputs = deserialize_outputs(coinbase_outputs) .map_err(|_| JDCError::shutdown(JDCErrorKind::ChannelManagerHasBadCoinbaseOutputs))?; - if get_jd_mode() == JdMode::FullTemplate { + if self.mode.is_full_template() { let tx_data_request = TemplateDistribution::RequestTransactionData(RequestTransactionData { template_id: msg.template_id, @@ -81,7 +80,7 @@ impl HandleTemplateDistributionMessagesFromServerAsync for ChannelManager { coinbase_outputs[0].value = Amount::from_sat(msg.coinbase_tx_value_remaining); let coinbase_only_token = if !msg.future_template - && get_jd_mode() == JdMode::CoinbaseOnly + && self.mode.is_coinbase_only() && channel_manager_data.upstream_channel.is_some() && channel_manager_data.last_new_prev_hash.is_some() && channel_manager_data.job_factory.is_some() @@ -440,7 +439,7 @@ impl HandleTemplateDistributionMessagesFromServerAsync for ChannelManager { (data.last_future_template.clone(), declare_job) }); - if get_jd_mode() == JdMode::FullTemplate { + if self.mode.is_full_template() { if let Some(Some(job)) = declare_job { let message = JobDeclaration::DeclareMiningJob(job); @@ -467,7 +466,7 @@ impl HandleTemplateDistributionMessagesFromServerAsync for ChannelManager { if let Some(ref mut upstream_channel) = channel_manager_data.upstream_channel { _ = upstream_channel.on_chain_tip_update(msg.clone().into()); - if get_jd_mode() == JdMode::CoinbaseOnly + if self.mode.is_coinbase_only() && channel_manager_data.job_factory.is_some() && future_template.is_some() { diff --git a/miner-apps/jd-client/src/lib/channel_manager/upstream_message_handler.rs b/miner-apps/jd-client/src/lib/channel_manager/upstream_message_handler.rs index b99c771f2..8a75bf897 100644 --- a/miner-apps/jd-client/src/lib/channel_manager/upstream_message_handler.rs +++ b/miner-apps/jd-client/src/lib/channel_manager/upstream_message_handler.rs @@ -23,7 +23,6 @@ use crate::{ JDC_SEARCH_SPACE_BYTES, MIN_EXTRANONCE_SIZE, STANDARD_CHANNEL_ALLOCATION_BYTES, }, error::{self, JDCError, JDCErrorKind}, - jd_mode::{get_jd_mode, JdMode}, utils::{create_close_channel_msg, validate_cached_share, UpstreamState}, }; @@ -166,7 +165,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager { debug!("Applied last_new_prev_hash to new extended channel"); } - let set_custom_job = if get_jd_mode() == JdMode::CoinbaseOnly + let set_custom_job = if self.mode.is_coinbase_only() && data.job_factory.is_some() && data.last_future_template.is_some() && data.last_new_prev_hash.is_some() @@ -242,7 +241,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager { }); if channel_state == UpstreamState::Connected { - if get_jd_mode() == JdMode::FullTemplate { + if self.mode.is_full_template() { if let Some(template) = template { let tx_data_request = TemplateDistribution::RequestTransactionData(RequestTransactionData { @@ -256,7 +255,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager { } } - if get_jd_mode() == JdMode::CoinbaseOnly { + if self.mode.is_coinbase_only() { if let Some(custom_job) = custom_job { let set_custom_job = Mining::SetCustomMiningJob(custom_job); let sv2_frame: Sv2Frame = AnyMessage::Mining(set_custom_job) diff --git a/miner-apps/jd-client/src/lib/config.rs b/miner-apps/jd-client/src/lib/config.rs index 49ba6081a..5913afe0c 100644 --- a/miner-apps/jd-client/src/lib/config.rs +++ b/miner-apps/jd-client/src/lib/config.rs @@ -198,7 +198,7 @@ impl JobDeclaratorClientConfig { } } -#[derive(Debug, Deserialize, Clone, Default, PartialEq)] +#[derive(Debug, Deserialize, Clone, Copy, Default, PartialEq)] #[serde(rename_all = "UPPERCASE")] pub enum ConfigJDCMode { #[default] diff --git a/miner-apps/jd-client/src/lib/jd_mode.rs b/miner-apps/jd-client/src/lib/jd_mode.rs index 0533afc71..049e220b1 100644 --- a/miner-apps/jd-client/src/lib/jd_mode.rs +++ b/miner-apps/jd-client/src/lib/jd_mode.rs @@ -5,57 +5,75 @@ //! //! Modes are stored in a global [`AtomicU8`] to allow safe concurrent access //! across threads. -use std::sync::atomic::{AtomicU8, Ordering}; - -/// Operating modes for the Job Declarator. -#[repr(u8)] -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum JdMode { - /// Runs in Coinbase only mode. - CoinbaseOnly = 0, - /// Runs in Full template mode, - FullTemplate = 1, - /// Runs in solo mining mode, - SoloMining = 2, +use std::sync::{ + atomic::{AtomicU8, Ordering}, + Arc, +}; + +use crate::config::ConfigJDCMode; + +#[derive(Clone, Debug)] +pub struct JDMode { + inner: Arc, + // Currently, how JDC works is the mode in config + // only gets activated once an upstream connection + // is made. + config_mode: ConfigJDCMode, } -impl From for JdMode { - fn from(val: u8) -> Self { - match val { - 0 => JdMode::CoinbaseOnly, - 1 => JdMode::FullTemplate, - 2 => JdMode::SoloMining, - _ => JdMode::SoloMining, +impl JDMode { + pub fn new(config_mode: ConfigJDCMode) -> JDMode { + JDMode { + inner: Arc::new(AtomicU8::new(ConfigJDCMode::SoloMining as u8)), + config_mode, } } -} -impl From for JdMode { - fn from(val: u32) -> Self { - match val { - 0 => JdMode::CoinbaseOnly, - 1 => JdMode::FullTemplate, - 2 => JdMode::SoloMining, - _ => JdMode::SoloMining, + /// This activates mode based on config file once + /// upstream connection is made. + pub fn activate(&self) { + match self.config_mode { + ConfigJDCMode::CoinbaseOnly => self.set_coinbase_only(), + ConfigJDCMode::FullTemplate => self.set_full_template(), + ConfigJDCMode::SoloMining => self.set_solo_mining(), } } -} -impl From for u8 { - fn from(mode: JdMode) -> Self { - mode as u8 + pub fn set_solo_mining(&self) { + self.inner + .store(ConfigJDCMode::SoloMining as u8, Ordering::Relaxed); } -} -/// Global atomic variable storing the current JD mode. -pub static JD_MODE: AtomicU8 = AtomicU8::new(JdMode::SoloMining as u8); + fn set_full_template(&self) { + self.inner + .store(ConfigJDCMode::FullTemplate as u8, Ordering::Relaxed); + } -/// Updates the global JD mode. -pub fn set_jd_mode(mode: JdMode) { - JD_MODE.store(mode as u8, Ordering::SeqCst); -} + fn set_coinbase_only(&self) { + self.inner + .store(ConfigJDCMode::CoinbaseOnly as u8, Ordering::Relaxed); + } -/// Returns the current global JD mode. -pub fn get_jd_mode() -> JdMode { - JD_MODE.load(Ordering::SeqCst).into() + pub fn is_solo_mining(&self) -> bool { + let mode = self.inner.load(Ordering::Relaxed); + mode == ConfigJDCMode::SoloMining as u8 + } + + pub fn is_full_template(&self) -> bool { + let mode = self.inner.load(Ordering::Relaxed); + mode == ConfigJDCMode::FullTemplate as u8 + } + + pub fn is_coinbase_only(&self) -> bool { + let mode = self.inner.load(Ordering::Relaxed); + mode == ConfigJDCMode::CoinbaseOnly as u8 + } + + pub fn is_config_full_template(&self) -> bool { + self.config_mode == ConfigJDCMode::FullTemplate + } + + pub fn is_config_coinbase_only(&self) -> bool { + self.config_mode == ConfigJDCMode::CoinbaseOnly + } } diff --git a/miner-apps/jd-client/src/lib/job_declarator/message_handler.rs b/miner-apps/jd-client/src/lib/job_declarator/message_handler.rs index 57ee0ab02..a8a7948c0 100644 --- a/miner-apps/jd-client/src/lib/job_declarator/message_handler.rs +++ b/miner-apps/jd-client/src/lib/job_declarator/message_handler.rs @@ -8,9 +8,7 @@ use stratum_apps::stratum_core::{ use tracing::{info, warn}; use crate::{ - config::ConfigJDCMode, error::{self, JDCError, JDCErrorKind}, - jd_mode::{set_jd_mode, JdMode}, job_declarator::JobDeclarator, }; @@ -32,16 +30,7 @@ impl HandleCommonMessagesFromServerAsync for JobDeclarator { _tlv_fields: Option<&[Tlv]>, ) -> Result<(), Self::Error> { info!("Received: {}", msg); - // Setting up JDMode from config, upon - // successful handshake. - let jd_mode = match self.mode { - ConfigJDCMode::CoinbaseOnly => JdMode::CoinbaseOnly, - ConfigJDCMode::FullTemplate => JdMode::FullTemplate, - ConfigJDCMode::SoloMining => JdMode::SoloMining, - }; - - set_jd_mode(jd_mode); - + self.mode.activate(); Ok(()) } diff --git a/miner-apps/jd-client/src/lib/job_declarator/mod.rs b/miner-apps/jd-client/src/lib/job_declarator/mod.rs index e4443895a..b9c07100c 100644 --- a/miner-apps/jd-client/src/lib/job_declarator/mod.rs +++ b/miner-apps/jd-client/src/lib/job_declarator/mod.rs @@ -21,9 +21,9 @@ use tokio::net::TcpStream; use tracing::{debug, error, info, warn}; use crate::{ - config::ConfigJDCMode, error::{self, JDCError, JDCErrorKind, JDCResult}, io_task::spawn_io_tasks, + jd_mode::JDMode, status::{handle_error, Status, StatusSender}, utils::{get_setup_connection_message_jds, UpstreamEntry}, }; @@ -53,7 +53,7 @@ pub struct JobDeclarator { /// Socket address of the Job Declarator server. socket_address: SocketAddr, /// Config JDC mode - mode: ConfigJDCMode, + mode: JDMode, } #[cfg_attr(not(test), hotpath::measure_all)] @@ -70,7 +70,7 @@ impl JobDeclarator { channel_manager_receiver: Receiver>, cancellation_token: CancellationToken, fallback_coordinator: FallbackCoordinator, - mode: ConfigJDCMode, + mode: JDMode, task_manager: Arc, ) -> JDCResult { let addr = resolve_host(&upstream_entry.jds_host, upstream_entry.jds_port) diff --git a/miner-apps/jd-client/src/lib/mod.rs b/miner-apps/jd-client/src/lib/mod.rs index 3504478ce..42a258ee5 100644 --- a/miner-apps/jd-client/src/lib/mod.rs +++ b/miner-apps/jd-client/src/lib/mod.rs @@ -21,9 +21,9 @@ use tracing::{debug, error, info, warn}; use crate::{ channel_manager::ChannelManager, - config::{ConfigJDCMode, JobDeclaratorClientConfig}, + config::JobDeclaratorClientConfig, error::JDCErrorKind, - jd_mode::{set_jd_mode, JdMode}, + jd_mode::JDMode, job_declarator::JobDeclarator, status::{State, Status}, template_receiver::{ @@ -78,6 +78,7 @@ impl JobDeclaratorClient { let miner_coinbase_outputs = vec![self.config.get_txout()]; let mut encoded_outputs = vec![]; + let mode = JDMode::new(self.config.mode); miner_coinbase_outputs .consensus_encode(&mut encoded_outputs) @@ -116,6 +117,7 @@ impl JobDeclaratorClient { encoded_outputs.clone(), self.config.supported_extensions().to_vec(), self.config.required_extensions().to_vec(), + mode.clone(), ) .await .unwrap(); @@ -278,7 +280,7 @@ impl JobDeclaratorClient { ); } info!("Starting in solo mining mode"); - set_jd_mode(jd_mode::JdMode::SoloMining); + mode.set_solo_mining(); } else if upstream_addresses.is_empty() { error!( "No upstreams configured for {:?} mode - at least one upstream is required", @@ -297,7 +299,7 @@ impl JobDeclaratorClient { jd_to_channel_manager_sender.clone(), self.cancellation_token.clone(), fallback_coordinator.clone(), - self.config.mode.clone(), + mode.clone(), task_manager.clone(), ) .await @@ -330,7 +332,7 @@ impl JobDeclaratorClient { } Err(e) => { tracing::error!("Failed to initialize upstream: {:?}", e); - set_jd_mode(jd_mode::JdMode::SoloMining); + mode.set_solo_mining(); } }; } @@ -398,7 +400,7 @@ impl JobDeclaratorClient { debug!("Draining buffered status message: {:?}", old_status.state); } - set_jd_mode(JdMode::SoloMining); + mode.set_solo_mining(); info!("Existing Upstream or JD instance taken out. Preparing fallback."); // Create a fresh FallbackCoordinator for the reconnection attempt @@ -428,6 +430,7 @@ impl JobDeclaratorClient { encoded_outputs.clone(), self.config.supported_extensions().to_vec(), self.config.required_extensions().to_vec(), + mode.clone() ) .await .unwrap(); @@ -454,7 +457,7 @@ impl JobDeclaratorClient { jd_to_channel_manager_sender_new.clone(), self.cancellation_token.clone(), fallback_coordinator.clone(), - self.config.mode.clone(), + mode.clone(), task_manager.clone(), ) .await @@ -487,7 +490,7 @@ impl JobDeclaratorClient { Err(e) => { tracing::error!("Failed to initialize upstream: {:?}", e); channel_manager_clone.upstream_state.set(UpstreamState::SoloMining); - set_jd_mode(jd_mode::JdMode::SoloMining); + mode.set_solo_mining(); info!("Fallback to solo mining mode"); } }; @@ -619,7 +622,7 @@ impl JobDeclaratorClient { jd_to_channel_manager_sender: Sender>, cancellation_token: CancellationToken, fallback_coordinator: FallbackCoordinator, - mode: ConfigJDCMode, + mode: JDMode, task_manager: Arc, ) -> Result<(Upstream, JobDeclarator), JDCErrorKind> { const MAX_RETRIES: usize = 3; @@ -730,7 +733,7 @@ async fn try_initialize_single( channel_manager_to_jd_receiver: Receiver>, cancellation_token: CancellationToken, fallback_coordinator: FallbackCoordinator, - mode: ConfigJDCMode, + mode: JDMode, task_manager: Arc, config: &JobDeclaratorClientConfig, ) -> Result<(Upstream, JobDeclarator), JDCErrorKind> { diff --git a/miner-apps/jd-client/src/lib/utils.rs b/miner-apps/jd-client/src/lib/utils.rs index dd44340ba..f68104ef1 100644 --- a/miner-apps/jd-client/src/lib/utils.rs +++ b/miner-apps/jd-client/src/lib/utils.rs @@ -40,8 +40,8 @@ use tracing::{debug, info}; use crate::{ channel_manager::{downstream_message_handler::RouteMessageTo, ChannelManagerData}, - config::ConfigJDCMode, error::JDCErrorKind, + jd_mode::JDMode, }; pub(crate) type DownstreamMessage = (Mining<'static>, Option>); @@ -89,7 +89,7 @@ pub fn get_setup_connection_message( /// Constructs a `SetupConnection` message for the Job Declarator (JDS). pub fn get_setup_connection_message_jds( proxy_address: &SocketAddr, - mode: &ConfigJDCMode, + mode: &JDMode, ) -> SetupConnection<'static> { let endpoint_host = proxy_address .ip() @@ -114,7 +114,7 @@ pub fn get_setup_connection_message_jds( device_id, }; - if matches!(mode, ConfigJDCMode::FullTemplate) { + if mode.is_config_full_template() { setup_connection.allow_full_template_mode(); } diff --git a/miner-apps/translator/src/lib/mod.rs b/miner-apps/translator/src/lib/mod.rs index aa65bf50d..09af0e217 100644 --- a/miner-apps/translator/src/lib/mod.rs +++ b/miner-apps/translator/src/lib/mod.rs @@ -16,7 +16,7 @@ use std::{ net::SocketAddr, sync::{ atomic::{AtomicBool, Ordering}, - Arc, OnceLock, + Arc, }, time::Duration, }; @@ -38,7 +38,7 @@ use crate::{ status::{State, Status}, sv1::sv1_server::sv1_server::Sv1Server, sv2::{ChannelManager, Upstream}, - utils::UpstreamEntry, + utils::{TproxyMode, UpstreamEntry}, }; pub mod config; @@ -83,16 +83,10 @@ impl TranslatorSv2 { /// protocol translation, job management, and status reporting. pub async fn start(self) { info!("Starting Translator Proxy..."); - // only initialized once - TPROXY_MODE - .set(self.config.aggregate_channels.into()) - .expect("TPROXY_MODE initialized more than once"); - VARDIFF_ENABLED - .set(self.config.downstream_difficulty_config.enable_vardiff) - .expect("VARDIFF_ENABLED initialized more than once"); let cancellation_token = self.cancellation_token.clone(); let mut fallback_coordinator = FallbackCoordinator::new(); + let tproxy_mode = TproxyMode::from(self.config.aggregate_channels); let task_manager = Arc::new(TaskManager::new()); let (status_sender, status_receiver) = async_channel::unbounded::(); @@ -130,6 +124,7 @@ impl TranslatorSv2 { channel_manager_to_sv1_server_receiver, sv1_server_to_channel_manager_sender, self.config.clone(), + tproxy_mode, )); info!("Initializing upstream connection..."); @@ -162,6 +157,9 @@ impl TranslatorSv2 { status_sender.clone(), self.config.supported_extensions.clone(), self.config.required_extensions.clone(), + tproxy_mode, + #[cfg(feature = "monitoring")] + self.config.downstream_difficulty_config.enable_vardiff, )); info!("Launching ChannelManager tasks..."); @@ -283,6 +281,7 @@ impl TranslatorSv2 { channel_manager_to_sv1_server_receiver, sv1_server_to_channel_manager_sender, self.config.clone(), + tproxy_mode )); if let Err(e) = self.initialize_upstream( @@ -309,6 +308,9 @@ impl TranslatorSv2 { status_sender.clone(), self.config.supported_extensions.clone(), self.config.required_extensions.clone(), + tproxy_mode, + #[cfg(feature = "monitoring")] + self.config.downstream_difficulty_config.enable_vardiff )); info!("Launching ChannelManager tasks..."); @@ -548,71 +550,6 @@ async fn try_initialize_upstream( Ok(()) } -/// Defines the operational mode for Translator Proxy. -/// -/// It can operate in two different modes that affect how Sv1 -/// downstream connections are mapped to the upstream Sv2 channels. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum TproxyMode { - /// All Sv1 downstream connections share a single extended Sv2 channel. - /// This mode uses extranonce_prefix allocation to distinguish between - /// different downstream miners while presenting them as a single entity - /// to the upstream server. This is more efficient for pools with many - /// miners. - Aggregated, - /// Each Sv1 downstream connection gets its own dedicated extended Sv2 channel. - /// This mode provides complete isolation between downstream connections - /// but may be less efficient for large numbers of miners. - NonAggregated, -} - -impl From for TproxyMode { - fn from(aggregate: bool) -> Self { - if aggregate { - return TproxyMode::Aggregated; - } - - TproxyMode::NonAggregated - } -} - -static TPROXY_MODE: OnceLock = OnceLock::new(); -static VARDIFF_ENABLED: OnceLock = OnceLock::new(); - -#[cfg(not(test))] -pub fn tproxy_mode() -> TproxyMode { - *TPROXY_MODE.get().expect("TPROXY_MODE has to exist") -} - -// We don’t initialize `TPROXY_MODE` in tests, so any test that -// depends on it will panic if the mode is undefined. -// This `cfg` wrapper ensures `tproxy_mode` does not panic in -// an undefined state by providing a default value when needed. -#[cfg(test)] -pub fn tproxy_mode() -> TproxyMode { - *TPROXY_MODE.get_or_init(|| TproxyMode::Aggregated) -} - -#[inline] -pub fn is_aggregated() -> bool { - matches!(tproxy_mode(), TproxyMode::Aggregated) -} - -#[inline] -pub fn is_non_aggregated() -> bool { - matches!(tproxy_mode(), TproxyMode::NonAggregated) -} - -#[cfg(not(test))] -pub fn vardiff_enabled() -> bool { - *VARDIFF_ENABLED.get().expect("VARDIFF_ENABLED has to exist") -} - -#[cfg(test)] -pub fn vardiff_enabled() -> bool { - *VARDIFF_ENABLED.get_or_init(|| true) -} - impl Drop for TranslatorSv2 { fn drop(&mut self) { info!("TranslatorSv2 dropped"); diff --git a/miner-apps/translator/src/lib/monitoring.rs b/miner-apps/translator/src/lib/monitoring.rs index 2a4f1192a..6e094e959 100644 --- a/miner-apps/translator/src/lib/monitoring.rs +++ b/miner-apps/translator/src/lib/monitoring.rs @@ -6,18 +6,15 @@ use stratum_apps::monitoring::server::{ServerExtendedChannelInfo, ServerInfo, ServerMonitoring}; -use crate::{ - sv2::channel_manager::ChannelManager, tproxy_mode, utils::AGGREGATED_CHANNEL_ID, - vardiff_enabled, TproxyMode, -}; +use crate::{sv2::channel_manager::ChannelManager, utils::AGGREGATED_CHANNEL_ID, TproxyMode}; impl ServerMonitoring for ChannelManager { fn get_server(&self) -> ServerInfo { let mut extended_channels = Vec::new(); let standard_channels = Vec::new(); // tProxy only uses extended channels - let report_hashrate = vardiff_enabled(); + let report_hashrate = self.report_hashrate; - match tproxy_mode() { + match self.mode { TproxyMode::Aggregated => { // In Aggregated mode: one shared channel to the server // stored under AGGREGATED_CHANNEL_ID diff --git a/miner-apps/translator/src/lib/sv1/sv1_server/difficulty_manager.rs b/miner-apps/translator/src/lib/sv1/sv1_server/difficulty_manager.rs index 3b93a3fed..11971491d 100644 --- a/miner-apps/translator/src/lib/sv1/sv1_server/difficulty_manager.rs +++ b/miner-apps/translator/src/lib/sv1/sv1_server/difficulty_manager.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use crate::{is_aggregated, is_non_aggregated, sv1::sv1_server::sv1_server::PendingTargetUpdate}; +use crate::sv1::sv1_server::sv1_server::PendingTargetUpdate; use stratum_apps::{ stratum_core::{ @@ -189,7 +189,7 @@ impl Sv1Server { * new_target, * new_hashrate) */ ) { - if is_aggregated() { + if self.mode.is_aggregated() { // Aggregated mode: Send single UpdateChannel with minimum target and total hashrate of // ALL downstreams self.send_aggregated_update_channel(all_updates).await; @@ -303,7 +303,7 @@ impl Sv1Server { set_target.channel_id, new_upstream_target ); - if is_aggregated() { + if self.mode.is_aggregated() { return self .handle_aggregated_set_target(new_upstream_target, set_target.channel_id) .await; @@ -458,7 +458,7 @@ impl Sv1Server { /// (e.g., disconnect). Calculates total hashrate and minimum target among all remaining /// downstreams. pub async fn send_update_channel_on_downstream_state_change(&self) { - if is_non_aggregated() { + if self.mode.is_non_aggregated() { return; } diff --git a/miner-apps/translator/src/lib/sv1/sv1_server/downstream_message_handler.rs b/miner-apps/translator/src/lib/sv1/sv1_server/downstream_message_handler.rs index b7345ebd1..22f91ead4 100644 --- a/miner-apps/translator/src/lib/sv1/sv1_server/downstream_message_handler.rs +++ b/miner-apps/translator/src/lib/sv1/sv1_server/downstream_message_handler.rs @@ -7,7 +7,7 @@ use stratum_apps::stratum_core::sv1_api::{ use tracing::{debug, info, warn}; use crate::{ - error, is_aggregated, + error, sv1::{downstream::SubmitShareWithChannelId, sv1_server::tlv_compatible_username, Sv1Server}, utils::{validate_sv1_share, AGGREGATED_CHANNEL_ID}, }; @@ -110,7 +110,7 @@ impl IsServer<'static> for Sv1Server { return false; }; - let channel_id = if is_aggregated() { + let channel_id = if self.mode.is_aggregated() { AGGREGATED_CHANNEL_ID } else { channel_id diff --git a/miner-apps/translator/src/lib/sv1/sv1_server/sv1_server.rs b/miner-apps/translator/src/lib/sv1/sv1_server/sv1_server.rs index 7d6ed9fa1..16f41105c 100644 --- a/miner-apps/translator/src/lib/sv1/sv1_server/sv1_server.rs +++ b/miner-apps/translator/src/lib/sv1/sv1_server/sv1_server.rs @@ -1,7 +1,6 @@ use crate::{ config::TranslatorConfig, error::{self, TproxyError, TproxyErrorKind, TproxyResult}, - is_aggregated, is_non_aggregated, status::{handle_error, Status, StatusSender}, sv1::{ downstream::{downstream::Downstream, SubmitShareWithChannelId}, @@ -9,7 +8,7 @@ use crate::{ channel::Sv1ServerChannelState, is_mining_authorize, KEEPALIVE_JOB_ID_DELIMITER, }, }, - utils::AGGREGATED_CHANNEL_ID, + utils::{TproxyMode, AGGREGATED_CHANNEL_ID}, }; use async_channel::{Receiver, Sender}; use dashmap::DashMap; @@ -85,6 +84,7 @@ pub struct Sv1Server { /// Valid Sv1 jobs storage, containing only a single shared entry (AGGREGATED_CHANNEL_ID) in /// case of channels aggregation (aggregated mode) pub(crate) valid_sv1_jobs: Arc>>>, + pub(crate) mode: TproxyMode, } #[cfg_attr(not(test), hotpath::measure_all)] @@ -171,6 +171,7 @@ impl Sv1Server { channel_manager_receiver: Receiver<(Mining<'static>, Option>)>, channel_manager_sender: Sender<(Mining<'static>, Option>)>, config: TranslatorConfig, + mode: TproxyMode, ) -> Self { let shares_per_minute = config.downstream_difficulty_config.shares_per_minute; let sv1_server_channel_state = @@ -192,6 +193,7 @@ impl Sv1Server { prevhashes: Arc::new(DashMap::new()), pending_target_updates: Arc::new(Mutex::new(Vec::new())), valid_sv1_jobs: Arc::new(DashMap::new()), + mode, } } @@ -510,7 +512,7 @@ impl Sv1Server { .map_err(|_| TproxyError::shutdown(TproxyErrorKind::SV1Error))?; // Only add TLV fields with user identity in non-aggregated mode - let tlv_fields = if is_non_aggregated() { + let tlv_fields = if self.mode.is_non_aggregated() { let Some(downstream) = self .downstreams .get(&message.downstream_id) @@ -749,7 +751,7 @@ impl Sv1Server { // Update job storage based on the configured mode let notify_parsed = notify.clone(); - let job_channel_id = if is_non_aggregated() { + let job_channel_id = if self.mode.is_non_aggregated() { m.channel_id } else { AGGREGATED_CHANNEL_ID @@ -980,7 +982,7 @@ impl Sv1Server { } }; - if is_aggregated() { + if self.mode.is_aggregated() { // Aggregated mode: send set_difficulty to ALL downstreams and update hashrate return self .send_set_difficulty_to_all_downstreams(new_target, derived_hashrate) @@ -1230,7 +1232,7 @@ impl Sv1Server { keepalive_notify.time = HexU32Be(new_time); // Add the keepalive job to valid jobs so shares can be validated - let job_channel_id = if is_aggregated() { + let job_channel_id = if self.mode.is_aggregated() { Some(AGGREGATED_CHANNEL_ID) } else { channel_id @@ -1303,7 +1305,7 @@ impl Sv1Server { &self, channel_id: Option, ) -> Option> { - let channel_id = if is_aggregated() { + let channel_id = if self.mode.is_aggregated() { AGGREGATED_CHANNEL_ID } else { channel_id? @@ -1321,7 +1323,7 @@ impl Sv1Server { job_id: &str, channel_id: Option, ) -> Option> { - let channel_id = if is_aggregated() { + let channel_id = if self.mode.is_aggregated() { AGGREGATED_CHANNEL_ID } else { channel_id? @@ -1379,8 +1381,8 @@ mod tests { let (_downstream_sender, cm_receiver) = unbounded(); let config = create_test_config(); let addr = "127.0.0.1:3333".parse().unwrap(); - - Sv1Server::new(addr, cm_receiver, cm_sender, config) + let tproxy_mode = TproxyMode::from(config.aggregate_channels); + Sv1Server::new(addr, cm_receiver, cm_sender, config, tproxy_mode) } #[test] @@ -1401,8 +1403,8 @@ mod tests { let (cm_sender, _cm_receiver) = unbounded(); let (_downstream_sender, cm_receiver) = unbounded(); let addr = "127.0.0.1:3333".parse().unwrap(); - - let server = Sv1Server::new(addr, cm_receiver, cm_sender, config); + let tproxy_mode = TproxyMode::from(config.aggregate_channels); + let server = Sv1Server::new(addr, cm_receiver, cm_sender, config, tproxy_mode); assert!(server.config.downstream_difficulty_config.enable_vardiff); } @@ -1442,8 +1444,9 @@ mod tests { let (cm_sender, _cm_receiver) = unbounded(); let (_downstream_sender, cm_receiver) = unbounded(); let addr = "127.0.0.1:3333".parse().unwrap(); + let tproxy_mode = TproxyMode::from(config.aggregate_channels); - let server = Sv1Server::new(addr, cm_receiver, cm_sender, config); + let server = Sv1Server::new(addr, cm_receiver, cm_sender, config, tproxy_mode); let target: Target = hash_rate_to_target(200.0, 5.0).unwrap(); let set_target = SetTarget { @@ -1463,8 +1466,8 @@ mod tests { let (cm_sender, _cm_receiver) = unbounded(); let (_downstream_sender, cm_receiver) = unbounded(); let addr = "127.0.0.1:3333".parse().unwrap(); - - let server = Sv1Server::new(addr, cm_receiver, cm_sender, config); + let tproxy_mode = TproxyMode::from(config.aggregate_channels); + let server = Sv1Server::new(addr, cm_receiver, cm_sender, config, tproxy_mode); let target: Target = hash_rate_to_target(200.0, 5.0).unwrap(); let set_target = SetTarget { diff --git a/miner-apps/translator/src/lib/sv2/channel_manager/channel_manager.rs b/miner-apps/translator/src/lib/sv2/channel_manager/channel_manager.rs index 6d477c71e..6e04a1762 100644 --- a/miner-apps/translator/src/lib/sv2/channel_manager/channel_manager.rs +++ b/miner-apps/translator/src/lib/sv2/channel_manager/channel_manager.rs @@ -1,9 +1,9 @@ use crate::{ error::{self, TproxyError, TproxyErrorKind, TproxyResult}, - is_aggregated, status::{handle_error, Status, StatusSender}, sv2::channel_manager::channel::ChannelState, utils::{AggregatedState, AtomicAggregatedState, AGGREGATED_CHANNEL_ID}, + TproxyMode, }; use async_channel::{Receiver, Sender}; use dashmap::DashMap; @@ -92,6 +92,11 @@ pub struct ChannelManager { /// Tracks whether the single upstream channel in aggregated mode is absent, /// being established, or connected. pub aggregated_channel_state: AtomicAggregatedState, + /// Current mode Tproxy is operating in. + pub(crate) mode: TproxyMode, + /// Required to show or not show hashrate on monitoring. + #[cfg(feature = "monitoring")] + pub(crate) report_hashrate: bool, } #[cfg_attr(not(test), hotpath::measure_all)] @@ -120,6 +125,8 @@ impl ChannelManager { status_sender: Sender, supported_extensions: Vec, required_extensions: Vec, + tproxy_mode: TproxyMode, + #[cfg(feature = "monitoring")] report_hashrate: bool, ) -> Self { let channel_state = ChannelState::new( upstream_sender, @@ -140,6 +147,9 @@ impl ChannelManager { negotiated_extensions: Arc::new(Mutex::new(Vec::new())), extranonce_factories: Arc::new(DashMap::new()), aggregated_channel_state: AtomicAggregatedState::new(AggregatedState::NoChannel), + mode: tproxy_mode, + #[cfg(feature = "monitoring")] + report_hashrate, } } @@ -309,7 +319,7 @@ impl ChannelManager { let hashrate = m.nominal_hash_rate; let min_extranonce_size = m.min_extranonce_size as usize; - if is_aggregated() { + if self.mode.is_aggregated() { match self.aggregated_channel_state.get() { AggregatedState::Connected => { return self @@ -351,7 +361,7 @@ impl ChannelManager { } } // In aggregated mode, add extra bytes for translator search space allocation - let upstream_min_extranonce_size = if is_aggregated() { + let upstream_min_extranonce_size = if self.mode.is_aggregated() { min_extranonce_size + AGGREGATED_MODE_TRANSLATOR_SEARCH_SPACE_BYTES } else { min_extranonce_size @@ -364,7 +374,7 @@ impl ChannelManager { // used in the `OpenExtendedMiningChannel.Success` handler. // In aggregated mode it was already inserted in the `AggregatedState::NoChannel` // match arm above. - if !is_aggregated() { + if !self.mode.is_aggregated() { self.pending_downstream_channels.insert( open_channel_msg.request_id as DownstreamId, (user_identity, hashrate, min_extranonce_size), @@ -400,7 +410,7 @@ impl ChannelManager { ) }); if let Some((Ok(_result), _share_accounting)) = value { - if is_aggregated() + if self.mode.is_aggregated() && self.extended_channels.contains_key(&AGGREGATED_CHANNEL_ID) { let upstream_extended_channel_id = self @@ -552,7 +562,7 @@ impl ChannelManager { Mining::UpdateChannel(mut m) => { debug!("Received UpdateChannel from SV1Server: {}", m); - if is_aggregated() { + if self.mode.is_aggregated() { // Update the aggregated channel's nominal hashrate so // that monitoring reports a value consistent with the // downstream vardiff estimate. @@ -810,6 +820,9 @@ mod tests { status_sender, vec![], vec![], + TproxyMode::from(true), + #[cfg(feature = "monitoring")] + true, ) } diff --git a/miner-apps/translator/src/lib/sv2/channel_manager/mining_message_handler.rs b/miner-apps/translator/src/lib/sv2/channel_manager/mining_message_handler.rs index 1f9b72dd9..c44a46033 100644 --- a/miner-apps/translator/src/lib/sv2/channel_manager/mining_message_handler.rs +++ b/miner-apps/translator/src/lib/sv2/channel_manager/mining_message_handler.rs @@ -1,6 +1,5 @@ use crate::{ error::{self, TproxyError, TproxyErrorKind}, - is_aggregated, sv2::ChannelManager, utils::{proxy_extranonce_prefix_len, AggregatedState, AGGREGATED_CHANNEL_ID}, }; @@ -126,7 +125,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager { // If we are in aggregated mode, we need to create a new extranonce prefix and // insert the extended channel into the map - if is_aggregated() { + if self.mode.is_aggregated() { // Store the upstream extended channel under AGGREGATED_CHANNEL_ID self.extended_channels .insert(AGGREGATED_CHANNEL_ID, extended_channel.clone()); @@ -274,7 +273,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager { // In aggregated mode, serve any downstream requests that were buffered in // pending_channels while the upstream channel was being established (Pending state). - if is_aggregated() { + if self.mode.is_aggregated() { let pending_requests: Vec<(u32, String, Hashrate, usize)> = self .pending_downstream_channels .iter() @@ -333,7 +332,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager { ) -> Result<(), Self::Error> { info!("Received: {}", m); // are we working in aggregated mode? - if is_aggregated() { + if self.mode.is_aggregated() { // even if aggregated channel_id != m.channel_id, we should trigger fallback // because why would a sane server send a CloseChannel message to a different // channel? @@ -395,7 +394,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager { // In aggregated mode, the Pool responds with the upstream channel ID, but the // channel is stored under AGGREGATED_CHANNEL_ID in the DashMap. // In non-aggregated mode, m.channel_id matches the DashMap key directly. - let key = if is_aggregated() { + let key = if self.mode.is_aggregated() { AGGREGATED_CHANNEL_ID } else { m.channel_id @@ -417,7 +416,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager { ) -> Result<(), Self::Error> { warn!("Received: {} ❌", m); - let key = if is_aggregated() { + let key = if self.mode.is_aggregated() { AGGREGATED_CHANNEL_ID } else { m.channel_id @@ -457,7 +456,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager { let mut new_extended_mining_job_messages = Vec::new(); // are we in aggregated mode? - if is_aggregated() { + if self.mode.is_aggregated() { // Validate that the message is for the aggregated channel or its group let aggregated_channel_id = self .extended_channels @@ -599,7 +598,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager { let mut set_new_prev_hash_messages = Vec::new(); let mut new_extended_mining_job_messages = Vec::new(); - if is_aggregated() { + if self.mode.is_aggregated() { // Validate that the message is for the aggregated channel or its group let aggregated_channel_id = self .extended_channels @@ -810,7 +809,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager { let mut set_target_messages = Vec::new(); // are in aggregated mode? - if is_aggregated() { + if self.mode.is_aggregated() { let aggregated_channel_id = self .extended_channels .get(&AGGREGATED_CHANNEL_ID) diff --git a/miner-apps/translator/src/lib/utils.rs b/miner-apps/translator/src/lib/utils.rs index 832f791a0..398c1be40 100644 --- a/miner-apps/translator/src/lib/utils.rs +++ b/miner-apps/translator/src/lib/utils.rs @@ -189,6 +189,43 @@ pub struct UpstreamEntry { pub tried_or_flagged: bool, } +/// Defines the operational mode for Translator Proxy. +/// +/// It can operate in two different modes that affect how Sv1 +/// downstream connections are mapped to the upstream Sv2 channels. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TproxyMode { + /// All Sv1 downstream connections share a single extended Sv2 channel. + /// This mode uses extranonce_prefix allocation to distinguish between + /// different downstream miners while presenting them as a single entity + /// to the upstream server. This is more efficient for pools with many + /// miners. + Aggregated, + /// Each Sv1 downstream connection gets its own dedicated extended Sv2 channel. + /// This mode provides complete isolation between downstream connections + /// but may be less efficient for large numbers of miners. + NonAggregated, +} + +impl From for TproxyMode { + fn from(value: bool) -> Self { + if value { + return TproxyMode::Aggregated; + } + TproxyMode::NonAggregated + } +} + +impl TproxyMode { + pub(crate) fn is_aggregated(self) -> bool { + TproxyMode::Aggregated == self + } + + pub(crate) fn is_non_aggregated(self) -> bool { + TproxyMode::NonAggregated == self + } +} + #[cfg(test)] mod tests { use super::*;