Skip to content

Commit 985afe4

Browse files
Eric Pricegimballock
authored andcommitted
fix(monitoring): move metric updates from /metrics handler into SnapshotCache::refresh (#337)
Move all Prometheus gauge updates (set + stale-label removal) out of the /metrics HTTP handler and into SnapshotCache::refresh(), which runs as a periodic background task. This eliminates the GaugeVec reset gap where label series momentarily disappeared on every scrape. Changes: - SnapshotCache now owns PrometheusMetrics and PreviousLabelSets - refresh() updates snapshot data AND Prometheus gauges atomically - /metrics handler reduced to: set uptime gauge, gather, encode - ServerState simplified (no more PreviousLabelSets or Mutex) - Tests updated to wire metrics through cache via with_metrics() - Integration tests: replace fixed-sleep assertions with poll_until_metric_gte (100ms poll, 5s deadline) for CI resilience - Clone impl preserves previous_labels for correct stale-label detection - debug-level tracing on stale label removal errors - debug_assert on with_metrics double-attachment Closes #337
1 parent 698e419 commit 985afe4

File tree

4 files changed

+404
-234
lines changed

4 files changed

+404
-234
lines changed

integration-tests/lib/prometheus_metrics_assertions.rs

Lines changed: 21 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,22 @@ pub(crate) fn parse_metric_value(metrics_text: &str, metric_name: &str) -> Optio
3838
if line.starts_with('#') {
3939
continue;
4040
}
41-
// For labeled metrics, match the prefix up to the closing brace
4241
if let Some(rest) = line.strip_prefix(metric_name) {
43-
// The value follows a space after the metric name (or after the closing brace)
44-
let value_str = rest.trim();
45-
// If there are labels and the name didn't include them, skip
46-
if value_str.starts_with('{') {
42+
let rest = rest.trim();
43+
if rest.is_empty() {
4744
continue;
4845
}
49-
return value_str.parse::<f64>().ok();
46+
// Bare metric (no labels): value follows directly after the name
47+
if rest.starts_with(|c: char| c.is_ascii_digit() || c == '-') {
48+
return rest.parse::<f64>().ok();
49+
}
50+
// Labeled metric: skip past the closing brace to get the value
51+
if rest.starts_with('{') {
52+
if let Some(brace_end) = rest.find('}') {
53+
let value_str = rest[brace_end + 1..].trim();
54+
return value_str.parse::<f64>().ok();
55+
}
56+
}
5057
}
5158
}
5259
None
@@ -135,15 +142,11 @@ pub fn assert_metric_present(metrics_text: &str, metric_name: &str) {
135142
);
136143
}
137144

