Skip to content

Commit c1cd72e

Browse files
authored
Merge pull request #387 from Shourya742/2026-03-28-fix-tproxy-handshake-flow
Fix translator handshake flow
2 parents 20f72a1 + daa226f commit c1cd72e

3 files changed

Lines changed: 71 additions & 57 deletions

File tree

miner-apps/translator/src/lib/sv1/downstream/downstream.rs

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,6 @@ pub struct Downstream {
4949
pub downstream_channel_state: DownstreamChannelState,
5050
// Flag to track if SV1 handshake is complete (subscribe + authorize)
5151
pub sv1_handshake_complete: Arc<AtomicBool>,
52-
// Flag to indicate we're processing queued Sv1 handshake message responses
53-
pub processing_queued_sv1_handshake_responses: Arc<AtomicBool>,
5452
}
5553

5654
#[cfg_attr(not(test), hotpath::measure_all)]
@@ -84,7 +82,6 @@ impl Downstream {
8482
downstream_data,
8583
downstream_channel_state,
8684
sv1_handshake_complete: Arc::new(AtomicBool::new(false)),
87-
processing_queued_sv1_handshake_responses: Arc::new(AtomicBool::new(false)),
8885
}
8986
}
9087

@@ -201,11 +198,6 @@ impl Downstream {
201198
return Ok(()); // Message not intended for this downstream
202199
}
203200

204-
// Check if this is a queued message response
205-
let is_queued_sv1_handshake_response = self
206-
.processing_queued_sv1_handshake_responses
207-
.load(Ordering::SeqCst);
208-
209201
// Handle messages based on message type and handshake state
210202
if let Message::Notification(notification) = &message {
211203
// For notifications (mining.set_difficulty, mining.notify), only send if
@@ -328,22 +320,8 @@ impl Downstream {
328320
}
329321
}
330322
}
331-
} else if is_queued_sv1_handshake_response {
332-
// For non-notification messages, send if processing queued handshake responses
333-
self.downstream_channel_state
334-
.downstream_sv1_sender
335-
.send(message.clone())
336-
.await
337-
.map_err(|e| {
338-
error!("Down: Failed to send queued message to downstream: {:?}", e);
339-
TproxyError::disconnect(
340-
TproxyErrorKind::ChannelErrorSender,
341-
downstream_id.unwrap_or(0),
342-
)
343-
})?;
344323
} else {
345-
// Neither handshake complete nor queued response - skip non-notification
346-
// messages
324+
// Handshake not complete - skip non-notification messages.
347325
debug!("Down: SV1 handshake not complete, skipping non-notification message");
348326
}
349327
}

miner-apps/translator/src/lib/sv1/sv1_server/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use stratum_apps::stratum_core::sv1_api::{json_rpc, Message};
2+
13
pub(super) mod channel;
24
mod difficulty_manager;
35
pub mod downstream_message_handler;
@@ -6,3 +8,12 @@ pub mod sv1_server;
68
/// Delimiter used to separate original job ID from keepalive mutation counter.
79
/// Format: `{original_job_id}#{counter}`
810
const KEEPALIVE_JOB_ID_DELIMITER: char = '#';
11+
12+
/// Check if Sv1 message is mining.authorize
13+
fn is_mining_authorize(msg: &Message) -> bool {
14+
if let json_rpc::Message::StandardRequest(r) = &msg {
15+
r.method == "mining.authorize"
16+
} else {
17+
false
18+
}
19+
}

miner-apps/translator/src/lib/sv1/sv1_server/sv1_server.rs

Lines changed: 59 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ use crate::{
55
status::{handle_error, Status, StatusSender},
66
sv1::{
77
downstream::downstream::Downstream,
8-
sv1_server::{channel::Sv1ServerChannelState, KEEPALIVE_JOB_ID_DELIMITER},
8+
sv1_server::{
9+
channel::Sv1ServerChannelState, is_mining_authorize, KEEPALIVE_JOB_ID_DELIMITER,
10+
},
911
},
1012
utils::AGGREGATED_CHANNEL_ID,
1113
};
@@ -41,7 +43,7 @@ use stratum_apps::{
4143
},
4244
sv2_to_sv1::{build_sv1_notify_from_sv2, build_sv1_set_difficulty_from_sv2_target},
4345
},
44-
sv1_api::{json_rpc, server_to_client, utils::HexU32Be, IsServer},
46+
sv1_api::{server_to_client, utils::HexU32Be, IsServer},
4547
},
4648
task_manager::TaskManager,
4749
utils::types::{ChannelId, DownstreamId, Hashrate, RequestId, SharesPerMinute},
@@ -346,9 +348,11 @@ impl Sv1Server {
346348
return Ok(());
347349
}
348350

