From 4727032f98925825b4963b873849219048a53973 Mon Sep 17 00:00:00 2001 From: Artiom Tretjakovas Date: Mon, 9 Mar 2026 13:14:44 +0200 Subject: [PATCH] Implement lean metric fixes * Renamed `lean_pq_signature_*` metrics to `lean_pq_sig_*`; * added `lean_current_slot` metric. --- lean_client/containers/src/block.rs | 2 +- lean_client/fork_choice/src/store.rs | 14 +++++---- .../tests/fork_choice_test_vectors.rs | 5 ++-- .../fork_choice/tests/unit_tests/time.rs | 15 +++++++--- lean_client/http_api/src/routing.rs | 4 +-- lean_client/http_api/src/server.rs | 4 +-- lean_client/metrics/src/metrics.rs | 27 ++++++++--------- lean_client/metrics/src/server.rs | 29 +++++++++++++++--- .../networking/src/gossipsub/message.rs | 5 +++- lean_client/networking/src/network/service.rs | 16 ++++++++-- lean_client/src/main.rs | 20 ++++++++++--- lean_client/validator/src/lib.rs | 30 ++++++++++++------- 12 files changed, 116 insertions(+), 55 deletions(-) diff --git a/lean_client/containers/src/block.rs b/lean_client/containers/src/block.rs index 2bad3a8..7c8ff85 100644 --- a/lean_client/containers/src/block.rs +++ b/lean_client/containers/src/block.rs @@ -202,7 +202,7 @@ impl SignedBlockWithAttestation { let _timer = METRICS.get().map(|metrics| { metrics - .lean_pq_signature_attestation_verification_time_seconds + .lean_pq_sig_attestation_verification_time_seconds .start_timer() }); diff --git a/lean_client/fork_choice/src/store.rs b/lean_client/fork_choice/src/store.rs index 98f7bde..a059988 100644 --- a/lean_client/fork_choice/src/store.rs +++ b/lean_client/fork_choice/src/store.rs @@ -341,8 +341,10 @@ pub fn update_safe_target(store: &mut Store) { } // Extract per-validator attestations from merged payloads - let attestations = - extract_attestations_from_aggregated_payloads(&all_payloads, &store.attestation_data_by_root); + let attestations = extract_attestations_from_aggregated_payloads( + &all_payloads, + &store.attestation_data_by_root, + ); // Run LMD-GHOST with 2/3 threshold to find safe target let new_safe_target = get_fork_choice_head(store, root, &attestations, min_score); @@ -380,10 +382,10 @@ pub fn tick_interval(store: &mut Store, has_proposal: bool) { match curr_interval { 0 if has_proposal => accept_new_attestations(store), // Interval 0: Block proposal - 1 => {} // Interval 1: Attestation phase - 2 => {} // Interval 2: Aggregation phase (handled in main.rs) - 3 => update_safe_target(store), // Interval 3: Safe target update - 4 => accept_new_attestations(store), // Interval 4: Accept attestations + 1 => {} // Interval 1: Attestation phase + 2 => {} // Interval 2: Aggregation phase (handled in main.rs) + 3 => update_safe_target(store), // Interval 3: Safe target update + 4 => accept_new_attestations(store), // Interval 4: Accept attestations _ => {} } } diff --git a/lean_client/fork_choice/tests/fork_choice_test_vectors.rs b/lean_client/fork_choice/tests/fork_choice_test_vectors.rs index f977b85..bc96b97 100644 --- a/lean_client/fork_choice/tests/fork_choice_test_vectors.rs +++ b/lean_client/fork_choice/tests/fork_choice_test_vectors.rs @@ -569,8 +569,9 @@ fn forkchoice(spec_file: &str) { // Advance time to the block's slot to ensure attestations are processable // SECONDS_PER_SLOT is 4. Convert to milliseconds for devnet-3 - let block_time_millis = - (store.config.genesis_time + (signed_block.message.block.slot.0 * 4)) * 1000; + let block_time_millis = (store.config.genesis_time + + (signed_block.message.block.slot.0 * 4)) + * 1000; on_tick(&mut store, block_time_millis, false); on_block(&mut store, signed_block).unwrap(); diff --git a/lean_client/fork_choice/tests/unit_tests/time.rs b/lean_client/fork_choice/tests/unit_tests/time.rs index 05fe4f6..b398a7c 100644 --- a/lean_client/fork_choice/tests/unit_tests/time.rs +++ b/lean_client/fork_choice/tests/unit_tests/time.rs @@ -1,6 +1,8 @@ use super::common::create_test_store; use fork_choice::handlers::on_tick; -use fork_choice::store::{INTERVALS_PER_SLOT, MILLIS_PER_INTERVAL, SECONDS_PER_SLOT, tick_interval}; +use fork_choice::store::{ + INTERVALS_PER_SLOT, MILLIS_PER_INTERVAL, SECONDS_PER_SLOT, tick_interval, +}; #[test] fn test_on_tick_basic() { @@ -32,7 +34,8 @@ fn test_on_tick_already_current() { let initial_time = store.time; // on_tick now expects milliseconds (devnet-3) // Convert initial_time (in intervals) to seconds, then to milliseconds - let current_target_millis = (store.config.genesis_time * 1000) + (initial_time * MILLIS_PER_INTERVAL); + let current_target_millis = + (store.config.genesis_time * 1000) + (initial_time * MILLIS_PER_INTERVAL); // Try to advance to current time on_tick(&mut store, current_target_millis, true); @@ -47,7 +50,8 @@ fn test_on_tick_small_increment() { let initial_time = store.time; // on_tick now expects milliseconds (devnet-3) // Advance by just 1 second (1000ms) - let target_time_millis = (store.config.genesis_time * 1000) + (initial_time * MILLIS_PER_INTERVAL) + 1000; + let target_time_millis = + (store.config.genesis_time * 1000) + (initial_time * MILLIS_PER_INTERVAL) + 1000; on_tick(&mut store, target_time_millis, false); @@ -177,7 +181,10 @@ fn test_millis_per_interval() { assert_eq!(MILLIS_PER_INTERVAL, 800); // Verify no integer truncation: 4 * 1000 = 5 * 800 - assert_eq!(SECONDS_PER_SLOT * 1000, MILLIS_PER_INTERVAL * INTERVALS_PER_SLOT); + assert_eq!( + SECONDS_PER_SLOT * 1000, + MILLIS_PER_INTERVAL * INTERVALS_PER_SLOT + ); } #[test] diff --git a/lean_client/http_api/src/routing.rs b/lean_client/http_api/src/routing.rs index cdc7e22..3af33b9 100644 --- a/lean_client/http_api/src/routing.rs +++ b/lean_client/http_api/src/routing.rs @@ -3,11 +3,11 @@ use metrics::metrics_module; use crate::config::HttpServerConfig; -pub fn normal_routes(config: &HttpServerConfig) -> Router { +pub fn normal_routes(config: &HttpServerConfig, genesis_time: u64) -> Router { let mut router = Router::new(); if config.metrics_enabled() { - router = router.merge(metrics_module(config.metrics.clone())); + router = router.merge(metrics_module(config.metrics.clone(), genesis_time)); } router diff --git a/lean_client/http_api/src/server.rs b/lean_client/http_api/src/server.rs index 09142d6..9eccdb7 100644 --- a/lean_client/http_api/src/server.rs +++ b/lean_client/http_api/src/server.rs @@ -6,8 +6,8 @@ use tracing::info; use crate::{config::HttpServerConfig, routing::normal_routes}; -pub async fn run_server(config: HttpServerConfig) -> Result<()> { - let router = normal_routes(&config); +pub async fn run_server(config: HttpServerConfig, genesis_time: u64) -> Result<()> { + let router = normal_routes(&config, genesis_time); let listener = config .listener() diff --git a/lean_client/metrics/src/metrics.rs b/lean_client/metrics/src/metrics.rs index f0dee42..ea0f75f 100644 --- a/lean_client/metrics/src/metrics.rs +++ b/lean_client/metrics/src/metrics.rs @@ -18,10 +18,10 @@ pub struct Metrics { // PQ Signature metrics /// Time taken to sign an attestation - pub lean_pq_signature_attestation_signing_time_seconds: Histogram, + pub lean_pq_sig_attestation_signing_time_seconds: Histogram, /// Time taken to verify an attestation signature - pub lean_pq_signature_attestation_verification_time_seconds: Histogram, + pub lean_pq_sig_attestation_verification_time_seconds: Histogram, /// Total number of aggregated signatures pub lean_pq_sig_aggregated_signatures_total: IntCounter, @@ -46,7 +46,7 @@ pub struct Metrics { pub lean_head_slot: IntGauge, /// Current slot of the lean chain - lean_current_slot: IntGauge, + pub lean_current_slot: IntGauge, /// Safe target slot pub lean_safe_target_slot: IntGauge, @@ -125,16 +125,14 @@ impl Metrics { )?, // PQ Signature metrics - lean_pq_signature_attestation_signing_time_seconds: Histogram::with_opts( - histogram_opts!( - "lean_pq_signature_attestation_signing_time_seconds", - "Time taken to sign an attestation", - vec![0.005, 0.01, 0.025, 0.05, 0.1, 1.0], - ), - )?, - lean_pq_signature_attestation_verification_time_seconds: Histogram::with_opts( + lean_pq_sig_attestation_signing_time_seconds: Histogram::with_opts(histogram_opts!( + "lean_pq_sig_attestation_signing_time_seconds", + "Time taken to sign an attestation", + vec![0.005, 0.01, 0.025, 0.05, 0.1, 1.0], + ))?, + lean_pq_sig_attestation_verification_time_seconds: Histogram::with_opts( histogram_opts!( - "lean_pq_signature_attestation_verification_time_seconds", + "lean_pq_sig_attestation_verification_time_seconds", "Time taken to verify an attestation signature", vec![0.005, 0.01, 0.025, 0.05, 0.1, 1.0], ), @@ -296,11 +294,10 @@ impl Metrics { default_registry.register(Box::new(self.lean_node_info.clone()))?; default_registry.register(Box::new(self.lean_node_start_time_seconds.clone()))?; default_registry.register(Box::new( - self.lean_pq_signature_attestation_signing_time_seconds - .clone(), + self.lean_pq_sig_attestation_signing_time_seconds.clone(), ))?; default_registry.register(Box::new( - self.lean_pq_signature_attestation_verification_time_seconds + self.lean_pq_sig_attestation_verification_time_seconds .clone(), ))?; default_registry.register(Box::new( diff --git a/lean_client/metrics/src/server.rs b/lean_client/metrics/src/server.rs index 9ee30e4..bb384ef 100644 --- a/lean_client/metrics/src/server.rs +++ b/lean_client/metrics/src/server.rs @@ -1,8 +1,12 @@ -use std::{error::Error as StdError, time::Duration}; +use std::{ + error::Error as StdError, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; use anyhow::{Error as AnyhowError, Result}; use axum::{ Router, + extract::State, http::StatusCode, response::{IntoResponse, Response}, routing::get, @@ -13,6 +17,8 @@ use prometheus::TextEncoder; use thiserror::Error; use tower_http::cors::AllowOrigin; +use crate::METRICS; + #[derive(Clone, Debug, Args)] pub struct MetricsServerConfig { #[arg(long = "metrics-timeout", default_value_t = Self::default().timeout, requires = "metrics_enabled")] @@ -63,8 +69,10 @@ impl ApiError for Error { } } -pub fn metrics_module(config: MetricsServerConfig) -> Router { - let router = Router::new().route("/metrics", get(get_metrics)); +pub fn metrics_module(config: MetricsServerConfig, genesis_time: u64) -> Router { + let router = Router::new() + .route("/metrics", get(get_metrics)) + .with_state(genesis_time); let router = http_api_utils::extend_router_with_middleware::( router, @@ -77,9 +85,22 @@ pub fn metrics_module(config: MetricsServerConfig) -> Router { } /// `GET /metrics` -async fn get_metrics() -> Result { +async fn get_metrics(State(genesis_time): State) -> Result { let mut buffer = String::new(); + METRICS.get().map(|metrics| { + let time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + + let since_genesis = time.saturating_sub(genesis_time * 1000); + // TODO: 4000 should be replaced with constant MILLIS_PER_SLOT + let slot = since_genesis / 4000; + + metrics.lean_current_slot.set(slot as i64); + }); + TextEncoder::new() .encode_utf8(prometheus::gather().as_slice(), &mut buffer) .map_err(AnyhowError::new)?; diff --git a/lean_client/networking/src/gossipsub/message.rs b/lean_client/networking/src/gossipsub/message.rs index ab9df7d..3f98e46 100644 --- a/lean_client/networking/src/gossipsub/message.rs +++ b/lean_client/networking/src/gossipsub/message.rs @@ -8,7 +8,10 @@ use ssz::SszReadDefault as _; pub enum GossipsubMessage { Block(SignedBlockWithAttestation), /// Attestation from a specific subnet (devnet-3) - AttestationSubnet { subnet_id: u64, attestation: SignedAttestation }, + AttestationSubnet { + subnet_id: u64, + attestation: SignedAttestation, + }, /// Aggregated attestation (devnet-3) Aggregation(SignedAggregatedAttestation), } diff --git a/lean_client/networking/src/network/service.rs b/lean_client/networking/src/network/service.rs index 4784ea6..8b01bb3 100644 --- a/lean_client/networking/src/network/service.rs +++ b/lean_client/networking/src/network/service.rs @@ -496,7 +496,10 @@ where ); } } - Ok(GossipsubMessage::AttestationSubnet { subnet_id, attestation }) => { + Ok(GossipsubMessage::AttestationSubnet { + subnet_id, + attestation, + }) => { info!( validator = %attestation.validator_id, slot = %attestation.message.slot.0, @@ -514,7 +517,9 @@ where }) .await { - warn!("failed to send subnet attestation for slot {slot} to chain: {err:?}"); + warn!( + "failed to send subnet attestation for slot {slot} to chain: {err:?}" + ); } } Ok(GossipsubMessage::Aggregation(signed_aggregated_attestation)) => { @@ -751,7 +756,12 @@ where // Devnet-3: Publish to subnet-specific topic only let topic_kind = GossipsubKind::AttestationSubnet(subnet_id); if let Err(err) = self.publish_to_topic(topic_kind, bytes) { - warn!(slot = slot, subnet_id = subnet_id, ?err, "Publish attestation to subnet failed"); + warn!( + slot = slot, + subnet_id = subnet_id, + ?err, + "Publish attestation to subnet failed" + ); } else { info!( slot = slot, diff --git a/lean_client/src/main.rs b/lean_client/src/main.rs index cb9a241..24a73a5 100644 --- a/lean_client/src/main.rs +++ b/lean_client/src/main.rs @@ -295,7 +295,11 @@ async fn main() -> Result<()> { "Failed to load XMSS keys: {}, falling back to zero signatures", e ); - Some(ValidatorService::new_with_aggregator(config, num_validators, args.is_aggregator)) + Some(ValidatorService::new_with_aggregator( + config, + num_validators, + args.is_aggregator, + )) } } } else { @@ -303,7 +307,11 @@ async fn main() -> Result<()> { "Hash-sig key directory not found: {:?}, using zero signatures", keys_path ); - Some(ValidatorService::new_with_aggregator(config, num_validators, args.is_aggregator)) + Some(ValidatorService::new_with_aggregator( + config, + num_validators, + args.is_aggregator, + )) } } else { info!( @@ -312,7 +320,11 @@ async fn main() -> Result<()> { aggregator = args.is_aggregator, "Validator mode enabled (no --hash-sig-key-dir specified - using zero signatures)" ); - Some(ValidatorService::new_with_aggregator(config, num_validators, args.is_aggregator)) + Some(ValidatorService::new_with_aggregator( + config, + num_validators, + args.is_aggregator, + )) } } Err(e) => { @@ -395,7 +407,7 @@ async fn main() -> Result<()> { task::spawn(async move { if args.http_config.metrics_enabled() { - if let Err(err) = http_api::run_server(args.http_config).await { + if let Err(err) = http_api::run_server(args.http_config, genesis_time).await { error!("HTTP Server failed with error: {err:?}"); } } diff --git a/lean_client/validator/src/lib.rs b/lean_client/validator/src/lib.rs index a67b745..c5022e3 100644 --- a/lean_client/validator/src/lib.rs +++ b/lean_client/validator/src/lib.rs @@ -4,14 +4,13 @@ use std::path::Path; use anyhow::{Context, Result, anyhow, bail}; use containers::{ - AggregatedSignatureProof, AggregationBits, - Attestation, AttestationData, AttestationSignatures, Block, BlockSignatures, - BlockWithAttestation, Checkpoint, SignedAggregatedAttestation, SignedAttestation, - SignedBlockWithAttestation, Slot, SignatureKey, + AggregatedSignatureProof, AggregationBits, Attestation, AttestationData, AttestationSignatures, + Block, BlockSignatures, BlockWithAttestation, Checkpoint, SignatureKey, + SignedAggregatedAttestation, SignedAttestation, SignedBlockWithAttestation, Slot, }; use fork_choice::store::{Store, produce_block_with_signatures}; -use ssz::H256; use metrics::{METRICS, stop_and_discard, stop_and_record}; +use ssz::H256; use ssz::SszHash; use tracing::{info, warn}; use try_from_iterator::TryFromIterator as _; @@ -66,7 +65,11 @@ impl ValidatorService { Self::new_with_aggregator(config, num_validators, false) } - pub fn new_with_aggregator(config: ValidatorConfig, num_validators: u64, is_aggregator: bool) -> Self { + pub fn new_with_aggregator( + config: ValidatorConfig, + num_validators: u64, + is_aggregator: bool, + ) -> Self { info!( node_id = %config.node_id, indices = ?config.validator_indices, @@ -156,7 +159,11 @@ impl ValidatorService { /// Perform aggregation duty if this node is an aggregator (devnet-3) /// Collects signatures from gossip_signatures and creates aggregated attestations /// Returns None if not an aggregator or no signatures to aggregate - pub fn maybe_aggregate(&self, store: &Store, slot: Slot) -> Option> { + pub fn maybe_aggregate( + &self, + store: &Store, + slot: Slot, + ) -> Option> { if !self.is_aggregator_for_slot(slot) { return None; } @@ -186,7 +193,8 @@ impl ValidatorService { // Look up attestation data by its hash (data_root) // This ensures we get the exact attestation that was signed, // matching ream's attestation_data_by_root_provider approach - let Some(attestation_data) = store.attestation_data_by_root.get(&data_root).cloned() else { + let Some(attestation_data) = store.attestation_data_by_root.get(&data_root).cloned() + else { warn!( data_root = %format!("0x{:x}", data_root), "Could not find attestation data for aggregation group" @@ -307,7 +315,7 @@ impl ValidatorService { let proposer_signature = { let sign_timer = METRICS.get().map(|metrics| { metrics - .lean_pq_signature_attestation_signing_time_seconds + .lean_pq_sig_attestation_signing_time_seconds .start_timer() }); @@ -377,7 +385,7 @@ impl ValidatorService { let signature = if let Some(ref key_manager) = self.key_manager { let _timer = METRICS.get().map(|metrics| { metrics - .lean_pq_signature_attestation_signing_time_seconds + .lean_pq_sig_attestation_signing_time_seconds .start_timer() }); @@ -387,7 +395,7 @@ impl ValidatorService { let _timer = METRICS.get().map(|metrics| { metrics - .lean_pq_signature_attestation_signing_time_seconds + .lean_pq_sig_attestation_signing_time_seconds .start_timer() }); match key_manager.sign(idx, epoch, message) {