From 39238ab48af3a9f6491c4594867d8e9cc5220098 Mon Sep 17 00:00:00 2001 From: bomanaps Date: Fri, 27 Mar 2026 12:19:37 +0100 Subject: [PATCH 1/2] Fix five OOM root causes and add detection metrics --- lean_client/fork_choice/src/handlers.rs | 194 ++++++++++++------ lean_client/fork_choice/src/store.rs | 19 +- lean_client/metrics/src/metrics.rs | 62 ++++++ lean_client/networking/src/network/service.rs | 6 +- lean_client/src/main.rs | 11 +- 5 files changed, 221 insertions(+), 71 deletions(-) diff --git a/lean_client/fork_choice/src/handlers.rs b/lean_client/fork_choice/src/handlers.rs index eb7d824..857fc06 100644 --- a/lean_client/fork_choice/src/handlers.rs +++ b/lean_client/fork_choice/src/handlers.rs @@ -35,32 +35,19 @@ pub fn on_tick(store: &mut Store, time_millis: u64, has_proposal: bool) { /// 3. A vote cannot be for a future slot. /// 4. Checkpoint slots must match block slots. fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<()> { - // Cannot count a vote if we haven't seen the blocks involved - ensure!( - store.blocks.contains_key(&data.source.root), - "Unknown source block: {:?}", - data.source.root - ); - - ensure!( - store.blocks.contains_key(&data.target.root), - "Unknown target block: {:?}", - &data.target.root - ); - - ensure!( - store.blocks.contains_key(&data.head.root), - "Unknown head block: {:?}", - &data.head.root - ); - - // Source must be older than Target. + // Topology: history is linear and monotonic — source <= target <= head (store.py:310-311). ensure!( data.source.slot <= data.target.slot, "Source checkpoint slot {} must not exceed target slot {}", data.source.slot.0, data.target.slot.0 ); + ensure!( + data.head.slot >= data.target.slot, + "Head slot {} must not be older than target slot {}", + data.head.slot.0, + data.target.slot.0 + ); // Validate checkpoint slots match block slots. // Skip the source slot-match when the root is the store's known justified or @@ -69,6 +56,7 @@ fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<() // slot from the downloaded state (e.g. 100994 vs anchor 101002). let source_block = &store.blocks[&data.source.root]; let target_block = &store.blocks[&data.target.root]; + let head_block = &store.blocks[&data.head.root]; let source_is_trusted_checkpoint = data.source.root == store.latest_justified.root || data.source.root == store.latest_finalized.root; @@ -89,6 +77,13 @@ fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<() target_block.slot.0 ); + ensure!( + head_block.slot == data.head.slot, + "Head checkpoint slot mismatch: checkpoint {} vs block {}", + data.head.slot.0, + head_block.slot.0 + ); + // Validate attestation is not too far in the future // We allow a small margin for clock disparity (1 slot), but no further. let current_slot = store.time / INTERVALS_PER_SLOT; @@ -147,6 +142,10 @@ pub fn on_gossip_attestation( .or_default() .push(signed_attestation); store.pending_fetch_roots.insert(missing_root); + METRICS.get().map(|m| { + m.lean_pending_fetch_roots + .set(store.pending_fetch_roots.len() as i64) + }); return Ok(()); } @@ -160,51 +159,61 @@ pub fn on_gossip_attestation( }); })?; - // Verify individual XMSS signature against the validator's public key. - // State is available: the pending-block check above confirmed target.root is in the store, - // and states are stored 1:1 with blocks in process_block_internal. - let key_state = store - .states - .get(&attestation_data.target.root) - .ok_or_else(|| anyhow!("no state for target block {}", attestation_data.target.root))?; + let data_root = attestation_data.hash_tree_root(); + let sig_key = SignatureKey::new(signed_attestation.validator_id, data_root); - ensure!( - validator_id < key_state.validators.len_u64(), - "validator {} out of range (max {})", - validator_id, - key_state.validators.len_u64() - ); + // Skip expensive XMSS verification for already-known signatures. + // Duplicate attestations arrive when the IDontWant buffer fills under CPU load, + // causing peers to rebroadcast. Each verify costs ~100ms; the early exit breaks + // the saturation loop without dropping vote data. + if !store.gossip_signatures.contains_key(&sig_key) { + // Verify individual XMSS signature against the validator's public key. + // State is available: the pending-block check above confirmed target.root is in + // the store, and states are stored 1:1 with blocks in process_block_internal. + let key_state = store + .states + .get(&attestation_data.target.root) + .ok_or_else(|| anyhow!("no state for target block {}", attestation_data.target.root))?; - let pubkey = key_state - .validators - .get(validator_id) - .map(|v| v.pubkey.clone()) - .map_err(|e| anyhow!("{e}"))?; + ensure!( + validator_id < key_state.validators.len_u64(), + "validator {} out of range (max {})", + validator_id, + key_state.validators.len_u64() + ); - let data_root = attestation_data.hash_tree_root(); + let pubkey = key_state + .validators + .get(validator_id) + .map(|v| v.pubkey.clone()) + .map_err(|e| anyhow!("{e}"))?; - signed_attestation - .signature - .verify(&pubkey, attestation_data.slot.0 as u32, data_root) - .context("individual attestation signature verification failed")?; + signed_attestation + .signature + .verify(&pubkey, attestation_data.slot.0 as u32, data_root) + .context("individual attestation signature verification failed")?; - // Store verified signature for later lookup during block building - let sig_key = SignatureKey::new(signed_attestation.validator_id, data_root); - store - .gossip_signatures - .insert(sig_key, signed_attestation.signature); + store + .gossip_signatures + .insert(sig_key, signed_attestation.signature); - // Update gossip signatures gauge - METRICS.get().map(|metrics| { - metrics - .lean_gossip_signatures - .set(store.gossip_signatures.len() as i64); - }); + // Update gossip signatures gauge + METRICS.get().map(|metrics| { + metrics + .lean_gossip_signatures + .set(store.gossip_signatures.len() as i64); + }); + } else { + METRICS.get().map(|m| m.lean_xmss_verify_skipped_total.inc()); + } - // Store attestation data indexed by hash for aggregation lookup store .attestation_data_by_root .insert(data_root, attestation_data.clone()); + METRICS.get().map(|m| { + m.lean_attestation_data_by_root + .set(store.attestation_data_by_root.len() as i64) + }); // Process the attestation data (not from block) on_attestation_internal(store, validator_id, attestation_data, false) @@ -249,17 +258,24 @@ pub fn on_attestation( let validator_id = signed_attestation.validator_id; let attestation_data = signed_attestation.message.clone(); - // Queue gossip attestations if any referenced block is not yet in the store. - if !is_from_block { - if let Some(missing_root) = find_unknown_attestation_block(store, &attestation_data) { - store - .pending_attestations - .entry(missing_root) - .or_default() - .push(signed_attestation); - store.pending_fetch_roots.insert(missing_root); - return Ok(()); + // All three referenced roots (source, target, head) must be in the store. + // For gossip: queue as pending if a block is missing, to be retried on arrival. + // For block-body: a missing root means the block itself is invalid — reject it. + if let Some(missing_root) = find_unknown_attestation_block(store, &attestation_data) { + if is_from_block { + bail!("block-body attestation references unknown block root {missing_root}"); } + store + .pending_attestations + .entry(missing_root) + .or_default() + .push(signed_attestation); + store.pending_fetch_roots.insert(missing_root); + METRICS.get().map(|m| { + m.lean_pending_fetch_roots + .set(store.pending_fetch_roots.len() as i64) + }); + return Ok(()); } // Validate attestation data @@ -334,17 +350,24 @@ pub fn on_aggregated_attestation( .or_default() .push(signed_aggregated_attestation); store.pending_fetch_roots.insert(missing_root); + METRICS.get().map(|m| { + m.lean_pending_fetch_roots + .set(store.pending_fetch_roots.len() as i64) + }); return Ok(()); } // Validate attestation data (slot bounds, target validity, etc.) validate_attestation_data(store, &attestation_data)?; - // Store attestation data indexed by hash for later extraction let data_root = attestation_data.hash_tree_root(); store .attestation_data_by_root .insert(data_root, attestation_data.clone()); + METRICS.get().map(|m| { + m.lean_attestation_data_by_root + .set(store.attestation_data_by_root.len() as i64) + }); // Verify aggregated XMSS proof against participant public keys. // State is available: the pending-block check above confirmed target.root is in the store, @@ -444,6 +467,12 @@ fn on_attestation_internal( .insert(validator_id, attestation_data); } } + METRICS.get().map(|m| { + m.lean_fork_choice_known_attestations + .set(store.latest_known_attestations.len() as i64); + m.lean_fork_choice_new_attestations + .set(store.latest_new_attestations.len() as i64); + }); Ok(()) } @@ -595,6 +624,31 @@ fn process_block_internal( .saturating_sub(STATE_PRUNE_BUFFER); store.states.retain(|_, state| state.slot.0 >= keep_from); store.blocks.retain(|_, block| block.slot.0 >= keep_from); + + // Prune stale attestation data — mirrors leanSpec store.prune_stale_attestation_data() + // (store.py:230-278), called at store.py:565-566 whenever finalization advances. + // Criterion: target.slot <= finalized_slot → stale, no longer affects fork choice. + // attestation_data_by_root is the secondary index used for target.slot lookup and + // must be pruned last so the three retain calls above can still resolve target.slot. + let finalized_slot = store.latest_finalized.slot.0; + let adr = &store.attestation_data_by_root; + store.gossip_signatures.retain(|key, _| { + adr.get(&key.data_root) + .map_or(true, |data| data.target.slot.0 > finalized_slot) + }); + store.latest_known_aggregated_payloads.retain(|key, _| { + adr.get(&key.data_root) + .map_or(true, |data| data.target.slot.0 > finalized_slot) + }); + store.latest_new_aggregated_payloads.retain(|key, _| { + adr.get(&key.data_root) + .map_or(true, |data| data.target.slot.0 > finalized_slot) + }); + store.attestation_data_by_root.retain(|_, data| data.target.slot.0 > finalized_slot); + METRICS.get().map(|m| { + m.lean_attestation_data_by_root + .set(store.attestation_data_by_root.len() as i64) + }); } if !justified_updated && !finalized_updated { @@ -647,6 +701,10 @@ fn process_block_internal( .lean_latest_known_aggregated_payloads .set(store.latest_known_aggregated_payloads.len() as i64); }); + METRICS.get().map(|m| { + m.lean_attestation_data_by_root + .set(store.attestation_data_by_root.len() as i64) + }); // Process each aggregated attestation's validators for fork choice. // Signatures have already been verified above via verify_signatures(). @@ -689,6 +747,10 @@ fn process_block_internal( store .attestation_data_by_root .insert(proposer_data_root, proposer_attestation.data.clone()); + METRICS.get().map(|m| { + m.lean_attestation_data_by_root + .set(store.attestation_data_by_root.len() as i64) + }); // Process proposer attestation as if received via gossip (is_from_block=false) // This ensures it goes to "new" attestations and doesn't immediately affect fork choice diff --git a/lean_client/fork_choice/src/store.rs b/lean_client/fork_choice/src/store.rs index d649e7d..ffff73c 100644 --- a/lean_client/fork_choice/src/store.rs +++ b/lean_client/fork_choice/src/store.rs @@ -7,6 +7,7 @@ use containers::{ SignedBlockWithAttestation, Slot, State, }; use metrics::{METRICS, set_gauge_u64}; +use tracing::warn; use ssz::{H256, SszHash}; use xmss::Signature; @@ -261,7 +262,17 @@ pub fn get_fork_choice_head( .expect("Error: Empty block."); } let mut vote_weights: HashMap = HashMap::new(); - let root_slot = store.blocks[&root].slot; + let root_slot = match store.blocks.get(&root) { + Some(block) => block.slot, + None => { + warn!( + %root, + justified_slot = store.latest_justified.slot.0, + "justified root not in store blocks, returning justified root as head" + ); + return root; + } + }; // stage 1: accumulate weights by walking up from each attestation's head for attestation_data in latest_attestations.values() { @@ -498,6 +509,12 @@ pub fn accept_new_attestations(store: &mut Store) { .latest_known_attestations .extend(store.latest_new_attestations.drain()); update_head(store); + METRICS.get().map(|m| { + m.lean_fork_choice_known_attestations + .set(store.latest_known_attestations.len() as i64); + m.lean_fork_choice_new_attestations + .set(store.latest_new_attestations.len() as i64); + }); } pub fn tick_interval(store: &mut Store, has_proposal: bool) { diff --git a/lean_client/metrics/src/metrics.rs b/lean_client/metrics/src/metrics.rs index 36be6ce..9a94154 100644 --- a/lean_client/metrics/src/metrics.rs +++ b/lean_client/metrics/src/metrics.rs @@ -140,6 +140,29 @@ pub struct Metrics { /// Number of attestation committees (ATTESTATION_COMMITTEE_COUNT) pub lean_attestation_committee_count: IntGauge, + + // OOM Detection Metrics + + /// Number of entries in the attestation_data_by_root secondary index + pub lean_attestation_data_by_root: IntGauge, + + /// Number of block roots queued for BlocksByRoot fetch (missing-block backlog) + pub lean_pending_fetch_roots: IntGauge, + + /// Number of orphan blocks held in the backfill BlockCache (hard-capped at 1024) + pub lean_block_cache_size: IntGauge, + + /// Gap between the network's current slot and the node's head slot (backfill depth) + pub lean_slots_behind: IntGauge, + + /// Validators with finalized (known) attestations currently in the fork-choice store + pub lean_fork_choice_known_attestations: IntGauge, + + /// Validators with pending (new/gossip) attestations in the fork-choice store + pub lean_fork_choice_new_attestations: IntGauge, + + /// XMSS verifications skipped because the signature was already cached + pub lean_xmss_verify_skipped_total: IntCounter, } impl Metrics { @@ -361,6 +384,36 @@ impl Metrics { "lean_attestation_committee_count", "Number of attestation committees (ATTESTATION_COMMITTEE_COUNT)", )?, + + // OOM Detection Metrics + lean_attestation_data_by_root: IntGauge::new( + "lean_attestation_data_by_root", + "Number of entries in attestation_data_by_root secondary index", + )?, + lean_pending_fetch_roots: IntGauge::new( + "lean_pending_fetch_roots", + "Block roots queued for BlocksByRoot fetch (missing-block backlog)", + )?, + lean_block_cache_size: IntGauge::new( + "lean_block_cache_size", + "Orphan blocks in backfill BlockCache (hard cap 1024)", + )?, + lean_slots_behind: IntGauge::new( + "lean_slots_behind", + "Current slot minus head slot — backfill depth and primary OOM risk indicator", + )?, + lean_fork_choice_known_attestations: IntGauge::new( + "lean_fork_choice_known_attestations", + "Validators with known attestations in the fork-choice store", + )?, + lean_fork_choice_new_attestations: IntGauge::new( + "lean_fork_choice_new_attestations", + "Validators with new gossip attestations in the fork-choice store", + )?, + lean_xmss_verify_skipped_total: IntCounter::new( + "lean_xmss_verify_skipped_total", + "XMSS verifications skipped (signature already cached) — root cause 4 indicator", + )?, }) } @@ -462,6 +515,15 @@ impl Metrics { default_registry.register(Box::new(self.lean_attestation_committee_subnet.clone()))?; default_registry.register(Box::new(self.lean_attestation_committee_count.clone()))?; + // OOM Detection Metrics + default_registry.register(Box::new(self.lean_attestation_data_by_root.clone()))?; + default_registry.register(Box::new(self.lean_pending_fetch_roots.clone()))?; + default_registry.register(Box::new(self.lean_block_cache_size.clone()))?; + default_registry.register(Box::new(self.lean_slots_behind.clone()))?; + default_registry.register(Box::new(self.lean_fork_choice_known_attestations.clone()))?; + default_registry.register(Box::new(self.lean_fork_choice_new_attestations.clone()))?; + default_registry.register(Box::new(self.lean_xmss_verify_skipped_total.clone()))?; + Ok(()) } diff --git a/lean_client/networking/src/network/service.rs b/lean_client/networking/src/network/service.rs index 866c9d1..544c2cd 100644 --- a/lean_client/networking/src/network/service.rs +++ b/lean_client/networking/src/network/service.rs @@ -267,8 +267,8 @@ where let behaviour = Self::build_behaviour(&local_key, &network_config)?; let config = Config::with_tokio_executor() - .with_notify_handler_buffer_size(NonZeroUsize::new(7).unwrap()) - .with_per_connection_event_buffer_size(4) + .with_notify_handler_buffer_size(NonZeroUsize::new(256).unwrap()) + .with_per_connection_event_buffer_size(64) .with_dial_concurrency_factor(NonZeroU8::new(1).unwrap()); let multiaddr = Self::multiaddr(&network_config)?; @@ -623,7 +623,7 @@ where .send(ChainMessage::ProcessAggregation { signed_aggregated_attestation, is_trusted: false, - should_gossip: true, + should_gossip: false, }) .await { diff --git a/lean_client/src/main.rs b/lean_client/src/main.rs index 92a1df2..b395ad6 100644 --- a/lean_client/src/main.rs +++ b/lean_client/src/main.rs @@ -231,6 +231,8 @@ fn print_chain_status(store: &Store, connected_peers: u64) { let timely = behind == 0; + METRICS.get().map(|m| m.lean_slots_behind.set(behind as i64)); + println!("\n+===============================================================+"); println!( " CHAIN STATUS: Current Slot: {} | Head Slot: {} | Behind: {}", @@ -1082,6 +1084,7 @@ async fn main() -> Result<()> { 0, ); block_cache.mark_orphan(block_root); + METRICS.get().map(|m| m.lean_block_cache_size.set(block_cache.len() as i64)); store.write().pending_fetch_roots.insert(parent_root); @@ -1093,6 +1096,7 @@ async fn main() -> Result<()> { ); let missing: Vec = store.write().pending_fetch_roots.drain().collect(); + METRICS.get().map(|m| m.lean_pending_fetch_roots.set(0)); if !missing.is_empty() { if let Err(req_err) = outbound_p2p_sender.send( OutboundP2pRequest::RequestBlocksByRoot(missing) @@ -1141,8 +1145,11 @@ async fn main() -> Result<()> { Err(e) => warn!("Problem processing block: {}", e), } + METRICS.get().map(|m| m.lean_block_cache_size.set(block_cache.len() as i64)); + // Drain block roots queued by retried attestations inside on_block. let missing: Vec = store.write().pending_fetch_roots.drain().collect(); + METRICS.get().map(|m| m.lean_pending_fetch_roots.set(0)); if !missing.is_empty() { if let Err(e) = outbound_p2p_sender.send( OutboundP2pRequest::RequestBlocksByRoot(missing) @@ -1195,6 +1202,7 @@ async fn main() -> Result<()> { } let missing: Vec = store.write().pending_fetch_roots.drain().collect(); + METRICS.get().map(|m| m.lean_pending_fetch_roots.set(0)); if !missing.is_empty() { if let Err(e) = outbound_p2p_sender.send( OutboundP2pRequest::RequestBlocksByRoot(missing) @@ -1208,7 +1216,7 @@ async fn main() -> Result<()> { is_trusted, should_gossip, } => { - if should_gossip && !is_trusted && !sync_state.accepts_gossip() { + if !is_trusted && !sync_state.accepts_gossip() { debug!( state = ?sync_state, slot = signed_aggregated_attestation.data.slot.0, @@ -1244,6 +1252,7 @@ async fn main() -> Result<()> { } let missing: Vec = store.write().pending_fetch_roots.drain().collect(); + METRICS.get().map(|m| m.lean_pending_fetch_roots.set(0)); if !missing.is_empty() { if let Err(e) = outbound_p2p_sender.send( OutboundP2pRequest::RequestBlocksByRoot(missing) From 59615d00e21c98cab589cb5089ce767d7efafd5b Mon Sep 17 00:00:00 2001 From: bomanaps Date: Mon, 30 Mar 2026 15:01:14 +0100 Subject: [PATCH 2/2] address review comments --- lean_client/fork_choice/src/handlers.rs | 22 ++++---- lean_client/fork_choice/src/store.rs | 4 +- lean_client/metrics/src/metrics.rs | 56 +++++++++---------- lean_client/networking/src/network/service.rs | 4 +- lean_client/src/main.rs | 16 +++--- 5 files changed, 52 insertions(+), 50 deletions(-) diff --git a/lean_client/fork_choice/src/handlers.rs b/lean_client/fork_choice/src/handlers.rs index 857fc06..3bfcfb8 100644 --- a/lean_client/fork_choice/src/handlers.rs +++ b/lean_client/fork_choice/src/handlers.rs @@ -143,7 +143,7 @@ pub fn on_gossip_attestation( .push(signed_attestation); store.pending_fetch_roots.insert(missing_root); METRICS.get().map(|m| { - m.lean_pending_fetch_roots + m.grandine_pending_fetch_roots .set(store.pending_fetch_roots.len() as i64) }); return Ok(()); @@ -204,14 +204,14 @@ pub fn on_gossip_attestation( .set(store.gossip_signatures.len() as i64); }); } else { - METRICS.get().map(|m| m.lean_xmss_verify_skipped_total.inc()); + METRICS.get().map(|m| m.grandine_xmss_verify_skipped_total.inc()); } store .attestation_data_by_root .insert(data_root, attestation_data.clone()); METRICS.get().map(|m| { - m.lean_attestation_data_by_root + m.grandine_attestation_data_by_root .set(store.attestation_data_by_root.len() as i64) }); @@ -272,7 +272,7 @@ pub fn on_attestation( .push(signed_attestation); store.pending_fetch_roots.insert(missing_root); METRICS.get().map(|m| { - m.lean_pending_fetch_roots + m.grandine_pending_fetch_roots .set(store.pending_fetch_roots.len() as i64) }); return Ok(()); @@ -351,7 +351,7 @@ pub fn on_aggregated_attestation( .push(signed_aggregated_attestation); store.pending_fetch_roots.insert(missing_root); METRICS.get().map(|m| { - m.lean_pending_fetch_roots + m.grandine_pending_fetch_roots .set(store.pending_fetch_roots.len() as i64) }); return Ok(()); @@ -365,7 +365,7 @@ pub fn on_aggregated_attestation( .attestation_data_by_root .insert(data_root, attestation_data.clone()); METRICS.get().map(|m| { - m.lean_attestation_data_by_root + m.grandine_attestation_data_by_root .set(store.attestation_data_by_root.len() as i64) }); @@ -468,9 +468,9 @@ fn on_attestation_internal( } } METRICS.get().map(|m| { - m.lean_fork_choice_known_attestations + m.grandine_fork_choice_known_attestations .set(store.latest_known_attestations.len() as i64); - m.lean_fork_choice_new_attestations + m.grandine_fork_choice_new_attestations .set(store.latest_new_attestations.len() as i64); }); Ok(()) @@ -646,7 +646,7 @@ fn process_block_internal( }); store.attestation_data_by_root.retain(|_, data| data.target.slot.0 > finalized_slot); METRICS.get().map(|m| { - m.lean_attestation_data_by_root + m.grandine_attestation_data_by_root .set(store.attestation_data_by_root.len() as i64) }); } @@ -702,7 +702,7 @@ fn process_block_internal( .set(store.latest_known_aggregated_payloads.len() as i64); }); METRICS.get().map(|m| { - m.lean_attestation_data_by_root + m.grandine_attestation_data_by_root .set(store.attestation_data_by_root.len() as i64) }); @@ -748,7 +748,7 @@ fn process_block_internal( .attestation_data_by_root .insert(proposer_data_root, proposer_attestation.data.clone()); METRICS.get().map(|m| { - m.lean_attestation_data_by_root + m.grandine_attestation_data_by_root .set(store.attestation_data_by_root.len() as i64) }); diff --git a/lean_client/fork_choice/src/store.rs b/lean_client/fork_choice/src/store.rs index ffff73c..c8444df 100644 --- a/lean_client/fork_choice/src/store.rs +++ b/lean_client/fork_choice/src/store.rs @@ -510,9 +510,9 @@ pub fn accept_new_attestations(store: &mut Store) { .extend(store.latest_new_attestations.drain()); update_head(store); METRICS.get().map(|m| { - m.lean_fork_choice_known_attestations + m.grandine_fork_choice_known_attestations .set(store.latest_known_attestations.len() as i64); - m.lean_fork_choice_new_attestations + m.grandine_fork_choice_new_attestations .set(store.latest_new_attestations.len() as i64); }); } diff --git a/lean_client/metrics/src/metrics.rs b/lean_client/metrics/src/metrics.rs index 9a94154..5940d8d 100644 --- a/lean_client/metrics/src/metrics.rs +++ b/lean_client/metrics/src/metrics.rs @@ -144,25 +144,25 @@ pub struct Metrics { // OOM Detection Metrics /// Number of entries in the attestation_data_by_root secondary index - pub lean_attestation_data_by_root: IntGauge, + pub grandine_attestation_data_by_root: IntGauge, /// Number of block roots queued for BlocksByRoot fetch (missing-block backlog) - pub lean_pending_fetch_roots: IntGauge, + pub grandine_pending_fetch_roots: IntGauge, /// Number of orphan blocks held in the backfill BlockCache (hard-capped at 1024) - pub lean_block_cache_size: IntGauge, + pub grandine_block_cache_size: IntGauge, /// Gap between the network's current slot and the node's head slot (backfill depth) - pub lean_slots_behind: IntGauge, + pub grandine_slots_behind: IntGauge, /// Validators with finalized (known) attestations currently in the fork-choice store - pub lean_fork_choice_known_attestations: IntGauge, + pub grandine_fork_choice_known_attestations: IntGauge, /// Validators with pending (new/gossip) attestations in the fork-choice store - pub lean_fork_choice_new_attestations: IntGauge, + pub grandine_fork_choice_new_attestations: IntGauge, /// XMSS verifications skipped because the signature was already cached - pub lean_xmss_verify_skipped_total: IntCounter, + pub grandine_xmss_verify_skipped_total: IntCounter, } impl Metrics { @@ -386,32 +386,32 @@ impl Metrics { )?, // OOM Detection Metrics - lean_attestation_data_by_root: IntGauge::new( - "lean_attestation_data_by_root", + grandine_attestation_data_by_root: IntGauge::new( + "grandine_attestation_data_by_root", "Number of entries in attestation_data_by_root secondary index", )?, - lean_pending_fetch_roots: IntGauge::new( - "lean_pending_fetch_roots", + grandine_pending_fetch_roots: IntGauge::new( + "grandine_pending_fetch_roots", "Block roots queued for BlocksByRoot fetch (missing-block backlog)", )?, - lean_block_cache_size: IntGauge::new( - "lean_block_cache_size", + grandine_block_cache_size: IntGauge::new( + "grandine_block_cache_size", "Orphan blocks in backfill BlockCache (hard cap 1024)", )?, - lean_slots_behind: IntGauge::new( - "lean_slots_behind", + grandine_slots_behind: IntGauge::new( + "grandine_slots_behind", "Current slot minus head slot — backfill depth and primary OOM risk indicator", )?, - lean_fork_choice_known_attestations: IntGauge::new( - "lean_fork_choice_known_attestations", + grandine_fork_choice_known_attestations: IntGauge::new( + "grandine_fork_choice_known_attestations", "Validators with known attestations in the fork-choice store", )?, - lean_fork_choice_new_attestations: IntGauge::new( - "lean_fork_choice_new_attestations", + grandine_fork_choice_new_attestations: IntGauge::new( + "grandine_fork_choice_new_attestations", "Validators with new gossip attestations in the fork-choice store", )?, - lean_xmss_verify_skipped_total: IntCounter::new( - "lean_xmss_verify_skipped_total", + grandine_xmss_verify_skipped_total: IntCounter::new( + "grandine_xmss_verify_skipped_total", "XMSS verifications skipped (signature already cached) — root cause 4 indicator", )?, }) @@ -516,13 +516,13 @@ impl Metrics { default_registry.register(Box::new(self.lean_attestation_committee_count.clone()))?; // OOM Detection Metrics - default_registry.register(Box::new(self.lean_attestation_data_by_root.clone()))?; - default_registry.register(Box::new(self.lean_pending_fetch_roots.clone()))?; - default_registry.register(Box::new(self.lean_block_cache_size.clone()))?; - default_registry.register(Box::new(self.lean_slots_behind.clone()))?; - default_registry.register(Box::new(self.lean_fork_choice_known_attestations.clone()))?; - default_registry.register(Box::new(self.lean_fork_choice_new_attestations.clone()))?; - default_registry.register(Box::new(self.lean_xmss_verify_skipped_total.clone()))?; + default_registry.register(Box::new(self.grandine_attestation_data_by_root.clone()))?; + default_registry.register(Box::new(self.grandine_pending_fetch_roots.clone()))?; + default_registry.register(Box::new(self.grandine_block_cache_size.clone()))?; + default_registry.register(Box::new(self.grandine_slots_behind.clone()))?; + default_registry.register(Box::new(self.grandine_fork_choice_known_attestations.clone()))?; + default_registry.register(Box::new(self.grandine_fork_choice_new_attestations.clone()))?; + default_registry.register(Box::new(self.grandine_xmss_verify_skipped_total.clone()))?; Ok(()) } diff --git a/lean_client/networking/src/network/service.rs b/lean_client/networking/src/network/service.rs index 544c2cd..54719ca 100644 --- a/lean_client/networking/src/network/service.rs +++ b/lean_client/networking/src/network/service.rs @@ -267,8 +267,8 @@ where let behaviour = Self::build_behaviour(&local_key, &network_config)?; let config = Config::with_tokio_executor() - .with_notify_handler_buffer_size(NonZeroUsize::new(256).unwrap()) - .with_per_connection_event_buffer_size(64) + .with_notify_handler_buffer_size(NonZeroUsize::new(7).unwrap()) + .with_per_connection_event_buffer_size(4) .with_dial_concurrency_factor(NonZeroU8::new(1).unwrap()); let multiaddr = Self::multiaddr(&network_config)?; diff --git a/lean_client/src/main.rs b/lean_client/src/main.rs index b395ad6..fc6ae90 100644 --- a/lean_client/src/main.rs +++ b/lean_client/src/main.rs @@ -231,7 +231,9 @@ fn print_chain_status(store: &Store, connected_peers: u64) { let timely = behind == 0; - METRICS.get().map(|m| m.lean_slots_behind.set(behind as i64)); + METRICS + .get() + .map(|m| m.grandine_slots_behind.set(behind as i64)); println!("\n+===============================================================+"); println!( @@ -1084,7 +1086,7 @@ async fn main() -> Result<()> { 0, ); block_cache.mark_orphan(block_root); - METRICS.get().map(|m| m.lean_block_cache_size.set(block_cache.len() as i64)); + METRICS.get().map(|m| m.grandine_block_cache_size.set(block_cache.len() as i64)); store.write().pending_fetch_roots.insert(parent_root); @@ -1096,7 +1098,7 @@ async fn main() -> Result<()> { ); let missing: Vec = store.write().pending_fetch_roots.drain().collect(); - METRICS.get().map(|m| m.lean_pending_fetch_roots.set(0)); + METRICS.get().map(|m| m.grandine_pending_fetch_roots.set(0)); if !missing.is_empty() { if let Err(req_err) = outbound_p2p_sender.send( OutboundP2pRequest::RequestBlocksByRoot(missing) @@ -1145,11 +1147,11 @@ async fn main() -> Result<()> { Err(e) => warn!("Problem processing block: {}", e), } - METRICS.get().map(|m| m.lean_block_cache_size.set(block_cache.len() as i64)); + METRICS.get().map(|m| m.grandine_block_cache_size.set(block_cache.len() as i64)); // Drain block roots queued by retried attestations inside on_block. let missing: Vec = store.write().pending_fetch_roots.drain().collect(); - METRICS.get().map(|m| m.lean_pending_fetch_roots.set(0)); + METRICS.get().map(|m| m.grandine_pending_fetch_roots.set(0)); if !missing.is_empty() { if let Err(e) = outbound_p2p_sender.send( OutboundP2pRequest::RequestBlocksByRoot(missing) @@ -1202,7 +1204,7 @@ async fn main() -> Result<()> { } let missing: Vec = store.write().pending_fetch_roots.drain().collect(); - METRICS.get().map(|m| m.lean_pending_fetch_roots.set(0)); + METRICS.get().map(|m| m.grandine_pending_fetch_roots.set(0)); if !missing.is_empty() { if let Err(e) = outbound_p2p_sender.send( OutboundP2pRequest::RequestBlocksByRoot(missing) @@ -1252,7 +1254,7 @@ async fn main() -> Result<()> { } let missing: Vec = store.write().pending_fetch_roots.drain().collect(); - METRICS.get().map(|m| m.lean_pending_fetch_roots.set(0)); + METRICS.get().map(|m| m.grandine_pending_fetch_roots.set(0)); if !missing.is_empty() { if let Err(e) = outbound_p2p_sender.send( OutboundP2pRequest::RequestBlocksByRoot(missing)