Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lean_client/containers/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ impl SignedBlockWithAttestation {

let _timer = METRICS.get().map(|metrics| {
metrics
.lean_pq_signature_attestation_verification_time_seconds
.lean_pq_sig_attestation_verification_time_seconds
.start_timer()
});

Expand Down
14 changes: 8 additions & 6 deletions lean_client/fork_choice/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,10 @@ pub fn update_safe_target(store: &mut Store) {
}

// Extract per-validator attestations from merged payloads
let attestations =
extract_attestations_from_aggregated_payloads(&all_payloads, &store.attestation_data_by_root);
let attestations = extract_attestations_from_aggregated_payloads(
&all_payloads,
&store.attestation_data_by_root,
);

// Run LMD-GHOST with 2/3 threshold to find safe target
let new_safe_target = get_fork_choice_head(store, root, &attestations, min_score);
Expand Down Expand Up @@ -380,10 +382,10 @@ pub fn tick_interval(store: &mut Store, has_proposal: bool) {

match curr_interval {
0 if has_proposal => accept_new_attestations(store), // Interval 0: Block proposal
1 => {} // Interval 1: Attestation phase
2 => {} // Interval 2: Aggregation phase (handled in main.rs)
3 => update_safe_target(store), // Interval 3: Safe target update
4 => accept_new_attestations(store), // Interval 4: Accept attestations
1 => {} // Interval 1: Attestation phase
2 => {} // Interval 2: Aggregation phase (handled in main.rs)
3 => update_safe_target(store), // Interval 3: Safe target update
4 => accept_new_attestations(store), // Interval 4: Accept attestations
_ => {}
}
}
Expand Down
5 changes: 3 additions & 2 deletions lean_client/fork_choice/tests/fork_choice_test_vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,8 +569,9 @@ fn forkchoice(spec_file: &str) {

// Advance time to the block's slot to ensure attestations are processable
// SECONDS_PER_SLOT is 4. Convert to milliseconds for devnet-3
let block_time_millis =
(store.config.genesis_time + (signed_block.message.block.slot.0 * 4)) * 1000;
let block_time_millis = (store.config.genesis_time
+ (signed_block.message.block.slot.0 * 4))
* 1000;
on_tick(&mut store, block_time_millis, false);

on_block(&mut store, signed_block).unwrap();
Expand Down
15 changes: 11 additions & 4 deletions lean_client/fork_choice/tests/unit_tests/time.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use super::common::create_test_store;
use fork_choice::handlers::on_tick;
use fork_choice::store::{INTERVALS_PER_SLOT, MILLIS_PER_INTERVAL, SECONDS_PER_SLOT, tick_interval};
use fork_choice::store::{
INTERVALS_PER_SLOT, MILLIS_PER_INTERVAL, SECONDS_PER_SLOT, tick_interval,
};

#[test]
fn test_on_tick_basic() {
Expand Down Expand Up @@ -32,7 +34,8 @@ fn test_on_tick_already_current() {
let initial_time = store.time;
// on_tick now expects milliseconds (devnet-3)
// Convert initial_time (in intervals) to seconds, then to milliseconds
let current_target_millis = (store.config.genesis_time * 1000) + (initial_time * MILLIS_PER_INTERVAL);
let current_target_millis =
(store.config.genesis_time * 1000) + (initial_time * MILLIS_PER_INTERVAL);

// Try to advance to current time
on_tick(&mut store, current_target_millis, true);
Expand All @@ -47,7 +50,8 @@ fn test_on_tick_small_increment() {
let initial_time = store.time;
// on_tick now expects milliseconds (devnet-3)
// Advance by just 1 second (1000ms)
let target_time_millis = (store.config.genesis_time * 1000) + (initial_time * MILLIS_PER_INTERVAL) + 1000;
let target_time_millis =
(store.config.genesis_time * 1000) + (initial_time * MILLIS_PER_INTERVAL) + 1000;

on_tick(&mut store, target_time_millis, false);

Expand Down Expand Up @@ -177,7 +181,10 @@ fn test_millis_per_interval() {
assert_eq!(MILLIS_PER_INTERVAL, 800);

// Verify no integer truncation: 4 * 1000 = 5 * 800
assert_eq!(SECONDS_PER_SLOT * 1000, MILLIS_PER_INTERVAL * INTERVALS_PER_SLOT);
assert_eq!(
SECONDS_PER_SLOT * 1000,
MILLIS_PER_INTERVAL * INTERVALS_PER_SLOT
);
}

#[test]
Expand Down
4 changes: 2 additions & 2 deletions lean_client/http_api/src/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use metrics::metrics_module;

use crate::config::HttpServerConfig;

pub fn normal_routes(config: &HttpServerConfig) -> Router {
pub fn normal_routes(config: &HttpServerConfig, genesis_time: u64) -> Router {
let mut router = Router::new();

if config.metrics_enabled() {
router = router.merge(metrics_module(config.metrics.clone()));
router = router.merge(metrics_module(config.metrics.clone(), genesis_time));
}

router
Expand Down
4 changes: 2 additions & 2 deletions lean_client/http_api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use tracing::info;

use crate::{config::HttpServerConfig, routing::normal_routes};

pub async fn run_server(config: HttpServerConfig) -> Result<()> {
let router = normal_routes(&config);
pub async fn run_server(config: HttpServerConfig, genesis_time: u64) -> Result<()> {
let router = normal_routes(&config, genesis_time);

let listener = config
.listener()
Expand Down
27 changes: 12 additions & 15 deletions lean_client/metrics/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ pub struct Metrics {

// PQ Signature metrics
/// Time taken to sign an attestation
pub lean_pq_signature_attestation_signing_time_seconds: Histogram,
pub lean_pq_sig_attestation_signing_time_seconds: Histogram,

/// Time taken to verify an attestation signature
pub lean_pq_signature_attestation_verification_time_seconds: Histogram,
pub lean_pq_sig_attestation_verification_time_seconds: Histogram,

/// Total number of aggregated signatures
pub lean_pq_sig_aggregated_signatures_total: IntCounter,
Expand All @@ -46,7 +46,7 @@ pub struct Metrics {
pub lean_head_slot: IntGauge,

/// Current slot of the lean chain
lean_current_slot: IntGauge,
pub lean_current_slot: IntGauge,

/// Safe target slot
pub lean_safe_target_slot: IntGauge,
Expand Down Expand Up @@ -125,16 +125,14 @@ impl Metrics {
)?,

// PQ Signature metrics
lean_pq_signature_attestation_signing_time_seconds: Histogram::with_opts(
histogram_opts!(
"lean_pq_signature_attestation_signing_time_seconds",
"Time taken to sign an attestation",
vec![0.005, 0.01, 0.025, 0.05, 0.1, 1.0],
),
)?,
lean_pq_signature_attestation_verification_time_seconds: Histogram::with_opts(
lean_pq_sig_attestation_signing_time_seconds: Histogram::with_opts(histogram_opts!(
"lean_pq_sig_attestation_signing_time_seconds",
"Time taken to sign an attestation",
vec![0.005, 0.01, 0.025, 0.05, 0.1, 1.0],
))?,
lean_pq_sig_attestation_verification_time_seconds: Histogram::with_opts(
histogram_opts!(
"lean_pq_signature_attestation_verification_time_seconds",
"lean_pq_sig_attestation_verification_time_seconds",
"Time taken to verify an attestation signature",
vec![0.005, 0.01, 0.025, 0.05, 0.1, 1.0],
),
Expand Down Expand Up @@ -296,11 +294,10 @@ impl Metrics {
default_registry.register(Box::new(self.lean_node_info.clone()))?;
default_registry.register(Box::new(self.lean_node_start_time_seconds.clone()))?;
default_registry.register(Box::new(
self.lean_pq_signature_attestation_signing_time_seconds
.clone(),
self.lean_pq_sig_attestation_signing_time_seconds.clone(),
))?;
default_registry.register(Box::new(
self.lean_pq_signature_attestation_verification_time_seconds
self.lean_pq_sig_attestation_verification_time_seconds
.clone(),
))?;
default_registry.register(Box::new(
Expand Down
29 changes: 25 additions & 4 deletions lean_client/metrics/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use std::{error::Error as StdError, time::Duration};
use std::{
error::Error as StdError,
time::{Duration, SystemTime, UNIX_EPOCH},
};

use anyhow::{Error as AnyhowError, Result};
use axum::{
Router,
extract::State,
http::StatusCode,
response::{IntoResponse, Response},
routing::get,
Expand All @@ -13,6 +17,8 @@ use prometheus::TextEncoder;
use thiserror::Error;
use tower_http::cors::AllowOrigin;

use crate::METRICS;

#[derive(Clone, Debug, Args)]
pub struct MetricsServerConfig {
#[arg(long = "metrics-timeout", default_value_t = Self::default().timeout, requires = "metrics_enabled")]
Expand Down Expand Up @@ -63,8 +69,10 @@ impl ApiError for Error {
}
}

pub fn metrics_module(config: MetricsServerConfig) -> Router {
let router = Router::new().route("/metrics", get(get_metrics));
pub fn metrics_module(config: MetricsServerConfig, genesis_time: u64) -> Router {
let router = Router::new()
.route("/metrics", get(get_metrics))
.with_state(genesis_time);

let router = http_api_utils::extend_router_with_middleware::<Error>(
router,
Expand All @@ -77,9 +85,22 @@ pub fn metrics_module(config: MetricsServerConfig) -> Router {
}

/// `GET /metrics`
async fn get_metrics() -> Result<String, Error> {
async fn get_metrics(State(genesis_time): State<u64>) -> Result<String, Error> {
let mut buffer = String::new();

METRICS.get().map(|metrics| {
let time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;

let since_genesis = time.saturating_sub(genesis_time * 1000);
// TODO: 4000 should be replaced with constant MILLIS_PER_SLOT
let slot = since_genesis / 4000;

metrics.lean_current_slot.set(slot as i64);
});

TextEncoder::new()
.encode_utf8(prometheus::gather().as_slice(), &mut buffer)
.map_err(AnyhowError::new)?;
Expand Down
5 changes: 4 additions & 1 deletion lean_client/networking/src/gossipsub/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use ssz::SszReadDefault as _;
pub enum GossipsubMessage {
Block(SignedBlockWithAttestation),
/// Attestation from a specific subnet (devnet-3)
AttestationSubnet { subnet_id: u64, attestation: SignedAttestation },
AttestationSubnet {
subnet_id: u64,
attestation: SignedAttestation,
},
/// Aggregated attestation (devnet-3)
Aggregation(SignedAggregatedAttestation),
}
Expand Down
16 changes: 13 additions & 3 deletions lean_client/networking/src/network/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,10 @@ where
);
}
}
Ok(GossipsubMessage::AttestationSubnet { subnet_id, attestation }) => {
Ok(GossipsubMessage::AttestationSubnet {
subnet_id,
attestation,
}) => {
info!(
validator = %attestation.validator_id,
slot = %attestation.message.slot.0,
Expand All @@ -514,7 +517,9 @@ where
})
.await
{
warn!("failed to send subnet attestation for slot {slot} to chain: {err:?}");
warn!(
"failed to send subnet attestation for slot {slot} to chain: {err:?}"
);
}
}
Ok(GossipsubMessage::Aggregation(signed_aggregated_attestation)) => {
Expand Down Expand Up @@ -751,7 +756,12 @@ where
// Devnet-3: Publish to subnet-specific topic only
let topic_kind = GossipsubKind::AttestationSubnet(subnet_id);
if let Err(err) = self.publish_to_topic(topic_kind, bytes) {
warn!(slot = slot, subnet_id = subnet_id, ?err, "Publish attestation to subnet failed");
warn!(
slot = slot,
subnet_id = subnet_id,
?err,
"Publish attestation to subnet failed"
);
} else {
info!(
slot = slot,
Expand Down
20 changes: 16 additions & 4 deletions lean_client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,15 +295,23 @@ async fn main() -> Result<()> {
"Failed to load XMSS keys: {}, falling back to zero signatures",
e
);
Some(ValidatorService::new_with_aggregator(config, num_validators, args.is_aggregator))
Some(ValidatorService::new_with_aggregator(
config,
num_validators,
args.is_aggregator,
))
}
}
} else {
warn!(
"Hash-sig key directory not found: {:?}, using zero signatures",
keys_path
);
Some(ValidatorService::new_with_aggregator(config, num_validators, args.is_aggregator))
Some(ValidatorService::new_with_aggregator(
config,
num_validators,
args.is_aggregator,
))
}
} else {
info!(
Expand All @@ -312,7 +320,11 @@ async fn main() -> Result<()> {
aggregator = args.is_aggregator,
"Validator mode enabled (no --hash-sig-key-dir specified - using zero signatures)"
);
Some(ValidatorService::new_with_aggregator(config, num_validators, args.is_aggregator))
Some(ValidatorService::new_with_aggregator(
config,
num_validators,
args.is_aggregator,
))
}
}
Err(e) => {
Expand Down Expand Up @@ -395,7 +407,7 @@ async fn main() -> Result<()> {

task::spawn(async move {
if args.http_config.metrics_enabled() {
if let Err(err) = http_api::run_server(args.http_config).await {
if let Err(err) = http_api::run_server(args.http_config, genesis_time).await {
error!("HTTP Server failed with error: {err:?}");
}
}
Expand Down
Loading
Loading