diff --git a/lean_client/containers/src/state.rs b/lean_client/containers/src/state.rs index 412b346..34d825c 100644 --- a/lean_client/containers/src/state.rs +++ b/lean_client/containers/src/state.rs @@ -502,7 +502,7 @@ impl State { continue; } - if !target.slot.is_justifiable_after(self.latest_finalized.slot) { + if !target.slot.is_justifiable_after(finalized_slot) { info!("skipping attestation, target slot is not yet justifiable"); continue; } @@ -538,7 +538,7 @@ impl State { justifications.remove(&target.root); if !(source.slot.0 + 1..target.slot.0) - .any(|slot| Slot(slot).is_justifiable_after(self.latest_finalized.slot)) + .any(|slot| Slot(slot).is_justifiable_after(finalized_slot)) { info!("finalizing {source:?}"); let old_finalized_slot = finalized_slot; diff --git a/lean_client/fork_choice/src/handlers.rs b/lean_client/fork_choice/src/handlers.rs index 43a354d..c3c72b9 100644 --- a/lean_client/fork_choice/src/handlers.rs +++ b/lean_client/fork_choice/src/handlers.rs @@ -7,7 +7,9 @@ use metrics::METRICS; use ssz::{H256, SszHash}; use tracing::warn; -use crate::store::{INTERVALS_PER_SLOT, MILLIS_PER_INTERVAL, Store, tick_interval, update_head}; +use crate::store::{ + INTERVALS_PER_SLOT, MILLIS_PER_INTERVAL, STATE_PRUNE_BUFFER, Store, tick_interval, update_head, +}; #[inline] pub fn on_tick(store: &mut Store, time_millis: u64, has_proposal: bool) { @@ -463,11 +465,8 @@ fn process_block_internal( "Block processed - new state info" ); - // Store block and state, store the plain Block (not SignedBlockWithAttestation) store.blocks.insert(block_root, block.clone()); store.states.insert(block_root, new_state.clone()); - // Also store signed block for serving BlocksByRoot requests (checkpoint sync backfill) - store.signed_blocks.insert(block_root, signed_block.clone()); // Retry attestations that arrived before this block was known. // Drain the queue for this root and re-process each attestation. @@ -525,6 +524,13 @@ fn process_block_internal( }; metrics.lean_latest_finalized_slot.set(slot); }); + + let keep_from = store + .latest_finalized + .slot + .0 + .saturating_sub(STATE_PRUNE_BUFFER); + store.states.retain(|_, state| state.slot.0 >= keep_from); } if !justified_updated && !finalized_updated { diff --git a/lean_client/fork_choice/src/store.rs b/lean_client/fork_choice/src/store.rs index cfb2d7e..537ce2d 100644 --- a/lean_client/fork_choice/src/store.rs +++ b/lean_client/fork_choice/src/store.rs @@ -63,14 +63,10 @@ pub struct Store { pub latest_new_aggregated_payloads: HashMap>, /// Attestation data indexed by hash (data_root). - /// Used to look up the exact attestation data that was signed, - /// matching ream's attestation_data_by_root_provider design. + /// Used to look up the exact attestation data that was signed when + /// processing aggregated payloads for safe target computation. pub attestation_data_by_root: HashMap, - /// Signed blocks indexed by block root. - /// Used to serve BlocksByRoot requests to peers for checkpoint sync backfill. - pub signed_blocks: HashMap, - /// Gossip attestations waiting for referenced blocks to arrive. /// Keyed by the missing block root. Drained when that block is processed. pub pending_attestations: HashMap>, @@ -86,6 +82,13 @@ pub struct Store { const JUSTIFICATION_LOOKBACK_SLOTS: u64 = 3; +/// Number of slots before the finalized slot for which states are retained. +/// States older than (finalized_slot - STATE_PRUNE_BUFFER) are pruned after +/// each finalization advance. The buffer covers late-arriving blocks and rapid +/// finalization jumps without risk of evicting a parent state still needed +/// for an in-flight state transition. +pub const STATE_PRUNE_BUFFER: u64 = 128; + impl Store { pub fn produce_attestation_data(&self, slot: Slot) -> Result { let head_checkpoint = Checkpoint { @@ -226,7 +229,6 @@ pub fn get_forkchoice_store( latest_known_aggregated_payloads: HashMap::new(), latest_new_aggregated_payloads: HashMap::new(), attestation_data_by_root: HashMap::new(), - signed_blocks: [(block_root, anchor_block)].into(), pending_attestations: HashMap::new(), pending_aggregated_attestations: HashMap::new(), pending_fetch_roots: HashSet::new(), diff --git a/lean_client/networking/src/network/service.rs b/lean_client/networking/src/network/service.rs index d684231..b123046 100644 --- a/lean_client/networking/src/network/service.rs +++ b/lean_client/networking/src/network/service.rs @@ -42,8 +42,8 @@ use crate::{ network::behaviour::{LeanNetworkBehaviour, LeanNetworkBehaviourEvent}, req_resp::{self, LeanRequest, ReqRespMessage}, types::{ - ChainMessage, ChainMessageSink, ConnectionState, OutboundP2pRequest, P2pRequestSource, - SignedBlockProvider, StatusProvider, + ChainMessage, ChainMessageSink, ConnectionState, MAX_BLOCK_CACHE_SIZE, + OutboundP2pRequest, P2pRequestSource, SignedBlockProvider, StatusProvider, }, }; @@ -51,11 +51,15 @@ const MAX_BLOCKS_BY_ROOT_RETRIES: u8 = 3; const MAX_BLOCK_FETCH_DEPTH: u32 = 65536; /// Maximum roots per BlocksByRoot request, aligned with leanSpec BackfillSync. const MAX_BLOCKS_PER_REQUEST: usize = 10; +/// Stalled request timeout. If a peer accepts the stream but never sends a response, +/// the request is cancelled and retried with a different peer after this duration. +const BLOCKS_BY_ROOT_REQUEST_TIMEOUT: Duration = Duration::from_secs(8); struct PendingBlocksRequest { roots: Vec, retries: u8, depth: u32, + created_at: tokio::time::Instant, } #[derive(Debug, Clone)] @@ -322,6 +326,11 @@ where let mut sync_interval = interval(Duration::from_secs(30)); sync_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + // Sweep for stalled BlocksByRoot requests. Fires at the same cadence as the timeout + // so stale entries are caught within one extra period at most. + let mut timeout_interval = interval(BLOCKS_BY_ROOT_REQUEST_TIMEOUT); + timeout_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + loop { select! { _ = reconnect_interval.tick() => { @@ -330,6 +339,9 @@ where _ = sync_interval.tick() => { self.send_status_to_all_connected_peers(); } + _ = timeout_interval.tick() => { + self.sweep_timed_out_requests(); + } _ = discovery_interval.tick() => { // Trigger active peer discovery if let Some(ref discovery) = self.discovery { @@ -759,6 +771,18 @@ where let root = block.message.block.hash_tree_root(); provider.insert(root, block.clone()); } + // Hard cap: evict lowest-slot blocks if still over limit. + if provider.len() > MAX_BLOCK_CACHE_SIZE { + let to_remove = provider.len() - MAX_BLOCK_CACHE_SIZE; + let mut slots: Vec<(H256, u64)> = provider + .iter() + .map(|(root, b)| (*root, b.message.block.slot.0)) + .collect(); + slots.sort_by_key(|(_, slot)| *slot); + for (root, _) in slots.into_iter().take(to_remove) { + provider.remove(&root); + } + } } // Step 2: Collect unique parent roots that are not yet received. @@ -959,6 +983,30 @@ where } } + fn sweep_timed_out_requests(&mut self) { + let timed_out: Vec = self + .pending_blocks_by_root + .iter() + .filter(|(_, req)| req.created_at.elapsed() > BLOCKS_BY_ROOT_REQUEST_TIMEOUT) + .map(|(id, _)| *id) + .collect(); + + for request_id in timed_out { + if let Some(req) = self.pending_blocks_by_root.remove(&request_id) { + warn!( + num_roots = req.roots.len(), + depth = req.depth, + "BlocksByRoot request timed out, retrying with different peer" + ); + for root in &req.roots { + self.in_flight_roots.remove(root); + } + // Pass a non-existent peer so all connected peers are eligible for retry. + self.retry_blocks_by_root_request(PeerId::random(), req); + } + } + } + fn handle_identify_event(&mut self, event: identify::Event) -> Option { match event { identify::Event::Received { @@ -1010,9 +1058,9 @@ where let current_state = self.peer_table.lock().get(&peer_id).cloned(); if !matches!( current_state, - Some(ConnectionState::Disconnected | ConnectionState::Connecting) | None + Some(ConnectionState::Disconnected) | None ) { - trace!(?peer_id, "Already connected"); + trace!(?peer_id, "Already connected or connecting"); continue; } @@ -1311,6 +1359,7 @@ where roots, retries, depth, + created_at: tokio::time::Instant::now(), }, ); } diff --git a/lean_client/networking/src/types.rs b/lean_client/networking/src/types.rs index 43db367..919b435 100644 --- a/lean_client/networking/src/types.rs +++ b/lean_client/networking/src/types.rs @@ -15,6 +15,10 @@ use tracing::warn; use crate::serde_utils::quoted_u64; +/// Hard cap on the number of blocks held in the signed block provider. +/// Prevents unbounded memory growth during backfill. +pub const MAX_BLOCK_CACHE_SIZE: usize = 1024; + /// Shared block provider for serving BlocksByRoot requests. /// Allows NetworkService to look up signed blocks for checkpoint sync backfill. pub type SignedBlockProvider = Arc>>; diff --git a/lean_client/src/main.rs b/lean_client/src/main.rs index ff72d6d..e7955d2 100644 --- a/lean_client/src/main.rs +++ b/lean_client/src/main.rs @@ -21,7 +21,8 @@ use networking::gossipsub::config::GossipsubConfig; use networking::gossipsub::topic::{compute_subnet_id, get_subscription_topics}; use networking::network::{NetworkService, NetworkServiceConfig}; use networking::types::{ - ChainMessage, OutboundP2pRequest, SignedBlockProvider, StatusProvider, ValidatorChainMessage, + ChainMessage, MAX_BLOCK_CACHE_SIZE, OutboundP2pRequest, SignedBlockProvider, StatusProvider, + ValidatorChainMessage, }; use parking_lot::RwLock; use ssz::{PersistentList, SszHash, SszReadDefault as _}; @@ -48,10 +49,24 @@ fn load_node_key(path: &str) -> Result> { Ok(Keypair::from(keypair)) } +/// Timeout for establishing the TCP/QUIC connection to the checkpoint peer. +/// Fail fast if the peer is unreachable. +const CHECKPOINT_CONNECT_TIMEOUT: Duration = Duration::from_secs(15); + +/// Inactivity timeout for reading the state body. +/// Resets on each successful read, so large states can download as long as +/// data keeps flowing, while stalled connections are detected promptly. +const CHECKPOINT_READ_TIMEOUT: Duration = Duration::from_secs(15); + async fn download_checkpoint_state(url: &str) -> Result { info!("Downloading checkpoint state from: {}", url); - let client = reqwest::Client::new(); + let client = reqwest::Client::builder() + .connect_timeout(CHECKPOINT_CONNECT_TIMEOUT) + .read_timeout(CHECKPOINT_READ_TIMEOUT) + .build() + .context("Failed to build HTTP client")?; + let response = client .get(url) .header("Accept", "application/octet-stream") @@ -85,7 +100,13 @@ async fn download_checkpoint_state(url: &str) -> Result { } fn verify_checkpoint_state(state: &State, genesis_state: &State) -> Result<()> { - // Verify genesis time matches + // Checkpoint cannot be genesis + anyhow::ensure!( + state.slot.0 > 0, + "Checkpoint state slot must be > 0 (got genesis slot)" + ); + + // Verify genesis time matches anyhow::ensure!( state.config.genesis_time == genesis_state.config.genesis_time, "Genesis time mismatch: checkpoint has {}, expected {}. Wrong network?", @@ -93,10 +114,16 @@ fn verify_checkpoint_state(state: &State, genesis_state: &State) -> Result<()> { genesis_state.config.genesis_time ); - // Verify validator count matches + // Verify state has validators let state_validator_count = state.validators.len_u64(); let expected_validator_count = genesis_state.validators.len_u64(); + anyhow::ensure!( + state_validator_count > 0, + "Invalid checkpoint state: no validators in registry" + ); + + // Verify validator count matches anyhow::ensure!( state_validator_count == expected_validator_count, "Validator count mismatch: checkpoint has {}, genesis expects {}. Wrong network?", @@ -104,13 +131,17 @@ fn verify_checkpoint_state(state: &State, genesis_state: &State) -> Result<()> { expected_validator_count ); - // Verify state has validators - anyhow::ensure!( - state_validator_count > 0, - "Invalid checkpoint state: no validators in registry" - ); + // Verify validator indices are sequential (0, 1, 2, ...) + for i in 0..state_validator_count { + let validator = state.validators.get(i).expect("validator exists"); + anyhow::ensure!( + validator.index == i, + "Non-sequential validator index at position {i}: expected {i}, got {}", + validator.index + ); + } - // Verify each validator pubkey matches genesis + // Verify each validator pubkey matches genesis for i in 0..state_validator_count { let state_pubkey = &state.validators.get(i).expect("validator exists").pubkey; let genesis_pubkey = &genesis_state @@ -121,14 +152,50 @@ fn verify_checkpoint_state(state: &State, genesis_state: &State) -> Result<()> { anyhow::ensure!( state_pubkey == genesis_pubkey, - "Validator pubkey mismatch at index {}: checkpoint has different validator set. Wrong network?", - i + "Validator pubkey mismatch at index {i}: checkpoint has different validator set. Wrong network?" ); } + // Finalized checkpoint cannot be in the future relative to the state + anyhow::ensure!( + state.latest_finalized.slot <= state.slot, + "Finalized slot {} exceeds state slot {}", + state.latest_finalized.slot.0, + state.slot.0 + ); + + // Justified must be at or after finalized + anyhow::ensure!( + state.latest_justified.slot >= state.latest_finalized.slot, + "Justified slot {} is before finalized slot {}", + state.latest_justified.slot.0, + state.latest_finalized.slot.0 + ); + + // If justified and finalized are at the same slot, their roots must agree + if state.latest_justified.slot == state.latest_finalized.slot { + anyhow::ensure!( + state.latest_justified.root == state.latest_finalized.root, + "Justified and finalized are at the same slot ({}) but have different roots", + state.latest_justified.slot.0 + ); + } + + // Block header cannot be ahead of the state + anyhow::ensure!( + state.latest_block_header.slot <= state.slot, + "Block header slot {} exceeds state slot {}", + state.latest_block_header.slot.0, + state.slot.0 + ); + info!( - "Checkpoint state verified: genesis_time={}, validators={}", - state.config.genesis_time, state_validator_count + "Checkpoint state verified: slot={}, genesis_time={}, validators={}, finalized={}, justified={}", + state.slot.0, + state.config.genesis_time, + state_validator_count, + state.latest_finalized.slot.0, + state.latest_justified.slot.0, ); Ok(()) @@ -385,7 +452,7 @@ async fn main() -> Result<()> { return Err(e); } - // Compute state root for the checkpoint state (like zeam's genStateBlockHeader) + // Compute state root for the checkpoint state let checkpoint_state_root = checkpoint_state.hash_tree_root(); // Reconstruct block header from state's latest_block_header with correct state_root @@ -449,8 +516,10 @@ async fn main() -> Result<()> { (checkpoint_state, checkpoint_signed_block) } Err(e) => { - warn!("Checkpoint sync failed: {}. Falling back to genesis.", e); - (genesis_state.clone(), genesis_signed_block) + return Err(e.context( + "Checkpoint sync failed. Fix the error and restart; \ + the node will not fall back to genesis when --checkpoint-sync-url is set.", + )); } } } else { @@ -805,7 +874,22 @@ async fn main() -> Result<()> { // BlocksByRoot even if it can't be processed yet (e.g. parent // missing). This prevents STREAM_CLOSED errors when a peer // requests a block we received but haven't incorporated yet. - signed_block_provider.write().insert(block_root, signed_block_with_attestation.clone()); + { + let mut provider = signed_block_provider.write(); + provider.insert(block_root, signed_block_with_attestation.clone()); + // Hard cap: evict lowest-slot blocks if still over limit. + if provider.len() > MAX_BLOCK_CACHE_SIZE { + let to_remove = provider.len() - MAX_BLOCK_CACHE_SIZE; + let mut slots: Vec<(H256, u64)> = provider + .iter() + .map(|(root, b)| (*root, b.message.block.slot.0)) + .collect(); + slots.sort_by_key(|(_, slot)| *slot); + for (root, _) in slots.into_iter().take(to_remove) { + provider.remove(&root); + } + } + } if !parent_exists { {