Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 21 additions & 38 deletions integration-tests/lib/prometheus_metrics_assertions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,22 @@ pub(crate) fn parse_metric_value(metrics_text: &str, metric_name: &str) -> Optio
if line.starts_with('#') {
continue;
}
// For labeled metrics, match the prefix up to the closing brace
if let Some(rest) = line.strip_prefix(metric_name) {
// The value follows a space after the metric name (or after the closing brace)
let value_str = rest.trim();
// If there are labels and the name didn't include them, skip
if value_str.starts_with('{') {
let rest = rest.trim();
if rest.is_empty() {
continue;
}
return value_str.parse::<f64>().ok();
// Bare metric (no labels): value follows directly after the name
if rest.starts_with(|c: char| c.is_ascii_digit() || c == '-') {
return rest.parse::<f64>().ok();
}
// Labeled metric: skip past the closing brace to get the value
if rest.starts_with('{') {
if let Some(brace_end) = rest.find('}') {
let value_str = rest[brace_end + 1..].trim();
return value_str.parse::<f64>().ok();
}
}
}
}
None
Expand Down Expand Up @@ -135,15 +142,11 @@ pub fn assert_metric_present(metrics_text: &str, metric_name: &str) {
);
}

/// Poll the `/metrics` endpoint until any line matching `metric_name` (with any labels) has a
/// value >= `min`, then return the full metrics text. Panics if the condition is not met within
/// `timeout`.
/// Poll `/metrics` until `metric_name` is present with a value >= `min`, or panic after
/// `timeout`. Polls every 100ms to react quickly while tolerating cache refresh jitter.
///
/// Use this instead of a fixed `sleep` for `GaugeVec` metrics (per-channel shares, blocks found)
/// that only appear in Prometheus output after the monitoring snapshot cache has refreshed with
/// observed label combinations. The handler calls `.reset()` on every `/metrics` request before
/// repopulating from the cached snapshot, so a label combination is only present when the
/// snapshot contains a non-default value for it.
/// Returns the full metrics text from the successful scrape so callers can make additional
/// assertions without a second fetch.
pub async fn poll_until_metric_gte(
monitoring_addr: SocketAddr,
metric_name: &str,
Expand All @@ -153,38 +156,18 @@ pub async fn poll_until_metric_gte(
let deadline = tokio::time::Instant::now() + timeout;
loop {
let metrics = fetch_metrics(monitoring_addr).await;
let satisfied = metrics.lines().any(|line| {
if line.starts_with('#') {
return false;
}
if let Some(rest) = line.strip_prefix(metric_name) {
// Match bare name followed by space, or labeled name followed by '{'
let value_str = if rest.starts_with(' ') {
rest.trim()
} else if rest.starts_with('{') {
// Skip past the closing brace to get the value
rest.find('}')
.and_then(|i| rest.get(i + 1..))
.map(|s| s.trim())
.unwrap_or("")
} else {
return false;
};
value_str.parse::<f64>().map(|v| v >= min).unwrap_or(false)
} else {
false
if let Some(v) = parse_metric_value(&metrics, metric_name) {
if v >= min {
return metrics;
}
});
if satisfied {
return metrics;
}
if tokio::time::Instant::now() >= deadline {
panic!(
"Metric '{}' never reached >= {} within {:?}. Last /metrics response:\n{}",
metric_name, min, timeout, metrics
);
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}

Expand Down
27 changes: 14 additions & 13 deletions integration-tests/tests/monitoring_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ use integration_tests_sv2::{
};
use stratum_apps::stratum_core::mining_sv2::*;

/// Timeout for polling metric assertions. Generous enough for slow CI.
const METRIC_POLL_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);

// ---------------------------------------------------------------------------
// 1. Pool + SV2 Mining Device (standard channel) Pool role exposes: client metrics (connections,
// channels, shares, hashrate) Pool has NO upstream, so server metrics should be absent.
Expand Down Expand Up @@ -41,12 +44,12 @@ async fn pool_monitoring_with_sv2_mining_device() {
// Health API
assert_api_health(pool_mon).await;

// Poll until per-channel share metric is populated in the monitoring cache
// Poll until the monitoring cache has refreshed with the new share data
let pool_metrics = poll_until_metric_gte(
pool_mon,
"sv2_client_shares_accepted_total",
1.0,
std::time::Duration::from_secs(10),
METRIC_POLL_TIMEOUT,
)
.await;
assert_uptime(&pool_metrics);
Expand Down Expand Up @@ -88,11 +91,13 @@ async fn pool_and_tproxy_monitoring_with_sv1_miner() {
// -- Pool metrics --
let pool_mon = pool_monitoring.expect("pool monitoring should be enabled");
assert_api_health(pool_mon).await;

// Poll until the monitoring cache has refreshed with the new share data
let pool_metrics = poll_until_metric_gte(
pool_mon,
"sv2_client_shares_accepted_total",
1.0,
std::time::Duration::from_secs(10),
METRIC_POLL_TIMEOUT,
)
.await;
assert_uptime(&pool_metrics);
Expand All @@ -103,13 +108,7 @@ async fn pool_and_tproxy_monitoring_with_sv1_miner() {
// -- tProxy metrics --
let tproxy_mon = tproxy_monitoring.expect("tproxy monitoring should be enabled");
assert_api_health(tproxy_mon).await;
let tproxy_metrics = poll_until_metric_gte(
tproxy_mon,
"sv2_server_shares_accepted_total",
1.0,
std::time::Duration::from_secs(10),
)
.await;
let tproxy_metrics = fetch_metrics(tproxy_mon).await;
assert_uptime(&tproxy_metrics);
// tProxy has 1 upstream extended channel
assert_metric_eq(
Expand Down Expand Up @@ -171,11 +170,13 @@ async fn jd_aggregated_topology_monitoring() {
// -- Pool metrics: sees 1 SV2 client (JDC), shares accepted --
let pool_mon = pool_monitoring.expect("pool monitoring should be enabled");
assert_api_health(pool_mon).await;

// Poll until the monitoring cache has refreshed with the new share data
let pool_metrics = poll_until_metric_gte(
pool_mon,
"sv2_client_shares_accepted_total",
1.0,
std::time::Duration::from_secs(10),
METRIC_POLL_TIMEOUT,
)
.await;
assert_uptime(&pool_metrics);
Expand Down Expand Up @@ -231,13 +232,13 @@ async fn block_found_detected_in_pool_metrics() {
.wait_for_message_type(MessageDirection::ToUpstream, MESSAGE_TYPE_SUBMIT_SOLUTION)
.await;

// Poll until block found metric appears in monitoring cache
// Poll until the monitoring cache has refreshed with the block found data
let pool_mon = pool_monitoring.expect("pool monitoring should be enabled");
let pool_metrics = poll_until_metric_gte(
pool_mon,
"sv2_client_blocks_found_total",
1.0,
std::time::Duration::from_secs(10),
METRIC_POLL_TIMEOUT,
)
.await;
assert_uptime(&pool_metrics);
Expand Down
137 changes: 85 additions & 52 deletions miner-apps/translator/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::{
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc, OnceLock,
Arc,
},
time::Duration,
};
Expand Down Expand Up @@ -83,13 +83,9 @@ impl TranslatorSv2 {
/// protocol translation, job management, and status reporting.
pub async fn start(self) {
info!("Starting Translator Proxy...");
// only initialized once
TPROXY_MODE
.set(self.config.aggregate_channels.into())
.expect("TPROXY_MODE initialized more than once");
VARDIFF_ENABLED
.set(self.config.downstream_difficulty_config.enable_vardiff)
.expect("VARDIFF_ENABLED initialized more than once");

let tproxy_mode: TproxyMode = self.config.aggregate_channels.into();
let report_hashrate = self.config.downstream_difficulty_config.enable_vardiff;

let cancellation_token = self.cancellation_token.clone();
let mut fallback_coordinator = FallbackCoordinator::new();
Expand Down Expand Up @@ -130,6 +126,7 @@ impl TranslatorSv2 {
channel_manager_to_sv1_server_receiver,
sv1_server_to_channel_manager_sender,
self.config.clone(),
tproxy_mode,
));

info!("Initializing upstream connection...");
Expand Down Expand Up @@ -162,6 +159,8 @@ impl TranslatorSv2 {
status_sender.clone(),
self.config.supported_extensions.clone(),
self.config.required_extensions.clone(),
tproxy_mode,
report_hashrate,
));

info!("Launching ChannelManager tasks...");
Expand Down Expand Up @@ -278,11 +277,24 @@ impl TranslatorSv2 {
let (sv1_server_to_channel_manager_sender, sv1_server_to_channel_manager_receiver) =
unbounded();

channel_manager = Arc::new(ChannelManager::new(
channel_manager_to_upstream_sender,
upstream_to_channel_manager_receiver,
channel_manager_to_sv1_server_sender,
sv1_server_to_channel_manager_receiver,
status_sender.clone(),
self.config.supported_extensions.clone(),
self.config.required_extensions.clone(),
tproxy_mode,
report_hashrate,
));

sv1_server = Arc::new(Sv1Server::new(
downstream_addr,
channel_manager_to_sv1_server_receiver,
sv1_server_to_channel_manager_sender,
self.config.clone(),
tproxy_mode,
));

if let Err(e) = self.initialize_upstream(
Expand All @@ -301,16 +313,6 @@ impl TranslatorSv2 {
break;
}

channel_manager = Arc::new(ChannelManager::new(
channel_manager_to_upstream_sender,
upstream_to_channel_manager_receiver,
channel_manager_to_sv1_server_sender,
sv1_server_to_channel_manager_receiver,
status_sender.clone(),
self.config.supported_extensions.clone(),
self.config.required_extensions.clone(),
));

info!("Launching ChannelManager tasks...");
ChannelManager::run_channel_manager_tasks(
channel_manager.clone(),
Expand Down Expand Up @@ -576,41 +578,14 @@ impl From<bool> for TproxyMode {
}
}

static TPROXY_MODE: OnceLock<TproxyMode> = OnceLock::new();
static VARDIFF_ENABLED: OnceLock<bool> = OnceLock::new();

#[cfg(not(test))]
pub fn tproxy_mode() -> TproxyMode {
*TPROXY_MODE.get().expect("TPROXY_MODE has to exist")
}

// We don’t initialize `TPROXY_MODE` in tests, so any test that
// depends on it will panic if the mode is undefined.
// This `cfg` wrapper ensures `tproxy_mode` does not panic in
// an undefined state by providing a default value when needed.
#[cfg(test)]
pub fn tproxy_mode() -> TproxyMode {
*TPROXY_MODE.get_or_init(|| TproxyMode::Aggregated)
}

#[inline]
pub fn is_aggregated() -> bool {
matches!(tproxy_mode(), TproxyMode::Aggregated)
}

#[inline]
pub fn is_non_aggregated() -> bool {
matches!(tproxy_mode(), TproxyMode::NonAggregated)
}

#[cfg(not(test))]
pub fn vardiff_enabled() -> bool {
*VARDIFF_ENABLED.get().expect("VARDIFF_ENABLED has to exist")
}
impl TproxyMode {
pub fn is_aggregated(&self) -> bool {
matches!(self, TproxyMode::Aggregated)
}

#[cfg(test)]
pub fn vardiff_enabled() -> bool {
*VARDIFF_ENABLED.get_or_init(|| true)
pub fn is_non_aggregated(&self) -> bool {
matches!(self, TproxyMode::NonAggregated)
}
}

impl Drop for TranslatorSv2 {
Expand All @@ -619,3 +594,61 @@ impl Drop for TranslatorSv2 {
self.cancellation_token.cancel();
}
}

#[cfg(test)]
mod tests {
use super::*;
use config::{DownstreamDifficultyConfig, Upstream};
use std::str::FromStr;
use stratum_apps::key_utils::Secp256k1PublicKey;

/// Creates a test TranslatorSv2 configuration
fn create_test_translator_config(aggregate_channels: bool) -> TranslatorConfig {
let pubkey_str = "9bDuixKmZqAJnrmP746n8zU1wyAQRrus7th9dxnkPg6RzQvCnan";
let pubkey = Secp256k1PublicKey::from_str(pubkey_str).unwrap();

let upstream = Upstream::new("127.0.0.1".to_string(), 4444, pubkey);
let difficulty_config = DownstreamDifficultyConfig::new(100.0, 5.0, true, 60);

TranslatorConfig::new(
vec![upstream],
"0.0.0.0".to_string(), // downstream_address
3333, // downstream_port
difficulty_config, // downstream_difficulty_config
2, // max_supported_version
1, // min_supported_version
4, // downstream_extranonce2_size
"test_user".to_string(),
aggregate_channels, // aggregate_channels - key parameter being tested
vec![], // supported_extensions
vec![], // required_extensions
None, // monitoring_address
None, // monitoring_cache_refresh_secs
)
}

/// Verifies that multiple TranslatorSv2 instances can be created sequentially
/// without panicking due to OnceLock re-initialization (issue #430).
#[test]
fn test_multiple_translator_instances_sequential() {
// Create first instance with aggregated mode
let config1 = create_test_translator_config(true);
let translator1 = TranslatorSv2::new(config1);
assert!(translator1.is_alive.load(Ordering::Relaxed));

// Create second instance with non-aggregated mode
// This would have panicked before the refactor due to OnceLock re-initialization
let config2 = create_test_translator_config(false);
let translator2 = TranslatorSv2::new(config2);
assert!(translator2.is_alive.load(Ordering::Relaxed));

// Create third instance with aggregated mode
let config3 = create_test_translator_config(true);
let translator3 = TranslatorSv2::new(config3);
assert!(translator3.is_alive.load(Ordering::Relaxed));

// All instances should have independent tproxy_mode values
// (We can't easily test this without running start(), but the fact that
// we can create multiple instances without panic proves the fix works)
}
}
9 changes: 3 additions & 6 deletions miner-apps/translator/src/lib/monitoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,15 @@

use stratum_apps::monitoring::server::{ServerExtendedChannelInfo, ServerInfo, ServerMonitoring};

use crate::{
sv2::channel_manager::ChannelManager, tproxy_mode, utils::AGGREGATED_CHANNEL_ID,
vardiff_enabled, TproxyMode,
};
use crate::{sv2::channel_manager::ChannelManager, utils::AGGREGATED_CHANNEL_ID, TproxyMode};

impl ServerMonitoring for ChannelManager {
fn get_server(&self) -> ServerInfo {
let mut extended_channels = Vec::new();
let standard_channels = Vec::new(); // tProxy only uses extended channels
let report_hashrate = vardiff_enabled();
let report_hashrate = self.report_hashrate;

match tproxy_mode() {
match self.tproxy_mode {
TproxyMode::Aggregated => {
// In Aggregated mode: one shared channel to the server
// stored under AGGREGATED_CHANNEL_ID
Expand Down
Loading
Loading