From 4c005e62de44842b3257ee1e437d6129b3c31bf6 Mon Sep 17 00:00:00 2001 From: Siddharth Sharma Date: Tue, 14 Apr 2026 16:29:58 -0700 Subject: [PATCH] [metrics] Add Bitcoin confirmation-count gauge for deposits Adds hashi_deposit_request_confirmations, an IntGaugeVec bucketed by {not_found, mempool, 0, 1, 2, 3, 4, 5, 6_plus}, so the Grafana dashboard can show the live distribution of pending deposits across confirmation levels and operators can tell "queue stuck" from "queue waiting on Bitcoin" at a glance. The gauge piggy-backs on the RPCs that btc_monitor already issues as part of its per-deposit confirmation-check flow: each pending-deposit worker sends a RecordDepositObservation / ForgetDeposit message to the monitor's own event loop after its bitcoind RPC returns, and the monitor caches the observation per outpoint. Every new Bitcoin block then rebuilds the gauge from that cache without issuing any fresh RPCs. InBlock observations store the raw block height, so a deposit moves through buckets naturally as new blocks arrive without needing fresh worker messages. Stale entries (not refreshed within STALE_OBSERVATION_BLOCKS = 10 blocks) are GC'd on every rebuild so deposits that expired or were reassigned during leader rotations leave the gauge on their own. Withdrawals are out of scope for this change. --- crates/hashi/src/btc_monitor/monitor.rs | 428 ++++++++++++++++++++++++ crates/hashi/src/metrics.rs | 23 ++ 2 files changed, 451 insertions(+) diff --git a/crates/hashi/src/btc_monitor/monitor.rs b/crates/hashi/src/btc_monitor/monitor.rs index 0966e74f8..d1df9c97d 100644 --- a/crates/hashi/src/btc_monitor/monitor.rs +++ b/crates/hashi/src/btc_monitor/monitor.rs @@ -1,6 +1,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -28,6 +29,12 @@ const KYOTO_MAX_CONSECUTIVE_FAILURES: u32 = 30; /// Delay before restarting Kyoto after connectivity loss. const KYOTO_RESTART_DELAY: Duration = Duration::from_secs(5); +/// How many Bitcoin blocks a deposit observation can go without being +/// refreshed before it's dropped from the confirmation-metrics cache. Prevents +/// stale entries from lingering once the leader stops querying a deposit +/// (e.g. the deposit was confirmed, expired, or leader rotated). +const STALE_OBSERVATION_BLOCKS: u32 = 10; + #[derive(Debug, Clone, PartialEq, Eq)] pub enum TxStatus { Confirmed { confirmations: u32 }, @@ -35,6 +42,23 @@ pub enum TxStatus { NotFound, } +/// Last-known Bitcoin-side status of a deposit, as observed by the monitor's +/// existing per-deposit RPC flow. Cached per outpoint so the confirmation +/// gauge can be rebuilt on every new Bitcoin block without issuing fresh +/// RPCs. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum CachedDepositObservation { + NotFound, + InMempool, + InBlock { height: u32 }, +} + +#[derive(Debug, Clone, Copy)] +struct CachedDepositEntry { + observation: CachedDepositObservation, + last_updated_tip: u32, +} + #[derive(Debug, thiserror::Error)] pub enum DepositConfirmError { #[error("UTXO {txid}:{vout} has already been spent on Bitcoin")] @@ -63,6 +87,12 @@ pub struct Monitor { pending_deposits: Vec, pending_deposit_workers: JoinSet<()>, rpc_workers: JoinSet<()>, + /// Per-outpoint cache of the last observation a deposit-processing worker + /// made about each deposit's Bitcoin-side status. Source of truth for the + /// `hashi_deposit_request_confirmations` gauge. Mutated only from the + /// monitor's event loop via `RecordDepositObservation` / `ForgetDeposit` + /// messages. + deposit_observation_cache: HashMap, } /// Offload a blocking Bitcoin Core RPC call to the tokio blocking thread pool. @@ -135,6 +165,7 @@ impl Monitor { pending_deposits: vec![], pending_deposit_workers: JoinSet::new(), rpc_workers: JoinSet::new(), + deposit_observation_cache: HashMap::new(), }; monitor @@ -418,9 +449,55 @@ impl Monitor { result_tx, )); } + MonitorMessage::RecordDepositObservation { + outpoint, + observation, + } => { + self.record_deposit_observation(outpoint, observation); + self.rebuild_confirmation_metrics(); + } + MonitorMessage::ForgetDeposit(outpoint) => { + if self.deposit_observation_cache.remove(&outpoint).is_some() { + self.rebuild_confirmation_metrics(); + } + } } } + fn record_deposit_observation( + &mut self, + outpoint: bitcoin::OutPoint, + observation: CachedDepositObservation, + ) { + let last_updated_tip = self.tip.as_ref().map(|t| t.height).unwrap_or(0); + self.deposit_observation_cache.insert( + outpoint, + CachedDepositEntry { + observation, + last_updated_tip, + }, + ); + } + + /// Drop cache entries that haven't been refreshed in the last + /// `STALE_OBSERVATION_BLOCKS` Bitcoin blocks, then write the current + /// bucket counts to `deposit_request_confirmations`. Runs on every new + /// Bitcoin block (via `process_pending_deposits`) and after every + /// observation-cache mutation. + /// + /// Bucket mapping: `InBlock { height }` is mapped to the `"N"` bucket via + /// `(tip.height + 1) - height`, clamped to `6_plus`, so a deposit moves + /// through buckets naturally as new blocks arrive without needing fresh + /// messages from workers. + fn rebuild_confirmation_metrics(&mut self) { + let tip_height = self.tip.as_ref().map(|t| t.height).unwrap_or(0); + rebuild_confirmation_metrics_inner( + &mut self.deposit_observation_cache, + tip_height, + &self.metrics, + ); + } + fn confirm_deposit(&mut self, pending_deposit: PendingDeposit) { debug!( "Processing deposit confirmation for {}", @@ -567,6 +644,11 @@ impl Monitor { } fn process_pending_deposits(&mut self) { + // Always refresh the confirmation gauge when the tip advances, even + // if there are no pending deposits to spawn — this GCs stale cache + // entries and lets the dashboard zero out when the queue empties. + self.rebuild_confirmation_metrics(); + let Some(tip) = &self.tip else { // Can't confirm deposits if we don't yet know the tip of the chain. return; @@ -606,7 +688,11 @@ impl Monitor { } pending_deposit.checked_at_height = tip.height; + // Clone client_tx so the worker can still emit observation messages + // after ownership moves into the guard. + let observation_tx = client_tx.clone(); let mut pending_deposit = PendingDepositGuard::new(pending_deposit, client_tx); + let outpoint = pending_deposit.outpoint; // Look up block from the txid. let block_info = match pending_deposit.block_info { @@ -623,6 +709,19 @@ impl Monitor { .await { Ok(tx_info) => tx_info, + Err(corepc_client::client_sync::Error::JsonRpc( + jsonrpc::error::Error::Rpc(ref e), + )) if e.code == -5 => { + // RPC error -5: "No such mempool or blockchain transaction" + debug!("Transaction {txid} not found in mempool or blockchain"); + send_observation( + &observation_tx, + outpoint, + CachedDepositObservation::NotFound, + ) + .await; + return; + } Err(e) => { error!("Failed to look up txid {txid}: {e}"); return; @@ -643,6 +742,12 @@ impl Monitor { "Transaction {} is not yet included in a block", pending_deposit.outpoint.txid ); + send_observation( + &observation_tx, + outpoint, + CachedDepositObservation::InMempool, + ) + .await; return; }; // Verify the block hash is in kyoto's independently-validated @@ -671,6 +776,19 @@ impl Monitor { } }; + // Refresh the observation cache for this outpoint. Runs on both the + // fresh-RPC path and the rare cached-`block_info` path (re-check + // within the leader's 5 s window), so the gauge bucket and staleness + // counter stay in sync every time this worker touches the deposit. + send_observation( + &observation_tx, + outpoint, + CachedDepositObservation::InBlock { + height: block_info.height, + }, + ) + .await; + // Check if the deposit has enough confirmations yet. let confirmations = (tip.height + 1).saturating_sub(block_info.height); if confirmations < confirmation_threshold { @@ -717,6 +835,7 @@ impl Monitor { let txout = match transaction.tx_out(pending_deposit.outpoint.vout.try_into().unwrap()) { Ok(txout) => txout.clone(), Err(e) => { + send_forget(&observation_tx, outpoint).await; let pending_deposit = pending_deposit.take(); let _ = pending_deposit .result_tx @@ -747,6 +866,7 @@ impl Monitor { "Deposit {}:{} confirmed with {confirmations}/{confirmation_threshold} confirmations", pending_deposit.outpoint.txid, pending_deposit.outpoint.vout, ); + send_forget(&observation_tx, outpoint).await; let pending_deposit = pending_deposit.take(); let _ = pending_deposit.result_tx.send(Ok(txout)); } @@ -757,6 +877,7 @@ impl Monitor { ); let txid = pending_deposit.outpoint.txid; let vout = pending_deposit.outpoint.vout; + send_forget(&observation_tx, outpoint).await; let pending_deposit = pending_deposit.take(); let _ = pending_deposit .result_tx @@ -769,6 +890,74 @@ impl Monitor { } } +/// GC stale entries from the cache, then write bucket counts into the +/// `deposit_request_confirmations` gauge. Pure over its inputs so it can be +/// exercised directly from tests without constructing a full `Monitor`. +fn rebuild_confirmation_metrics_inner( + cache: &mut HashMap, + tip_height: u32, + metrics: &Metrics, +) { + let min_fresh = tip_height.saturating_sub(STALE_OBSERVATION_BLOCKS); + cache.retain(|_, entry| entry.last_updated_tip >= min_fresh); + + let mut counts = [0i64; crate::metrics::CONFIRMATION_STATUS_LABELS.len()]; + for entry in cache.values() { + let idx = match entry.observation { + CachedDepositObservation::NotFound => 0, + CachedDepositObservation::InMempool => 1, + CachedDepositObservation::InBlock { height } => { + let confirmations = (tip_height + 1).saturating_sub(height); + // Bucket 2 == "0 confirmations", through bucket 8 == "6_plus". + (2 + confirmations.min(6) as usize).min(8) + } + }; + counts[idx] += 1; + } + + for (i, label) in crate::metrics::CONFIRMATION_STATUS_LABELS + .iter() + .enumerate() + { + metrics + .deposit_request_confirmations + .with_label_values(&[label]) + .set(counts[i]); + } +} + +/// Best-effort send of a `RecordDepositObservation` message to the monitor's +/// own event loop. Silently drops if the channel is closed (shutdown). +async fn send_observation( + client_tx: &tokio::sync::mpsc::Sender, + outpoint: bitcoin::OutPoint, + observation: CachedDepositObservation, +) { + if let Err(e) = client_tx + .send(MonitorMessage::RecordDepositObservation { + outpoint, + observation, + }) + .await + { + debug!("Failed to record deposit observation (monitor shutting down?): {e}"); + } +} + +/// Best-effort send of a `ForgetDeposit` message to the monitor's own event +/// loop. Silently drops if the channel is closed (shutdown). +async fn send_forget( + client_tx: &tokio::sync::mpsc::Sender, + outpoint: bitcoin::OutPoint, +) { + if let Err(e) = client_tx + .send(MonitorMessage::ForgetDeposit(outpoint)) + .await + { + debug!("Failed to forget deposit observation (monitor shutting down?): {e}"); + } +} + #[derive(Debug)] struct PendingDeposit { outpoint: bitcoin::OutPoint, @@ -904,4 +1093,243 @@ enum MonitorMessage { // Query the status of a transaction (confirmed, in mempool, or not found). GetTransactionStatus(bitcoin::Txid, oneshot::Sender>), + + // Record an observation that a pending-deposit worker just made about a + // deposit's Bitcoin-side status. Updates `deposit_observation_cache` so + // that the next `rebuild_confirmation_metrics` can see the fresh entry. + RecordDepositObservation { + outpoint: bitcoin::OutPoint, + observation: CachedDepositObservation, + }, + + // Drop a deposit from `deposit_observation_cache` — typically because the + // deposit was successfully confirmed or permanently rejected, so it + // should leave the gauge immediately rather than wait for GC. + ForgetDeposit(bitcoin::OutPoint), +} + +#[cfg(test)] +mod tests { + use super::*; + use bitcoin::hashes::Hash; + + fn make_outpoint(seed: u8) -> bitcoin::OutPoint { + let mut bytes = [0u8; 32]; + bytes[0] = seed; + bitcoin::OutPoint { + txid: bitcoin::Txid::from_byte_array(bytes), + vout: 0, + } + } + + fn fresh_metrics() -> Metrics { + Metrics::new(&prometheus::Registry::new()) + } + + fn bucket(metrics: &Metrics, label: &str) -> i64 { + metrics + .deposit_request_confirmations + .with_label_values(&[label]) + .get() + } + + #[test] + fn empty_cache_zeros_all_labels() { + let metrics = fresh_metrics(); + let mut cache: HashMap = HashMap::new(); + + rebuild_confirmation_metrics_inner(&mut cache, 100, &metrics); + + for label in crate::metrics::CONFIRMATION_STATUS_LABELS { + assert_eq!(bucket(&metrics, label), 0, "label {label} should be zero"); + } + } + + #[test] + fn in_block_maps_to_confirmation_count_bucket() { + let metrics = fresh_metrics(); + let mut cache = HashMap::new(); + // Tip is 100, deposit is in block at height 98 → confirmations = 3 → bucket "2" + // Wait: confirmations = (100 + 1) - 98 = 3, bucket index 2 + 3 = 5 → "3". + cache.insert( + make_outpoint(1), + CachedDepositEntry { + observation: CachedDepositObservation::InBlock { height: 98 }, + last_updated_tip: 100, + }, + ); + + rebuild_confirmation_metrics_inner(&mut cache, 100, &metrics); + + assert_eq!(bucket(&metrics, "3"), 1); + assert_eq!(bucket(&metrics, "2"), 0); + assert_eq!(bucket(&metrics, "4"), 0); + } + + #[test] + fn tip_advance_shifts_bucket_without_fresh_messages() { + let metrics = fresh_metrics(); + let mut cache = HashMap::new(); + cache.insert( + make_outpoint(1), + CachedDepositEntry { + observation: CachedDepositObservation::InBlock { height: 100 }, + last_updated_tip: 100, + }, + ); + + // Tip 100: confirmations = (100+1) - 100 = 1 → bucket "1" + rebuild_confirmation_metrics_inner(&mut cache, 100, &metrics); + assert_eq!(bucket(&metrics, "1"), 1); + assert_eq!(bucket(&metrics, "2"), 0); + + // Tip 101: confirmations = (101+1) - 100 = 2 → bucket "2" + rebuild_confirmation_metrics_inner(&mut cache, 101, &metrics); + assert_eq!(bucket(&metrics, "1"), 0); + assert_eq!(bucket(&metrics, "2"), 1); + } + + #[test] + fn confirmations_at_or_above_six_land_in_6_plus() { + let metrics = fresh_metrics(); + let mut cache = HashMap::new(); + // Deposit at height 10, tip at 100 → 91 confirmations → 6_plus. + cache.insert( + make_outpoint(1), + CachedDepositEntry { + observation: CachedDepositObservation::InBlock { height: 10 }, + last_updated_tip: 100, + }, + ); + // Deposit at height 94, tip at 100 → 7 confirmations → 6_plus. + cache.insert( + make_outpoint(2), + CachedDepositEntry { + observation: CachedDepositObservation::InBlock { height: 94 }, + last_updated_tip: 100, + }, + ); + // Deposit at height 95, tip at 100 → 6 confirmations → 6_plus. + cache.insert( + make_outpoint(3), + CachedDepositEntry { + observation: CachedDepositObservation::InBlock { height: 95 }, + last_updated_tip: 100, + }, + ); + // Deposit at height 96, tip at 100 → 5 confirmations → "5". + cache.insert( + make_outpoint(4), + CachedDepositEntry { + observation: CachedDepositObservation::InBlock { height: 96 }, + last_updated_tip: 100, + }, + ); + + rebuild_confirmation_metrics_inner(&mut cache, 100, &metrics); + + assert_eq!(bucket(&metrics, "6_plus"), 3); + assert_eq!(bucket(&metrics, "5"), 1); + } + + #[test] + fn not_found_and_mempool_populate_their_labels() { + let metrics = fresh_metrics(); + let mut cache = HashMap::new(); + cache.insert( + make_outpoint(1), + CachedDepositEntry { + observation: CachedDepositObservation::NotFound, + last_updated_tip: 100, + }, + ); + cache.insert( + make_outpoint(2), + CachedDepositEntry { + observation: CachedDepositObservation::InMempool, + last_updated_tip: 100, + }, + ); + cache.insert( + make_outpoint(3), + CachedDepositEntry { + observation: CachedDepositObservation::InMempool, + last_updated_tip: 100, + }, + ); + + rebuild_confirmation_metrics_inner(&mut cache, 100, &metrics); + + assert_eq!(bucket(&metrics, "not_found"), 1); + assert_eq!(bucket(&metrics, "mempool"), 2); + } + + #[test] + fn stale_entries_are_gced() { + let metrics = fresh_metrics(); + let mut cache = HashMap::new(); + // Fresh entry (updated at tip-1). + cache.insert( + make_outpoint(1), + CachedDepositEntry { + observation: CachedDepositObservation::InBlock { height: 90 }, + last_updated_tip: 99, + }, + ); + // Stale entry (updated 20 blocks ago, threshold is 10). + cache.insert( + make_outpoint(2), + CachedDepositEntry { + observation: CachedDepositObservation::InBlock { height: 70 }, + last_updated_tip: 80, + }, + ); + + rebuild_confirmation_metrics_inner(&mut cache, 100, &metrics); + + assert_eq!(cache.len(), 1, "stale entry should be GC'd"); + assert!(cache.contains_key(&make_outpoint(1))); + // Fresh entry: tip 100 - block 90 + 1 = 11 confirmations → 6_plus. + assert_eq!(bucket(&metrics, "6_plus"), 1); + } + + #[test] + fn entry_on_gc_boundary_is_kept() { + let metrics = fresh_metrics(); + let mut cache = HashMap::new(); + // Entry exactly at the staleness boundary: tip - STALE_OBSERVATION_BLOCKS. + cache.insert( + make_outpoint(1), + CachedDepositEntry { + observation: CachedDepositObservation::InMempool, + last_updated_tip: 100 - STALE_OBSERVATION_BLOCKS, + }, + ); + + rebuild_confirmation_metrics_inner(&mut cache, 100, &metrics); + + assert_eq!(cache.len(), 1); + assert_eq!(bucket(&metrics, "mempool"), 1); + } + + #[test] + fn tip_zero_keeps_all_entries() { + // Bootstrap case: tip is not yet known (represented as 0). We should + // not GC just because tip is 0, otherwise every fresh observation + // would immediately be dropped before the first block arrives. + let metrics = fresh_metrics(); + let mut cache = HashMap::new(); + cache.insert( + make_outpoint(1), + CachedDepositEntry { + observation: CachedDepositObservation::InMempool, + last_updated_tip: 0, + }, + ); + + rebuild_confirmation_metrics_inner(&mut cache, 0, &metrics); + + assert_eq!(cache.len(), 1); + assert_eq!(bucket(&metrics, "mempool"), 1); + } } diff --git a/crates/hashi/src/metrics.rs b/crates/hashi/src/metrics.rs index 9f884fd0d..3b55f9bc2 100644 --- a/crates/hashi/src/metrics.rs +++ b/crates/hashi/src/metrics.rs @@ -50,6 +50,7 @@ pub struct Metrics { reconfig_in_progress: IntGauge, paused: IntGauge, deposit_queue_size: IntGauge, + pub deposit_request_confirmations: IntGaugeVec, withdrawal_queue_size: IntGaugeVec, withdrawal_queue_value: IntGaugeVec, utxo_pool_size: IntGaugeVec, @@ -81,6 +82,20 @@ const LATENCY_SEC_BUCKETS: &[f64] = &[ const MPC_SIGN_DURATION_BUCKETS: &[f64] = &[0.1, 0.25, 0.5, 1., 1.5, 2., 2.5, 3., 4., 5., 7.5, 10.]; +/// Bucket labels for `hashi_deposit_request_confirmations`. Exported so callers +/// that rebuild the gauge can enumerate every label. +pub const CONFIRMATION_STATUS_LABELS: &[&str] = &[ + "not_found", + "mempool", + "0", + "1", + "2", + "3", + "4", + "5", + "6_plus", +]; + impl Metrics { pub fn new_default() -> Self { Self::new(prometheus::default_registry()) @@ -216,6 +231,14 @@ impl Metrics { registry, ) .unwrap(), + deposit_request_confirmations: register_int_gauge_vec_with_registry!( + "hashi_deposit_request_confirmations", + "Pending deposit requests bucketed by their Bitcoin-side transaction status. \ + The `status` label is one of: not_found, mempool, 0, 1, 2, 3, 4, 5, 6_plus.", + &["status"], + registry, + ) + .unwrap(), withdrawal_queue_size: register_int_gauge_vec_with_registry!( "hashi_withdrawal_queue_size", "number of withdrawal requests by status",