Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions miner-apps/translator/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
//! provides the `start` method as the main entry point for running the translator service.
//! It relies on several sub-modules (`config`, `downstream_sv1`, `upstream_sv2`, `proxy`, `status`,
//! etc.) for specialized functionalities.
#![allow(clippy::module_inception)]
use async_channel::{unbounded, Receiver, Sender};
use std::{
net::SocketAddr,
Expand All @@ -36,7 +35,7 @@ use config::TranslatorConfig;
use crate::{
error::TproxyErrorKind,
status::{State, Status},
sv1::sv1_server::sv1_server::Sv1Server,
sv1::sv1_server::Sv1Server,
sv2::{ChannelManager, Upstream},
utils::UpstreamEntry,
};
Expand Down Expand Up @@ -159,7 +158,6 @@ impl TranslatorSv2 {
upstream_to_channel_manager_receiver,
channel_manager_to_sv1_server_sender.clone(),
sv1_server_to_channel_manager_receiver,
status_sender.clone(),
self.config.supported_extensions.clone(),
self.config.required_extensions.clone(),
));
Expand Down Expand Up @@ -306,7 +304,6 @@ impl TranslatorSv2 {
upstream_to_channel_manager_receiver,
channel_manager_to_sv1_server_sender,
sv1_server_to_channel_manager_receiver,
status_sender.clone(),
self.config.supported_extensions.clone(),
self.config.required_extensions.clone(),
));
Expand Down
3 changes: 1 addition & 2 deletions miner-apps/translator/src/lib/monitoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
use stratum_apps::monitoring::server::{ServerExtendedChannelInfo, ServerInfo, ServerMonitoring};

use crate::{
sv2::channel_manager::ChannelManager, tproxy_mode, utils::AGGREGATED_CHANNEL_ID,
vardiff_enabled, TproxyMode,
sv2::ChannelManager, tproxy_mode, utils::AGGREGATED_CHANNEL_ID, vardiff_enabled, TproxyMode,
};

impl ServerMonitoring for ChannelManager {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
error::{self, TproxyError, TproxyErrorKind, TproxyResult},
status::{handle_error, StatusSender},
sv1::downstream::{channel::DownstreamChannelState, data::DownstreamData},
utils::SubmitShareWithChannelId,
};
use async_channel::{Receiver, Sender};
use std::{
Expand All @@ -19,14 +19,128 @@ use stratum_apps::{
sv1_api::{
json_rpc::{self, Message},
server_to_client,
utils::{Extranonce, HexU32Be},
},
},
task_manager::TaskManager,
utils::types::{DownstreamId, Hashrate},
utils::types::{ChannelId, DownstreamId, Hashrate},
};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};

#[derive(Clone, Debug)]
pub(crate) struct DownstreamChannelState {
pub downstream_sv1_sender: Sender<json_rpc::Message>,
downstream_sv1_receiver: Receiver<json_rpc::Message>,
sv1_server_sender: Sender<(DownstreamId, json_rpc::Message)>,
sv1_server_receiver: Receiver<json_rpc::Message>,
/// Per-connection cancellation token (child of the global token).
/// Cancelled when this downstream's task loop exits, causing
/// the associated SV1 I/O task to shut down.
connection_token: CancellationToken,
}

#[cfg_attr(not(test), hotpath::measure_all)]
impl DownstreamChannelState {
pub fn new(
downstream_sv1_sender: Sender<json_rpc::Message>,
downstream_sv1_receiver: Receiver<json_rpc::Message>,
sv1_server_sender: Sender<(DownstreamId, json_rpc::Message)>,
sv1_server_receiver: Receiver<json_rpc::Message>,
connection_token: CancellationToken,
) -> Self {
Self {
downstream_sv1_receiver,
downstream_sv1_sender,
sv1_server_receiver,
sv1_server_sender,
connection_token,
}
}

pub fn drop(&self) {
debug!("Dropping downstream channel state");
self.connection_token.cancel();
self.downstream_sv1_receiver.close();
self.downstream_sv1_sender.close();
}
}

