Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions integration-tests/tests/translator_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1950,3 +1950,60 @@ async fn tproxy_sends_single_open_extended_mining_channel_in_aggregated_mode() {

shutdown_all!(pool, tproxy);
}

// This test verifies whether we can spawn multiple tproxy in the
// same process.
//
// More info here: https://github.com/stratum-mining/sv2-apps/issues/430
#[tokio::test]
async fn multiple_tproxy_sessions() {
start_tracing();
let (_tp, tp_addr) = start_template_provider(None, DifficultyLevel::High);
let (pool, pool_addr, _) = start_pool(sv2_tp_config(tp_addr), vec![], vec![], false).await;

let (pool_translator_sniffer_1, pool_translator_sniffer_addr_1) =
start_sniffer("0", pool_addr, false, vec![], None);
let (tproxy_1, _, _) = start_sv2_translator(
&[pool_translator_sniffer_addr_1],
true,
vec![],
vec![],
None,
false,
)
.await;

let (pool_translator_sniffer_2, pool_translator_sniffer_addr_2) =
start_sniffer("0", pool_addr, false, vec![], None);
let (tproxy_2, _, _) = start_sv2_translator(
&[pool_translator_sniffer_addr_2],
true,
vec![],
vec![],
None,
false,
)
.await;

pool_translator_sniffer_1
.wait_for_message_type(MessageDirection::ToUpstream, MESSAGE_TYPE_SETUP_CONNECTION)
.await;
pool_translator_sniffer_1
.wait_for_message_type(
MessageDirection::ToDownstream,
MESSAGE_TYPE_SETUP_CONNECTION_SUCCESS,
)
.await;

pool_translator_sniffer_2
.wait_for_message_type(MessageDirection::ToUpstream, MESSAGE_TYPE_SETUP_CONNECTION)
.await;
pool_translator_sniffer_2
.wait_for_message_type(
MessageDirection::ToDownstream,
MESSAGE_TYPE_SETUP_CONNECTION_SUCCESS,
)
.await;

shutdown_all!(pool, tproxy_1, tproxy_2);
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use crate::{
ChannelManager, ChannelManagerChannel, SharesOrderedByDiff, FULL_EXTRANONCE_SIZE,
},
error::{self, JDCError, JDCErrorKind},
jd_mode::{get_jd_mode, JdMode},
utils::{add_share_to_cache, create_close_channel_msg},
};

