diff --git a/crates/hashi/src/btc_monitor/monitor.rs b/crates/hashi/src/btc_monitor/monitor.rs index 0966e74f8..d1df9c97d 100644 --- a/crates/hashi/src/btc_monitor/monitor.rs +++ b/crates/hashi/src/btc_monitor/monitor.rs @@ -1,6 +1,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -28,6 +29,12 @@ const KYOTO_MAX_CONSECUTIVE_FAILURES: u32 = 30; /// Delay before restarting Kyoto after connectivity loss. const KYOTO_RESTART_DELAY: Duration = Duration::from_secs(5); +/// How many Bitcoin blocks a deposit observation can go without being +/// refreshed before it's dropped from the confirmation-metrics cache. Prevents +/// stale entries from lingering once the leader stops querying a deposit +/// (e.g. the deposit was confirmed, expired, or leader rotated). +const STALE_OBSERVATION_BLOCKS: u32 = 10; + #[derive(Debug, Clone, PartialEq, Eq)] pub enum TxStatus { Confirmed { confirmations: u32 }, @@ -35,6 +42,23 @@ pub enum TxStatus { NotFound, } +/// Last-known Bitcoin-side status of a deposit, as observed by the monitor's +/// existing per-deposit RPC flow. Cached per outpoint so the confirmation +/// gauge can be rebuilt on every new Bitcoin block without issuing fresh +/// RPCs. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum CachedDepositObservation { + NotFound, + InMempool, + InBlock { height: u32 }, +} + +#[derive(Debug, Clone, Copy)] +struct CachedDepositEntry { + observation: CachedDepositObservation, + last_updated_tip: u32, +} + #[derive(Debug, thiserror::Error)] pub enum DepositConfirmError { #[error("UTXO {txid}:{vout} has already been spent on Bitcoin")] @@ -63,6 +87,12 @@ pub struct Monitor { pending_deposits: Vec, pending_deposit_workers: JoinSet<()>, rpc_workers: JoinSet<()>, + /// Per-outpoint cache of the last observation a deposit-processing worker + /// made about each deposit's Bitcoin-side status. Source of truth for the + /// `hashi_deposit_request_confirmations` gauge. Mutated only from the + /// monitor's event loop via `RecordDepositObservation` / `ForgetDeposit` + /// messages. + deposit_observation_cache: HashMap, } /// Offload a blocking Bitcoin Core RPC call to the tokio blocking thread pool. @@ -135,6 +165,7 @@ impl Monitor { pending_deposits: vec![], pending_deposit_workers: JoinSet::new(), rpc_workers: JoinSet::new(), + deposit_observation_cache: HashMap::new(), }; monitor @@ -418,9 +449,55 @@ impl Monitor { result_tx, )); } + MonitorMessage::RecordDepositObservation { + outpoint, + observation, + } => { + self.record_deposit_observation(outpoint, observation); + self.rebuild_confirmation_metrics(); + } + MonitorMessage::ForgetDeposit(outpoint) => { + if self.deposit_observation_cache.remove(&outpoint).is_some() { + self.rebuild_confirmation_metrics(); + } + } } } + fn record_deposit_observation( + &mut self, + outpoint: bitcoin::OutPoint, + observation: CachedDepositObservation, + ) { + let last_updated_tip = self.tip.as_ref().map(|t| t.height).unwrap_or(0); + self.deposit_observation_cache.insert( + outpoint, + CachedDepositEntry { + observation, + last_updated_tip, + }, + ); + } + + /// Drop cache entries that haven't been refreshed in the last + /// `STALE_OBSERVATION_BLOCKS` Bitcoin blocks, then write the current + /// bucket counts to `deposit_request_confirmations`. Runs on every new + /// Bitcoin block (via `process_pending_deposits`) and after every + /// observation-cache mutation. + /// + /// Bucket mapping: `InBlock { height }` is mapped to the `"N"` bucket via + /// `(tip.height + 1) - height`, clamped to `6_plus`, so a deposit moves + /// through buckets naturally as new blocks arrive without needing fresh + /// messages from workers. + fn rebuild_confirmation_metrics(&mut self) { + let tip_height = self.tip.as_ref().map(|t| t.height).unwrap_or(0); + rebuild_confirmation_metrics_inner( + &mut self.deposit_observation_cache, + tip_height, + &self.metrics, + ); + } + fn confirm_deposit(&mut self, pending_deposit: PendingDeposit) { debug!( "Processing deposit confirmation for {}", @@ -567,6 +644,11 @@ impl Monitor { } fn process_pending_deposits(&mut self) { + // Always refresh the confirmation gauge when the tip advances, even + // if there are no pending deposits to spawn — this GCs stale cache + // entries and lets the dashboard zero out when the queue empties. + self.rebuild_confirmation_metrics(); + let Some(tip) = &self.tip else { // Can't confirm deposits if we don't yet know the tip of the chain. return; @@ -606,7 +688,11 @@ impl Monitor { } pending_deposit.checked_at_height = tip.height; + // Clone client_tx so the worker can still emit observation messages + // after ownership moves into the guard. + let observation_tx = client_tx.clone(); let mut pending_deposit = PendingDepositGuard::new(pending_deposit, client_tx); + let outpoint = pending_deposit.outpoint; // Look up block from the txid. let block_info = match pending_deposit.block_info { @@ -623,6 +709,19 @@ impl Monitor { .await { Ok(tx_info) => tx_info, + Err(corepc_client::client_sync::Error::JsonRpc( + jsonrpc::error::Error::Rpc(ref e), + )) if e.code == -5 => { + // RPC error -5: "No such mempool or blockchain transaction" + debug!("Transaction {txid} not found in mempool or blockchain"); + send_observation( + &observation_tx, + outpoint, + CachedDepositObservation::NotFound, + ) + .await; + return; + } Err(e) => { error!("Failed to look up txid {txid}: {e}"); return; @@ -643,6 +742,12 @@ impl Monitor { "Transaction {} is not yet included in a block", pending_deposit.outpoint.txid ); + send_observation( + &observation_tx, + outpoint, + CachedDepositObservation::InMempool, + ) + .await; return; }; // Verify the block hash is in kyoto's independently-validated @@ -671,6 +776,19 @@ impl Monitor { } }; + // Refresh the observation cache for this outpoint. Runs on both the + // fresh-RPC path and the rare cached-`block_info` path (re-check + // within the leader's 5 s window), so the gauge bucket and staleness + // counter stay in sync every time this worker touches the deposit. + send_observation( + &observation_tx, + outpoint, + CachedDepositObservation::InBlock { + height: block_info.height, + }, + ) + .await; + // Check if the deposit has enough confirmations yet. let confirmations = (tip.height + 1).saturating_sub(block_info.height); if confirmations < confirmation_threshold { @@ -717,6 +835,7 @@ impl Monitor { let txout = match transaction.tx_out(pending_deposit.outpoint.vout.try_into().unwrap()) { Ok(txout) => txout.clone(), Err(e) => { + send_forget(&observation_tx, outpoint).await; let pending_deposit = pending_deposit.take(); let _ = pending_deposit .result_tx @@ -747,6 +866,7 @@ impl Monitor { "Deposit {}:{} confirmed with {confirmations}/{confirmation_threshold} confirmations", pending_deposit.outpoint.txid, pending_deposit.outpoint.vout, ); + send_forget(&observation_tx, outpoint).await; let pending_deposit = pending_deposit.take(); let _ = pending_deposit.result_tx.send(Ok(txout)); } @@ -757,6 +877,7 @@ impl Monitor { ); let txid = pending_deposit.outpoint.txid; let vout = pending_deposit.outpoint.vout; + send_forget(&observation_tx, outpoint).await; let pending_deposit = pending_deposit.take(); let _ = pending_deposit .result_tx @@ -769,6 +890,74 @@ impl Monitor { } } +/// GC stale entries from the cache, then write bucket counts into the +/// `deposit_request_confirmations` gauge. Pure over its inputs so it can be +/// exercised directly from tests without constructing a full `Monitor`. +fn rebuild_confirmation_metrics_inner( + cache: &mut HashMap, + tip_height: u32, + metrics: &Metrics, +) { + let min_fresh = tip_height.saturating_sub(STALE_OBSERVATION_BLOCKS); + cache.retain(|_, entry| entry.last_updated_tip >= min_fresh); + + let mut counts = [0i64; crate::metrics::CONFIRMATION_STATUS_LABELS.len()]; + for entry in cache.values() { + let idx = match entry.observation { + CachedDepositObservation::NotFound => 0, + CachedDepositObservation::InMempool => 1, + CachedDepositObservation::InBlock { height } => { + let confirmations = (tip_height + 1).saturating_sub(height); + // Bucket 2 == "0 confirmations", through bucket 8 == "6_plus". + (2 + confirmations.min(6) as usize).min(8) + } + }; + counts[idx] += 1; + } + + for (i, label) in crate::metrics::CONFIRMATION_STATUS_LABELS + .iter() + .enumerate() + { + metrics + .deposit_request_confirmations + .with_label_values(&[label]) + .set(counts[i]); + } +} + +/// Best-effort send of a `RecordDepositObservation` message to the monitor's +/// own event loop. Silently drops if the channel is closed (shutdown). +async fn send_observation( + client_tx: &tokio::sync::mpsc::Sender, + outpoint: bitcoin::OutPoint, + observation: CachedDepositObservation, +) { + if let Err(e) = client_tx + .send(MonitorMessage::RecordDepositObservation { + outpoint, + observation, + }) + .await + { + debug!("Failed to record deposit observation (monitor shutting down?): {e}"); + } +} + +/// Best-effort send of a `ForgetDeposit` message to the monitor's own event +/// loop. Silently drops if the channel is closed (shutdown). +async fn send_forget( + client_tx: &tokio::sync::mpsc::Sender, + outpoint: bitcoin::OutPoint, +) { + if let Err(e) = client_tx + .send(MonitorMessage::ForgetDeposit(outpoint)) + .await + { + debug!("Failed to forget deposit observation (monitor shutting down?): {e}"); + } +} + #[derive(Debug)] struct PendingDeposit { outpoint: bitcoin::OutPoint, @@ -904,4 +1093,243 @@ enum MonitorMessage { // Query the status of a transaction (confirmed, in mempool, or not found). GetTransactionStatus(bitcoin::Txid, oneshot::Sender>), + + // Record an observation that a pending-deposit worker just made about a + // deposit's Bitcoin-side status. Updates `deposit_observation_cache` so + // that the next `rebuild_confirmation_metrics` can see the fresh entry. + RecordDepositObservation { + outpoint: bitcoin::OutPoint, + observation: CachedDepositObservation, + }, + + // Drop a deposit from `deposit_observation_cache` — typically because the + // deposit was successfully confirmed or permanently rejected, so it + // should leave the gauge immediately rather than wait for GC. + ForgetDeposit(bitcoin::OutPoint), +} + +#[cfg(test)] +mod tests { + use super::*; + use bitcoin::hashes::Hash; + + fn make_outpoint(seed: u8) -> bitcoin::OutPoint { + let mut bytes = [0u8; 32]; + bytes[0] = seed; + bitcoin::OutPoint { + txid: bitcoin::Txid::from_byte_array(bytes), + vout: 0, + } + } + + fn fresh_metrics() -> Metrics { + Metrics::new(&prometheus::Registry::new()) + } + + fn bucket(metrics: &Metrics, label: &str) -> i64 { + metrics + .deposit_request_confirmations + .with_label_values(&[label]) + .get() + } + + #[test] + fn empty_cache_zeros_all_labels() { + let metrics = fresh_metrics(); + let mut cache: HashMap = HashMap::new(); + + rebuild_confirmation_metrics_inner(&mut cache, 100, &metrics); + + for label in crate::metrics::CONFIRMATION_STATUS_LABELS { + assert_eq!(bucket(&metrics, label), 0, "label {label} should be zero"); + } + } + + #[test] + fn in_block_maps_to_confirmation_count_bucket() { + let metrics = fresh_metrics(); + let mut cache = HashMap::new(); + // Tip is 100, deposit is in block at height 98 → confirmations = 3 → bucket "2" + // Wait: confirmations = (100 + 1) - 98 = 3, bucket index 2 + 3 = 5 → "3". + cache.insert( + make_outpoint(1), + CachedDepositEntry { + observation: CachedDepositObservation::InBlock { height: 98 }, + last_updated_tip: 100, + }, + ); + + rebuild_confirmation_metrics_inner(&mut cache, 100, &metrics); + + assert_eq!(bucket(&metrics, "3"), 1); + assert_eq!(bucket(&metrics, "2"), 0); + assert_eq!(bucket(&metrics, "4"), 0); + } + + #[test] + fn tip_advance_shifts_bucket_without_fresh_messages() { + let metrics = fresh_metrics(); + let mut cache = HashMap::new(); + cache.insert( + make_outpoint(1), + CachedDepositEntry { + observation: CachedDepositObservation::InBlock { height: 100 }, + last_updated_tip: 100, + }, + ); + + // Tip 100: confirmations = (100+1) - 100 = 1 → bucket "1" + rebuild_confirmation_metrics_inner(&mut cache, 100, &metrics); + assert_eq!(bucket(&metrics, "1"), 1); + assert_eq!(bucket(&metrics, "2"), 0); + + // Tip 101: confirmations = (101+1) - 100 = 2 → bucket "2" + rebuild_confirmation_metrics_inner(&mut cache, 101, &metrics); + assert_eq!(bucket(&metrics, "1"), 0); + assert_eq!(bucket(&metrics, "2"), 1); + } + + #[test] + fn confirmations_at_or_above_six_land_in_6_plus() { + let metrics = fresh_metrics(); + let mut cache = HashMap::new(); + // Deposit at height 10, tip at 100 → 91 confirmations → 6_plus. + cache.insert( + make_outpoint(1), + CachedDepositEntry { + observation: CachedDepositObservation::InBlock { height: 10 }, + last_updated_tip: 100, + }, + ); + // Deposit at height 94, tip at 100 → 7 confirmations → 6_plus. + cache.insert( + make_outpoint(2), + CachedDepositEntry { + observation: CachedDepositObservation::InBlock { height: 94 }, + last_updated_tip: 100, + }, + ); + // Deposit at height 95, tip at 100 → 6 confirmations → 6_plus. + cache.insert( + make_outpoint(3), + CachedDepositEntry { + observation: CachedDepositObservation::InBlock { height: 95 }, + last_updated_tip: 100, + }, + ); + // Deposit at height 96, tip at 100 → 5 confirmations → "5". + cache.insert( + make_outpoint(4), + CachedDepositEntry { + observation: CachedDepositObservation::InBlock { height: 96 }, + last_updated_tip: 100, + }, + ); + + rebuild_confirmation_metrics_inner(&mut cache, 100, &metrics); + + assert_eq!(bucket(&metrics, "6_plus"), 3); + assert_eq!(bucket(&metrics, "5"), 1); + } + + #[test] + fn not_found_and_mempool_populate_their_labels() { + let metrics = fresh_metrics(); + let mut cache = HashMap::new(); + cache.insert( + make_outpoint(1), + CachedDepositEntry { + observation: CachedDepositObservation::NotFound, + last_updated_tip: 100, + }, + ); + cache.insert( + make_outpoint(2), + CachedDepositEntry { + observation: CachedDepositObservation::InMempool, + last_updated_tip: 100, + }, + ); + cache.insert( + make_outpoint(3), + CachedDepositEntry { + observation: CachedDepositObservation::InMempool, + last_updated_tip: 100, + }, + ); + + rebuild_confirmation_metrics_inner(&mut cache, 100, &metrics); + + assert_eq!(bucket(&metrics, "not_found"), 1); + assert_eq!(bucket(&metrics, "mempool"), 2); + } + + #[test] + fn stale_entries_are_gced() { + let metrics = fresh_metrics(); + let mut cache = HashMap::new(); + // Fresh entry (updated at tip-1). + cache.insert( + make_outpoint(1), + CachedDepositEntry { + observation: CachedDepositObservation::InBlock { height: 90 }, + last_updated_tip: 99, + }, + ); + // Stale entry (updated 20 blocks ago, threshold is 10). + cache.insert( + make_outpoint(2), + CachedDepositEntry { + observation: CachedDepositObservation::InBlock { height: 70 }, + last_updated_tip: 80, + }, + ); + + rebuild_confirmation_metrics_inner(&mut cache, 100, &metrics); + + assert_eq!(cache.len(), 1, "stale entry should be GC'd"); + assert!(cache.contains_key(&make_outpoint(1))); + // Fresh entry: tip 100 - block 90 + 1 = 11 confirmations → 6_plus. + assert_eq!(bucket(&metrics, "6_plus"), 1); + } + + #[test] + fn entry_on_gc_boundary_is_kept() { + let metrics = fresh_metrics(); + let mut cache = HashMap::new(); + // Entry exactly at the staleness boundary: tip - STALE_OBSERVATION_BLOCKS. + cache.insert( + make_outpoint(1), + CachedDepositEntry { + observation: CachedDepositObservation::InMempool, + last_updated_tip: 100 - STALE_OBSERVATION_BLOCKS, + }, + ); + + rebuild_confirmation_metrics_inner(&mut cache, 100, &metrics); + + assert_eq!(cache.len(), 1); + assert_eq!(bucket(&metrics, "mempool"), 1); + } + + #[test] + fn tip_zero_keeps_all_entries() { + // Bootstrap case: tip is not yet known (represented as 0). We should + // not GC just because tip is 0, otherwise every fresh observation + // would immediately be dropped before the first block arrives. + let metrics = fresh_metrics(); + let mut cache = HashMap::new(); + cache.insert( + make_outpoint(1), + CachedDepositEntry { + observation: CachedDepositObservation::InMempool, + last_updated_tip: 0, + }, + ); + + rebuild_confirmation_metrics_inner(&mut cache, 0, &metrics); + + assert_eq!(cache.len(), 1); + assert_eq!(bucket(&metrics, "mempool"), 1); + } } diff --git a/crates/hashi/src/metrics.rs b/crates/hashi/src/metrics.rs index 9f884fd0d..3b55f9bc2 100644 --- a/crates/hashi/src/metrics.rs +++ b/crates/hashi/src/metrics.rs @@ -50,6 +50,7 @@ pub struct Metrics { reconfig_in_progress: IntGauge, paused: IntGauge, deposit_queue_size: IntGauge, + pub deposit_request_confirmations: IntGaugeVec, withdrawal_queue_size: IntGaugeVec, withdrawal_queue_value: IntGaugeVec, utxo_pool_size: IntGaugeVec, @@ -81,6 +82,20 @@ const LATENCY_SEC_BUCKETS: &[f64] = &[ const MPC_SIGN_DURATION_BUCKETS: &[f64] = &[0.1, 0.25, 0.5, 1., 1.5, 2., 2.5, 3., 4., 5., 7.5, 10.]; +/// Bucket labels for `hashi_deposit_request_confirmations`. Exported so callers +/// that rebuild the gauge can enumerate every label. +pub const CONFIRMATION_STATUS_LABELS: &[&str] = &[ + "not_found", + "mempool", + "0", + "1", + "2", + "3", + "4", + "5", + "6_plus", +]; + impl Metrics { pub fn new_default() -> Self { Self::new(prometheus::default_registry()) @@ -216,6 +231,14 @@ impl Metrics { registry, ) .unwrap(), + deposit_request_confirmations: register_int_gauge_vec_with_registry!( + "hashi_deposit_request_confirmations", + "Pending deposit requests bucketed by their Bitcoin-side transaction status. \ + The `status` label is one of: not_found, mempool, 0, 1, 2, 3, 4, 5, 6_plus.", + &["status"], + registry, + ) + .unwrap(), withdrawal_queue_size: register_int_gauge_vec_with_registry!( "hashi_withdrawal_queue_size", "number of withdrawal requests by status",