Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 27 additions & 20 deletions pool-apps/pool/src/lib/channel_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ use jd_server_sv2::job_declarator::JobDeclarator;
use crate::{
config::PoolConfig,
downstream::Downstream,
error::{self, PoolError, PoolErrorKind, PoolResult},
status::{handle_error, Status, StatusSender},
error::{self, Action, PoolError, PoolErrorKind, PoolResult},
utils::DownstreamMessage,
};

Expand Down Expand Up @@ -234,7 +233,6 @@ impl ChannelManager {
listening_address: SocketAddr,
task_manager: Arc<TaskManager>,
cancellation_token: CancellationToken,
status_sender: Sender<Status>,
channel_manager_sender: Sender<(DownstreamId, Mining<'static>, Option<Vec<Tlv>>)>,
) -> PoolResult<(), error::ChannelManager> {
// todo: let start_downstream_server accept Arc, instead of clone.
Expand Down Expand Up @@ -286,11 +284,11 @@ impl ChannelManager {

let this = Arc::clone(&this);
let cancellation_token_inner = cancellation_token_clone.clone();
let status_sender_inner = status_sender.clone();
let channel_manager_sender_inner = channel_manager_sender.clone();
let task_manager_inner = task_manager_clone.clone();

task_manager_clone.spawn(async move {
let cancellation_token_clone = cancellation_token_inner.clone();
let noise_stream = tokio::select! {
result = accept_noise_connection(stream, authority_public_key, authority_secret_key, cert_validity_sec) => {
match result {
Expand All @@ -317,8 +315,7 @@ impl ChannelManager {
Some(group_channel) => group_channel,
None => {
error!("Failed to bootstrap group channel - disconnecting downstream {downstream_id}");
let error = PoolError::<error::ChannelManager>::shutdown(PoolErrorKind::CouldNotInitiateSystem);
handle_error(&StatusSender::ChannelManager(status_sender_inner), error).await;
cancellation_token_clone.cancel();
return;
}
};
Expand Down Expand Up @@ -347,8 +344,8 @@ impl ChannelManager {
downstream
.start(
cancellation_token_inner,
status_sender_inner,
task_manager_inner,
move |downstream_id| this.remove_downstream(downstream_id)
)
.await;
});
Expand All @@ -374,12 +371,9 @@ impl ChannelManager {
pub async fn start(
self,
cancellation_token: CancellationToken,
status_sender: Sender<Status>,
task_manager: Arc<TaskManager>,
coinbase_outputs: Vec<TxOut>,
) -> PoolResult<(), error::ChannelManager> {
let status_sender = StatusSender::ChannelManager(status_sender);

self.coinbase_output_constraints(coinbase_outputs).await?;

task_manager.spawn(async move {
Expand All @@ -400,16 +394,34 @@ impl ChannelManager {
res = cm_template.handle_template_provider_message() => {
if let Err(e) = res {
error!(error = ?e, "Error handling Template Receiver message");
if handle_error(&status_sender, e).await {
break;
match e.action {
Action::Shutdown => {
cancellation_token.cancel();
break;
}
Action::Disconnect(downstream_id) => {
cm_downstreams.remove_downstream(downstream_id);
}
Action::Log => {
warn!("Log-only error from channel manager: {:?}", e.kind);
}
}
}
}
res = cm_downstreams.handle_downstream_mining_message() => {
if let Err(e) = res {
error!(error = ?e, "Error handling Downstreams message");
if handle_error(&status_sender, e).await {
break;
match e.action {
Action::Shutdown => {
cancellation_token.cancel();
break;
}
Action::Disconnect(downstream_id) => {
cm_downstreams.remove_downstream(downstream_id);
}
Action::Log => {
warn!("Log-only error from channel manager: {:?}", e.kind);
}
}
}
}
Expand All @@ -424,11 +436,7 @@ impl ChannelManager {
// Given a `downstream_id`, this method:
// 1. Removes the corresponding Downstream from the `downstream` map.
// 2. Removes the channels of the corresponding Downstream from `vardiff` map.
#[allow(clippy::result_large_err)]
pub fn remove_downstream(
&self,
downstream_id: DownstreamId,
) -> PoolResult<(), error::ChannelManager> {
pub fn remove_downstream(&self, downstream_id: DownstreamId) {
self.channel_manager_data.super_safe_lock(|cm_data| {
cm_data.downstream.remove(&downstream_id);
cm_data
Expand All @@ -438,7 +446,6 @@ impl ChannelManager {
self.channel_manager_channel
.downstream_sender
.super_safe_lock(|map| map.remove(&downstream_id));
Ok(())
}

// Handles messages received from the TP subsystem.
Expand Down
72 changes: 50 additions & 22 deletions pool-apps/pool/src/lib/downstream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ use stratum_apps::{
use tracing::{debug, error, warn};

use crate::{
error::{self, PoolError, PoolErrorKind, PoolResult},
error::{self, Action, PoolError, PoolErrorKind, PoolResult},
io_task::spawn_io_tasks,
status::{handle_error, Status, StatusSender},
utils::PayoutMode,
};

Expand Down Expand Up @@ -76,10 +75,6 @@ pub struct DownstreamChannel {
channel_manager_receiver: Receiver<(Mining<'static>, Option<Vec<Tlv>>)>,
downstream_sender: Sender<Sv2Frame>,
downstream_receiver: Receiver<Sv2Frame>,
/// Per-connection cancellation token (child of the global token).
/// Cancelled when this downstream's message loop exits, causing
/// the associated I/O tasks to shut down.
connection_token: CancellationToken,
}

/// Represents a downstream client connected to this node.
Expand All @@ -94,6 +89,10 @@ pub struct Downstream {
pub supported_extensions: Vec<u16>,
/// Extensions that the pool requires
pub required_extensions: Vec<u16>,
/// Per-connection cancellation token (child of the global token).
/// Cancelled when this downstream's message loop exits, causing
/// the associated I/O tasks to shut down.
downstream_connection_token: CancellationToken,
}

#[cfg_attr(not(test), hotpath::measure_all)]
Expand All @@ -118,22 +117,21 @@ impl Downstream {

// Create a per-connection child token so we can cancel this
// connection's I/O tasks independently of the global shutdown.
let connection_token = cancellation_token.child_token();
let downstream_connection_token = cancellation_token.child_token();
spawn_io_tasks(
task_manager,
noise_stream_reader,
noise_stream_writer,
outbound_rx,
inbound_tx,
connection_token.clone(),
downstream_connection_token.clone(),
);

let downstream_channel = DownstreamChannel {
channel_manager_receiver,
channel_manager_sender,
downstream_sender: outbound_tx,
downstream_receiver: inbound_rx,
connection_token,
};

let downstream_data = Arc::new(Mutex::new(DownstreamData {
Expand All @@ -153,6 +151,7 @@ impl Downstream {
requires_custom_work: Arc::new(AtomicBool::new(false)),
supported_extensions,
required_extensions,
downstream_connection_token,
}
}

Expand All @@ -165,14 +164,9 @@ impl Downstream {
pub async fn start(
mut self,
cancellation_token: CancellationToken,
status_sender: Sender<Status>,
task_manager: Arc<TaskManager>,
remove_downstream: impl FnOnce(DownstreamId) + Send + 'static,
) {
let status_sender = StatusSender::Downstream {
downstream_id: self.downstream_id,
tx: status_sender,
};

// Setup initial connection
if let Err(e) = self.setup_connection_with_downstream().await {
error!(?e, "Failed to set up downstream connection");
Expand All @@ -181,7 +175,21 @@ impl Downstream {
// before we break the TCP connection
tokio::time::sleep(std::time::Duration::from_secs(1)).await;

handle_error(&status_sender, e).await;
match e.action {
Action::Shutdown => {
cancellation_token.cancel();
}
Action::Disconnect(_) => {
self.downstream_connection_token.cancel();
}
Action::Log => {
warn!(
"Log-only error from downstream {}: {:?}",
self.downstream_id, e.kind
);
}
}
remove_downstream(self.downstream_id);
return;
Comment thread
Shourya742 marked this conversation as resolved.
}

Expand All @@ -198,23 +206,43 @@ impl Downstream {
res = self_clone_1.handle_downstream_message() => {
if let Err(e) = res {
error!(?e, "Error handling downstream message for {downstream_id}");
if handle_error(&status_sender, e).await {
break;
match e.action {
Action::Shutdown => {
cancellation_token.cancel();
break;
}
Action::Log => {
warn!("Log-only error from downstream {downstream_id}: {:?}", e.kind);
}
Action::Disconnect(_) => {
self_clone_1.downstream_connection_token.cancel();
break;
}
}
}
}
res = self_clone_2.handle_channel_manager_message() => {
if let Err(e) = res {
error!(?e, "Error handling channel manager message for {downstream_id}");
if handle_error(&status_sender, e).await {
break;
match e.action {
Action::Shutdown => {
cancellation_token.cancel();
break;
}
Action::Log => {
warn!("Log-only error from downstream {downstream_id}: {:?}", e.kind);
}
Action::Disconnect(_) => {
self_clone_1.downstream_connection_token.cancel();
break;
}
}
}
}
}
}

self.downstream_channel.connection_token.cancel();
self.downstream_connection_token.cancel();
remove_downstream(self.downstream_id);
warn!("Downstream: unified message loop exited.");
});
}
Expand Down
56 changes: 7 additions & 49 deletions pool-apps/pool/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use crate::{
channel_manager::ChannelManager,
config::PoolConfig,
error::PoolErrorKind,
status::State,
template_receiver::{
bitcoin_core::{connect_to_bitcoin_core, BitcoinCoreSv2TDPConfig},
sv2_tp::Sv2Tp,
Expand All @@ -39,7 +38,6 @@ pub mod error;
mod io_task;
#[cfg(feature = "monitoring")]
mod monitoring;
pub mod status;
pub mod template_receiver;
pub mod utils;

Expand Down Expand Up @@ -75,8 +73,6 @@ impl PoolSv2 {

let task_manager = Arc::new(TaskManager::new());

let (status_sender, status_receiver) = unbounded();

let (downstream_to_channel_manager_sender, downstream_to_channel_manager_receiver) =
unbounded();

Expand Down Expand Up @@ -194,7 +190,6 @@ impl PoolSv2 {
}

let channel_manager_clone = channel_manager.clone();
let channel_manager_for_cleanup = channel_manager.clone();
let mut bitcoin_core_sv2_join_handle: Option<JoinHandle<()>> = None;

match self.config.template_provider_type().clone() {
Expand All @@ -213,12 +208,7 @@ impl PoolSv2 {
.await?;

sv2_tp
.start(
address,
cancellation_token.clone(),
status_sender.clone(),
task_manager.clone(),
)
.start(address, cancellation_token.clone(), task_manager.clone())
.await?;

info!("Sv2 Template Provider setup done");
Expand Down Expand Up @@ -258,7 +248,6 @@ impl PoolSv2 {
bitcoin_core_config,
cancellation_token.clone(),
task_manager.clone(),
status_sender.clone(),
)
.await,
);
Expand All @@ -268,7 +257,6 @@ impl PoolSv2 {
channel_manager
.start(
cancellation_token.clone(),
status_sender.clone(),
task_manager.clone(),
coinbase_outputs,
)
Expand All @@ -282,48 +270,18 @@ impl PoolSv2 {
*self.config.listen_address(),
task_manager.clone(),
cancellation_token.clone(),
status_sender,
downstream_to_channel_manager_sender,
)
.await?;

info!("Spawning status listener task...");
loop {
tokio::select! {
_ = tokio::signal::ctrl_c() => {
info!("Ctrl+C received — initiating graceful shutdown...");
cancellation_token.cancel();
break;
}
_ = cancellation_token.cancelled() => {
break;
}
message = status_receiver.recv() => {
if let Ok(status) = message {
match status.state {
State::DownstreamShutdown{downstream_id,..} => {
warn!("Downstream {downstream_id:?} disconnected — cleaning up channel manager.");
// Remove downstream from channel manager to prevent memory leak
if let Err(e) = channel_manager_for_cleanup.remove_downstream(downstream_id) {
error!("Failed to remove downstream {downstream_id:?}: {e:?}");
cancellation_token.cancel();
break;
}
}
State::TemplateReceiverShutdown(_) => {
warn!("Template Receiver shutdown requested — initiating full shutdown.");
cancellation_token.cancel();
break;
}
State::ChannelManagerShutdown(_) => {
warn!("Channel Manager shutdown requested — initiating full shutdown.");
cancellation_token.cancel();
break;
}
}
}
}

tokio::select! {
_ = tokio::signal::ctrl_c() => {
info!("Ctrl+C received — initiating graceful shutdown...");
cancellation_token.cancel();
}
_ = cancellation_token.cancelled() => {}
}

if let Some(ref jd) = job_declarator_for_shutdown {
Expand Down
Loading
Loading