diff --git a/crates/e2e-tests/src/lib.rs b/crates/e2e-tests/src/lib.rs index 1c4baff5f..9e91b74e4 100644 --- a/crates/e2e-tests/src/lib.rs +++ b/crates/e2e-tests/src/lib.rs @@ -655,9 +655,15 @@ mod tests { .map(|node| { let signing_manager = node.hashi().signing_manager(); let onchain_state = node.hashi().onchain_state().clone(); - let p2p_channel = hashi::mpc::rpc::RpcP2PChannel::new(onchain_state, epoch); + let p2p_channel = hashi::mpc::rpc::RpcP2PChannel::new( + onchain_state, + epoch, + hashi::metrics::MPC_LABEL_SIGNING, + node.hashi().metrics.clone(), + ); let beacon = beacon_value; let message = message.to_vec(); + let metrics = node.hashi().metrics.clone(); async move { hashi::mpc::SigningManager::sign( &signing_manager, @@ -668,6 +674,7 @@ mod tests { &beacon, None, SIGNING_TIMEOUT, + &metrics, ) .await } diff --git a/crates/hashi/src/grpc/mod.rs b/crates/hashi/src/grpc/mod.rs index e9115f7cb..dcc431969 100644 --- a/crates/hashi/src/grpc/mod.rs +++ b/crates/hashi/src/grpc/mod.rs @@ -39,6 +39,10 @@ impl HttpService { Self { inner: hashi } } + pub(crate) fn metrics(&self) -> &crate::metrics::Metrics { + &self.inner.metrics + } + pub async fn start(self) -> (std::net::SocketAddr, Service) { let router = { let max_decoding_message_size = self.inner.config.grpc_max_decoding_message_size(); diff --git a/crates/hashi/src/metrics.rs b/crates/hashi/src/metrics.rs index 9f884fd0d..29851ab18 100644 --- a/crates/hashi/src/metrics.rs +++ b/crates/hashi/src/metrics.rs @@ -73,6 +73,30 @@ pub struct Metrics { pub mpc_sign_duration_seconds: HistogramVec, pub mpc_sign_failures_total: IntCounterVec, + + // MPC profiling metrics + pub mpc_reconfig_total_duration_seconds: HistogramVec, + pub mpc_end_reconfig_duration_seconds: HistogramVec, + pub mpc_prepare_signing_duration_seconds: HistogramVec, + pub mpc_total_duration_seconds: HistogramVec, + pub mpc_dealer_crypto_duration_seconds: HistogramVec, + pub mpc_p2p_broadcast_duration_seconds: HistogramVec, + pub mpc_cert_publish_duration_seconds: HistogramVec, + pub mpc_tob_poll_duration_seconds: HistogramVec, + pub mpc_cert_verify_duration_seconds: HistogramVec, + pub mpc_message_process_duration_seconds: HistogramVec, + pub mpc_message_retrieval_duration_seconds: HistogramVec, + pub mpc_complaint_recovery_duration_seconds: HistogramVec, + pub mpc_completion_duration_seconds: HistogramVec, + pub mpc_presig_conversion_duration_seconds: HistogramVec, + pub mpc_rotation_prepare_previous_duration_seconds: HistogramVec, + pub mpc_sign_partial_gen_duration_seconds: HistogramVec, + pub mpc_sign_collection_duration_seconds: HistogramVec, + pub mpc_sign_aggregation_duration_seconds: HistogramVec, + pub mpc_rpc_handler_process_duration_seconds: HistogramVec, + pub mpc_bytes_sent_total: IntCounterVec, + pub mpc_bytes_received_total: IntCounterVec, + pub mpc_p2p_message_size_bytes: HistogramVec, } const LATENCY_SEC_BUCKETS: &[f64] = &[ @@ -81,6 +105,19 @@ const LATENCY_SEC_BUCKETS: &[f64] = &[ const MPC_SIGN_DURATION_BUCKETS: &[f64] = &[0.1, 0.25, 0.5, 1., 1.5, 2., 2.5, 3., 4., 5., 7.5, 10.]; +pub const MPC_LABEL_DKG: &str = "dkg"; +pub const MPC_LABEL_KEY_ROTATION: &str = "key_rotation"; +pub const MPC_LABEL_NONCE_GEN: &str = "nonce_gen"; +pub const MPC_LABEL_SIGNING: &str = "signing"; + +const MPC_PROTOCOL_DURATION_BUCKETS: &[f64] = &[0.1, 0.25, 0.5, 1., 2., 5., 10., 20., 30., 60.]; + +const MPC_PHASE_DURATION_BUCKETS: &[f64] = + &[0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1., 2., 5., 10.]; + +const MPC_MESSAGE_SIZE_BUCKETS: &[f64] = + &[1024., 4096., 16384., 65536., 262144., 1048576., 4194304.]; + impl Metrics { pub fn new_default() -> Self { Self::new(prometheus::default_registry()) @@ -343,6 +380,190 @@ impl Metrics { registry, ) .unwrap(), + + // MPC profiling: reconfig-level + mpc_reconfig_total_duration_seconds: register_histogram_vec_with_registry!( + "hashi_mpc_reconfig_total_duration_seconds", + "Duration of full handle_reconfig", + &["protocol"], + MPC_PROTOCOL_DURATION_BUCKETS.to_vec(), + registry, + ) + .unwrap(), + mpc_end_reconfig_duration_seconds: register_histogram_vec_with_registry!( + "hashi_mpc_end_reconfig_duration_seconds", + "Duration of submit_end_reconfig", + &["protocol"], + MPC_PROTOCOL_DURATION_BUCKETS.to_vec(), + registry, + ) + .unwrap(), + mpc_prepare_signing_duration_seconds: register_histogram_vec_with_registry!( + "hashi_mpc_prepare_signing_duration_seconds", + "Duration of prepare_signing", + &["protocol"], + MPC_PROTOCOL_DURATION_BUCKETS.to_vec(), + registry, + ) + .unwrap(), + + // MPC profiling: per-phase (labeled by protocol) + mpc_total_duration_seconds: register_histogram_vec_with_registry!( + "hashi_mpc_total_duration_seconds", + "End-to-end duration of MPC protocol", + &["protocol"], + MPC_PROTOCOL_DURATION_BUCKETS.to_vec(), + registry, + ) + .unwrap(), + mpc_dealer_crypto_duration_seconds: register_histogram_vec_with_registry!( + "hashi_mpc_dealer_crypto_duration_seconds", + "Duration of dealer crypto", + &["protocol"], + MPC_PHASE_DURATION_BUCKETS.to_vec(), + registry, + ) + .unwrap(), + mpc_p2p_broadcast_duration_seconds: register_histogram_vec_with_registry!( + "hashi_mpc_p2p_broadcast_duration_seconds", + "Duration of send_to_many", + &["protocol"], + MPC_PHASE_DURATION_BUCKETS.to_vec(), + registry, + ) + .unwrap(), + mpc_cert_publish_duration_seconds: register_histogram_vec_with_registry!( + "hashi_mpc_cert_publish_duration_seconds", + "Duration of tob_channel.publish", + &["protocol"], + MPC_PHASE_DURATION_BUCKETS.to_vec(), + registry, + ) + .unwrap(), + mpc_tob_poll_duration_seconds: register_histogram_vec_with_registry!( + "hashi_mpc_tob_poll_duration_seconds", + "Duration of tob_channel.receive", + &["protocol"], + MPC_PHASE_DURATION_BUCKETS.to_vec(), + registry, + ) + .unwrap(), + mpc_cert_verify_duration_seconds: register_histogram_vec_with_registry!( + "hashi_mpc_cert_verify_duration_seconds", + "Duration of BLS certificate signature verification", + &["protocol"], + MPC_PHASE_DURATION_BUCKETS.to_vec(), + registry, + ) + .unwrap(), + mpc_message_process_duration_seconds: register_histogram_vec_with_registry!( + "hashi_mpc_message_process_duration_seconds", + "Duration of AVSS message processing", + &["protocol"], + MPC_PHASE_DURATION_BUCKETS.to_vec(), + registry, + ) + .unwrap(), + mpc_message_retrieval_duration_seconds: register_histogram_vec_with_registry!( + "hashi_mpc_message_retrieval_duration_seconds", + "Duration of retrieve_dealer_message", + &["protocol"], + MPC_PHASE_DURATION_BUCKETS.to_vec(), + registry, + ) + .unwrap(), + mpc_complaint_recovery_duration_seconds: register_histogram_vec_with_registry!( + "hashi_mpc_complaint_recovery_duration_seconds", + "Duration of complaint recovery", + &["protocol"], + MPC_PHASE_DURATION_BUCKETS.to_vec(), + registry, + ) + .unwrap(), + mpc_completion_duration_seconds: register_histogram_vec_with_registry!( + "hashi_mpc_completion_duration_seconds", + "Duration of final aggregation", + &["protocol"], + MPC_PHASE_DURATION_BUCKETS.to_vec(), + registry, + ) + .unwrap(), + mpc_presig_conversion_duration_seconds: register_histogram_vec_with_registry!( + "hashi_mpc_presig_conversion_duration_seconds", + "Duration of Presignatures::new", + &["protocol"], + MPC_PHASE_DURATION_BUCKETS.to_vec(), + registry, + ) + .unwrap(), + mpc_rotation_prepare_previous_duration_seconds: register_histogram_vec_with_registry!( + "hashi_mpc_rotation_prepare_previous_duration_seconds", + "Duration of prepare_previous_output", + &["protocol"], + MPC_PHASE_DURATION_BUCKETS.to_vec(), + registry, + ) + .unwrap(), + + // MPC profiling: signing phase breakdown + mpc_sign_partial_gen_duration_seconds: register_histogram_vec_with_registry!( + "hashi_mpc_sign_partial_gen_duration_seconds", + "Duration of generate_partial_signatures", + &["protocol"], + MPC_PHASE_DURATION_BUCKETS.to_vec(), + registry, + ) + .unwrap(), + mpc_sign_collection_duration_seconds: register_histogram_vec_with_registry!( + "hashi_mpc_sign_collection_duration_seconds", + "Duration of P2P partial signature collection from peers", + &["protocol"], + MPC_PHASE_DURATION_BUCKETS.to_vec(), + registry, + ) + .unwrap(), + mpc_sign_aggregation_duration_seconds: register_histogram_vec_with_registry!( + "hashi_mpc_sign_aggregation_duration_seconds", + "Duration of aggregate_signatures / RS recovery", + &["protocol"], + MPC_PHASE_DURATION_BUCKETS.to_vec(), + registry, + ) + .unwrap(), + + // MPC profiling: RPC handler + mpc_rpc_handler_process_duration_seconds: register_histogram_vec_with_registry!( + "hashi_mpc_rpc_handler_process_duration_seconds", + "Duration of process_message in RPC handler", + &["protocol"], + MPC_PHASE_DURATION_BUCKETS.to_vec(), + registry, + ) + .unwrap(), + + // MPC profiling: communication volume + mpc_bytes_sent_total: register_int_counter_vec_with_registry!( + "hashi_mpc_bytes_sent_total", + "Total bytes sent in MPC P2P messages", + &["protocol"], + registry, + ) + .unwrap(), + mpc_bytes_received_total: register_int_counter_vec_with_registry!( + "hashi_mpc_bytes_received_total", + "Total bytes received in MPC P2P messages", + &["protocol"], + registry, + ) + .unwrap(), + mpc_p2p_message_size_bytes: register_histogram_vec_with_registry!( + "hashi_mpc_p2p_message_size_bytes", + "Size of each MPC P2P message sent (bytes)", + &["protocol"], + MPC_MESSAGE_SIZE_BUCKETS.to_vec(), + registry, + ) + .unwrap(), } } diff --git a/crates/hashi/src/mpc/mpc_except_signing.rs b/crates/hashi/src/mpc/mpc_except_signing.rs index c8377c2e7..7b448bccb 100644 --- a/crates/hashi/src/mpc/mpc_except_signing.rs +++ b/crates/hashi/src/mpc/mpc_except_signing.rs @@ -7,6 +7,10 @@ use crate::communication::send_to_many; use crate::communication::with_timeout_and_retry; use crate::constants::SUI_MAINNET_CHAIN_ID; use crate::constants::SUI_TESTNET_CHAIN_ID; +use crate::metrics::MPC_LABEL_DKG; +use crate::metrics::MPC_LABEL_KEY_ROTATION; +use crate::metrics::MPC_LABEL_NONCE_GEN; +use crate::metrics::Metrics; use crate::mpc::types::CertificateV1; pub use crate::mpc::types::ComplainRequest; pub use crate::mpc::types::ComplaintResponses; @@ -499,6 +503,7 @@ impl MpcManager { mpc_manager: &Arc>, p2p_channel: &impl P2PChannel, tob_channel: &mut impl OrderedBroadcastChannel, + metrics: &Metrics, ) -> MpcResult { let certified = tob_channel.certified_dealers().await; let (certified_reduced_weight, threshold) = { @@ -517,11 +522,12 @@ impl MpcManager { (weight, mgr.mpc_config.threshold as u32) }; if certified_reduced_weight < threshold - && let Err(e) = Self::run_dkg_as_dealer(mpc_manager, p2p_channel, tob_channel).await + && let Err(e) = + Self::run_dkg_as_dealer(mpc_manager, p2p_channel, tob_channel, metrics).await { tracing::error!("Dealer phase failed: {}. Continuing as party only.", e); } - Self::run_dkg_as_party(mpc_manager, p2p_channel, tob_channel).await + Self::run_dkg_as_party(mpc_manager, p2p_channel, tob_channel, metrics).await } pub async fn run_key_rotation( @@ -529,10 +535,16 @@ impl MpcManager { previous_certificates: &[CertificateV1], p2p_channel: &impl P2PChannel, ordered_broadcast_channel: &mut impl OrderedBroadcastChannel, + metrics: &Metrics, ) -> MpcResult { tracing::info!("run_key_rotation: starting prepare_previous_output"); + let _timer = metrics + .mpc_rotation_prepare_previous_duration_seconds + .with_label_values(&[MPC_LABEL_KEY_ROTATION]) + .start_timer(); let (previous, is_member_of_previous_committee) = Self::prepare_previous_output(mpc_manager, previous_certificates, p2p_channel).await?; + drop(_timer); tracing::info!( "run_key_rotation: prepare_previous_output complete, \ is_member={is_member_of_previous_committee}", @@ -591,6 +603,7 @@ impl MpcManager { &previous, p2p_channel, ordered_broadcast_channel, + metrics, ) .await { @@ -605,6 +618,7 @@ impl MpcManager { &previous, p2p_channel, ordered_broadcast_channel, + metrics, ) .await } @@ -614,6 +628,7 @@ impl MpcManager { batch_index: u32, p2p_channel: &impl P2PChannel, tob_channel: &mut impl OrderedBroadcastChannel, + metrics: &Metrics, ) -> MpcResult> { // Clear stale state from previous batch. { @@ -641,8 +656,14 @@ impl MpcManager { (weight, mgr.required_nonce_weight()) }; if certified_reduced_weight < required_reduced_weight - && let Err(e) = - Self::run_as_nonce_dealer(mpc_manager, batch_index, p2p_channel, tob_channel).await + && let Err(e) = Self::run_as_nonce_dealer( + mpc_manager, + batch_index, + p2p_channel, + tob_channel, + metrics, + ) + .await { tracing::error!( "Nonce dealer phase failed: {}. Continuing as party only.", @@ -650,7 +671,8 @@ impl MpcManager { ); } let certified = - Self::run_as_nonce_party(mpc_manager, batch_index, p2p_channel, tob_channel).await?; + Self::run_as_nonce_party(mpc_manager, batch_index, p2p_channel, tob_channel, metrics) + .await?; let mut mgr = mpc_manager.write().unwrap(); // Keep only the outputs selected by the party phase. The RPC handler's // `try_sign_nonce_message` may have inserted additional outputs @@ -736,8 +758,13 @@ impl MpcManager { mpc_manager: &Arc>, p2p_channel: &impl P2PChannel, tob_channel: &mut impl OrderedBroadcastChannel, + metrics: &Metrics, ) -> MpcResult<()> { // TODO(Optimization): Skip dealer phase if certificate is already on TOB + let _timer = metrics + .mpc_dealer_crypto_duration_seconds + .with_label_values(&[MPC_LABEL_DKG]) + .start_timer(); let dealer_data = { let mgr = Arc::clone(mpc_manager); spawn_blocking(move || { @@ -747,6 +774,7 @@ impl MpcManager { }) .await? }; + drop(_timer); let mut aggregator = BlsSignatureAggregator::new_with_reduced_weights( &dealer_data.committee, dealer_data.messages_hash.clone(), @@ -755,12 +783,17 @@ impl MpcManager { aggregator .add_signature(dealer_data.my_signature) .expect("first signature should always be valid"); + let _timer = metrics + .mpc_p2p_broadcast_duration_seconds + .with_label_values(&[MPC_LABEL_DKG]) + .start_timer(); let results = send_to_many( dealer_data.recipients.iter().copied(), dealer_data.request, |addr, req| async move { p2p_channel.send_messages(&addr, &req).await }, ) .await; + drop(_timer); for (addr, result) in results { match result { Ok(response) => { @@ -776,11 +809,16 @@ impl MpcManager { .finish() .expect("signatures should always be valid"); let cert = CertificateV1::Dkg(dkg_cert); + let _timer = metrics + .mpc_cert_publish_duration_seconds + .with_label_values(&[MPC_LABEL_DKG]) + .start_timer(); with_timeout_and_retry(|| tob_channel.publish(cert.clone())) .await .map_err(|e| { MpcError::BroadcastError(format!("{}: {}", ERR_PUBLISH_CERT_FAILED, e)) })?; + drop(_timer); } Ok(()) } @@ -789,6 +827,7 @@ impl MpcManager { mpc_manager: &Arc>, p2p_channel: &impl P2PChannel, tob_channel: &mut impl OrderedBroadcastChannel, + metrics: &Metrics, ) -> MpcResult { let threshold = { let mgr = mpc_manager.read().unwrap(); @@ -800,10 +839,15 @@ impl MpcManager { if dealer_weight_sum >= threshold { break; } + let _timer = metrics + .mpc_tob_poll_duration_seconds + .with_label_values(&[MPC_LABEL_DKG]) + .start_timer(); let cert = tob_channel .receive() .await .map_err(|e| MpcError::BroadcastError(e.to_string()))?; + drop(_timer); let CertificateV1::Dkg(dkg_cert) = cert else { continue; }; @@ -813,6 +857,10 @@ impl MpcManager { continue; } { + let _timer = metrics + .mpc_cert_verify_duration_seconds + .with_label_values(&[MPC_LABEL_DKG]) + .start_timer(); let mgr = Arc::clone(mpc_manager); let cert = dkg_cert.clone(); let verified = spawn_blocking(move || { @@ -820,6 +868,7 @@ impl MpcManager { mgr.committee.verify_signature(&cert) }) .await; + drop(_timer); if let Err(e) = verified { tracing::info!("Invalid certificate signature from {:?}: {}", &dealer, e); continue; @@ -840,6 +889,10 @@ impl MpcManager { "Certificate from dealer {:?} received but message missing or hash mismatch, retrieving from signers", &dealer ); + let _timer = metrics + .mpc_message_retrieval_duration_seconds + .with_label_values(&[MPC_LABEL_DKG]) + .start_timer(); Self::retrieve_dealer_message(mpc_manager, message, &dkg_cert, p2p_channel) .await .map_err(|e| { @@ -850,6 +903,7 @@ impl MpcManager { ); e })?; + drop(_timer); // Delete stale output from the RPC handler so the party phase // reprocesses with the retrieved (certified) message. mpc_manager @@ -858,6 +912,10 @@ impl MpcManager { .dealer_outputs .remove(&DealerOutputsKey::Dkg(dealer)); } + let _timer = metrics + .mpc_message_process_duration_seconds + .with_label_values(&[MPC_LABEL_DKG]) + .start_timer(); let has_complaint = { let mgr = Arc::clone(mpc_manager); spawn_blocking(move || { @@ -878,18 +936,23 @@ impl MpcManager { }) .await? }; + drop(_timer); if has_complaint { tracing::info!( "DKG complaint detected for dealer {:?}, recovering via Complain RPC", dealer ); - let signers = { + let _timer = metrics + .mpc_complaint_recovery_duration_seconds + .with_label_values(&[MPC_LABEL_DKG]) + .start_timer(); + let (signers, epoch) = { let mgr = mpc_manager.read().unwrap(); - dkg_cert + let signers = dkg_cert .signers(&mgr.committee) - .expect("certificate verified above") + .expect("certificate verified above"); + (signers, mgr.mpc_config.epoch) }; - let epoch = mpc_manager.read().unwrap().mpc_config.epoch; let recovered = Self::recover_shares_via_complaint( mpc_manager, &dealer, @@ -905,6 +968,7 @@ impl MpcManager { mgr.complaints_to_process .remove(&ComplaintsToProcessKey::Dkg(dealer)); } + drop(_timer); } let dealer_weight = { let mgr = mpc_manager.read().unwrap(); @@ -928,6 +992,10 @@ impl MpcManager { dealer_weight_sum += dealer_weight as u32; certified_dealers.insert(dealer); } + let _timer = metrics + .mpc_completion_duration_seconds + .with_label_values(&[MPC_LABEL_DKG]) + .start_timer(); let output = { let mgr = Arc::clone(mpc_manager); spawn_blocking(move || { @@ -936,6 +1004,7 @@ impl MpcManager { }) .await? }; + drop(_timer); Ok(output) } @@ -944,8 +1013,13 @@ impl MpcManager { previous: &MpcOutput, p2p_channel: &impl P2PChannel, ordered_broadcast_channel: &mut impl OrderedBroadcastChannel, + metrics: &Metrics, ) -> MpcResult<()> { // TODO(Optimization): Skip dealer phase if certificate is already on TOB + let _timer = metrics + .mpc_dealer_crypto_duration_seconds + .with_label_values(&[MPC_LABEL_KEY_ROTATION]) + .start_timer(); let dealer_data = { let mgr = Arc::clone(mpc_manager); let previous = previous.clone(); @@ -956,6 +1030,7 @@ impl MpcManager { }) .await? }; + drop(_timer); let mut aggregator = BlsSignatureAggregator::new_with_reduced_weights( &dealer_data.committee, dealer_data.messages_hash.clone(), @@ -964,12 +1039,17 @@ impl MpcManager { aggregator .add_signature(dealer_data.my_signature) .expect("first signature should always be valid"); + let _timer = metrics + .mpc_p2p_broadcast_duration_seconds + .with_label_values(&[MPC_LABEL_KEY_ROTATION]) + .start_timer(); let results = send_to_many( dealer_data.recipients.iter().copied(), dealer_data.request, |addr, req| async move { p2p_channel.send_messages(&addr, &req).await }, ) .await; + drop(_timer); for (addr, result) in results { match result { Ok(response) => { @@ -988,11 +1068,16 @@ impl MpcManager { .finish() .expect("signatures should always be valid"); let cert = CertificateV1::Rotation(rotation_cert); + let _timer = metrics + .mpc_cert_publish_duration_seconds + .with_label_values(&[MPC_LABEL_KEY_ROTATION]) + .start_timer(); with_timeout_and_retry(|| ordered_broadcast_channel.publish(cert.clone())) .await .map_err(|e| { MpcError::BroadcastError(format!("{}: {}", ERR_PUBLISH_CERT_FAILED, e)) })?; + drop(_timer); } Ok(()) } @@ -1002,6 +1087,7 @@ impl MpcManager { previous: &MpcOutput, p2p_channel: &impl P2PChannel, ordered_broadcast_channel: &mut impl OrderedBroadcastChannel, + metrics: &Metrics, ) -> MpcResult { let mut certified_share_indices: Vec = Vec::new(); let mut certified_dealers = HashSet::new(); @@ -1013,10 +1099,15 @@ impl MpcManager { if certified_share_indices.len() >= previous.threshold as usize { break; } + let _timer = metrics + .mpc_tob_poll_duration_seconds + .with_label_values(&[MPC_LABEL_KEY_ROTATION]) + .start_timer(); let cert = ordered_broadcast_channel .receive() .await .map_err(|e| MpcError::BroadcastError(e.to_string()))?; + drop(_timer); let CertificateV1::Rotation(rotation_cert) = cert else { continue; }; @@ -1026,6 +1117,10 @@ impl MpcManager { continue; } { + let _timer = metrics + .mpc_cert_verify_duration_seconds + .with_label_values(&[MPC_LABEL_KEY_ROTATION]) + .start_timer(); let mgr = Arc::clone(mpc_manager); let cert = rotation_cert.clone(); let verified = spawn_blocking(move || { @@ -1033,6 +1128,7 @@ impl MpcManager { mgr.committee.verify_signature(&cert) }) .await; + drop(_timer); if let Err(e) = verified { tracing::info!( "Invalid rotation certificate signature from {:?}: {}", @@ -1078,6 +1174,10 @@ impl MpcManager { "Rotation messages from dealer {:?} not available or hash mismatch, retrieving from signers", dealer ); + let _timer = metrics + .mpc_message_retrieval_duration_seconds + .with_label_values(&[MPC_LABEL_KEY_ROTATION]) + .start_timer(); Self::retrieve_rotation_messages(mpc_manager, message, &rotation_cert, p2p_channel) .await .map_err(|e| { @@ -1088,6 +1188,7 @@ impl MpcManager { ); e })?; + drop(_timer); // Delete stale outputs from the RPC handler so the party phase // reprocesses with the retrieved (certified) messages. { @@ -1098,6 +1199,10 @@ impl MpcManager { } } { + let _timer = metrics + .mpc_message_process_duration_seconds + .with_label_values(&[MPC_LABEL_KEY_ROTATION]) + .start_timer(); let mgr = Arc::clone(mpc_manager); let previous = previous.clone(); let share_indices = dealer_share_indices.clone(); @@ -1115,14 +1220,19 @@ impl MpcManager { Ok::<_, MpcError>(()) }) .await?; + drop(_timer); } - let signers = { + let (signers, epoch) = { let mgr = mpc_manager.read().unwrap(); - rotation_cert + let signers = rotation_cert .signers(&mgr.committee) - .expect("certificate verified above") + .expect("certificate verified above"); + (signers, mgr.mpc_config.epoch) }; - let epoch = mpc_manager.read().unwrap().mpc_config.epoch; + let _timer = metrics + .mpc_complaint_recovery_duration_seconds + .with_label_values(&[MPC_LABEL_KEY_ROTATION]) + .start_timer(); let recovered = Self::recover_rotation_shares_via_complaints( mpc_manager, &dealer, @@ -1141,6 +1251,7 @@ impl MpcManager { .remove(&ComplaintsToProcessKey::Rotation(dealer, share_index)); } } + drop(_timer); // Only add indices that have outputs (avoids adding indices for // dealers with empty rotation messages, e.g. a node that rejoined // with no shares from the new-member fallback). @@ -1165,6 +1276,10 @@ impl MpcManager { ); } tracing::info!("run_key_rotation_as_party: threshold met, calling complete_key_rotation",); + let _timer = metrics + .mpc_completion_duration_seconds + .with_label_values(&[MPC_LABEL_KEY_ROTATION]) + .start_timer(); let output = { let mgr = Arc::clone(mpc_manager); let previous = previous.clone(); @@ -1174,6 +1289,7 @@ impl MpcManager { }) .await? }; + drop(_timer); Ok(output) } @@ -1182,7 +1298,12 @@ impl MpcManager { batch_index: u32, p2p_channel: &impl P2PChannel, tob_channel: &mut impl OrderedBroadcastChannel, + metrics: &Metrics, ) -> MpcResult<()> { + let _timer = metrics + .mpc_dealer_crypto_duration_seconds + .with_label_values(&[MPC_LABEL_NONCE_GEN]) + .start_timer(); let dealer_data = { let mgr = Arc::clone(mpc_manager); spawn_blocking(move || { @@ -1192,6 +1313,7 @@ impl MpcManager { }) .await? }; + drop(_timer); let mut aggregator = BlsSignatureAggregator::new_with_reduced_weights( &dealer_data.committee, dealer_data.messages_hash.clone(), @@ -1200,12 +1322,17 @@ impl MpcManager { aggregator .add_signature(dealer_data.my_signature) .expect("first signature should always be valid"); + let _timer = metrics + .mpc_p2p_broadcast_duration_seconds + .with_label_values(&[MPC_LABEL_NONCE_GEN]) + .start_timer(); let results = send_to_many( dealer_data.recipients.iter().copied(), dealer_data.request, |addr, req| async move { p2p_channel.send_messages(&addr, &req).await }, ) .await; + drop(_timer); for (addr, result) in results { match result { Ok(response) => { @@ -1224,11 +1351,16 @@ impl MpcManager { batch_index, cert: nonce_cert, }; + let _timer = metrics + .mpc_cert_publish_duration_seconds + .with_label_values(&[MPC_LABEL_NONCE_GEN]) + .start_timer(); with_timeout_and_retry(|| tob_channel.publish(cert.clone())) .await .map_err(|e| { MpcError::BroadcastError(format!("{}: {}", ERR_PUBLISH_CERT_FAILED, e)) })?; + drop(_timer); } Ok(()) } @@ -1238,6 +1370,7 @@ impl MpcManager { batch_index: u32, p2p_channel: &impl P2PChannel, tob_channel: &mut impl OrderedBroadcastChannel, + metrics: &Metrics, ) -> MpcResult> { let required_weight = { let mgr = mpc_manager.read().unwrap(); @@ -1249,10 +1382,15 @@ impl MpcManager { if dealer_weight_sum >= required_weight { break; } + let _timer = metrics + .mpc_tob_poll_duration_seconds + .with_label_values(&[MPC_LABEL_NONCE_GEN]) + .start_timer(); let cert = tob_channel .receive() .await .map_err(|e| MpcError::BroadcastError(e.to_string()))?; + drop(_timer); let CertificateV1::NonceGeneration { cert: nonce_cert, .. } = cert @@ -1265,6 +1403,10 @@ impl MpcManager { continue; } { + let _timer = metrics + .mpc_cert_verify_duration_seconds + .with_label_values(&[MPC_LABEL_NONCE_GEN]) + .start_timer(); let mgr = Arc::clone(mpc_manager); let cert = nonce_cert.clone(); let verified = spawn_blocking(move || { @@ -1272,6 +1414,7 @@ impl MpcManager { mgr.committee.verify_signature(&cert) }) .await; + drop(_timer); if let Err(e) = verified { tracing::info!( "Invalid nonce certificate signature from {:?}: {}", @@ -1290,6 +1433,10 @@ impl MpcManager { "Nonce message for dealer {:?} not found in memory or DB, retrieving from signers", &dealer ); + let _timer = metrics + .mpc_message_retrieval_duration_seconds + .with_label_values(&[MPC_LABEL_NONCE_GEN]) + .start_timer(); Self::retrieve_nonce_message( mpc_manager, message, @@ -1306,6 +1453,7 @@ impl MpcManager { ); e })?; + drop(_timer); // Delete stale output from the RPC handler so the party phase // reprocesses with the retrieved (certified) message. mpc_manager @@ -1314,6 +1462,10 @@ impl MpcManager { .dealer_nonce_outputs .remove(&dealer); } + let _timer = metrics + .mpc_message_process_duration_seconds + .with_label_values(&[MPC_LABEL_NONCE_GEN]) + .start_timer(); let has_complaint = { let mgr = Arc::clone(mpc_manager); spawn_blocking(move || { @@ -1332,18 +1484,23 @@ impl MpcManager { }) .await? }; + drop(_timer); if has_complaint { tracing::info!( "Nonce gen complaint detected for dealer {:?}, recovering via Complain RPC", dealer ); - let signers = { + let _timer = metrics + .mpc_complaint_recovery_duration_seconds + .with_label_values(&[MPC_LABEL_NONCE_GEN]) + .start_timer(); + let (signers, epoch) = { let mgr = mpc_manager.read().unwrap(); - nonce_cert + let signers = nonce_cert .signers(&mgr.committee) - .expect("certificate verified above") + .expect("certificate verified above"); + (signers, mgr.mpc_config.epoch) }; - let epoch = mpc_manager.read().unwrap().mpc_config.epoch; Self::recover_nonce_shares_via_complaint( mpc_manager, &dealer, @@ -1352,6 +1509,7 @@ impl MpcManager { epoch, ) .await?; + drop(_timer); } let dealer_weight = { let mgr = mpc_manager.read().unwrap(); diff --git a/crates/hashi/src/mpc/mpc_except_signing_tests.rs b/crates/hashi/src/mpc/mpc_except_signing_tests.rs index a32aaf573..639851546 100644 --- a/crates/hashi/src/mpc/mpc_except_signing_tests.rs +++ b/crates/hashi/src/mpc/mpc_except_signing_tests.rs @@ -3,6 +3,11 @@ use super::*; use crate::communication::ChannelResult; +use crate::metrics::Metrics; + +fn test_metrics() -> Metrics { + Metrics::new(&prometheus::Registry::new()) +} use crate::mpc::types::GetPartialSignaturesRequest; use crate::mpc::types::GetPartialSignaturesResponse; use crate::mpc::types::ProtocolType; @@ -1410,12 +1415,13 @@ async fn test_run_dkg() { let test_manager = Arc::new(RwLock::new(test_manager)); // Call run_as_dealer() and run_as_party() for validator 0 - MpcManager::run_dkg_as_dealer(&test_manager, &mock_p2p, &mut mock_tob) - .await - .unwrap(); - let output = MpcManager::run_dkg_as_party(&test_manager, &mock_p2p, &mut mock_tob) + MpcManager::run_dkg_as_dealer(&test_manager, &mock_p2p, &mut mock_tob, &test_metrics()) .await .unwrap(); + let output = + MpcManager::run_dkg_as_party(&test_manager, &mock_p2p, &mut mock_tob, &test_metrics()) + .await + .unwrap(); // Verify validator 0 received the correct number of key shares based on its weight assert_eq!( @@ -1535,12 +1541,13 @@ async fn test_run_dkg_with_complaint_recovery() { let test_manager = Arc::new(RwLock::new(test_manager)); - MpcManager::run_dkg_as_dealer(&test_manager, &mock_p2p, &mut mock_tob) - .await - .unwrap(); - let output = MpcManager::run_dkg_as_party(&test_manager, &mock_p2p, &mut mock_tob) + MpcManager::run_dkg_as_dealer(&test_manager, &mock_p2p, &mut mock_tob, &test_metrics()) .await .unwrap(); + let output = + MpcManager::run_dkg_as_party(&test_manager, &mock_p2p, &mut mock_tob, &test_metrics()) + .await + .unwrap(); // Verify output is valid despite cheating dealer assert_eq!( @@ -1636,9 +1643,14 @@ async fn test_run_triggers_dealer_phase() { let mut mock_tob = MockOrderedBroadcastChannel::new(setup.certificates) .with_override_certified_dealers(vec![]); - let output = MpcManager::run_dkg(&setup.test_manager, &setup.mock_p2p, &mut mock_tob) - .await - .unwrap(); + let output = MpcManager::run_dkg( + &setup.test_manager, + &setup.mock_p2p, + &mut mock_tob, + &test_metrics(), + ) + .await + .unwrap(); // Verify dealer published a certificate assert!( @@ -1658,9 +1670,14 @@ async fn test_run_skips_dealer_phase() { // With 4 certificates and threshold = 2, existing_weight = 4 >= 2, dealer skips let mut mock_tob = MockOrderedBroadcastChannel::new(setup.certificates); - let output = MpcManager::run_dkg(&setup.test_manager, &setup.mock_p2p, &mut mock_tob) - .await - .unwrap(); + let output = MpcManager::run_dkg( + &setup.test_manager, + &setup.mock_p2p, + &mut mock_tob, + &test_metrics(), + ) + .await + .unwrap(); // Verify dealer did NOT publish (skipped) assert_eq!( @@ -1684,9 +1701,14 @@ async fn test_run_dealer_failure_party_still_executes() { .with_override_certified_dealers(vec![]) .with_fail_on_publish("simulated publish failure"); - let output = MpcManager::run_dkg(&setup.test_manager, &setup.mock_p2p, &mut mock_tob) - .await - .unwrap(); + let output = MpcManager::run_dkg( + &setup.test_manager, + &setup.mock_p2p, + &mut mock_tob, + &test_metrics(), + ) + .await + .unwrap(); // Verify DKG completed successfully (party phase executed despite dealer failure) assert_eq!(output.key_shares.shares.len(), 1); @@ -1713,7 +1735,9 @@ async fn test_run_as_dealer_success() { let mut mock_tob = MockOrderedBroadcastChannel::new(Vec::new()); // Call run_as_dealer() - let result = MpcManager::run_dkg_as_dealer(&test_manager, &mock_p2p, &mut mock_tob).await; + let result = + MpcManager::run_dkg_as_dealer(&test_manager, &mock_p2p, &mut mock_tob, &test_metrics()) + .await; // Verify success assert!(result.is_ok()); @@ -1795,9 +1819,10 @@ async fn test_run_as_party_success() { .map(|(idx, mgr)| (setup.address(idx + 1), mgr)) .collect(); let mock_p2p = MockP2PChannel::new(other_managers, setup.address(0)); - let output = MpcManager::run_dkg_as_party(&test_manager, &mock_p2p, &mut mock_tob) - .await - .unwrap(); + let output = + MpcManager::run_dkg_as_party(&test_manager, &mock_p2p, &mut mock_tob, &test_metrics()) + .await + .unwrap(); // Verify output structure assert_eq!(output.key_shares.shares.len(), 1); // weight = 1 @@ -1913,9 +1938,10 @@ async fn test_run_as_party_recovers_shares_via_complaint() { let party_manager = Arc::new(RwLock::new(party_manager)); // Run as party - should recover shares via complaint - let output = MpcManager::run_dkg_as_party(&party_manager, &mock_p2p, &mut mock_tob) - .await - .unwrap(); + let output = + MpcManager::run_dkg_as_party(&party_manager, &mock_p2p, &mut mock_tob, &test_metrics()) + .await + .unwrap(); // Verify complaint was resolved // DKG: complaints keyed by dealer address @@ -2025,9 +2051,10 @@ async fn test_run_as_party_recovers_from_hash_mismatch() { .map(|(idx, mgr)| (setup.address(idx + 1), mgr)) .collect(); let mock_p2p = MockP2PChannel::new(other_managers, setup.address(0)); - let output = MpcManager::run_dkg_as_party(&test_manager, &mock_p2p, &mut mock_tob) - .await - .unwrap(); + let output = + MpcManager::run_dkg_as_party(&test_manager, &mock_p2p, &mut mock_tob, &test_metrics()) + .await + .unwrap(); // The critical assertion: test_manager must produce the same vk as other nodes. // Without the delete-on-mismatch fix, the stale output from wrong_msg_0 would @@ -2096,9 +2123,10 @@ async fn test_run_as_party_requires_different_dealers() { }) .collect(); let mock_p2p = MockP2PChannel::new(other_managers, setup.address(2)); - let output = MpcManager::run_dkg_as_party(&test_manager, &mock_p2p, &mut mock_tob) - .await - .unwrap(); + let output = + MpcManager::run_dkg_as_party(&test_manager, &mock_p2p, &mut mock_tob, &test_metrics()) + .await + .unwrap(); // Verify it correctly waited for 2 different dealers assert_eq!(output.key_shares.shares.len(), 1); // weight = 1 @@ -2119,7 +2147,9 @@ async fn test_run_as_dealer_p2p_send_error() { }; let mut mock_tob = MockOrderedBroadcastChannel::new(Vec::new()); - let result = MpcManager::run_dkg_as_dealer(&test_manager, &failing_p2p, &mut mock_tob).await; + let result = + MpcManager::run_dkg_as_dealer(&test_manager, &failing_p2p, &mut mock_tob, &test_metrics()) + .await; assert!(result.is_ok()); assert_eq!(mock_tob.published_count(), 0); @@ -2151,8 +2181,13 @@ async fn test_run_as_dealer_tob_publish_error() { fail_on_receive: false, }; - let result = - MpcManager::run_dkg_as_dealer(&test_manager, &succeeding_p2p, &mut failing_tob).await; + let result = MpcManager::run_dkg_as_dealer( + &test_manager, + &succeeding_p2p, + &mut failing_tob, + &test_metrics(), + ) + .await; assert!(result.is_err()); let err = result.unwrap_err(); @@ -2185,8 +2220,13 @@ async fn test_run_as_dealer_partial_failures_still_collects_enough() { let mut mock_tob = MockOrderedBroadcastChannel::new(Vec::new()); - let result = - MpcManager::run_dkg_as_dealer(&test_manager, &partially_failing_p2p, &mut mock_tob).await; + let result = MpcManager::run_dkg_as_dealer( + &test_manager, + &partially_failing_p2p, + &mut mock_tob, + &test_metrics(), + ) + .await; assert!(result.is_ok()); // Verify that a certificate was published @@ -2214,8 +2254,13 @@ async fn test_run_as_dealer_partial_failures_insufficient_signatures() { let mut mock_tob = MockOrderedBroadcastChannel::new(Vec::new()); - let result = - MpcManager::run_dkg_as_dealer(&test_manager, &partially_failing_p2p, &mut mock_tob).await; + let result = MpcManager::run_dkg_as_dealer( + &test_manager, + &partially_failing_p2p, + &mut mock_tob, + &test_metrics(), + ) + .await; assert!(result.is_ok()); assert_eq!(mock_tob.published_count(), 0); @@ -2243,7 +2288,9 @@ async fn test_run_as_dealer_includes_own_signature() { let mut mock_tob = MockOrderedBroadcastChannel::new(Vec::new()); // Run as dealer - let result = MpcManager::run_dkg_as_dealer(&test_manager, &mock_p2p, &mut mock_tob).await; + let result = + MpcManager::run_dkg_as_dealer(&test_manager, &mock_p2p, &mut mock_tob, &test_metrics()) + .await; assert!(result.is_ok()); @@ -2286,7 +2333,9 @@ async fn test_run_as_party_tob_receive_error() { }; let mock_p2p = MockP2PChannel::new(HashMap::new(), setup.address(0)); - let result = MpcManager::run_dkg_as_party(&test_manager, &mock_p2p, &mut failing_tob).await; + let result = + MpcManager::run_dkg_as_party(&test_manager, &mock_p2p, &mut failing_tob, &test_metrics()) + .await; assert!(result.is_err()); let err = result.unwrap_err(); @@ -2398,7 +2447,9 @@ async fn setup_party_and_run( // Run party collection let mock_p2p = MockP2PChannel::new(HashMap::new(), party_addr); let party_manager = Arc::new(RwLock::new(party_manager)); - let result = MpcManager::run_dkg_as_party(&party_manager, &mock_p2p, &mut mock_tob).await; + let result = + MpcManager::run_dkg_as_party(&party_manager, &mock_p2p, &mut mock_tob, &test_metrics()) + .await; (result, mock_tob) } @@ -2540,7 +2591,9 @@ async fn test_run_as_party_skips_duplicate_dealers() { // Run party collection let mock_p2p = MockP2PChannel::new(HashMap::new(), party_addr); let party_manager = Arc::new(RwLock::new(party_manager)); - let result = MpcManager::run_dkg_as_party(&party_manager, &mock_p2p, &mut mock_tob).await; + let result = + MpcManager::run_dkg_as_party(&party_manager, &mock_p2p, &mut mock_tob, &test_metrics()) + .await; assert!(result.is_ok()); // Verify behavior: @@ -2626,7 +2679,9 @@ async fn test_run_as_party_retrieves_missing_dealer_messages() { let party_manager = Arc::new(RwLock::new(party_manager)); // Run as party - should retrieve missing messages via P2P - let result = MpcManager::run_dkg_as_party(&party_manager, &mock_p2p, &mut mock_tob).await; + let result = + MpcManager::run_dkg_as_party(&party_manager, &mock_p2p, &mut mock_tob, &test_metrics()) + .await; assert!(result.is_ok()); let mgr = party_manager.read().unwrap(); @@ -2729,7 +2784,9 @@ async fn test_run_as_party_aborts_on_retrieval_failure() { let party_manager = Arc::new(RwLock::new(party_manager)); // Run as party - should process dealer1 successfully, then ABORT on dealer2 retrieval failure - let result = MpcManager::run_dkg_as_party(&party_manager, &mock_p2p, &mut mock_tob).await; + let result = + MpcManager::run_dkg_as_party(&party_manager, &mock_p2p, &mut mock_tob, &test_metrics()) + .await; // Should fail with PairwiseCommunicationError (could not retrieve message from any signer) assert!(result.is_err()); @@ -2824,7 +2881,9 @@ async fn test_run_as_party_aborts_on_failed_recovery() { // Run as party - should ABORT on dealer0 recovery failure // With retry logic, failed signers are skipped, so we get ProtocolFailed - let result = MpcManager::run_dkg_as_party(&party_manager, &mock_p2p, &mut mock_tob).await; + let result = + MpcManager::run_dkg_as_party(&party_manager, &mock_p2p, &mut mock_tob, &test_metrics()) + .await; // Should fail with ProtocolFailed (all signers failed, not enough responses) assert!(result.is_err(), "Expected error, got: {:?}", result); @@ -4748,7 +4807,13 @@ async fn test_restart_dealer_reuses_stored_message() { let restarted_manager = Arc::new(RwLock::new(restarted_manager)); // Run run_as_dealer - should reuse stored message - let result = MpcManager::run_dkg_as_dealer(&restarted_manager, &mock_p2p, &mut mock_tob).await; + let result = MpcManager::run_dkg_as_dealer( + &restarted_manager, + &mock_p2p, + &mut mock_tob, + &test_metrics(), + ) + .await; assert!(result.is_ok()); // Verify store_dealer_message was NOT called (message already existed) @@ -4862,7 +4927,13 @@ async fn test_restart_party_uses_stored_messages_without_retrieval() { let party_manager = Arc::new(RwLock::new(party_manager)); // Run as party - let result = MpcManager::run_dkg_as_party(&party_manager, &tracking_p2p, &mut mock_tob).await; + let result = MpcManager::run_dkg_as_party( + &party_manager, + &tracking_p2p, + &mut mock_tob, + &test_metrics(), + ) + .await; assert!(result.is_ok()); // Verify retrieve_message was NOT called (messages were already in memory) @@ -5408,6 +5479,7 @@ async fn test_run_key_rotation() { &rotation_setup.certificates(), &mock_p2p, &mut mock_tob, + &test_metrics(), ) .await .unwrap(); @@ -5546,6 +5618,7 @@ async fn test_run_key_rotation_skips_dealer_phase() { &rotation_setup.certificates(), &mock_p2p, &mut mock_tob, + &test_metrics(), ) .await .unwrap(); @@ -5691,6 +5764,7 @@ async fn test_run_key_rotation_excludes_empty_messages_from_share_count() { &rotation_setup.certificates(), &mock_p2p, &mut mock_tob, + &test_metrics(), ) .await .unwrap(); @@ -5895,6 +5969,7 @@ async fn test_run_key_rotation_recovers_from_hash_mismatch() { &rotation_setup.certificates(), &mock_p2p, &mut mock_tob, + &test_metrics(), ) .await .unwrap(); @@ -6028,6 +6103,7 @@ async fn test_run_key_rotation_with_complaint_recovery() { &rotation_setup.certificates(), &mock_p2p, &mut mock_tob, + &test_metrics(), ) .await .unwrap(); @@ -8533,12 +8609,24 @@ async fn test_run_nonce_generation() { let test_manager = Arc::new(RwLock::new(test_manager)); - MpcManager::run_as_nonce_dealer(&test_manager, batch_index, &mock_p2p, &mut mock_tob) - .await - .unwrap(); - MpcManager::run_as_nonce_party(&test_manager, batch_index, &mock_p2p, &mut mock_tob) - .await - .unwrap(); + MpcManager::run_as_nonce_dealer( + &test_manager, + batch_index, + &mock_p2p, + &mut mock_tob, + &test_metrics(), + ) + .await + .unwrap(); + MpcManager::run_as_nonce_party( + &test_manager, + batch_index, + &mock_p2p, + &mut mock_tob, + &test_metrics(), + ) + .await + .unwrap(); // Verify validator 0 has nonce outputs from enough dealers let mgr = test_manager.read().unwrap(); @@ -8677,9 +8765,15 @@ async fn test_run_as_nonce_party_recovers_from_hash_mismatch() { let mut mock_tob = MockOrderedBroadcastChannel::new(all_certs); let test_manager = Arc::new(RwLock::new(test_manager)); - MpcManager::run_as_nonce_party(&test_manager, batch_index, &mock_p2p, &mut mock_tob) - .await - .unwrap(); + MpcManager::run_as_nonce_party( + &test_manager, + batch_index, + &mock_p2p, + &mut mock_tob, + &test_metrics(), + ) + .await + .unwrap(); // Verify the nonce output for dealer 0 matches what a clean node has. // Without the delete-on-mismatch fix, the stale output from the wrong @@ -8749,10 +8843,15 @@ async fn test_run_nonce_generation_skips_dealer_phase() { let test_manager = Arc::new(RwLock::new(test_manager)); let mut mock_tob = MockOrderedBroadcastChannel::new(certificates); - let outputs = - MpcManager::run_nonce_generation(&test_manager, batch_index, &mock_p2p, &mut mock_tob) - .await - .unwrap(); + let outputs = MpcManager::run_nonce_generation( + &test_manager, + batch_index, + &mock_p2p, + &mut mock_tob, + &test_metrics(), + ) + .await + .unwrap(); // Verify dealer did NOT publish (skipped) assert_eq!( @@ -8831,10 +8930,15 @@ async fn test_run_as_nonce_party_loads_from_store_after_restart() { // run_as_nonce_party should succeed by loading messages from the store, // not from P2P (which would fail since mock_p2p has no managers). - let certified = - MpcManager::run_as_nonce_party(&test_manager, batch_index, &mock_p2p, &mut mock_tob) - .await - .unwrap(); + let certified = MpcManager::run_as_nonce_party( + &test_manager, + batch_index, + &mock_p2p, + &mut mock_tob, + &test_metrics(), + ) + .await + .unwrap(); let mgr = test_manager.read().unwrap(); assert!( @@ -9026,12 +9130,24 @@ async fn test_run_nonce_generation_with_complaint_recovery() { let test_manager = Arc::new(RwLock::new(test_manager)); - MpcManager::run_as_nonce_dealer(&test_manager, batch_index, &mock_p2p, &mut mock_tob) - .await - .unwrap(); - MpcManager::run_as_nonce_party(&test_manager, batch_index, &mock_p2p, &mut mock_tob) - .await - .unwrap(); + MpcManager::run_as_nonce_dealer( + &test_manager, + batch_index, + &mock_p2p, + &mut mock_tob, + &test_metrics(), + ) + .await + .unwrap(); + MpcManager::run_as_nonce_party( + &test_manager, + batch_index, + &mock_p2p, + &mut mock_tob, + &test_metrics(), + ) + .await + .unwrap(); // Verify enough nonce outputs collected let mgr = test_manager.read().unwrap(); diff --git a/crates/hashi/src/mpc/rpc/p2p_channel.rs b/crates/hashi/src/mpc/rpc/p2p_channel.rs index 9aadc2ed3..073ee0684 100644 --- a/crates/hashi/src/mpc/rpc/p2p_channel.rs +++ b/crates/hashi/src/mpc/rpc/p2p_channel.rs @@ -1,10 +1,13 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::sync::Arc; + use crate::communication::ChannelError; use crate::communication::ChannelResult; use crate::communication::P2PChannel; use crate::grpc::Client; +use crate::metrics::Metrics; use crate::mpc::types::ComplainRequest; use crate::mpc::types::ComplaintResponses; use crate::mpc::types::GetPartialSignaturesRequest; @@ -22,13 +25,22 @@ use sui_sdk_types::Address; pub struct RpcP2PChannel { onchain_state: OnchainState, epoch: u64, + metrics: Arc, + protocol_label: &'static str, } impl RpcP2PChannel { - pub fn new(onchain_state: OnchainState, epoch: u64) -> Self { + pub fn new( + onchain_state: OnchainState, + epoch: u64, + protocol_label: &'static str, + metrics: Arc, + ) -> Self { Self { onchain_state, epoch, + metrics, + protocol_label, } } @@ -49,6 +61,17 @@ impl P2PChannel for RpcP2PChannel { recipient: &Address, request: &SendMessagesRequest, ) -> ChannelResult { + if let Ok(bytes) = bcs::to_bytes(request) { + let size = bytes.len() as u64; + self.metrics + .mpc_p2p_message_size_bytes + .with_label_values(&[self.protocol_label]) + .observe(size as f64); + self.metrics + .mpc_bytes_sent_total + .with_label_values(&[self.protocol_label]) + .inc_by(size); + } self.get_client(recipient)? .send_messages(self.epoch, request) .await @@ -60,10 +83,18 @@ impl P2PChannel for RpcP2PChannel { party: &Address, request: &RetrieveMessagesRequest, ) -> ChannelResult { - self.get_client(party)? + let response = self + .get_client(party)? .retrieve_messages(request) .await - .map_err(|e| ChannelError::RequestFailed(e.to_string())) + .map_err(|e| ChannelError::RequestFailed(e.to_string()))?; + if let Ok(bytes) = bcs::to_bytes(&response) { + self.metrics + .mpc_bytes_received_total + .with_label_values(&[self.protocol_label]) + .inc_by(bytes.len() as u64); + } + Ok(response) } async fn complain( @@ -71,6 +102,12 @@ impl P2PChannel for RpcP2PChannel { party: &Address, request: &ComplainRequest, ) -> ChannelResult { + if let Ok(bytes) = bcs::to_bytes(request) { + self.metrics + .mpc_bytes_sent_total + .with_label_values(&[self.protocol_label]) + .inc_by(bytes.len() as u64); + } self.get_client(party)? .complain(request) .await diff --git a/crates/hashi/src/mpc/rpc/service.rs b/crates/hashi/src/mpc/rpc/service.rs index 0dadeb075..3454e1d84 100644 --- a/crates/hashi/src/mpc/rpc/service.rs +++ b/crates/hashi/src/mpc/rpc/service.rs @@ -33,7 +33,17 @@ impl MpcService for HttpService { let external_request = request.into_inner(); let internal_request = types::SendMessagesRequest::try_from(&external_request) .map_err(|e| Status::invalid_argument(e.to_string()))?; + let label = match &internal_request.messages { + types::Messages::Dkg(_) => crate::metrics::MPC_LABEL_DKG, + types::Messages::Rotation(_) => crate::metrics::MPC_LABEL_KEY_ROTATION, + types::Messages::NonceGeneration(_) => crate::metrics::MPC_LABEL_NONCE_GEN, + }; let mpc_manager = self.mpc_manager()?; + let _timer = self + .metrics() + .mpc_rpc_handler_process_duration_seconds + .with_label_values(&[label]) + .start_timer(); let response = spawn_blocking(move || -> Result<_, Status> { let mut mgr = mpc_manager.write().unwrap(); validate_epoch(mgr.mpc_config.epoch, external_request.epoch)?; @@ -48,6 +58,7 @@ impl MpcService for HttpService { }) }) .await?; + drop(_timer); Ok(tonic::Response::new(SendMessagesResponse::from(&response))) } diff --git a/crates/hashi/src/mpc/service.rs b/crates/hashi/src/mpc/service.rs index f2a9e5985..71a8694c8 100644 --- a/crates/hashi/src/mpc/service.rs +++ b/crates/hashi/src/mpc/service.rs @@ -20,6 +20,9 @@ use crate::Hashi; use crate::communication::SuiTobChannel; use crate::communication::fetch_certificates; use crate::constants::PRESIG_REFILL_DIVISOR; +use crate::metrics::MPC_LABEL_DKG; +use crate::metrics::MPC_LABEL_KEY_ROTATION; +use crate::metrics::MPC_LABEL_NONCE_GEN; use crate::mpc::MpcManager; use crate::mpc::MpcOutput; use crate::mpc::SigningManager; @@ -291,7 +294,12 @@ impl MpcService { .mpc_manager() .expect("MpcManager must be set before run_dkg"); let signer = self.inner.config.operator_private_key()?; - let p2p_channel = RpcP2PChannel::new(onchain_state.clone(), target_epoch); + let p2p_channel = RpcP2PChannel::new( + onchain_state.clone(), + target_epoch, + MPC_LABEL_DKG, + self.inner.metrics.clone(), + ); let mut tob_channel = SuiTobChannel::new( self.inner.config.hashi_ids(), onchain_state, @@ -299,9 +307,14 @@ impl MpcService { None, signer, ); - let output = MpcManager::run_dkg(&mpc_manager, &p2p_channel, &mut tob_channel) - .await - .map_err(|e| anyhow::anyhow!("DKG failed: {e}"))?; + let output = MpcManager::run_dkg( + &mpc_manager, + &p2p_channel, + &mut tob_channel, + &self.inner.metrics, + ) + .await + .map_err(|e| anyhow::anyhow!("DKG failed: {e}"))?; Ok(output) } @@ -324,7 +337,12 @@ impl MpcService { .mpc_manager() .ok_or_else(|| anyhow::anyhow!("MpcManager not initialized"))?; let signer = self.inner.config.operator_private_key()?; - let p2p_channel = RpcP2PChannel::new(onchain_state.clone(), epoch); + let p2p_channel = RpcP2PChannel::new( + onchain_state.clone(), + epoch, + MPC_LABEL_NONCE_GEN, + self.inner.metrics.clone(), + ); let mut tob_channel = SuiTobChannel::new( self.inner.config.hashi_ids(), onchain_state, @@ -332,14 +350,22 @@ impl MpcService { Some(batch_index), signer, ); - let nonce_outputs = MpcManager::run_nonce_generation( + let metrics = &self.inner.metrics; + let _timer = metrics + .mpc_total_duration_seconds + .with_label_values(&[MPC_LABEL_NONCE_GEN]) + .start_timer(); + let nonce_result = MpcManager::run_nonce_generation( &mpc_manager, batch_index, &p2p_channel, &mut tob_channel, + metrics, ) - .await - .map_err(|e| anyhow::anyhow!("Nonce generation failed: {e}"))?; + .await; + drop(_timer); + let nonce_outputs = + nonce_result.map_err(|e| anyhow::anyhow!("Nonce generation failed: {e}"))?; let (batch_size_per_weight, f) = { let mgr = mpc_manager.read().unwrap(); ( @@ -347,8 +373,13 @@ impl MpcService { mgr.mpc_config.max_faulty as usize, ) }; + let _timer = metrics + .mpc_presig_conversion_duration_seconds + .with_label_values(&[MPC_LABEL_NONCE_GEN]) + .start_timer(); let presignatures = Presignatures::new(nonce_outputs, batch_size_per_weight, f) .map_err(|e| anyhow::anyhow!("Failed to create presignatures: {e}"))?; + drop(_timer); Ok((committee, presignatures)) } @@ -471,7 +502,12 @@ impl MpcService { "No nonce gen certificates on TOB for epoch {epoch} batch {batch_index}" ) })?; - let p2p_channel = RpcP2PChannel::new(self.inner.onchain_state().clone(), epoch); + let p2p_channel = RpcP2PChannel::new( + self.inner.onchain_state().clone(), + epoch, + MPC_LABEL_NONCE_GEN, + self.inner.metrics.clone(), + ); let outputs = MpcManager::reconstruct_presignatures_with_complaint_recovery( mpc_manager, epoch, @@ -543,6 +579,16 @@ impl MpcService { .committees .mpc_public_key() .is_empty(); + let protocol_label = if run_dkg { + MPC_LABEL_DKG + } else { + MPC_LABEL_KEY_ROTATION + }; + let metrics = &self.inner.metrics; + let _reconfig_timer = metrics + .mpc_reconfig_total_duration_seconds + .with_label_values(&[protocol_label]) + .start_timer(); info!("handle_reconfig: epoch={target_epoch}, run_dkg={run_dkg}, entering retry loop",); let output = loop { if self.get_pending_epoch_change() != Some(target_epoch) { @@ -562,6 +608,10 @@ impl MpcService { self.sleep_if_still_pending(target_epoch).await; continue; } + let _timer = metrics + .mpc_total_duration_seconds + .with_label_values(&[protocol_label]) + .start_timer(); let result = if run_dkg { tokio::time::timeout(MPC_PROTOCOL_TIMEOUT, self.run_dkg(target_epoch)) .await @@ -579,6 +629,7 @@ impl MpcService { )) }) }; + drop(_timer); match result { Ok(output) => break output, Err(e) => { @@ -592,6 +643,10 @@ impl MpcService { }; let _ = self.key_ready_tx.send(Some(output.public_key)); info!("MPC key ready for epoch {target_epoch}, submitting end_reconfig"); + let _end_reconfig_timer = metrics + .mpc_end_reconfig_duration_seconds + .with_label_values(&[protocol_label]) + .start_timer(); loop { if self.get_pending_epoch_change() != Some(target_epoch) { break; @@ -607,10 +662,15 @@ impl MpcService { } } } + drop(_end_reconfig_timer); info!("end_reconfig complete for epoch {target_epoch}, running prepare_signing"); if let Err(e) = self.inner.db.prune_messages_below(target_epoch) { error!("Failed to prune old MPC messages below epoch {target_epoch}: {e}"); } + let _prepare_signing_timer = metrics + .mpc_prepare_signing_duration_seconds + .with_label_values(&[protocol_label]) + .start_timer(); for attempt in 1..=MAX_PROTOCOL_ATTEMPTS { match self.prepare_signing(target_epoch, &output).await { Ok(()) => break, @@ -630,6 +690,8 @@ impl MpcService { } } } + drop(_prepare_signing_timer); + drop(_reconfig_timer); } fn setup_initial_dkg(&self, target_epoch: u64) -> anyhow::Result<()> { @@ -672,7 +734,12 @@ impl MpcService { previous_certs.len(), ); let signer = self.inner.config.operator_private_key()?; - let p2p_channel = RpcP2PChannel::new(onchain_state.clone(), target_epoch); + let p2p_channel = RpcP2PChannel::new( + onchain_state.clone(), + target_epoch, + MPC_LABEL_KEY_ROTATION, + self.inner.metrics.clone(), + ); let mut tob_channel = SuiTobChannel::new( self.inner.config.hashi_ids(), onchain_state, @@ -685,6 +752,7 @@ impl MpcService { &previous_certs, &p2p_channel, &mut tob_channel, + &self.inner.metrics, ) .await .map_err(|e| anyhow::anyhow!("Key rotation failed: {e}"))?; diff --git a/crates/hashi/src/mpc/signing.rs b/crates/hashi/src/mpc/signing.rs index eafc2b568..9bc7bf366 100644 --- a/crates/hashi/src/mpc/signing.rs +++ b/crates/hashi/src/mpc/signing.rs @@ -26,6 +26,8 @@ use tokio::time::Instant; use crate::communication::P2PChannel; use crate::communication::send_to_many; +use crate::metrics::MPC_LABEL_SIGNING; +use crate::metrics::Metrics; use crate::mpc::types::GetPartialSignaturesRequest; use crate::mpc::types::GetPartialSignaturesResponse; use crate::mpc::types::PartialSigningOutput; @@ -198,6 +200,7 @@ impl SigningManager { beacon_value: &S, derivation_address: Option<&DerivationAddress>, timeout: Duration, + metrics: &Metrics, ) -> SigningResult { let (public_nonce, partial_sigs, threshold, address, committee, verifying_key) = { let mut mgr = signing_manager.write().unwrap(); @@ -274,6 +277,10 @@ impl SigningManager { batch_index={used_batch_index}, \ position={target_position})", ); + let _timer = metrics + .mpc_sign_partial_gen_duration_seconds + .with_label_values(&[MPC_LABEL_SIGNING]) + .start_timer(); let result = generate_partial_signatures( message, presig, @@ -283,6 +290,7 @@ impl SigningManager { derivation_address, ) .map_err(|e| SigningError::CryptoError(e.to_string()))?; + drop(_timer); // Trigger refill based on the latest batch's consumption. if let Some(latest) = mgr.batches.last() { @@ -339,6 +347,10 @@ impl SigningManager { .collect(); let request = GetPartialSignaturesRequest { sui_request_id }; let deadline = Instant::now() + timeout; + let _collection_timer = metrics + .mpc_sign_collection_duration_seconds + .with_label_values(&[MPC_LABEL_SIGNING]) + .start_timer(); loop { if all_partial_sigs.len() >= threshold as usize { break; @@ -357,6 +369,7 @@ impl SigningManager { ) .await; } + drop(_collection_timer); let params = AggregationParams { message, public_nonce: &public_nonce, @@ -365,6 +378,10 @@ impl SigningManager { verifying_key: &verifying_key, derivation_address, }; + let _agg_timer = metrics + .mpc_sign_aggregation_duration_seconds + .with_label_values(&[MPC_LABEL_SIGNING]) + .start_timer(); let result = match aggregate_signatures( params.message, params.public_nonce, @@ -393,6 +410,7 @@ impl SigningManager { } Err(e) => Err(SigningError::CryptoError(e.to_string())), }; + drop(_agg_timer); match &result { Ok(_) => {} Err(e) => { @@ -554,6 +572,10 @@ mod tests { use rand::SeedableRng; use rand::rngs::StdRng; + fn test_metrics() -> Metrics { + Metrics::new(&prometheus::Registry::new()) + } + fn mock_shares(rng: &mut impl AllowedRng, secret: S, t: u16, n: u16) -> Vec> { let p = Poly::rand_fixed_c0(t - 1, secret, rng); (1..=n) @@ -1101,6 +1123,7 @@ mod tests { &beacon, None, Duration::from_secs(30), + &test_metrics(), ) .await .unwrap(); @@ -1152,6 +1175,7 @@ mod tests { &beacon, None, Duration::from_secs(30), + &test_metrics(), ) .await .unwrap(); @@ -1180,6 +1204,7 @@ mod tests { &beacon, None, Duration::from_secs(30), + &test_metrics(), ) .await .unwrap(); @@ -1208,6 +1233,7 @@ mod tests { &beacon, None, Duration::from_secs(30), + &test_metrics(), ) .await .unwrap(); @@ -1240,6 +1266,7 @@ mod tests { &beacon, None, Duration::from_secs(30), + &test_metrics(), ) .await; @@ -1276,6 +1303,7 @@ mod tests { &beacon, None, Duration::from_millis(1), // very short timeout + &test_metrics(), ) .await; @@ -1356,6 +1384,7 @@ mod tests { &S::zero(), None, Duration::from_secs(30), + &test_metrics(), ) .await .unwrap(); @@ -1382,6 +1411,7 @@ mod tests { &S::zero(), None, Duration::from_secs(30), + &test_metrics(), ) .await; @@ -1407,6 +1437,7 @@ mod tests { &S::zero(), None, Duration::from_secs(30), + &test_metrics(), ) .await; @@ -1468,6 +1499,7 @@ mod tests { &S::zero(), None, Duration::from_secs(30), + &test_metrics(), ) .await; @@ -1497,6 +1529,7 @@ mod tests { &beacon, None, Duration::from_secs(30), + &test_metrics(), ) .await; assert!(result.is_ok(), "sign {i} should succeed"); @@ -1520,6 +1553,7 @@ mod tests { &beacon, None, Duration::from_secs(30), + &test_metrics(), ) .await; @@ -1564,6 +1598,7 @@ mod tests { &beacon, None, Duration::from_secs(30), + &test_metrics(), ) .await; @@ -1591,6 +1626,7 @@ mod tests { &beacon, None, Duration::from_secs(30), + &test_metrics(), ) .await; assert!(result1.is_ok()); @@ -1607,6 +1643,7 @@ mod tests { &beacon, None, Duration::from_secs(30), + &test_metrics(), ) .await; assert!(matches!(result2, Err(SigningError::PoolExhausted))); @@ -1628,6 +1665,7 @@ mod tests { &S::zero(), None, Duration::from_secs(30), + &test_metrics(), ) .await; @@ -1656,6 +1694,7 @@ mod tests { &beacon, None, Duration::from_secs(30), + &test_metrics(), ) .await .unwrap(); @@ -1677,6 +1716,7 @@ mod tests { &beacon, None, Duration::from_secs(30), + &test_metrics(), ) .await .unwrap(); @@ -1720,6 +1760,7 @@ mod tests { &beacon, None, Duration::from_secs(30), + &test_metrics(), ) .await .unwrap(); @@ -1735,6 +1776,7 @@ mod tests { &beacon, None, Duration::from_secs(30), + &test_metrics(), ) .await .unwrap(); diff --git a/crates/hashi/src/withdrawals.rs b/crates/hashi/src/withdrawals.rs index 3ef640a5c..799134132 100644 --- a/crates/hashi/src/withdrawals.rs +++ b/crates/hashi/src/withdrawals.rs @@ -559,7 +559,12 @@ impl Hashi { epoch, ); } - let p2p_channel = RpcP2PChannel::new(onchain_state, epoch); + let p2p_channel = RpcP2PChannel::new( + onchain_state, + epoch, + crate::metrics::MPC_LABEL_SIGNING, + self.metrics.clone(), + ); let signing_manager = self.signing_manager(); let beacon = S::from_bytes_mod_order(&txn.randomness); let signing_messages = self.withdrawal_signing_messages(unsigned_tx, &txn.inputs)?; @@ -581,6 +586,7 @@ impl Hashi { &beacon, derivation_address.as_ref(), WITHDRAWAL_SIGNING_TIMEOUT, + &self.metrics, ) .await; let sign_duration = sign_start.elapsed().as_secs_f64();