351+
let is_authorize = is_mining_authorize(&downstream_message);
352+
349353
let response = self
350354
.clone()
351-
.handle_message(Some(downstream_id), downstream_message.clone());
355+
.handle_message(Some(downstream_id), downstream_message);
352356

353357
match response {
354358
Ok(Some(response_msg)) => {
@@ -370,13 +374,11 @@ impl Sv1Server {
370374
})?;
371375

372376
// Check if this was an authorize message and handle sv1 handshake completion
373-
if let json_rpc::Message::StandardRequest(request) = &downstream_message {
374-
if request.method == "mining.authorize" {
375-
info!("Down: Handling mining.authorize after handshake completion");
376-
if let Err(e) = downstream.handle_sv1_handshake_completion().await {
377-
error!("Down: Failed to handle handshake completion: {:?}", e);
378-
return Err(TproxyError::disconnect(e, downstream_id));
379-
}
377+
if is_authorize {
378+
info!("Down: Handling mining.authorize after handshake completion");
379+
if let Err(e) = downstream.handle_sv1_handshake_completion().await {
380+
error!("Down: Failed to handle handshake completion: {:?}", e);
381+
return Err(TproxyError::disconnect(e, downstream_id));
380382
}
381383
}
382384
}
@@ -570,39 +572,62 @@ impl Sv1Server {
570572
.map_err(TproxyError::shutdown)?;
571573

572574
// Process all queued messages now that channel is established
573-
if let Ok(queued_messages) = downstream.downstream_data.safe_lock(|d| {
574-
let messages = d.queued_sv1_handshake_messages.clone();
575-
d.queued_sv1_handshake_messages.clear();
576-
messages
577-
}) {
575+
let queued_messages = downstream
576+
.downstream_data
577+
.safe_lock(|d| std::mem::take(&mut d.queued_sv1_handshake_messages))
578+
.ok();
579+
if let Some(queued_messages) = queued_messages {
578580
if !queued_messages.is_empty() {
579581
info!(
580582
"Processing {} queued Sv1 messages for downstream {}",
581583
queued_messages.len(),
582584
downstream_id
583585
);
584586

585-
// Set flag to indicate we're processing queued responses
586-
downstream
587-
.processing_queued_sv1_handshake_responses
588-
.store(true, Ordering::SeqCst);
587+
let downstream_sv1_sender = downstream
588+
.downstream_channel_state
589+
.downstream_sv1_sender
590+
.clone();
589591

590592
for message in queued_messages {
591-
if let Ok(Some(response_msg)) =
592-
self.clone().handle_message(Some(downstream_id), message)
593-
{
594-
self.sv1_server_channel_state
595-
.sv1_server_to_downstream_sender
596-
.send((
597-
m.channel_id,
598-
Some(downstream_id),
599-
response_msg.into(),
600-
))
601-
.map_err(|_| {
602-
TproxyError::shutdown(
603-
TproxyErrorKind::ChannelErrorSender,
604-
)
605-
})?;
593+
let is_authorize = is_mining_authorize(&message);
594+
let response =
595+
self.clone().handle_message(Some(downstream_id), message);
596+
match response {
597+
Ok(Some(response_msg)) => {
598+
downstream_sv1_sender.send(response_msg.into()).await
599+
.map_err(|e| {
600+
error!(
601+
"Down: Failed to send message to downstream: {e:?}"
602+
);
603+
TproxyError::disconnect(
604+
TproxyErrorKind::ChannelErrorSender, downstream_id
605+
)
606+
})?;
607+
608+
if is_authorize {
609+
info!("Down: Handling mining.authorize after upstream channel is open");
610+
if let Err(e) =
611+
downstream.handle_sv1_handshake_completion().await
612+
{
613+
error!(
614+
"Down: Failed to handle handshake completion: {:?}",
615+
e
616+
);
617+
return Err(TproxyError::disconnect(
618+
e,
619+
downstream_id,
620+
));
621+
}
622+
}
623+
}
624+
Ok(None) => {
625+
// Message was handled but no response needed
626+
}
627+
Err(e) => {
628+
error!("Down: Error handling downstream message: {:?}", e);
629+
return Err(TproxyError::disconnect(e, downstream_id));
630+
}
606631
}
607632
}
608633
}

0 commit comments

Comments
 (0)