diff --git a/pool-apps/pool/src/lib/channel_manager/mod.rs b/pool-apps/pool/src/lib/channel_manager/mod.rs index cc5fcc875..cc452f23e 100644 --- a/pool-apps/pool/src/lib/channel_manager/mod.rs +++ b/pool-apps/pool/src/lib/channel_manager/mod.rs @@ -45,8 +45,7 @@ use jd_server_sv2::job_declarator::JobDeclarator; use crate::{ config::PoolConfig, downstream::Downstream, - error::{self, PoolError, PoolErrorKind, PoolResult}, - status::{handle_error, Status, StatusSender}, + error::{self, Action, PoolError, PoolErrorKind, PoolResult}, utils::DownstreamMessage, }; @@ -234,7 +233,6 @@ impl ChannelManager { listening_address: SocketAddr, task_manager: Arc, cancellation_token: CancellationToken, - status_sender: Sender, channel_manager_sender: Sender<(DownstreamId, Mining<'static>, Option>)>, ) -> PoolResult<(), error::ChannelManager> { // todo: let start_downstream_server accept Arc, instead of clone. @@ -286,11 +284,11 @@ impl ChannelManager { let this = Arc::clone(&this); let cancellation_token_inner = cancellation_token_clone.clone(); - let status_sender_inner = status_sender.clone(); let channel_manager_sender_inner = channel_manager_sender.clone(); let task_manager_inner = task_manager_clone.clone(); task_manager_clone.spawn(async move { + let cancellation_token_clone = cancellation_token_inner.clone(); let noise_stream = tokio::select! { result = accept_noise_connection(stream, authority_public_key, authority_secret_key, cert_validity_sec) => { match result { @@ -317,8 +315,7 @@ impl ChannelManager { Some(group_channel) => group_channel, None => { error!("Failed to bootstrap group channel - disconnecting downstream {downstream_id}"); - let error = PoolError::::shutdown(PoolErrorKind::CouldNotInitiateSystem); - handle_error(&StatusSender::ChannelManager(status_sender_inner), error).await; + cancellation_token_clone.cancel(); return; } }; @@ -347,8 +344,8 @@ impl ChannelManager { downstream .start( cancellation_token_inner, - status_sender_inner, task_manager_inner, + move |downstream_id| this.remove_downstream(downstream_id) ) .await; }); @@ -374,12 +371,9 @@ impl ChannelManager { pub async fn start( self, cancellation_token: CancellationToken, - status_sender: Sender, task_manager: Arc, coinbase_outputs: Vec, ) -> PoolResult<(), error::ChannelManager> { - let status_sender = StatusSender::ChannelManager(status_sender); - self.coinbase_output_constraints(coinbase_outputs).await?; task_manager.spawn(async move { @@ -400,16 +394,34 @@ impl ChannelManager { res = cm_template.handle_template_provider_message() => { if let Err(e) = res { error!(error = ?e, "Error handling Template Receiver message"); - if handle_error(&status_sender, e).await { - break; + match e.action { + Action::Shutdown => { + cancellation_token.cancel(); + break; + } + Action::Disconnect(downstream_id) => { + cm_downstreams.remove_downstream(downstream_id); + } + Action::Log => { + warn!("Log-only error from channel manager: {:?}", e.kind); + } } } } res = cm_downstreams.handle_downstream_mining_message() => { if let Err(e) = res { error!(error = ?e, "Error handling Downstreams message"); - if handle_error(&status_sender, e).await { - break; + match e.action { + Action::Shutdown => { + cancellation_token.cancel(); + break; + } + Action::Disconnect(downstream_id) => { + cm_downstreams.remove_downstream(downstream_id); + } + Action::Log => { + warn!("Log-only error from channel manager: {:?}", e.kind); + } } } } @@ -424,11 +436,7 @@ impl ChannelManager { // Given a `downstream_id`, this method: // 1. Removes the corresponding Downstream from the `downstream` map. // 2. Removes the channels of the corresponding Downstream from `vardiff` map. - #[allow(clippy::result_large_err)] - pub fn remove_downstream( - &self, - downstream_id: DownstreamId, - ) -> PoolResult<(), error::ChannelManager> { + pub fn remove_downstream(&self, downstream_id: DownstreamId) { self.channel_manager_data.super_safe_lock(|cm_data| { cm_data.downstream.remove(&downstream_id); cm_data @@ -438,7 +446,6 @@ impl ChannelManager { self.channel_manager_channel .downstream_sender .super_safe_lock(|map| map.remove(&downstream_id)); - Ok(()) } // Handles messages received from the TP subsystem. diff --git a/pool-apps/pool/src/lib/downstream/mod.rs b/pool-apps/pool/src/lib/downstream/mod.rs index ec828f2ec..639ffca83 100644 --- a/pool-apps/pool/src/lib/downstream/mod.rs +++ b/pool-apps/pool/src/lib/downstream/mod.rs @@ -32,9 +32,8 @@ use stratum_apps::{ use tracing::{debug, error, warn}; use crate::{ - error::{self, PoolError, PoolErrorKind, PoolResult}, + error::{self, Action, PoolError, PoolErrorKind, PoolResult}, io_task::spawn_io_tasks, - status::{handle_error, Status, StatusSender}, utils::PayoutMode, }; @@ -76,10 +75,6 @@ pub struct DownstreamChannel { channel_manager_receiver: Receiver<(Mining<'static>, Option>)>, downstream_sender: Sender, downstream_receiver: Receiver, - /// Per-connection cancellation token (child of the global token). - /// Cancelled when this downstream's message loop exits, causing - /// the associated I/O tasks to shut down. - connection_token: CancellationToken, } /// Represents a downstream client connected to this node. @@ -94,6 +89,10 @@ pub struct Downstream { pub supported_extensions: Vec, /// Extensions that the pool requires pub required_extensions: Vec, + /// Per-connection cancellation token (child of the global token). + /// Cancelled when this downstream's message loop exits, causing + /// the associated I/O tasks to shut down. + downstream_connection_token: CancellationToken, } #[cfg_attr(not(test), hotpath::measure_all)] @@ -118,14 +117,14 @@ impl Downstream { // Create a per-connection child token so we can cancel this // connection's I/O tasks independently of the global shutdown. - let connection_token = cancellation_token.child_token(); + let downstream_connection_token = cancellation_token.child_token(); spawn_io_tasks( task_manager, noise_stream_reader, noise_stream_writer, outbound_rx, inbound_tx, - connection_token.clone(), + downstream_connection_token.clone(), ); let downstream_channel = DownstreamChannel { @@ -133,7 +132,6 @@ impl Downstream { channel_manager_sender, downstream_sender: outbound_tx, downstream_receiver: inbound_rx, - connection_token, }; let downstream_data = Arc::new(Mutex::new(DownstreamData { @@ -153,6 +151,7 @@ impl Downstream { requires_custom_work: Arc::new(AtomicBool::new(false)), supported_extensions, required_extensions, + downstream_connection_token, } } @@ -165,14 +164,9 @@ impl Downstream { pub async fn start( mut self, cancellation_token: CancellationToken, - status_sender: Sender, task_manager: Arc, + remove_downstream: impl FnOnce(DownstreamId) + Send + 'static, ) { - let status_sender = StatusSender::Downstream { - downstream_id: self.downstream_id, - tx: status_sender, - }; - // Setup initial connection if let Err(e) = self.setup_connection_with_downstream().await { error!(?e, "Failed to set up downstream connection"); @@ -181,7 +175,21 @@ impl Downstream { // before we break the TCP connection tokio::time::sleep(std::time::Duration::from_secs(1)).await; - handle_error(&status_sender, e).await; + match e.action { + Action::Shutdown => { + cancellation_token.cancel(); + } + Action::Disconnect(_) => { + self.downstream_connection_token.cancel(); + } + Action::Log => { + warn!( + "Log-only error from downstream {}: {:?}", + self.downstream_id, e.kind + ); + } + } + remove_downstream(self.downstream_id); return; } @@ -198,23 +206,43 @@ impl Downstream { res = self_clone_1.handle_downstream_message() => { if let Err(e) = res { error!(?e, "Error handling downstream message for {downstream_id}"); - if handle_error(&status_sender, e).await { - break; + match e.action { + Action::Shutdown => { + cancellation_token.cancel(); + break; + } + Action::Log => { + warn!("Log-only error from downstream {downstream_id}: {:?}", e.kind); + } + Action::Disconnect(_) => { + self_clone_1.downstream_connection_token.cancel(); + break; + } } } } res = self_clone_2.handle_channel_manager_message() => { if let Err(e) = res { error!(?e, "Error handling channel manager message for {downstream_id}"); - if handle_error(&status_sender, e).await { - break; + match e.action { + Action::Shutdown => { + cancellation_token.cancel(); + break; + } + Action::Log => { + warn!("Log-only error from downstream {downstream_id}: {:?}", e.kind); + } + Action::Disconnect(_) => { + self_clone_1.downstream_connection_token.cancel(); + break; + } } } } } } - - self.downstream_channel.connection_token.cancel(); + self.downstream_connection_token.cancel(); + remove_downstream(self.downstream_id); warn!("Downstream: unified message loop exited."); }); } diff --git a/pool-apps/pool/src/lib/mod.rs b/pool-apps/pool/src/lib/mod.rs index a0937e0e6..acd7ff9d0 100644 --- a/pool-apps/pool/src/lib/mod.rs +++ b/pool-apps/pool/src/lib/mod.rs @@ -25,7 +25,6 @@ use crate::{ channel_manager::ChannelManager, config::PoolConfig, error::PoolErrorKind, - status::State, template_receiver::{ bitcoin_core::{connect_to_bitcoin_core, BitcoinCoreSv2TDPConfig}, sv2_tp::Sv2Tp, @@ -39,7 +38,6 @@ pub mod error; mod io_task; #[cfg(feature = "monitoring")] mod monitoring; -pub mod status; pub mod template_receiver; pub mod utils; @@ -75,8 +73,6 @@ impl PoolSv2 { let task_manager = Arc::new(TaskManager::new()); - let (status_sender, status_receiver) = unbounded(); - let (downstream_to_channel_manager_sender, downstream_to_channel_manager_receiver) = unbounded(); @@ -194,7 +190,6 @@ impl PoolSv2 { } let channel_manager_clone = channel_manager.clone(); - let channel_manager_for_cleanup = channel_manager.clone(); let mut bitcoin_core_sv2_join_handle: Option> = None; match self.config.template_provider_type().clone() { @@ -213,12 +208,7 @@ impl PoolSv2 { .await?; sv2_tp - .start( - address, - cancellation_token.clone(), - status_sender.clone(), - task_manager.clone(), - ) + .start(address, cancellation_token.clone(), task_manager.clone()) .await?; info!("Sv2 Template Provider setup done"); @@ -258,7 +248,6 @@ impl PoolSv2 { bitcoin_core_config, cancellation_token.clone(), task_manager.clone(), - status_sender.clone(), ) .await, ); @@ -268,7 +257,6 @@ impl PoolSv2 { channel_manager .start( cancellation_token.clone(), - status_sender.clone(), task_manager.clone(), coinbase_outputs, ) @@ -282,48 +270,18 @@ impl PoolSv2 { *self.config.listen_address(), task_manager.clone(), cancellation_token.clone(), - status_sender, downstream_to_channel_manager_sender, ) .await?; info!("Spawning status listener task..."); - loop { - tokio::select! { - _ = tokio::signal::ctrl_c() => { - info!("Ctrl+C received — initiating graceful shutdown..."); - cancellation_token.cancel(); - break; - } - _ = cancellation_token.cancelled() => { - break; - } - message = status_receiver.recv() => { - if let Ok(status) = message { - match status.state { - State::DownstreamShutdown{downstream_id,..} => { - warn!("Downstream {downstream_id:?} disconnected — cleaning up channel manager."); - // Remove downstream from channel manager to prevent memory leak - if let Err(e) = channel_manager_for_cleanup.remove_downstream(downstream_id) { - error!("Failed to remove downstream {downstream_id:?}: {e:?}"); - cancellation_token.cancel(); - break; - } - } - State::TemplateReceiverShutdown(_) => { - warn!("Template Receiver shutdown requested — initiating full shutdown."); - cancellation_token.cancel(); - break; - } - State::ChannelManagerShutdown(_) => { - warn!("Channel Manager shutdown requested — initiating full shutdown."); - cancellation_token.cancel(); - break; - } - } - } - } + + tokio::select! { + _ = tokio::signal::ctrl_c() => { + info!("Ctrl+C received — initiating graceful shutdown..."); + cancellation_token.cancel(); } + _ = cancellation_token.cancelled() => {} } if let Some(ref jd) = job_declarator_for_shutdown { diff --git a/pool-apps/pool/src/lib/status.rs b/pool-apps/pool/src/lib/status.rs deleted file mode 100644 index b2fffd17f..000000000 --- a/pool-apps/pool/src/lib/status.rs +++ /dev/null @@ -1,153 +0,0 @@ -//! Status reporting and error propagation Utility. -//! -//! This module provides mechanisms for communicating shutdown events and -//! component state changes across the system. Each component (downstream, -//! upstream, job declarator, template receiver, channel manager) can send -//! and receive status updates via typed channels. Errors are automatically -//! converted into shutdown signals, allowing coordinated teardown of tasks. - -use stratum_apps::utils::types::DownstreamId; -use tracing::{debug, warn}; - -use crate::error::{Action, PoolError, PoolErrorKind}; - -/// Sender type for propagating status updates from different system components. -#[derive(Debug, Clone)] -pub enum StatusSender { - /// Status updates from a specific downstream connection. - Downstream { - downstream_id: DownstreamId, - tx: async_channel::Sender, - }, - /// Status updates from the template receiver. - TemplateReceiver(async_channel::Sender), - /// Status updates from the channel manager. - ChannelManager(async_channel::Sender), -} - -/// High-level identifier of a component type that can send status updates. -#[derive(Debug, PartialEq, Eq)] -pub enum StatusType { - /// A downstream connection identified by its ID. - Downstream(DownstreamId), - /// The template receiver component. - TemplateReceiver, - /// The channel manager component. - ChannelManager, -} - -impl From<&StatusSender> for StatusType { - fn from(value: &StatusSender) -> Self { - match value { - StatusSender::ChannelManager(_) => StatusType::ChannelManager, - StatusSender::Downstream { - downstream_id, - tx: _, - } => StatusType::Downstream(*downstream_id), - StatusSender::TemplateReceiver(_) => StatusType::TemplateReceiver, - } - } -} - -#[cfg_attr(not(test), hotpath::measure_all)] -impl StatusSender { - /// Sends a status update for the associated component. - pub async fn send(&self, status: Status) -> Result<(), async_channel::SendError> { - match self { - Self::Downstream { downstream_id, tx } => { - debug!( - "Sending status from Downstream [{}]: {:?}", - downstream_id, status.state - ); - tx.send(status).await - } - Self::TemplateReceiver(tx) => { - debug!("Sending status from TemplateReceiver: {:?}", status.state); - tx.send(status).await - } - Self::ChannelManager(tx) => { - debug!("Sending status from ChannelManager: {:?}", status.state); - tx.send(status).await - } - } - } -} - -/// Represents the state of a component, typically triggered by an error or shutdown event. -#[derive(Debug)] -pub enum State { - /// A downstream connection has shut down with a reason. - DownstreamShutdown { - downstream_id: DownstreamId, - reason: PoolErrorKind, - }, - /// Template receiver has shut down with a reason. - TemplateReceiverShutdown(PoolErrorKind), - /// Channel manager has shut down with a reason. - ChannelManagerShutdown(PoolErrorKind), -} - -/// Wrapper around a component’s state, sent as status updates across the system. -#[derive(Debug)] -pub struct Status { - /// The current state being reported. - pub state: State, -} - -#[cfg_attr(not(test), hotpath::measure)] -async fn send_status(sender: &StatusSender, error: PoolError) -> bool { - use Action::*; - - match error.action { - Log => { - warn!("Log-only error from {:?}: {:?}", sender, error.kind); - false - } - - Disconnect(downstream_id) => { - let state = State::DownstreamShutdown { - downstream_id, - reason: error.kind, - }; - - if let Err(e) = sender.send(Status { state }).await { - tracing::error!( - "Failed to send downstream shutdown status from {:?}: {:?}", - sender, - e - ); - std::process::abort(); - } - matches!(sender, StatusSender::Downstream { .. }) - } - Shutdown => { - let state = match sender { - StatusSender::ChannelManager(_) => { - warn!( - "Channel Manager shutdown requested due to error: {:?}", - error.kind - ); - State::ChannelManagerShutdown(error.kind) - } - StatusSender::TemplateReceiver(_) => { - warn!( - "Template Receiver shutdown requested due to error: {:?}", - error.kind - ); - State::TemplateReceiverShutdown(error.kind) - } - _ => State::ChannelManagerShutdown(error.kind), - }; - - if let Err(e) = sender.send(Status { state }).await { - tracing::error!("Failed to send shutdown status from {:?}: {:?}", sender, e); - std::process::abort(); - } - true - } - } -} - -pub async fn handle_error(sender: &StatusSender, e: PoolError) -> bool { - send_status(sender, e).await -} diff --git a/pool-apps/pool/src/lib/template_receiver/bitcoin_core.rs b/pool-apps/pool/src/lib/template_receiver/bitcoin_core.rs index 4ec0ddc83..f4b7136dd 100644 --- a/pool-apps/pool/src/lib/template_receiver/bitcoin_core.rs +++ b/pool-apps/pool/src/lib/template_receiver/bitcoin_core.rs @@ -1,7 +1,3 @@ -use crate::{ - error::PoolErrorKind, - status::{State, Status}, -}; use async_channel::{Receiver, Sender}; use bitcoin_core_sv2::template_distribution_protocol::{BitcoinCoreSv2TDP, CancellationToken}; use std::{path::PathBuf, sync::Arc, thread::JoinHandle}; @@ -22,9 +18,9 @@ pub async fn connect_to_bitcoin_core( bitcoin_core_config: BitcoinCoreSv2TDPConfig, cancellation_token: CancellationToken, task_manager: Arc, - status_sender: Sender, ) -> JoinHandle<()> { let bitcoin_core_canc_token = bitcoin_core_config.cancellation_token.clone(); + let cancellation_token_clone = cancellation_token.clone(); // spawn a task to handle shutdown signals and cancellation token activations task_manager.spawn(async move { @@ -47,13 +43,7 @@ pub async fn connect_to_bitcoin_core( Ok(rt) => rt, Err(e) => { tracing::error!("Failed to create Tokio runtime: {:?}", e); - - // we can't use handle_error here because we're not in a async context yet - let _ = status_sender.send_blocking(Status { - state: State::TemplateReceiverShutdown( - PoolErrorKind::FailedToCreateBitcoinCoreTokioRuntime, - ), - }); + cancellation_token_clone.cancel(); return; } }; diff --git a/pool-apps/pool/src/lib/template_receiver/sv2_tp/mod.rs b/pool-apps/pool/src/lib/template_receiver/sv2_tp/mod.rs index a3e287c6c..d2b07747a 100644 --- a/pool-apps/pool/src/lib/template_receiver/sv2_tp/mod.rs +++ b/pool-apps/pool/src/lib/template_receiver/sv2_tp/mod.rs @@ -20,9 +20,8 @@ use tokio::net::TcpStream; use tracing::{debug, error, info, warn}; use crate::{ - error::{self, PoolError, PoolErrorKind, PoolResult}, + error::{self, Action, PoolError, PoolErrorKind, PoolResult}, io_task::spawn_io_tasks, - status::{handle_error, Status, StatusSender}, utils::get_setup_connection_message_tp, }; @@ -144,11 +143,8 @@ impl Sv2Tp { mut self, socket_address: String, cancellation_token: CancellationToken, - status_sender: Sender, task_manager: Arc, ) -> PoolResult<(), error::TemplateProvider> { - let status_sender = StatusSender::TemplateReceiver(status_sender); - info!("Initialized state for starting template receiver"); self.setup_connection(socket_address).await?; @@ -165,16 +161,30 @@ impl Sv2Tp { res = self_clone_1.handle_template_provider_message() => { if let Err(e) = res { error!("TemplateReceiver template provider handler failed: {e:?}"); - if handle_error(&status_sender, e).await { - break; + match e.action { + Action::Shutdown => { + cancellation_token.cancel(); + break; + } + Action::Log => { + warn!("Log-only error from Template Provider: {:?}", e.kind); + } + _ => {} } } } res = self_clone_2.handle_channel_manager_message() => { if let Err(e) = res { error!("TemplateReceiver channel manager handler failed: {e:?}"); - if handle_error(&status_sender, e).await { - break; + match e.action { + Action::Shutdown => { + cancellation_token.cancel(); + break; + } + Action::Log => { + warn!("Log-only error from Template Provider: {:?}", e.kind); + } + _ => {} } } },