#[derive(Debug)]
pub struct DownstreamData {
pub channel_id: Option<ChannelId>,
pub extranonce1: Extranonce<'static>,
pub extranonce2_len: usize,
pub target: Target,
pub hashrate: Option<Hashrate>,
pub version_rolling_mask: Option<HexU32Be>,
pub version_rolling_min_bit: Option<HexU32Be>,
pub last_job_version_field: Option<u32>,
pub authorized_worker_name: String,
pub user_identity: String,
pub cached_set_difficulty: Option<json_rpc::Message>,
pub cached_notify: Option<json_rpc::Message>,
pub pending_target: Option<Target>,
pub pending_hashrate: Option<Hashrate>,
// Queue of Sv1 handshake messages received while waiting for SV2 channel to open
pub queued_sv1_handshake_messages: Vec<json_rpc::Message>,
// Stores pending shares to be sent to the sv1_server
pub pending_share: Option<SubmitShareWithChannelId>,
// Tracks the upstream target for this downstream, used for vardiff target comparison
pub upstream_target: Option<Target>,
// Timestamp of when the last job was received by this downstream, used for keepalive check
pub last_job_received_time: Option<Instant>,
}

impl DownstreamData {
pub fn new(hashrate: Option<Hashrate>, target: Target) -> Self {
DownstreamData {
channel_id: None,
extranonce1: vec![0; 8]
.try_into()
.expect("8-byte extranonce is always valid"),
extranonce2_len: 4,
target,
hashrate,
version_rolling_mask: None,
version_rolling_min_bit: None,
last_job_version_field: None,
authorized_worker_name: String::new(),
user_identity: String::new(),
cached_set_difficulty: None,
cached_notify: None,
pending_target: None,
pending_hashrate: None,
queued_sv1_handshake_messages: Vec::new(),
pending_share: None,
upstream_target: None,
last_job_received_time: None,
}
}

pub fn set_pending_target(&mut self, new_target: Target, downstream_id: DownstreamId) {
self.pending_target = Some(new_target);
debug!("Downstream {downstream_id}: Set pending target");
}

pub fn set_pending_hashrate(
&mut self,
new_hashrate: Option<Hashrate>,
downstream_id: DownstreamId,
) {
self.pending_hashrate = new_hashrate;
debug!("Downstream {downstream_id}: Set pending hashrate");
}

pub fn set_upstream_target(&mut self, upstream_target: Target, downstream_id: DownstreamId) {
self.upstream_target = Some(upstream_target);
debug!(
"Downstream {downstream_id}: Set upstream target to {}",
upstream_target
);
}
}

/// Represents a downstream SV1 miner connection.
///
/// This struct manages the state and communication for a single SV1 miner connected
Expand All @@ -44,7 +158,7 @@ use tracing::{debug, error, info, warn};
pub struct Downstream {
pub downstream_id: DownstreamId,
pub downstream_data: Arc<Mutex<DownstreamData>>,
pub downstream_channel_state: DownstreamChannelState,
pub(crate) downstream_channel_state: DownstreamChannelState,
// Flag to track if SV1 handshake is complete (subscribe + authorize)
pub sv1_handshake_complete: Arc<AtomicBool>,
}
Expand Down Expand Up @@ -116,7 +230,6 @@ impl Downstream {
info!("Downstream {downstream_id}: fallback triggered");
break;
}

// Handle downstream -> server message
res = self.handle_downstream_message() => {
if let Err(e) = res {
Expand Down Expand Up @@ -168,7 +281,7 @@ impl Downstream {
/// complete
/// - On handshake completion: sends cached messages in correct order (set_difficulty first,
/// then notify)
pub async fn handle_sv1_server_message(&self) -> TproxyResult<(), error::Downstream> {
async fn handle_sv1_server_message(&self) -> TproxyResult<(), error::Downstream> {
match self
.downstream_channel_state
.sv1_server_receiver
Expand Down Expand Up @@ -330,7 +443,7 @@ impl Downstream {
/// which implements the SV1 protocol logic and generates appropriate responses.
/// Responses are sent back to the miner, while share submissions are forwarded
/// to the SV1 server for upstream processing.
pub async fn handle_downstream_message(&self) -> TproxyResult<(), error::Downstream> {
async fn handle_downstream_message(&self) -> TproxyResult<(), error::Downstream> {
let downstream_id = self.downstream_id;
let message = match self
.downstream_channel_state
Expand Down
42 changes: 0 additions & 42 deletions miner-apps/translator/src/lib/sv1/downstream/channel.rs

This file was deleted.

89 changes: 0 additions & 89 deletions miner-apps/translator/src/lib/sv1/downstream/data.rs

This file was deleted.

45 changes: 0 additions & 45 deletions miner-apps/translator/src/lib/sv1/downstream/mod.rs

This file was deleted.

Loading
Loading