Expand Down Expand Up @@ -121,14 +120,12 @@ impl RouteMessageTo<'_> {
}
}
RouteMessageTo::Upstream(message) => {
if get_jd_mode() != JdMode::SoloMining {
let message_static = message.into_static();
let sv2_frame: Sv2Frame = AnyMessage::Mining(message_static).try_into()?;
channel_manager_channel
.upstream_sender
.send(sv2_frame)
.await?;
}
let message_static = message.into_static();
let sv2_frame: Sv2Frame = AnyMessage::Mining(message_static).try_into()?;
channel_manager_channel
.upstream_sender
.send(sv2_frame)
.await?;
}
RouteMessageTo::JobDeclarator(message) => {
channel_manager_channel
Expand Down
4 changes: 4 additions & 0 deletions miner-apps/jd-client/src/lib/channel_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ use crate::{
config::JobDeclaratorClientConfig,
downstream::Downstream,
error::{self, JDCError, JDCErrorKind, JDCResult},
jd_mode::JDMode,
status::{handle_error, Status, StatusSender},
utils::{
AtomicUpstreamState, DownstreamChannelJobId, DownstreamMessage, PendingChannelRequest,
Expand Down Expand Up @@ -267,6 +268,7 @@ pub struct ChannelManager {
/// 3. Connected: An upstream channel is successfully established.
/// 4. SoloMining: No upstream is available; the JDC operates in solo mining mode. case.
pub upstream_state: AtomicUpstreamState,
pub mode: JDMode,
}

#[cfg_attr(not(test), hotpath::measure_all)]
Expand All @@ -285,6 +287,7 @@ impl ChannelManager {
coinbase_outputs: Vec<u8>,
supported_extensions: Vec<u16>,
required_extensions: Vec<u16>,
mode: JDMode,
) -> JDCResult<Self, error::ChannelManager> {
let (range_0, range_1, range_2) = {
let range_1 = 0..JDC_SEARCH_SPACE_BYTES;
Expand Down Expand Up @@ -348,6 +351,7 @@ impl ChannelManager {
miner_tag_string: config.jdc_signature().to_string(),
user_identity: config.user_identity().to_string(),
upstream_state: AtomicUpstreamState::new(UpstreamState::SoloMining),
mode,
};

Ok(channel_manager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use tracing::{error, info, warn};
use crate::{
channel_manager::{downstream_message_handler::RouteMessageTo, ChannelManager, DeclaredJob},
error::{self, JDCError, JDCErrorKind},
jd_mode::{get_jd_mode, JdMode},
};

#[cfg_attr(not(test), hotpath::measure_all)]
Expand Down Expand Up @@ -63,7 +62,7 @@ impl HandleTemplateDistributionMessagesFromServerAsync for ChannelManager {
let mut coinbase_outputs = deserialize_outputs(coinbase_outputs)
.map_err(|_| JDCError::shutdown(JDCErrorKind::ChannelManagerHasBadCoinbaseOutputs))?;

if get_jd_mode() == JdMode::FullTemplate {
if self.mode.is_full_template() {
let tx_data_request =
TemplateDistribution::RequestTransactionData(RequestTransactionData {
template_id: msg.template_id,
Expand All @@ -81,7 +80,7 @@ impl HandleTemplateDistributionMessagesFromServerAsync for ChannelManager {
coinbase_outputs[0].value = Amount::from_sat(msg.coinbase_tx_value_remaining);

let coinbase_only_token = if !msg.future_template
&& get_jd_mode() == JdMode::CoinbaseOnly
&& self.mode.is_coinbase_only()
&& channel_manager_data.upstream_channel.is_some()
&& channel_manager_data.last_new_prev_hash.is_some()
&& channel_manager_data.job_factory.is_some()
Expand Down Expand Up @@ -440,7 +439,7 @@ impl HandleTemplateDistributionMessagesFromServerAsync for ChannelManager {
(data.last_future_template.clone(), declare_job)
});

if get_jd_mode() == JdMode::FullTemplate {
if self.mode.is_full_template() {
if let Some(Some(job)) = declare_job {
let message = JobDeclaration::DeclareMiningJob(job);

Expand All @@ -467,7 +466,7 @@ impl HandleTemplateDistributionMessagesFromServerAsync for ChannelManager {
if let Some(ref mut upstream_channel) = channel_manager_data.upstream_channel {
_ = upstream_channel.on_chain_tip_update(msg.clone().into());

if get_jd_mode() == JdMode::CoinbaseOnly
if self.mode.is_coinbase_only()
&& channel_manager_data.job_factory.is_some()
&& future_template.is_some()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use crate::{
JDC_SEARCH_SPACE_BYTES, MIN_EXTRANONCE_SIZE, STANDARD_CHANNEL_ALLOCATION_BYTES,
},
error::{self, JDCError, JDCErrorKind},
jd_mode::{get_jd_mode, JdMode},
utils::{create_close_channel_msg, validate_cached_share, UpstreamState},
};

Expand Down Expand Up @@ -166,7 +165,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager {
debug!("Applied last_new_prev_hash to new extended channel");
}

let set_custom_job = if get_jd_mode() == JdMode::CoinbaseOnly
let set_custom_job = if self.mode.is_coinbase_only()
&& data.job_factory.is_some()
&& data.last_future_template.is_some()
&& data.last_new_prev_hash.is_some()
Expand Down Expand Up @@ -242,7 +241,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager {
});

if channel_state == UpstreamState::Connected {
if get_jd_mode() == JdMode::FullTemplate {
if self.mode.is_full_template() {
if let Some(template) = template {
let tx_data_request =
TemplateDistribution::RequestTransactionData(RequestTransactionData {
Expand All @@ -256,7 +255,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager {
}
}

if get_jd_mode() == JdMode::CoinbaseOnly {
if self.mode.is_coinbase_only() {
if let Some(custom_job) = custom_job {
let set_custom_job = Mining::SetCustomMiningJob(custom_job);
let sv2_frame: Sv2Frame = AnyMessage::Mining(set_custom_job)
Expand Down
2 changes: 1 addition & 1 deletion miner-apps/jd-client/src/lib/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ impl JobDeclaratorClientConfig {
}
}

#[derive(Debug, Deserialize, Clone, Default, PartialEq)]
#[derive(Debug, Deserialize, Clone, Copy, Default, PartialEq)]
#[serde(rename_all = "UPPERCASE")]
pub enum ConfigJDCMode {
#[default]
Expand Down
100 changes: 59 additions & 41 deletions miner-apps/jd-client/src/lib/jd_mode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,57 +5,75 @@
//!
//! Modes are stored in a global [`AtomicU8`] to allow safe concurrent access
//! across threads.
use std::sync::atomic::{AtomicU8, Ordering};

/// Operating modes for the Job Declarator.
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum JdMode {
/// Runs in Coinbase only mode.
CoinbaseOnly = 0,
/// Runs in Full template mode,
FullTemplate = 1,
/// Runs in solo mining mode,
SoloMining = 2,
use std::sync::{
atomic::{AtomicU8, Ordering},
Arc,
};

use crate::config::ConfigJDCMode;

#[derive(Clone, Debug)]
pub struct JDMode {
inner: Arc<AtomicU8>,
// Currently, how JDC works is the mode in config
// only gets activated once an upstream connection
// is made.
config_mode: ConfigJDCMode,
}

impl From<u8> for JdMode {
fn from(val: u8) -> Self {
match val {
0 => JdMode::CoinbaseOnly,
1 => JdMode::FullTemplate,
2 => JdMode::SoloMining,
_ => JdMode::SoloMining,
impl JDMode {
pub fn new(config_mode: ConfigJDCMode) -> JDMode {
JDMode {
inner: Arc::new(AtomicU8::new(ConfigJDCMode::SoloMining as u8)),
config_mode,
}
}
}

impl From<u32> for JdMode {
fn from(val: u32) -> Self {
match val {
0 => JdMode::CoinbaseOnly,
1 => JdMode::FullTemplate,
2 => JdMode::SoloMining,
_ => JdMode::SoloMining,
/// This activates mode based on config file once
/// upstream connection is made.
pub fn activate(&self) {
match self.config_mode {
ConfigJDCMode::CoinbaseOnly => self.set_coinbase_only(),
ConfigJDCMode::FullTemplate => self.set_full_template(),
ConfigJDCMode::SoloMining => self.set_solo_mining(),
}
}
}

impl From<JdMode> for u8 {
fn from(mode: JdMode) -> Self {
mode as u8
pub fn set_solo_mining(&self) {
self.inner
.store(ConfigJDCMode::SoloMining as u8, Ordering::Relaxed);
}
}

/// Global atomic variable storing the current JD mode.
pub static JD_MODE: AtomicU8 = AtomicU8::new(JdMode::SoloMining as u8);
fn set_full_template(&self) {
self.inner
.store(ConfigJDCMode::FullTemplate as u8, Ordering::Relaxed);
}

/// Updates the global JD mode.
pub fn set_jd_mode(mode: JdMode) {
JD_MODE.store(mode as u8, Ordering::SeqCst);
}
fn set_coinbase_only(&self) {
self.inner
.store(ConfigJDCMode::CoinbaseOnly as u8, Ordering::Relaxed);
}

/// Returns the current global JD mode.
pub fn get_jd_mode() -> JdMode {
JD_MODE.load(Ordering::SeqCst).into()
pub fn is_solo_mining(&self) -> bool {
let mode = self.inner.load(Ordering::Relaxed);
mode == ConfigJDCMode::SoloMining as u8
}

pub fn is_full_template(&self) -> bool {
let mode = self.inner.load(Ordering::Relaxed);
mode == ConfigJDCMode::FullTemplate as u8
}

pub fn is_coinbase_only(&self) -> bool {
let mode = self.inner.load(Ordering::Relaxed);
mode == ConfigJDCMode::CoinbaseOnly as u8
}

pub fn is_config_full_template(&self) -> bool {
self.config_mode == ConfigJDCMode::FullTemplate
}

pub fn is_config_coinbase_only(&self) -> bool {
self.config_mode == ConfigJDCMode::CoinbaseOnly
}
}
13 changes: 1 addition & 12 deletions miner-apps/jd-client/src/lib/job_declarator/message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ use stratum_apps::stratum_core::{
use tracing::{info, warn};

use crate::{
config::ConfigJDCMode,
error::{self, JDCError, JDCErrorKind},
jd_mode::{set_jd_mode, JdMode},
job_declarator::JobDeclarator,
};

Expand All @@ -32,16 +30,7 @@ impl HandleCommonMessagesFromServerAsync for JobDeclarator {
_tlv_fields: Option<&[Tlv]>,
) -> Result<(), Self::Error> {
info!("Received: {}", msg);
// Setting up JDMode from config, upon
// successful handshake.
let jd_mode = match self.mode {
ConfigJDCMode::CoinbaseOnly => JdMode::CoinbaseOnly,
ConfigJDCMode::FullTemplate => JdMode::FullTemplate,
ConfigJDCMode::SoloMining => JdMode::SoloMining,
};

set_jd_mode(jd_mode);

self.mode.activate();
Ok(())
}

Expand Down
6 changes: 3 additions & 3 deletions miner-apps/jd-client/src/lib/job_declarator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ use tokio::net::TcpStream;
use tracing::{debug, error, info, warn};

use crate::{
config::ConfigJDCMode,
error::{self, JDCError, JDCErrorKind, JDCResult},
io_task::spawn_io_tasks,
jd_mode::JDMode,
status::{handle_error, Status, StatusSender},
utils::{get_setup_connection_message_jds, UpstreamEntry},
};
Expand Down Expand Up @@ -53,7 +53,7 @@ pub struct JobDeclarator {
/// Socket address of the Job Declarator server.
socket_address: SocketAddr,
/// Config JDC mode
mode: ConfigJDCMode,
mode: JDMode,
}

#[cfg_attr(not(test), hotpath::measure_all)]
Expand All @@ -70,7 +70,7 @@ impl JobDeclarator {
channel_manager_receiver: Receiver<JobDeclaration<'static>>,
cancellation_token: CancellationToken,
fallback_coordinator: FallbackCoordinator,
mode: ConfigJDCMode,
mode: JDMode,
task_manager: Arc<TaskManager>,
) -> JDCResult<Self, error::JobDeclarator> {
let addr = resolve_host(&upstream_entry.jds_host, upstream_entry.jds_port)
Expand Down
Loading
Loading