@@ -12,12 +12,15 @@ use stratum_apps::{
1212 custom_mutex:: Mutex ,
1313 fallback_coordinator:: FallbackCoordinator ,
1414 stratum_core:: {
15- channels_sv2:: client:: { extended:: ExtendedChannel , group:: GroupChannel } ,
15+ channels_sv2:: {
16+ client:: { extended:: ExtendedChannel , group:: GroupChannel } ,
17+ extranonce_manager:: ExtranonceAllocator ,
18+ } ,
1619 codec_sv2:: StandardSv2Frame ,
1720 extensions_sv2:: { EXTENSION_TYPE_WORKER_HASHRATE_TRACKING , TLV_FIELD_TYPE_USER_IDENTITY } ,
1821 framing_sv2,
1922 handlers_sv2:: { HandleExtensionsFromServerAsync , HandleMiningMessagesFromServerAsync } ,
20- mining_sv2:: { ExtendedExtranonce , OpenExtendedMiningChannelSuccess } ,
23+ mining_sv2:: OpenExtendedMiningChannelSuccess ,
2124 parsers_sv2:: { AnyMessage , Mining , Tlv , TlvList } ,
2225 } ,
2326 task_manager:: TaskManager ,
@@ -30,10 +33,12 @@ use stratum_apps::{
3033use tokio_util:: sync:: CancellationToken ;
3134use tracing:: { debug, error, info, warn} ;
3235
33- /// Extra bytes allocated for translator search space in aggregated mode.
34- /// This allows the translator to manage multiple downstream connections
35- /// by allocating unique extranonce prefixes to each downstream.
36- const AGGREGATED_MODE_TRANSLATOR_SEARCH_SPACE_BYTES : usize = 4 ;
36+ /// Number of bytes the translator uses for its local extranonce prefix allocation
37+ /// in aggregated mode (unique prefix per downstream).
38+ pub const TPROXY_ALLOCATION_BYTES : usize = 4 ;
39+
40+ /// Maximum number of downstream channels the translator can allocate prefixes for.
41+ pub const TPROXY_MAX_CHANNELS : usize = 65_536 ;
3742
3843/// Manages SV2 channels and message routing between upstream and downstream.
3944///
@@ -73,8 +78,10 @@ pub struct ChannelManager {
7378 pub share_sequence_counters : Arc < DashMap < u32 , u32 > > ,
7479 /// Extensions that have been successfully negotiated with the upstream server
7580 pub negotiated_extensions : Arc < Mutex < Vec < u16 > > > ,
76- /// Extranonce factories containing per channel extranonces
77- pub extranonce_factories : Arc < DashMap < ChannelId , ExtendedExtranonce > > ,
81+ /// Extranonce allocators containing per channel extranonce management
82+ pub extranonce_allocators : Arc < DashMap < ChannelId , ExtranonceAllocator > > ,
83+ /// Mapping of channel_id → local_prefix_id for freeing extranonces on channel close
84+ pub channel_to_local_prefix_id : Arc < DashMap < ChannelId , usize > > ,
7885}
7986
8087#[ cfg_attr( not( test) , hotpath:: measure_all) ]
@@ -121,7 +128,8 @@ impl ChannelManager {
121128 group_channels : Arc :: new ( DashMap :: new ( ) ) ,
122129 share_sequence_counters : Arc :: new ( DashMap :: new ( ) ) ,
123130 negotiated_extensions : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
124- extranonce_factories : Arc :: new ( DashMap :: new ( ) ) ,
131+ extranonce_allocators : Arc :: new ( DashMap :: new ( ) ) ,
132+ channel_to_local_prefix_id : Arc :: new ( DashMap :: new ( ) ) ,
125133 }
126134 }
127135
@@ -172,7 +180,8 @@ impl ChannelManager {
172180 self . group_channels. clear( ) ;
173181 self . share_sequence_counters. clear( ) ;
174182 self . negotiated_extensions. super_safe_lock( |data| data. clear( ) ) ;
175- self . extranonce_factories. clear( ) ;
183+ self . extranonce_allocators. clear( ) ;
184+ self . channel_to_local_prefix_id. clear( ) ;
176185 break ;
177186 }
178187 res = self . clone( ) . handle_upstream_frame( ) => {
@@ -301,18 +310,22 @@ impl ChannelManager {
301310 . get ( & AGGREGATED_CHANNEL_ID )
302311 . map ( |ch| * ch. get_target ( ) )
303312 . unwrap ( ) ;
304- let new_extranonce_prefix = self
305- . extranonce_factories
313+ let alloc_result = self
314+ . extranonce_allocators
306315 . get_mut ( & AGGREGATED_CHANNEL_ID )
307- . unwrap ( )
308- . next_prefix_extended ( open_channel_msg. min_extranonce_size . into ( ) )
309- . ok ( ) ;
316+ . and_then ( |mut allocator| {
317+ allocator
318+ . allocate_extended ( open_channel_msg. min_extranonce_size . into ( ) )
319+ . ok ( )
320+ } ) ;
310321 let new_extranonce_size = self
311- . extranonce_factories
312- . get_mut ( & AGGREGATED_CHANNEL_ID )
313- . unwrap ( )
314- . get_range2_len ( ) ;
315- if let Some ( new_extranonce_prefix) = new_extranonce_prefix {
322+ . extranonce_allocators
323+ . get ( & AGGREGATED_CHANNEL_ID )
324+ . map ( |allocator| allocator. rollable_extranonce_size ( ) )
325+ . unwrap_or ( 0 ) ;
326+ if let Some ( prefix) = alloc_result {
327+ let new_extranonce_prefix = prefix. as_bytes ( ) . to_vec ( ) ;
328+ let local_prefix_id = prefix. local_prefix_id ( ) ;
316329 if new_extranonce_size >= open_channel_msg. min_extranonce_size as usize
317330 {
318331 // Find max channel ID, excluding AGGREGATED_CHANNEL_ID (u32::MAX)
@@ -323,14 +336,12 @@ impl ChannelManager {
323336 . filter ( |x| * x. key ( ) != AGGREGATED_CHANNEL_ID )
324337 . fold ( 0 , |acc, x| std:: cmp:: max ( acc, * x. key ( ) ) ) ;
325338 let next_channel_id = channel_id + 1 ;
339+ self . channel_to_local_prefix_id
340+ . insert ( next_channel_id, local_prefix_id) ;
326341 let new_downstream_extended_channel = ExtendedChannel :: new (
327342 next_channel_id,
328343 user_identity. clone ( ) ,
329- new_extranonce_prefix
330- . clone ( )
331- . into_b032 ( )
332- . into_static ( )
333- . to_vec ( ) ,
344+ new_extranonce_prefix. clone ( ) ,
334345 target,
335346 hashrate,
336347 true ,
@@ -344,7 +355,10 @@ impl ChannelManager {
344355 channel_id : next_channel_id,
345356 target : target. to_le_bytes ( ) . into ( ) ,
346357 extranonce_size : new_extranonce_size as u16 ,
347- extranonce_prefix : new_extranonce_prefix. clone ( ) . into ( ) ,
358+ extranonce_prefix : new_extranonce_prefix
359+ . clone ( )
360+ . try_into ( )
361+ . expect ( "valid prefix" ) ,
348362 group_channel_id : 0 , /* use a dummy value, this shouldn't
349363 * matter for the Sv1 server */
350364 } ,
@@ -445,9 +459,9 @@ impl ChannelManager {
445459 user_identity. as_bytes ( ) . to_vec ( ) . try_into ( ) . unwrap ( ) ;
446460 }
447461 }
448- // In aggregated mode, add extra bytes for translator search space allocation
462+ // In aggregated mode, add extra bytes for translator's own prefix allocation
449463 let upstream_min_extranonce_size = if is_aggregated ( ) {
450- min_extranonce_size + AGGREGATED_MODE_TRANSLATOR_SEARCH_SPACE_BYTES
464+ min_extranonce_size + TPROXY_ALLOCATION_BYTES
451465 } else {
452466 min_extranonce_size
453467 } ;
@@ -513,16 +527,17 @@ impl ChannelManager {
513527 . extended_channels
514528 . get ( & m. channel_id )
515529 . map ( |channel| channel. get_extranonce_prefix ( ) . clone ( ) ) ;
516- // Get the length of the upstream prefix (range0)
517- let range0_len = self
518- . extranonce_factories
530+ // Get the length of the upstream prefix
531+ let upstream_prefix_len = self
532+ . extranonce_allocators
519533 . get ( & AGGREGATED_CHANNEL_ID )
520- . unwrap ( )
521- . get_range0_len ( ) ;
534+ . map ( |allocator| allocator . upstream_prefix_len ( ) )
535+ . unwrap_or ( 0 ) ;
522536 if let Some ( downstream_extranonce_prefix) = downstream_extranonce_prefix {
523- // Skip the upstream prefix (range0) and take the remaining
537+ // Skip the upstream prefix and take the remaining
524538 // bytes (translator proxy prefix)
525- let translator_prefix = & downstream_extranonce_prefix[ range0_len..] ;
539+ let translator_prefix =
540+ & downstream_extranonce_prefix[ upstream_prefix_len..] ;
526541 // Create new extranonce: translator proxy prefix + miner's
527542 // extranonce
528543 let mut new_extranonce = translator_prefix. to_vec ( ) ;
@@ -540,21 +555,22 @@ impl ChannelManager {
540555 // counter
541556 m. sequence_number = self . next_share_sequence_number ( m. channel_id ) ;
542557
543- // Check if we have a per-channel factory for extranonce adjustment
544- let channel_factory = self . extranonce_factories . get ( & m. channel_id ) ;
558+ // Check if we have a per-channel allocator for extranonce adjustment
559+ let channel_allocator = self . extranonce_allocators . get ( & m. channel_id ) ;
545560
546- if let Some ( factory ) = channel_factory {
561+ if let Some ( allocator ) = channel_allocator {
547562 // We need to adjust the extranonce for this channel
548563 let downstream_extranonce_prefix = self
549564 . extended_channels
550565 . get ( & m. channel_id )
551566 . map ( |channel| channel. get_extranonce_prefix ( ) . clone ( ) ) ;
552- let range0_len = factory . get_range0_len ( ) ;
567+ let upstream_prefix_len = allocator . upstream_prefix_len ( ) ;
553568 if let Some ( downstream_extranonce_prefix) = downstream_extranonce_prefix
554569 {
555- // Skip the upstream prefix (range0) and take the remaining
570+ // Skip the upstream prefix and take the remaining
556571 // bytes (translator proxy prefix)
557- let translator_prefix = & downstream_extranonce_prefix[ range0_len..] ;
572+ let translator_prefix =
573+ & downstream_extranonce_prefix[ upstream_prefix_len..] ;
558574 // Create new extranonce: translator proxy prefix + miner's
559575 // extranonce
560576 let mut new_extranonce = translator_prefix. to_vec ( ) ;
@@ -691,20 +707,40 @@ impl ChannelManager {
691707 debug ! ( "Removed channel {} from group channel before sending CloseChannel to upstream" , m. channel_id) ;
692708 }
693709 }
710+ // Free the extranonce prefix for reuse
711+ if let Some ( ( _, prefix_id) ) = self . channel_to_local_prefix_id . remove ( & m. channel_id )
712+ {
713+ // In aggregated mode, free from AGGREGATED_CHANNEL_ID allocator
714+ if is_aggregated ( ) {
715+ if let Some ( mut allocator) =
716+ self . extranonce_allocators . get_mut ( & AGGREGATED_CHANNEL_ID )
717+ {
718+ allocator. free ( prefix_id) ;
719+ }
720+ } else if let Some ( mut allocator) =
721+ self . extranonce_allocators . get_mut ( & m. channel_id )
722+ {
723+ allocator. free ( prefix_id) ;
724+ }
725+ }
694726
695- let message = Mining :: CloseChannel ( m) ;
696- let sv2_frame: Sv2Frame = AnyMessage :: Mining ( message)
697- . try_into ( )
698- . map_err ( TproxyError :: shutdown) ?;
699-
700- self . channel_state
701- . upstream_sender
702- . send ( sv2_frame)
703- . await
704- . map_err ( |e| {
705- error ! ( "Failed to send CloseChannel message to upstream: {:?}" , e) ;
706- TproxyError :: fallback ( TproxyErrorKind :: ChannelErrorSender )
707- } ) ?;
727+ // In aggregated mode, don't forward CloseChannel to upstream since all
728+ // downstreams share the same upstream channel (AGGREGATED_CHANNEL_ID)
729+ if !is_aggregated ( ) {
730+ let message = Mining :: CloseChannel ( m) ;
731+ let sv2_frame: Sv2Frame = AnyMessage :: Mining ( message)
732+ . try_into ( )
733+ . map_err ( TproxyError :: shutdown) ?;
734+
735+ self . channel_state
736+ . upstream_sender
737+ . send ( sv2_frame)
738+ . await
739+ . map_err ( |e| {
740+ error ! ( "Failed to send CloseChannel message to upstream: {:?}" , e) ;
741+ TproxyError :: fallback ( TproxyErrorKind :: ChannelErrorSender )
742+ } ) ?;
743+ }
708744 }
709745 _ => {
710746 warn ! ( "Unhandled downstream message: {:?}" , message) ;
0 commit comments