From 9229bc0f3ada82c1802ff47c86dc988da1976212 Mon Sep 17 00:00:00 2001 From: Eric Price Date: Tue, 17 Mar 2026 19:27:14 -0400 Subject: [PATCH 1/2] fix(monitoring): move metric updates from /metrics handler into SnapshotCache::refresh (#337) Move all Prometheus gauge updates (set + stale-label removal) out of the /metrics HTTP handler and into SnapshotCache::refresh(), which runs as a periodic background task. This eliminates the GaugeVec reset gap where label series momentarily disappeared on every scrape. Changes: - SnapshotCache now owns PrometheusMetrics and PreviousLabelSets - refresh() updates snapshot data AND Prometheus gauges atomically - /metrics handler reduced to: set uptime gauge, gather, encode - ServerState simplified (no more PreviousLabelSets or Mutex) - Tests updated to wire metrics through cache via with_metrics() - Integration tests: replace fixed-sleep assertions with poll_until_metric_gte (100ms poll, 5s deadline) for CI resilience - Clone impl preserves previous_labels for correct stale-label detection - debug-level tracing on stale label removal errors - debug_assert on with_metrics double-attachment Closes #337 --- .../lib/prometheus_metrics_assertions.rs | 59 ++-- .../tests/monitoring_integration.rs | 27 +- stratum-apps/src/monitoring/http_server.rs | 307 +++++++----------- stratum-apps/src/monitoring/snapshot_cache.rs | 245 +++++++++++++- 4 files changed, 404 insertions(+), 234 deletions(-) diff --git a/integration-tests/lib/prometheus_metrics_assertions.rs b/integration-tests/lib/prometheus_metrics_assertions.rs index bfeb5364d..0625760b5 100644 --- a/integration-tests/lib/prometheus_metrics_assertions.rs +++ b/integration-tests/lib/prometheus_metrics_assertions.rs @@ -38,15 +38,22 @@ pub(crate) fn parse_metric_value(metrics_text: &str, metric_name: &str) -> Optio if line.starts_with('#') { continue; } - // For labeled metrics, match the prefix up to the closing brace if let Some(rest) = line.strip_prefix(metric_name) { - // The value follows a space after the metric name (or after the closing brace) - let value_str = rest.trim(); - // If there are labels and the name didn't include them, skip - if value_str.starts_with('{') { + let rest = rest.trim(); + if rest.is_empty() { continue; } - return value_str.parse::().ok(); + // Bare metric (no labels): value follows directly after the name + if rest.starts_with(|c: char| c.is_ascii_digit() || c == '-') { + return rest.parse::().ok(); + } + // Labeled metric: skip past the closing brace to get the value + if rest.starts_with('{') { + if let Some(brace_end) = rest.find('}') { + let value_str = rest[brace_end + 1..].trim(); + return value_str.parse::().ok(); + } + } } } None @@ -135,15 +142,11 @@ pub fn assert_metric_present(metrics_text: &str, metric_name: &str) { ); } -/// Poll the `/metrics` endpoint until any line matching `metric_name` (with any labels) has a -/// value >= `min`, then return the full metrics text. Panics if the condition is not met within -/// `timeout`. +/// Poll `/metrics` until `metric_name` is present with a value >= `min`, or panic after +/// `timeout`. Polls every 100ms to react quickly while tolerating cache refresh jitter. /// -/// Use this instead of a fixed `sleep` for `GaugeVec` metrics (per-channel shares, blocks found) -/// that only appear in Prometheus output after the monitoring snapshot cache has refreshed with -/// observed label combinations. The handler calls `.reset()` on every `/metrics` request before -/// repopulating from the cached snapshot, so a label combination is only present when the -/// snapshot contains a non-default value for it. +/// Returns the full metrics text from the successful scrape so callers can make additional +/// assertions without a second fetch. pub async fn poll_until_metric_gte( monitoring_addr: SocketAddr, metric_name: &str, @@ -153,30 +156,10 @@ pub async fn poll_until_metric_gte( let deadline = tokio::time::Instant::now() + timeout; loop { let metrics = fetch_metrics(monitoring_addr).await; - let satisfied = metrics.lines().any(|line| { - if line.starts_with('#') { - return false; - } - if let Some(rest) = line.strip_prefix(metric_name) { - // Match bare name followed by space, or labeled name followed by '{' - let value_str = if rest.starts_with(' ') { - rest.trim() - } else if rest.starts_with('{') { - // Skip past the closing brace to get the value - rest.find('}') - .and_then(|i| rest.get(i + 1..)) - .map(|s| s.trim()) - .unwrap_or("") - } else { - return false; - }; - value_str.parse::().map(|v| v >= min).unwrap_or(false) - } else { - false + if let Some(v) = parse_metric_value(&metrics, metric_name) { + if v >= min { + return metrics; } - }); - if satisfied { - return metrics; } if tokio::time::Instant::now() >= deadline { panic!( @@ -184,7 +167,7 @@ pub async fn poll_until_metric_gte( metric_name, min, timeout, metrics ); } - tokio::time::sleep(std::time::Duration::from_millis(500)).await; + tokio::time::sleep(std::time::Duration::from_millis(100)).await; } } diff --git a/integration-tests/tests/monitoring_integration.rs b/integration-tests/tests/monitoring_integration.rs index 1aca9c4c4..3321c7a69 100644 --- a/integration-tests/tests/monitoring_integration.rs +++ b/integration-tests/tests/monitoring_integration.rs @@ -9,6 +9,9 @@ use integration_tests_sv2::{ }; use stratum_apps::stratum_core::mining_sv2::*; +/// Timeout for polling metric assertions. Generous enough for slow CI. +const METRIC_POLL_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5); + // --------------------------------------------------------------------------- // 1. Pool + SV2 Mining Device (standard channel) Pool role exposes: client metrics (connections, // channels, shares, hashrate) Pool has NO upstream, so server metrics should be absent. @@ -41,12 +44,12 @@ async fn pool_monitoring_with_sv2_mining_device() { // Health API assert_api_health(pool_mon).await; - // Poll until per-channel share metric is populated in the monitoring cache + // Poll until the monitoring cache has refreshed with the new share data let pool_metrics = poll_until_metric_gte( pool_mon, "sv2_client_shares_accepted_total", 1.0, - std::time::Duration::from_secs(10), + METRIC_POLL_TIMEOUT, ) .await; assert_uptime(&pool_metrics); @@ -88,11 +91,13 @@ async fn pool_and_tproxy_monitoring_with_sv1_miner() { // -- Pool metrics -- let pool_mon = pool_monitoring.expect("pool monitoring should be enabled"); assert_api_health(pool_mon).await; + + // Poll until the monitoring cache has refreshed with the new share data let pool_metrics = poll_until_metric_gte( pool_mon, "sv2_client_shares_accepted_total", 1.0, - std::time::Duration::from_secs(10), + METRIC_POLL_TIMEOUT, ) .await; assert_uptime(&pool_metrics); @@ -103,13 +108,7 @@ async fn pool_and_tproxy_monitoring_with_sv1_miner() { // -- tProxy metrics -- let tproxy_mon = tproxy_monitoring.expect("tproxy monitoring should be enabled"); assert_api_health(tproxy_mon).await; - let tproxy_metrics = poll_until_metric_gte( - tproxy_mon, - "sv2_server_shares_accepted_total", - 1.0, - std::time::Duration::from_secs(10), - ) - .await; + let tproxy_metrics = fetch_metrics(tproxy_mon).await; assert_uptime(&tproxy_metrics); // tProxy has 1 upstream extended channel assert_metric_eq( @@ -171,11 +170,13 @@ async fn jd_aggregated_topology_monitoring() { // -- Pool metrics: sees 1 SV2 client (JDC), shares accepted -- let pool_mon = pool_monitoring.expect("pool monitoring should be enabled"); assert_api_health(pool_mon).await; + + // Poll until the monitoring cache has refreshed with the new share data let pool_metrics = poll_until_metric_gte( pool_mon, "sv2_client_shares_accepted_total", 1.0, - std::time::Duration::from_secs(10), + METRIC_POLL_TIMEOUT, ) .await; assert_uptime(&pool_metrics); @@ -231,13 +232,13 @@ async fn block_found_detected_in_pool_metrics() { .wait_for_message_type(MessageDirection::ToUpstream, MESSAGE_TYPE_SUBMIT_SOLUTION) .await; - // Poll until block found metric appears in monitoring cache + // Poll until the monitoring cache has refreshed with the block found data let pool_mon = pool_monitoring.expect("pool monitoring should be enabled"); let pool_metrics = poll_until_metric_gte( pool_mon, "sv2_client_blocks_found_total", 1.0, - std::time::Duration::from_secs(10), + METRIC_POLL_TIMEOUT, ) .await; assert_uptime(&pool_metrics); diff --git a/stratum-apps/src/monitoring/http_server.rs b/stratum-apps/src/monitoring/http_server.rs index 7daf6ad02..e33e57c44 100644 --- a/stratum-apps/src/monitoring/http_server.rs +++ b/stratum-apps/src/monitoring/http_server.rs @@ -161,17 +161,17 @@ impl MonitoringServer { let has_server = server_monitoring.is_some(); let has_sv2_clients = sv2_clients_monitoring.is_some(); - // Create the snapshot cache - let cache = Arc::new(SnapshotCache::new( - refresh_interval, - server_monitoring, - sv2_clients_monitoring, - )); + let metrics = PrometheusMetrics::new(has_server, has_sv2_clients, false)?; - // Do initial refresh - cache.refresh(); + // Create the snapshot cache with metrics attached so refresh() + // updates Prometheus gauges atomically alongside the snapshot data. + let cache = Arc::new( + SnapshotCache::new(refresh_interval, server_monitoring, sv2_clients_monitoring) + .with_metrics(metrics.clone()), + ); - let metrics = PrometheusMetrics::new(has_server, has_sv2_clients, false)?; + // Do initial refresh (populates both snapshot and Prometheus gauges) + cache.refresh(); Ok(Self { bind_address, @@ -196,18 +196,21 @@ impl MonitoringServer { let has_server = snapshot.server_info.is_some(); let has_sv2_clients = snapshot.sv2_clients_summary.is_some(); - // Add Sv1 clients source to the cache + // Re-create metrics with SV1 enabled + let metrics = PrometheusMetrics::new(has_server, has_sv2_clients, true)?; + + // Add Sv1 clients source and attach new metrics to the cache let cache = Arc::new( Arc::try_unwrap(self.state.cache) .unwrap_or_else(|arc| (*arc).clone()) - .with_sv1_clients_source(sv1_monitoring), + .with_sv1_clients_source(sv1_monitoring) + .with_metrics(metrics.clone()), ); - // Refresh cache with new SV1 data + // Refresh cache with new SV1 data (also updates Prometheus gauges) cache.refresh(); - // Re-create metrics with SV1 enabled - self.state.metrics = PrometheusMetrics::new(has_server, has_sv2_clients, true)?; + self.state.metrics = metrics; self.state.cache = cache; Ok(self) @@ -733,10 +736,18 @@ async fn handle_sv1_client_by_id( } } -/// Handler for Prometheus metrics endpoint +/// Handler for Prometheus metrics endpoint. +/// +/// All GaugeVec metric values are updated atomically by the background cache refresh +/// task in `SnapshotCache::refresh()`. This handler only needs to: +/// 1. Set the uptime gauge (requires wall-clock time at scrape time) +/// 2. Gather and encode all registered metrics +/// +/// Because metric values are always kept in sync with the snapshot data, there is +/// never a gap where label series momentarily disappear. Tests can assert on metrics +/// directly after a cache refresh without polling for transient states. async fn handle_prometheus_metrics(State(state): State) -> Response { - let snapshot = state.cache.get_snapshot(); - + // Uptime is the only metric set at scrape time (needs current wall clock) let uptime_secs = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() @@ -744,163 +755,7 @@ async fn handle_prometheus_metrics(State(state): State) -> Response - state.start_time; state.metrics.sv2_uptime_seconds.set(uptime_secs as f64); - // Reset per-channel metrics before repopulating - if let Some(ref metric) = state.metrics.sv2_client_channel_hashrate { - metric.reset(); - } - if let Some(ref metric) = state.metrics.sv2_client_shares_accepted_total { - metric.reset(); - } - if let Some(ref metric) = state.metrics.sv2_server_channel_hashrate { - metric.reset(); - } - if let Some(ref metric) = state.metrics.sv2_server_shares_accepted_total { - metric.reset(); - } - - // Collect server metrics - if let Some(ref summary) = snapshot.server_summary { - if let Some(ref metric) = state.metrics.sv2_server_channels { - metric - .with_label_values(&["extended"]) - .set(summary.extended_channels as f64); - metric - .with_label_values(&["standard"]) - .set(summary.standard_channels as f64); - } - if let Some(ref metric) = state.metrics.sv2_server_hashrate_total { - metric.set(summary.total_hashrate as f64); - } - } - - if let Some(ref server) = snapshot.server_info { - for channel in &server.extended_channels { - let channel_id = channel.channel_id.to_string(); - let user = &channel.user_identity; - - if let Some(ref metric) = state.metrics.sv2_server_shares_accepted_total { - metric - .with_label_values(&[&channel_id, user]) - .set(channel.shares_acknowledged as f64); - } - if let (Some(ref metric), Some(hashrate)) = ( - &state.metrics.sv2_server_channel_hashrate, - channel.nominal_hashrate, - ) { - metric - .with_label_values(&[&channel_id, user]) - .set(hashrate as f64); - } - } - - for channel in &server.standard_channels { - let channel_id = channel.channel_id.to_string(); - let user = &channel.user_identity; - - if let Some(ref metric) = state.metrics.sv2_server_shares_accepted_total { - metric - .with_label_values(&[&channel_id, user]) - .set(channel.shares_acknowledged as f64); - } - if let (Some(ref metric), Some(hashrate)) = ( - &state.metrics.sv2_server_channel_hashrate, - channel.nominal_hashrate, - ) { - metric - .with_label_values(&[&channel_id, user]) - .set(hashrate as f64); - } - } - - if let Some(ref metric) = state.metrics.sv2_server_blocks_found_total { - let total: u64 = server - .extended_channels - .iter() - .map(|c| c.blocks_found as u64) - .chain( - server - .standard_channels - .iter() - .map(|c| c.blocks_found as u64), - ) - .sum(); - metric.set(total as f64); - } - } - - // Collect Sv2 clients metrics - if let Some(ref summary) = snapshot.sv2_clients_summary { - if let Some(ref metric) = state.metrics.sv2_clients_total { - metric.set(summary.total_clients as f64); - } - if let Some(ref metric) = state.metrics.sv2_client_channels { - metric - .with_label_values(&["extended"]) - .set(summary.extended_channels as f64); - metric - .with_label_values(&["standard"]) - .set(summary.standard_channels as f64); - } - if let Some(ref metric) = state.metrics.sv2_client_hashrate_total { - metric.set(summary.total_hashrate as f64); - } - - let mut client_blocks_total: u64 = 0; - - for client in snapshot.sv2_clients.as_deref().unwrap_or(&[]) { - let client_id = client.client_id.to_string(); - - for channel in &client.extended_channels { - let channel_id = channel.channel_id.to_string(); - let user = &channel.user_identity; - - if let Some(ref metric) = state.metrics.sv2_client_shares_accepted_total { - metric - .with_label_values(&[&client_id, &channel_id, user]) - .set(channel.shares_accepted as f64); - } - if let Some(ref metric) = state.metrics.sv2_client_channel_hashrate { - metric - .with_label_values(&[&client_id, &channel_id, user]) - .set(channel.nominal_hashrate as f64); - } - client_blocks_total += channel.blocks_found as u64; - } - - for channel in &client.standard_channels { - let channel_id = channel.channel_id.to_string(); - let user = &channel.user_identity; - - if let Some(ref metric) = state.metrics.sv2_client_shares_accepted_total { - metric - .with_label_values(&[&client_id, &channel_id, user]) - .set(channel.shares_accepted as f64); - } - if let Some(ref metric) = state.metrics.sv2_client_channel_hashrate { - metric - .with_label_values(&[&client_id, &channel_id, user]) - .set(channel.nominal_hashrate as f64); - } - client_blocks_total += channel.blocks_found as u64; - } - } - - if let Some(ref metric) = state.metrics.sv2_client_blocks_found_total { - metric.set(client_blocks_total as f64); - } - } - - // Collect SV1 client metrics - if let Some(ref summary) = snapshot.sv1_clients_summary { - if let Some(ref metric) = state.metrics.sv1_clients_total { - metric.set(summary.total_clients as f64); - } - if let Some(ref metric) = state.metrics.sv1_hashrate_total { - metric.set(summary.total_hashrate as f64); - } - } - - // Encode and return metrics + // Gather and encode — all other metrics were set by the last cache refresh let encoder = TextEncoder::new(); let metric_families = state.metrics.registry.gather(); let mut buffer = Vec::new(); @@ -931,6 +786,7 @@ mod tests { use super::*; use axum::body::Body; use http_body_util::BodyExt; + use std::sync::Mutex; use tower::ServiceExt; // ── helpers ────────────────────────────────────────────────────── @@ -1066,7 +922,16 @@ mod tests { clients: Option>, sv1: Option>, ) -> Router { - let cache = Arc::new(SnapshotCache::new(Duration::from_secs(60), server, clients)); + let has_server = server.is_some(); + let has_clients = clients.is_some(); + let has_sv1 = sv1.is_some(); + + let metrics = PrometheusMetrics::new(has_server, has_clients, has_sv1).unwrap(); + + let cache = Arc::new( + SnapshotCache::new(Duration::from_secs(60), server, clients) + .with_metrics(metrics.clone()), + ); let cache = if let Some(sv1_source) = sv1 { Arc::new( @@ -1080,12 +945,6 @@ mod tests { cache.refresh(); - let has_server = cache.get_snapshot().server_info.is_some(); - let has_clients = cache.get_snapshot().sv2_clients_summary.is_some(); - let has_sv1 = cache.get_snapshot().sv1_clients.is_some(); - - let metrics = PrometheusMetrics::new(has_server, has_clients, has_sv1).unwrap(); - let start_time = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() @@ -1550,4 +1409,90 @@ mod tests { assert!(!body.contains("sv2_server_channels")); assert!(!body.contains("sv2_clients_total")); } + + // Mutable mock that allows changing data between requests + struct MutableMockClients(Mutex>); + impl super::super::client::Sv2ClientsMonitoring for MutableMockClients { + fn get_sv2_clients(&self) -> Vec { + self.0.lock().unwrap().clone() + } + } + + /// Verify that stale channel labels are removed without a reset gap. + /// + /// Scenario: First scrape has client with channel 1 and channel 2. + /// Second scrape: channel 2 is gone. The test verifies that: + /// - Channel 1 metrics are still present (no gap) + /// - Channel 2 metrics are removed (stale cleanup) + #[tokio::test] + async fn metrics_stale_labels_removed_without_reset_gap() { + let initial_clients = vec![Sv2ClientInfo { + client_id: 1, + extended_channels: vec![ + create_extended_channel_info(1, 100.0), + create_extended_channel_info(2, 200.0), + ], + standard_channels: vec![], + }]; + + let mock_clients = Arc::new(MutableMockClients(Mutex::new(initial_clients))); + let metrics = PrometheusMetrics::new(false, true, false).unwrap(); + let cache = Arc::new( + SnapshotCache::new( + Duration::from_secs(60), + None, + Some(mock_clients.clone() + as Arc), + ) + .with_metrics(metrics.clone()), + ); + cache.refresh(); + + let start_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + let state = ServerState { + cache: cache.clone(), + start_time, + metrics, + }; + + let app = Router::new() + .route("/metrics", get(handle_prometheus_metrics)) + .with_state(state); + + // First scrape — both channels present + let response = app.clone().oneshot(make_request("/metrics")).await.unwrap(); + let body = get_body(response).await; + // Prometheus sorts label keys alphabetically: channel_id, client_id, user_identity + assert!( + body.contains("sv2_client_shares_accepted_total{channel_id=\"1\",client_id=\"1\""), + "Channel 1 should be present on first scrape" + ); + assert!( + body.contains("sv2_client_shares_accepted_total{channel_id=\"2\",client_id=\"1\""), + "Channel 2 should be present on first scrape" + ); + + // Remove channel 2 from mock data and refresh cache + { + let mut clients = mock_clients.0.lock().unwrap(); + clients[0].extended_channels.retain(|c| c.channel_id == 1); + } + cache.refresh(); + + // Second scrape — channel 2 should be removed, channel 1 still present + let response = app.clone().oneshot(make_request("/metrics")).await.unwrap(); + let body = get_body(response).await; + assert!( + body.contains("sv2_client_shares_accepted_total{channel_id=\"1\",client_id=\"1\""), + "Channel 1 should still be present after stale removal" + ); + assert!( + !body.contains("sv2_client_shares_accepted_total{channel_id=\"2\",client_id=\"1\""), + "Channel 2 should be removed as stale" + ); + } } diff --git a/stratum-apps/src/monitoring/snapshot_cache.rs b/stratum-apps/src/monitoring/snapshot_cache.rs index 0c9b4ad69..e0ecbf808 100644 --- a/stratum-apps/src/monitoring/snapshot_cache.rs +++ b/stratum-apps/src/monitoring/snapshot_cache.rs @@ -37,16 +37,31 @@ //! ``` use std::{ - sync::{Arc, RwLock}, + collections::HashSet, + sync::{Arc, Mutex, RwLock}, time::{Duration, Instant}, }; +use tracing::debug; + use super::{ client::{Sv2ClientInfo, Sv2ClientsMonitoring, Sv2ClientsSummary}, + prometheus_metrics::PrometheusMetrics, server::{ServerInfo, ServerMonitoring, ServerSummary}, sv1::{Sv1ClientInfo, Sv1ClientsMonitoring, Sv1ClientsSummary}, }; +/// Tracks which label combinations were set on the previous refresh so we can +/// remove only stale series instead of calling `.reset()` (which would create a +/// gap where all label series momentarily disappear). +#[derive(Default)] +struct PreviousLabelSets { + /// Labels for server per-channel GaugeVecs: [channel_id, user_identity] + server_channel_labels: HashSet<[String; 2]>, + /// Labels for client per-channel GaugeVecs: [client_id, channel_id, user_identity] + client_channel_labels: HashSet<[String; 3]>, +} + /// Cached snapshot of monitoring data. /// /// This struct holds a point-in-time copy of all monitoring data, @@ -63,24 +78,42 @@ pub struct MonitoringSnapshot { } /// A cache that holds monitoring snapshots and refreshes them periodically. +/// +/// When `PrometheusMetrics` are attached, the cache also updates Prometheus +/// gauges during each refresh, keeping metric values in lockstep with the +/// snapshot data. This means the `/metrics` handler never needs to compute +/// values — it only gathers and encodes. pub struct SnapshotCache { snapshot: RwLock, refresh_interval: Duration, server_source: Option>, sv2_clients_source: Option>, sv1_clients_source: Option>, + metrics: Option, + previous_labels: Mutex, } impl Clone for SnapshotCache { fn clone(&self) -> Self { - // Clone creates a new cache with the same sources and current snapshot + // Clone creates a new cache with the same sources and current snapshot. + // previous_labels is cloned so the new cache can correctly detect stale + // label combinations on its first refresh. let current_snapshot = self.snapshot.read().unwrap().clone(); + let prev = self + .previous_labels + .lock() + .unwrap_or_else(|e| e.into_inner()); Self { snapshot: RwLock::new(current_snapshot), refresh_interval: self.refresh_interval, server_source: self.server_source.clone(), sv2_clients_source: self.sv2_clients_source.clone(), sv1_clients_source: self.sv1_clients_source.clone(), + metrics: self.metrics.clone(), + previous_labels: Mutex::new(PreviousLabelSets { + server_channel_labels: prev.server_channel_labels.clone(), + client_channel_labels: prev.client_channel_labels.clone(), + }), } } } @@ -104,6 +137,8 @@ impl SnapshotCache { server_source, sv2_clients_source, sv1_clients_source: None, + metrics: None, + previous_labels: Mutex::new(PreviousLabelSets::default()), } } @@ -116,6 +151,15 @@ impl SnapshotCache { self } + /// Attach (or replace) Prometheus metrics so they are updated during each `refresh()`. + /// + /// This is called once in `MonitoringServer::new` and may be called again in + /// `with_sv1_monitoring` which re-creates the metrics with SV1 gauges enabled. + pub fn with_metrics(mut self, metrics: PrometheusMetrics) -> Self { + self.metrics = Some(metrics); + self + } + /// Get the current snapshot. /// /// This is a fast read that does NOT acquire any business logic locks. @@ -128,6 +172,10 @@ impl SnapshotCache { /// /// This method DOES acquire the business logic locks (via the trait methods), /// but it's only called periodically by a background task, not on every request. + /// + /// When Prometheus metrics are attached, they are updated atomically alongside + /// the snapshot — eliminating any gap where metrics could be missing or stale + /// relative to the snapshot data. pub fn refresh(&self) { let mut new_snapshot = MonitoringSnapshot { timestamp: Some(Instant::now()), @@ -152,10 +200,203 @@ impl SnapshotCache { new_snapshot.sv1_clients_summary = Some(source.get_sv1_clients_summary()); } + // Update Prometheus gauges from the new snapshot data + if let Some(ref metrics) = self.metrics { + self.update_metrics(metrics, &new_snapshot); + } + // Update the cache *self.snapshot.write().unwrap() = new_snapshot; } + /// Update all Prometheus gauges from the given snapshot, then remove stale + /// label combinations that are no longer present. + fn update_metrics(&self, metrics: &PrometheusMetrics, snapshot: &MonitoringSnapshot) { + let mut current_server_labels: HashSet<[String; 2]> = HashSet::new(); + let mut current_client_labels: HashSet<[String; 3]> = HashSet::new(); + + // Server metrics + if let Some(ref summary) = snapshot.server_summary { + if let Some(ref m) = metrics.sv2_server_channels { + m.with_label_values(&["extended"]) + .set(summary.extended_channels as f64); + m.with_label_values(&["standard"]) + .set(summary.standard_channels as f64); + } + if let Some(ref m) = metrics.sv2_server_hashrate_total { + m.set(summary.total_hashrate as f64); + } + } + + if let Some(ref server) = snapshot.server_info { + for channel in &server.extended_channels { + let channel_id = channel.channel_id.to_string(); + let user = &channel.user_identity; + let labels = [channel_id.clone(), user.clone()]; + + if let Some(ref m) = metrics.sv2_server_shares_accepted_total { + m.with_label_values(&[&channel_id, user]) + .set(channel.shares_acknowledged as f64); + } + if let (Some(ref m), Some(hashrate)) = ( + &metrics.sv2_server_channel_hashrate, + channel.nominal_hashrate, + ) { + m.with_label_values(&[&channel_id, user]) + .set(hashrate as f64); + } + current_server_labels.insert(labels); + } + + for channel in &server.standard_channels { + let channel_id = channel.channel_id.to_string(); + let user = &channel.user_identity; + let labels = [channel_id.clone(), user.clone()]; + + if let Some(ref m) = metrics.sv2_server_shares_accepted_total { + m.with_label_values(&[&channel_id, user]) + .set(channel.shares_acknowledged as f64); + } + if let (Some(ref m), Some(hashrate)) = ( + &metrics.sv2_server_channel_hashrate, + channel.nominal_hashrate, + ) { + m.with_label_values(&[&channel_id, user]) + .set(hashrate as f64); + } + current_server_labels.insert(labels); + } + + if let Some(ref m) = metrics.sv2_server_blocks_found_total { + let total: u64 = server + .extended_channels + .iter() + .map(|c| c.blocks_found as u64) + .chain( + server + .standard_channels + .iter() + .map(|c| c.blocks_found as u64), + ) + .sum(); + m.set(total as f64); + } + } + + // Sv2 clients metrics + if let Some(ref summary) = snapshot.sv2_clients_summary { + if let Some(ref m) = metrics.sv2_clients_total { + m.set(summary.total_clients as f64); + } + if let Some(ref m) = metrics.sv2_client_channels { + m.with_label_values(&["extended"]) + .set(summary.extended_channels as f64); + m.with_label_values(&["standard"]) + .set(summary.standard_channels as f64); + } + if let Some(ref m) = metrics.sv2_client_hashrate_total { + m.set(summary.total_hashrate as f64); + } + + let mut client_blocks_total: u64 = 0; + + for client in snapshot.sv2_clients.as_deref().unwrap_or(&[]) { + let client_id = client.client_id.to_string(); + + for channel in &client.extended_channels { + let channel_id = channel.channel_id.to_string(); + let user = &channel.user_identity; + let labels = [client_id.clone(), channel_id.clone(), user.clone()]; + + if let Some(ref m) = metrics.sv2_client_shares_accepted_total { + m.with_label_values(&[&client_id, &channel_id, user]) + .set(channel.shares_accepted as f64); + } + if let Some(ref m) = metrics.sv2_client_channel_hashrate { + m.with_label_values(&[&client_id, &channel_id, user]) + .set(channel.nominal_hashrate as f64); + } + current_client_labels.insert(labels); + client_blocks_total += channel.blocks_found as u64; + } + + for channel in &client.standard_channels { + let channel_id = channel.channel_id.to_string(); + let user = &channel.user_identity; + let labels = [client_id.clone(), channel_id.clone(), user.clone()]; + + if let Some(ref m) = metrics.sv2_client_shares_accepted_total { + m.with_label_values(&[&client_id, &channel_id, user]) + .set(channel.shares_accepted as f64); + } + if let Some(ref m) = metrics.sv2_client_channel_hashrate { + m.with_label_values(&[&client_id, &channel_id, user]) + .set(channel.nominal_hashrate as f64); + } + current_client_labels.insert(labels); + client_blocks_total += channel.blocks_found as u64; + } + } + + if let Some(ref m) = metrics.sv2_client_blocks_found_total { + m.set(client_blocks_total as f64); + } + } + + // SV1 client metrics + if let Some(ref summary) = snapshot.sv1_clients_summary { + if let Some(ref m) = metrics.sv1_clients_total { + m.set(summary.total_clients as f64); + } + if let Some(ref m) = metrics.sv1_hashrate_total { + m.set(summary.total_hashrate as f64); + } + } + + // Remove stale label combinations that are no longer in the snapshot + let mut prev = self + .previous_labels + .lock() + .unwrap_or_else(|e| e.into_inner()); + + for stale in prev + .server_channel_labels + .difference(¤t_server_labels) + { + let label_refs: Vec<&str> = stale.iter().map(|s| s.as_str()).collect(); + if let Some(ref m) = metrics.sv2_server_shares_accepted_total { + if let Err(e) = m.remove_label_values(&label_refs) { + debug!(labels = ?label_refs, error = %e, "failed to remove stale server shares label"); + } + } + if let Some(ref m) = metrics.sv2_server_channel_hashrate { + if let Err(e) = m.remove_label_values(&label_refs) { + debug!(labels = ?label_refs, error = %e, "failed to remove stale server hashrate label"); + } + } + } + + for stale in prev + .client_channel_labels + .difference(¤t_client_labels) + { + let label_refs: Vec<&str> = stale.iter().map(|s| s.as_str()).collect(); + if let Some(ref m) = metrics.sv2_client_shares_accepted_total { + if let Err(e) = m.remove_label_values(&label_refs) { + debug!(labels = ?label_refs, error = %e, "failed to remove stale client shares label"); + } + } + if let Some(ref m) = metrics.sv2_client_channel_hashrate { + if let Err(e) = m.remove_label_values(&label_refs) { + debug!(labels = ?label_refs, error = %e, "failed to remove stale client hashrate label"); + } + } + } + + prev.server_channel_labels = current_server_labels; + prev.client_channel_labels = current_client_labels; + } + /// Get the refresh interval pub fn refresh_interval(&self) -> Duration { self.refresh_interval From 171a0138fdc410cfe22f02186a6d77f30aea5202 Mon Sep 17 00:00:00 2001 From: Eric Price Date: Tue, 14 Apr 2026 20:38:10 -0400 Subject: [PATCH 2/2] Refactor: Remove OnceLock statics from TranslatorSv2 (#430) Replace process-wide static OnceLock variables (TPROXY_MODE, VARDIFF_ENABLED) with instance fields to fix panics during sequential integration tests. Changes: - Add tproxy_mode and report_hashrate fields to ChannelManager and Sv1Server - Add is_aggregated() and is_non_aggregated() instance methods - Update all call sites to use self.is_aggregated() instead of global functions - Remove static OnceLocks and global helper functions from mod.rs - Add integration test verifying multiple TranslatorSv2 instances can run - Fix test naming mismatch in test_handle_set_target_without_vardiff_non_aggregated This enables multiple TranslatorSv2 instances to be created sequentially without panicking due to OnceLock re-initialization. --- miner-apps/translator/src/lib/mod.rs | 137 +++++++++++------- miner-apps/translator/src/lib/monitoring.rs | 9 +- .../lib/sv1/sv1_server/difficulty_manager.rs | 11 +- .../sv1_server/downstream_message_handler.rs | 4 +- .../src/lib/sv1/sv1_server/sv1_server.rs | 43 ++++-- .../sv2/channel_manager/channel_manager.rs | 35 ++++- .../channel_manager/mining_message_handler.rs | 17 +-- 7 files changed, 162 insertions(+), 94 deletions(-) diff --git a/miner-apps/translator/src/lib/mod.rs b/miner-apps/translator/src/lib/mod.rs index aa65bf50d..b3b23b8f8 100644 --- a/miner-apps/translator/src/lib/mod.rs +++ b/miner-apps/translator/src/lib/mod.rs @@ -16,7 +16,7 @@ use std::{ net::SocketAddr, sync::{ atomic::{AtomicBool, Ordering}, - Arc, OnceLock, + Arc, }, time::Duration, }; @@ -83,13 +83,9 @@ impl TranslatorSv2 { /// protocol translation, job management, and status reporting. pub async fn start(self) { info!("Starting Translator Proxy..."); - // only initialized once - TPROXY_MODE - .set(self.config.aggregate_channels.into()) - .expect("TPROXY_MODE initialized more than once"); - VARDIFF_ENABLED - .set(self.config.downstream_difficulty_config.enable_vardiff) - .expect("VARDIFF_ENABLED initialized more than once"); + + let tproxy_mode: TproxyMode = self.config.aggregate_channels.into(); + let report_hashrate = self.config.downstream_difficulty_config.enable_vardiff; let cancellation_token = self.cancellation_token.clone(); let mut fallback_coordinator = FallbackCoordinator::new(); @@ -130,6 +126,7 @@ impl TranslatorSv2 { channel_manager_to_sv1_server_receiver, sv1_server_to_channel_manager_sender, self.config.clone(), + tproxy_mode, )); info!("Initializing upstream connection..."); @@ -162,6 +159,8 @@ impl TranslatorSv2 { status_sender.clone(), self.config.supported_extensions.clone(), self.config.required_extensions.clone(), + tproxy_mode, + report_hashrate, )); info!("Launching ChannelManager tasks..."); @@ -278,11 +277,24 @@ impl TranslatorSv2 { let (sv1_server_to_channel_manager_sender, sv1_server_to_channel_manager_receiver) = unbounded(); + channel_manager = Arc::new(ChannelManager::new( + channel_manager_to_upstream_sender, + upstream_to_channel_manager_receiver, + channel_manager_to_sv1_server_sender, + sv1_server_to_channel_manager_receiver, + status_sender.clone(), + self.config.supported_extensions.clone(), + self.config.required_extensions.clone(), + tproxy_mode, + report_hashrate, + )); + sv1_server = Arc::new(Sv1Server::new( downstream_addr, channel_manager_to_sv1_server_receiver, sv1_server_to_channel_manager_sender, self.config.clone(), + tproxy_mode, )); if let Err(e) = self.initialize_upstream( @@ -301,16 +313,6 @@ impl TranslatorSv2 { break; } - channel_manager = Arc::new(ChannelManager::new( - channel_manager_to_upstream_sender, - upstream_to_channel_manager_receiver, - channel_manager_to_sv1_server_sender, - sv1_server_to_channel_manager_receiver, - status_sender.clone(), - self.config.supported_extensions.clone(), - self.config.required_extensions.clone(), - )); - info!("Launching ChannelManager tasks..."); ChannelManager::run_channel_manager_tasks( channel_manager.clone(), @@ -576,41 +578,14 @@ impl From for TproxyMode { } } -static TPROXY_MODE: OnceLock = OnceLock::new(); -static VARDIFF_ENABLED: OnceLock = OnceLock::new(); - -#[cfg(not(test))] -pub fn tproxy_mode() -> TproxyMode { - *TPROXY_MODE.get().expect("TPROXY_MODE has to exist") -} - -// We don’t initialize `TPROXY_MODE` in tests, so any test that -// depends on it will panic if the mode is undefined. -// This `cfg` wrapper ensures `tproxy_mode` does not panic in -// an undefined state by providing a default value when needed. -#[cfg(test)] -pub fn tproxy_mode() -> TproxyMode { - *TPROXY_MODE.get_or_init(|| TproxyMode::Aggregated) -} - -#[inline] -pub fn is_aggregated() -> bool { - matches!(tproxy_mode(), TproxyMode::Aggregated) -} - -#[inline] -pub fn is_non_aggregated() -> bool { - matches!(tproxy_mode(), TproxyMode::NonAggregated) -} - -#[cfg(not(test))] -pub fn vardiff_enabled() -> bool { - *VARDIFF_ENABLED.get().expect("VARDIFF_ENABLED has to exist") -} +impl TproxyMode { + pub fn is_aggregated(&self) -> bool { + matches!(self, TproxyMode::Aggregated) + } -#[cfg(test)] -pub fn vardiff_enabled() -> bool { - *VARDIFF_ENABLED.get_or_init(|| true) + pub fn is_non_aggregated(&self) -> bool { + matches!(self, TproxyMode::NonAggregated) + } } impl Drop for TranslatorSv2 { @@ -619,3 +594,61 @@ impl Drop for TranslatorSv2 { self.cancellation_token.cancel(); } } + +#[cfg(test)] +mod tests { + use super::*; + use config::{DownstreamDifficultyConfig, Upstream}; + use std::str::FromStr; + use stratum_apps::key_utils::Secp256k1PublicKey; + + /// Creates a test TranslatorSv2 configuration + fn create_test_translator_config(aggregate_channels: bool) -> TranslatorConfig { + let pubkey_str = "9bDuixKmZqAJnrmP746n8zU1wyAQRrus7th9dxnkPg6RzQvCnan"; + let pubkey = Secp256k1PublicKey::from_str(pubkey_str).unwrap(); + + let upstream = Upstream::new("127.0.0.1".to_string(), 4444, pubkey); + let difficulty_config = DownstreamDifficultyConfig::new(100.0, 5.0, true, 60); + + TranslatorConfig::new( + vec![upstream], + "0.0.0.0".to_string(), // downstream_address + 3333, // downstream_port + difficulty_config, // downstream_difficulty_config + 2, // max_supported_version + 1, // min_supported_version + 4, // downstream_extranonce2_size + "test_user".to_string(), + aggregate_channels, // aggregate_channels - key parameter being tested + vec![], // supported_extensions + vec![], // required_extensions + None, // monitoring_address + None, // monitoring_cache_refresh_secs + ) + } + + /// Verifies that multiple TranslatorSv2 instances can be created sequentially + /// without panicking due to OnceLock re-initialization (issue #430). + #[test] + fn test_multiple_translator_instances_sequential() { + // Create first instance with aggregated mode + let config1 = create_test_translator_config(true); + let translator1 = TranslatorSv2::new(config1); + assert!(translator1.is_alive.load(Ordering::Relaxed)); + + // Create second instance with non-aggregated mode + // This would have panicked before the refactor due to OnceLock re-initialization + let config2 = create_test_translator_config(false); + let translator2 = TranslatorSv2::new(config2); + assert!(translator2.is_alive.load(Ordering::Relaxed)); + + // Create third instance with aggregated mode + let config3 = create_test_translator_config(true); + let translator3 = TranslatorSv2::new(config3); + assert!(translator3.is_alive.load(Ordering::Relaxed)); + + // All instances should have independent tproxy_mode values + // (We can't easily test this without running start(), but the fact that + // we can create multiple instances without panic proves the fix works) + } +} diff --git a/miner-apps/translator/src/lib/monitoring.rs b/miner-apps/translator/src/lib/monitoring.rs index 2a4f1192a..f39ae5d52 100644 --- a/miner-apps/translator/src/lib/monitoring.rs +++ b/miner-apps/translator/src/lib/monitoring.rs @@ -6,18 +6,15 @@ use stratum_apps::monitoring::server::{ServerExtendedChannelInfo, ServerInfo, ServerMonitoring}; -use crate::{ - sv2::channel_manager::ChannelManager, tproxy_mode, utils::AGGREGATED_CHANNEL_ID, - vardiff_enabled, TproxyMode, -}; +use crate::{sv2::channel_manager::ChannelManager, utils::AGGREGATED_CHANNEL_ID, TproxyMode}; impl ServerMonitoring for ChannelManager { fn get_server(&self) -> ServerInfo { let mut extended_channels = Vec::new(); let standard_channels = Vec::new(); // tProxy only uses extended channels - let report_hashrate = vardiff_enabled(); + let report_hashrate = self.report_hashrate; - match tproxy_mode() { + match self.tproxy_mode { TproxyMode::Aggregated => { // In Aggregated mode: one shared channel to the server // stored under AGGREGATED_CHANNEL_ID diff --git a/miner-apps/translator/src/lib/sv1/sv1_server/difficulty_manager.rs b/miner-apps/translator/src/lib/sv1/sv1_server/difficulty_manager.rs index 3b93a3fed..746149625 100644 --- a/miner-apps/translator/src/lib/sv1/sv1_server/difficulty_manager.rs +++ b/miner-apps/translator/src/lib/sv1/sv1_server/difficulty_manager.rs @@ -1,7 +1,6 @@ use std::sync::Arc; -use crate::{is_aggregated, is_non_aggregated, sv1::sv1_server::sv1_server::PendingTargetUpdate}; - +use crate::{sv1::sv1_server::sv1_server::PendingTargetUpdate, sv1::Sv1Server}; use stratum_apps::{ stratum_core::{ bitcoin::Target, @@ -14,8 +13,6 @@ use stratum_apps::{ }; use tracing::{debug, error, info, trace, warn}; -use crate::sv1::Sv1Server; - enum AggregatedSnapshot { Active { total_hashrate: Hashrate, @@ -189,7 +186,7 @@ impl Sv1Server { * new_target, * new_hashrate) */ ) { - if is_aggregated() { + if self.is_aggregated() { // Aggregated mode: Send single UpdateChannel with minimum target and total hashrate of // ALL downstreams self.send_aggregated_update_channel(all_updates).await; @@ -303,7 +300,7 @@ impl Sv1Server { set_target.channel_id, new_upstream_target ); - if is_aggregated() { + if self.is_aggregated() { return self .handle_aggregated_set_target(new_upstream_target, set_target.channel_id) .await; @@ -458,7 +455,7 @@ impl Sv1Server { /// (e.g., disconnect). Calculates total hashrate and minimum target among all remaining /// downstreams. pub async fn send_update_channel_on_downstream_state_change(&self) { - if is_non_aggregated() { + if self.is_non_aggregated() { return; } diff --git a/miner-apps/translator/src/lib/sv1/sv1_server/downstream_message_handler.rs b/miner-apps/translator/src/lib/sv1/sv1_server/downstream_message_handler.rs index b7345ebd1..fd207180f 100644 --- a/miner-apps/translator/src/lib/sv1/sv1_server/downstream_message_handler.rs +++ b/miner-apps/translator/src/lib/sv1/sv1_server/downstream_message_handler.rs @@ -7,7 +7,7 @@ use stratum_apps::stratum_core::sv1_api::{ use tracing::{debug, info, warn}; use crate::{ - error, is_aggregated, + error, sv1::{downstream::SubmitShareWithChannelId, sv1_server::tlv_compatible_username, Sv1Server}, utils::{validate_sv1_share, AGGREGATED_CHANNEL_ID}, }; @@ -110,7 +110,7 @@ impl IsServer<'static> for Sv1Server { return false; }; - let channel_id = if is_aggregated() { + let channel_id = if self.is_aggregated() { AGGREGATED_CHANNEL_ID } else { channel_id diff --git a/miner-apps/translator/src/lib/sv1/sv1_server/sv1_server.rs b/miner-apps/translator/src/lib/sv1/sv1_server/sv1_server.rs index 7d6ed9fa1..c9a5f18e4 100644 --- a/miner-apps/translator/src/lib/sv1/sv1_server/sv1_server.rs +++ b/miner-apps/translator/src/lib/sv1/sv1_server/sv1_server.rs @@ -1,7 +1,6 @@ use crate::{ config::TranslatorConfig, error::{self, TproxyError, TproxyErrorKind, TproxyResult}, - is_aggregated, is_non_aggregated, status::{handle_error, Status, StatusSender}, sv1::{ downstream::{downstream::Downstream, SubmitShareWithChannelId}, @@ -10,6 +9,7 @@ use crate::{ }, }, utils::AGGREGATED_CHANNEL_ID, + TproxyMode, }; use async_channel::{Receiver, Sender}; use dashmap::DashMap; @@ -85,6 +85,8 @@ pub struct Sv1Server { /// Valid Sv1 jobs storage, containing only a single shared entry (AGGREGATED_CHANNEL_ID) in /// case of channels aggregation (aggregated mode) pub(crate) valid_sv1_jobs: Arc>>>, + /// Operating mode for the translator (Aggregated or NonAggregated) + pub(crate) tproxy_mode: TproxyMode, } #[cfg_attr(not(test), hotpath::measure_all)] @@ -163,6 +165,7 @@ impl Sv1Server { /// * `channel_manager_receiver` - Channel to receive messages from the channel manager /// * `channel_manager_sender` - Channel to send messages to the channel manager /// * `config` - Configuration settings for the translator + /// * `tproxy_mode` - Operating mode for the translator /// /// # Returns /// A new Sv1Server instance ready to accept connections @@ -171,6 +174,7 @@ impl Sv1Server { channel_manager_receiver: Receiver<(Mining<'static>, Option>)>, channel_manager_sender: Sender<(Mining<'static>, Option>)>, config: TranslatorConfig, + tproxy_mode: TproxyMode, ) -> Self { let shares_per_minute = config.downstream_difficulty_config.shares_per_minute; let sv1_server_channel_state = @@ -192,9 +196,20 @@ impl Sv1Server { prevhashes: Arc::new(DashMap::new()), pending_target_updates: Arc::new(Mutex::new(Vec::new())), valid_sv1_jobs: Arc::new(DashMap::new()), + tproxy_mode, } } + #[inline] + pub fn is_aggregated(&self) -> bool { + self.tproxy_mode.is_aggregated() + } + + #[inline] + pub fn is_non_aggregated(&self) -> bool { + self.tproxy_mode.is_non_aggregated() + } + /// Starts the SV1 server and begins accepting connections. /// /// This method: @@ -510,7 +525,7 @@ impl Sv1Server { .map_err(|_| TproxyError::shutdown(TproxyErrorKind::SV1Error))?; // Only add TLV fields with user identity in non-aggregated mode - let tlv_fields = if is_non_aggregated() { + let tlv_fields = if self.is_non_aggregated() { let Some(downstream) = self .downstreams .get(&message.downstream_id) @@ -749,7 +764,7 @@ impl Sv1Server { // Update job storage based on the configured mode let notify_parsed = notify.clone(); - let job_channel_id = if is_non_aggregated() { + let job_channel_id = if self.is_non_aggregated() { m.channel_id } else { AGGREGATED_CHANNEL_ID @@ -980,7 +995,7 @@ impl Sv1Server { } }; - if is_aggregated() { + if self.is_aggregated() { // Aggregated mode: send set_difficulty to ALL downstreams and update hashrate return self .send_set_difficulty_to_all_downstreams(new_target, derived_hashrate) @@ -1230,7 +1245,7 @@ impl Sv1Server { keepalive_notify.time = HexU32Be(new_time); // Add the keepalive job to valid jobs so shares can be validated - let job_channel_id = if is_aggregated() { + let job_channel_id = if self.is_aggregated() { Some(AGGREGATED_CHANNEL_ID) } else { channel_id @@ -1303,7 +1318,7 @@ impl Sv1Server { &self, channel_id: Option, ) -> Option> { - let channel_id = if is_aggregated() { + let channel_id = if self.is_aggregated() { AGGREGATED_CHANNEL_ID } else { channel_id? @@ -1321,7 +1336,7 @@ impl Sv1Server { job_id: &str, channel_id: Option, ) -> Option> { - let channel_id = if is_aggregated() { + let channel_id = if self.is_aggregated() { AGGREGATED_CHANNEL_ID } else { channel_id? @@ -1380,7 +1395,7 @@ mod tests { let config = create_test_config(); let addr = "127.0.0.1:3333".parse().unwrap(); - Sv1Server::new(addr, cm_receiver, cm_sender, config) + Sv1Server::new(addr, cm_receiver, cm_sender, config, TproxyMode::Aggregated) } #[test] @@ -1402,7 +1417,7 @@ mod tests { let (_downstream_sender, cm_receiver) = unbounded(); let addr = "127.0.0.1:3333".parse().unwrap(); - let server = Sv1Server::new(addr, cm_receiver, cm_sender, config); + let server = Sv1Server::new(addr, cm_receiver, cm_sender, config, TproxyMode::Aggregated); assert!(server.config.downstream_difficulty_config.enable_vardiff); } @@ -1443,7 +1458,7 @@ mod tests { let (_downstream_sender, cm_receiver) = unbounded(); let addr = "127.0.0.1:3333".parse().unwrap(); - let server = Sv1Server::new(addr, cm_receiver, cm_sender, config); + let server = Sv1Server::new(addr, cm_receiver, cm_sender, config, TproxyMode::Aggregated); let target: Target = hash_rate_to_target(200.0, 5.0).unwrap(); let set_target = SetTarget { @@ -1464,7 +1479,13 @@ mod tests { let (_downstream_sender, cm_receiver) = unbounded(); let addr = "127.0.0.1:3333".parse().unwrap(); - let server = Sv1Server::new(addr, cm_receiver, cm_sender, config); + let server = Sv1Server::new( + addr, + cm_receiver, + cm_sender, + config, + TproxyMode::NonAggregated, + ); let target: Target = hash_rate_to_target(200.0, 5.0).unwrap(); let set_target = SetTarget { diff --git a/miner-apps/translator/src/lib/sv2/channel_manager/channel_manager.rs b/miner-apps/translator/src/lib/sv2/channel_manager/channel_manager.rs index 6d477c71e..7e396e300 100644 --- a/miner-apps/translator/src/lib/sv2/channel_manager/channel_manager.rs +++ b/miner-apps/translator/src/lib/sv2/channel_manager/channel_manager.rs @@ -1,9 +1,9 @@ use crate::{ error::{self, TproxyError, TproxyErrorKind, TproxyResult}, - is_aggregated, status::{handle_error, Status, StatusSender}, sv2::channel_manager::channel::ChannelState, utils::{AggregatedState, AtomicAggregatedState, AGGREGATED_CHANNEL_ID}, + TproxyMode, }; use async_channel::{Receiver, Sender}; use dashmap::DashMap; @@ -92,6 +92,10 @@ pub struct ChannelManager { /// Tracks whether the single upstream channel in aggregated mode is absent, /// being established, or connected. pub aggregated_channel_state: AtomicAggregatedState, + /// Operating mode for the translator (Aggregated or NonAggregated) + pub tproxy_mode: TproxyMode, + /// Whether to report hashrate in monitoring (true when vardiff is enabled) + pub report_hashrate: bool, } #[cfg_attr(not(test), hotpath::measure_all)] @@ -103,11 +107,12 @@ impl ChannelManager { /// * `upstream_receiver` - Channel to receive messages from upstream /// * `sv1_server_sender` - Channel to send messages to SV1 server /// * `sv1_server_receiver` - Channel to receive messages from SV1 server - /// * `mode` - Operating mode (Aggregated or NonAggregated) /// * `supported_extensions` - Extensions that the translator supports (will request if required /// by server) /// * `required_extensions` - Extensions that the translator requires (must be supported by /// server) + /// * `tproxy_mode` - Operating mode for the translator + /// * `report_hashrate` - Whether to report hashrate in monitoring /// /// # Returns /// A new ChannelManager instance ready to handle message routing @@ -120,6 +125,8 @@ impl ChannelManager { status_sender: Sender, supported_extensions: Vec, required_extensions: Vec, + tproxy_mode: TproxyMode, + report_hashrate: bool, ) -> Self { let channel_state = ChannelState::new( upstream_sender, @@ -140,9 +147,21 @@ impl ChannelManager { negotiated_extensions: Arc::new(Mutex::new(Vec::new())), extranonce_factories: Arc::new(DashMap::new()), aggregated_channel_state: AtomicAggregatedState::new(AggregatedState::NoChannel), + tproxy_mode, + report_hashrate, } } + #[inline] + pub fn is_aggregated(&self) -> bool { + self.tproxy_mode.is_aggregated() + } + + #[inline] + pub fn is_non_aggregated(&self) -> bool { + self.tproxy_mode.is_non_aggregated() + } + /// Spawns and runs the main channel manager task loop. /// /// This method creates an async task that handles all message routing for the @@ -309,7 +328,7 @@ impl ChannelManager { let hashrate = m.nominal_hash_rate; let min_extranonce_size = m.min_extranonce_size as usize; - if is_aggregated() { + if self.is_aggregated() { match self.aggregated_channel_state.get() { AggregatedState::Connected => { return self @@ -351,7 +370,7 @@ impl ChannelManager { } } // In aggregated mode, add extra bytes for translator search space allocation - let upstream_min_extranonce_size = if is_aggregated() { + let upstream_min_extranonce_size = if self.is_aggregated() { min_extranonce_size + AGGREGATED_MODE_TRANSLATOR_SEARCH_SPACE_BYTES } else { min_extranonce_size @@ -364,7 +383,7 @@ impl ChannelManager { // used in the `OpenExtendedMiningChannel.Success` handler. // In aggregated mode it was already inserted in the `AggregatedState::NoChannel` // match arm above. - if !is_aggregated() { + if !self.is_aggregated() { self.pending_downstream_channels.insert( open_channel_msg.request_id as DownstreamId, (user_identity, hashrate, min_extranonce_size), @@ -400,7 +419,7 @@ impl ChannelManager { ) }); if let Some((Ok(_result), _share_accounting)) = value { - if is_aggregated() + if self.is_aggregated() && self.extended_channels.contains_key(&AGGREGATED_CHANNEL_ID) { let upstream_extended_channel_id = self @@ -552,7 +571,7 @@ impl ChannelManager { Mining::UpdateChannel(mut m) => { debug!("Received UpdateChannel from SV1Server: {}", m); - if is_aggregated() { + if self.is_aggregated() { // Update the aggregated channel's nominal hashrate so // that monitoring reports a value consistent with the // downstream vardiff estimate. @@ -810,6 +829,8 @@ mod tests { status_sender, vec![], vec![], + TproxyMode::Aggregated, + true, ) } diff --git a/miner-apps/translator/src/lib/sv2/channel_manager/mining_message_handler.rs b/miner-apps/translator/src/lib/sv2/channel_manager/mining_message_handler.rs index 1f9b72dd9..648770f28 100644 --- a/miner-apps/translator/src/lib/sv2/channel_manager/mining_message_handler.rs +++ b/miner-apps/translator/src/lib/sv2/channel_manager/mining_message_handler.rs @@ -1,6 +1,5 @@ use crate::{ error::{self, TproxyError, TproxyErrorKind}, - is_aggregated, sv2::ChannelManager, utils::{proxy_extranonce_prefix_len, AggregatedState, AGGREGATED_CHANNEL_ID}, }; @@ -126,7 +125,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager { // If we are in aggregated mode, we need to create a new extranonce prefix and // insert the extended channel into the map - if is_aggregated() { + if self.is_aggregated() { // Store the upstream extended channel under AGGREGATED_CHANNEL_ID self.extended_channels .insert(AGGREGATED_CHANNEL_ID, extended_channel.clone()); @@ -274,7 +273,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager { // In aggregated mode, serve any downstream requests that were buffered in // pending_channels while the upstream channel was being established (Pending state). - if is_aggregated() { + if self.is_aggregated() { let pending_requests: Vec<(u32, String, Hashrate, usize)> = self .pending_downstream_channels .iter() @@ -333,7 +332,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager { ) -> Result<(), Self::Error> { info!("Received: {}", m); // are we working in aggregated mode? - if is_aggregated() { + if self.is_aggregated() { // even if aggregated channel_id != m.channel_id, we should trigger fallback // because why would a sane server send a CloseChannel message to a different // channel? @@ -395,7 +394,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager { // In aggregated mode, the Pool responds with the upstream channel ID, but the // channel is stored under AGGREGATED_CHANNEL_ID in the DashMap. // In non-aggregated mode, m.channel_id matches the DashMap key directly. - let key = if is_aggregated() { + let key = if self.is_aggregated() { AGGREGATED_CHANNEL_ID } else { m.channel_id @@ -417,7 +416,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager { ) -> Result<(), Self::Error> { warn!("Received: {} ❌", m); - let key = if is_aggregated() { + let key = if self.is_aggregated() { AGGREGATED_CHANNEL_ID } else { m.channel_id @@ -457,7 +456,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager { let mut new_extended_mining_job_messages = Vec::new(); // are we in aggregated mode? - if is_aggregated() { + if self.is_aggregated() { // Validate that the message is for the aggregated channel or its group let aggregated_channel_id = self .extended_channels @@ -599,7 +598,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager { let mut set_new_prev_hash_messages = Vec::new(); let mut new_extended_mining_job_messages = Vec::new(); - if is_aggregated() { + if self.is_aggregated() { // Validate that the message is for the aggregated channel or its group let aggregated_channel_id = self .extended_channels @@ -810,7 +809,7 @@ impl HandleMiningMessagesFromServerAsync for ChannelManager { let mut set_target_messages = Vec::new(); // are in aggregated mode? - if is_aggregated() { + if self.is_aggregated() { let aggregated_channel_id = self .extended_channels .get(&AGGREGATED_CHANNEL_ID)