138-
/// Poll the `/metrics` endpoint until any line matching `metric_name` (with any labels) has a
139-
/// value >= `min`, then return the full metrics text. Panics if the condition is not met within
140-
/// `timeout`.
145+
/// Poll `/metrics` until `metric_name` is present with a value >= `min`, or panic after
146+
/// `timeout`. Polls every 100ms to react quickly while tolerating cache refresh jitter.
141147
///
142-
/// Use this instead of a fixed `sleep` for `GaugeVec` metrics (per-channel shares, blocks found)
143-
/// that only appear in Prometheus output after the monitoring snapshot cache has refreshed with
144-
/// observed label combinations. The handler calls `.reset()` on every `/metrics` request before
145-
/// repopulating from the cached snapshot, so a label combination is only present when the
146-
/// snapshot contains a non-default value for it.
148+
/// Returns the full metrics text from the successful scrape so callers can make additional
149+
/// assertions without a second fetch.
147150
pub async fn poll_until_metric_gte(
148151
monitoring_addr: SocketAddr,
149152
metric_name: &str,
@@ -153,38 +156,18 @@ pub async fn poll_until_metric_gte(
153156
let deadline = tokio::time::Instant::now() + timeout;
154157
loop {
155158
let metrics = fetch_metrics(monitoring_addr).await;
156-
let satisfied = metrics.lines().any(|line| {
157-
if line.starts_with('#') {
158-
return false;
159-
}
160-
if let Some(rest) = line.strip_prefix(metric_name) {
161-
// Match bare name followed by space, or labeled name followed by '{'
162-
let value_str = if rest.starts_with(' ') {
163-
rest.trim()
164-
} else if rest.starts_with('{') {
165-
// Skip past the closing brace to get the value
166-
rest.find('}')
167-
.and_then(|i| rest.get(i + 1..))
168-
.map(|s| s.trim())
169-
.unwrap_or("")
170-
} else {
171-
return false;
172-
};
173-
value_str.parse::<f64>().map(|v| v >= min).unwrap_or(false)
174-
} else {
175-
false
159+
if let Some(v) = parse_metric_value(&metrics, metric_name) {
160+
if v >= min {
161+
return metrics;
176162
}
177-
});
178-
if satisfied {
179-
return metrics;
180163
}
181164
if tokio::time::Instant::now() >= deadline {
182165
panic!(
183166
"Metric '{}' never reached >= {} within {:?}. Last /metrics response:\n{}",
184167
metric_name, min, timeout, metrics
185168
);
186169
}
187-
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
170+
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
188171
}
189172
}
190173

integration-tests/tests/monitoring_integration.rs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ use integration_tests_sv2::{
99
};
1010
use stratum_apps::stratum_core::mining_sv2::*;
1111

12+
/// Timeout for polling metric assertions. Generous enough for slow CI.
13+
const METRIC_POLL_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5);
14+
1215
// ---------------------------------------------------------------------------
1316
// 1. Pool + SV2 Mining Device (standard channel) Pool role exposes: client metrics (connections,
1417
// channels, shares, hashrate) Pool has NO upstream, so server metrics should be absent.
@@ -41,12 +44,12 @@ async fn pool_monitoring_with_sv2_mining_device() {
4144
// Health API
4245
assert_api_health(pool_mon).await;
4346

44-
// Poll until per-channel share metric is populated in the monitoring cache
47+
// Poll until the monitoring cache has refreshed with the new share data
4548
let pool_metrics = poll_until_metric_gte(
4649
pool_mon,
4750
"sv2_client_shares_accepted_total",
4851
1.0,
49-
std::time::Duration::from_secs(10),
52+
METRIC_POLL_TIMEOUT,
5053
)
5154
.await;
5255
assert_uptime(&pool_metrics);
@@ -86,11 +89,13 @@ async fn pool_and_tproxy_monitoring_with_sv1_miner() {
8689
// -- Pool metrics --
8790
let pool_mon = pool_monitoring.expect("pool monitoring should be enabled");
8891
assert_api_health(pool_mon).await;
92+
93+
// Poll until the monitoring cache has refreshed with the new share data
8994
let pool_metrics = poll_until_metric_gte(
9095
pool_mon,
9196
"sv2_client_shares_accepted_total",
9297
1.0,
93-
std::time::Duration::from_secs(10),
98+
METRIC_POLL_TIMEOUT,
9499
)
95100
.await;
96101
assert_uptime(&pool_metrics);
@@ -101,13 +106,7 @@ async fn pool_and_tproxy_monitoring_with_sv1_miner() {
101106
// -- tProxy metrics --
102107
let tproxy_mon = tproxy_monitoring.expect("tproxy monitoring should be enabled");
103108
assert_api_health(tproxy_mon).await;
104-
let tproxy_metrics = poll_until_metric_gte(
105-
tproxy_mon,
106-
"sv2_server_shares_accepted_total",
107-
1.0,
108-
std::time::Duration::from_secs(10),
109-
)
110-
.await;
109+
let tproxy_metrics = fetch_metrics(tproxy_mon).await;
111110
assert_uptime(&tproxy_metrics);
112111
// tProxy has 1 upstream extended channel
113112
assert_metric_eq(
@@ -166,11 +165,13 @@ async fn jd_aggregated_topology_monitoring() {
166165
// -- Pool metrics: sees 1 SV2 client (JDC), shares accepted --
167166
let pool_mon = pool_monitoring.expect("pool monitoring should be enabled");
168167
assert_api_health(pool_mon).await;
168+
169+
// Poll until the monitoring cache has refreshed with the new share data
169170
let pool_metrics = poll_until_metric_gte(
170171
pool_mon,
171172
"sv2_client_shares_accepted_total",
172173
1.0,
173-
std::time::Duration::from_secs(10),
174+
METRIC_POLL_TIMEOUT,
174175
)
175176
.await;
176177
assert_uptime(&pool_metrics);
@@ -223,13 +224,13 @@ async fn block_found_detected_in_pool_metrics() {
223224
.wait_for_message_type(MessageDirection::ToUpstream, MESSAGE_TYPE_SUBMIT_SOLUTION)
224225
.await;
225226

226-
// Poll until block found metric appears in monitoring cache
227+
// Poll until the monitoring cache has refreshed with the block found data
227228
let pool_mon = pool_monitoring.expect("pool monitoring should be enabled");
228229
let pool_metrics = poll_until_metric_gte(
229230
pool_mon,
230231
"sv2_client_blocks_found_total",
231232
1.0,
232-
std::time::Duration::from_secs(10),
233+
METRIC_POLL_TIMEOUT,
233234
)
234235
.await;
235236
assert_uptime(&pool_metrics);

0 commit comments

Comments
 (0)