From 2999d19f06ceba17e068b4e51b9c53353a14817b Mon Sep 17 00:00:00 2001 From: bomanaps Date: Fri, 20 Mar 2026 19:45:15 +0100 Subject: [PATCH] Replace blocks_queue with bounded BlockCache to prevent OOM --- lean_client/fork_choice/src/block_cache.rs | 189 ++++++++++++++++++ lean_client/fork_choice/src/handlers.rs | 37 ++-- lean_client/fork_choice/src/lib.rs | 1 + lean_client/fork_choice/src/store.rs | 3 - .../tests/fork_choice_test_vectors.rs | 4 +- lean_client/src/main.rs | 45 ++--- 6 files changed, 229 insertions(+), 50 deletions(-) create mode 100644 lean_client/fork_choice/src/block_cache.rs diff --git a/lean_client/fork_choice/src/block_cache.rs b/lean_client/fork_choice/src/block_cache.rs new file mode 100644 index 0000000..9990754 --- /dev/null +++ b/lean_client/fork_choice/src/block_cache.rs @@ -0,0 +1,189 @@ +use std::collections::{HashMap, HashSet, VecDeque}; +use std::time::Instant; + +use containers::{SignedBlockWithAttestation, Slot}; +use ssz::H256; + +pub const MAX_CACHED_BLOCKS: usize = 1024; + +#[derive(Debug, Clone)] +pub struct PendingBlock { + pub block: SignedBlockWithAttestation, + pub root: H256, + pub parent_root: H256, + pub slot: Slot, + pub received_from: Option, + pub received_at: Instant, + pub backfill_depth: u32, +} + +pub struct BlockCache { + blocks: HashMap, + insertion_order: VecDeque, + by_parent: HashMap>, + orphans: HashSet, + capacity: usize, +} + +impl BlockCache { + pub fn new() -> Self { + Self::with_capacity(MAX_CACHED_BLOCKS) + } + + pub fn with_capacity(capacity: usize) -> Self { + Self { + blocks: HashMap::new(), + insertion_order: VecDeque::new(), + by_parent: HashMap::new(), + orphans: HashSet::new(), + capacity, + } + } + + pub fn len(&self) -> usize { + self.blocks.len() + } + + pub fn orphan_count(&self) -> usize { + self.orphans.len() + } + + pub fn contains(&self, root: &H256) -> bool { + self.blocks.contains_key(root) + } + + pub fn add( + &mut self, + block: SignedBlockWithAttestation, + root: H256, + parent_root: H256, + slot: Slot, + received_from: Option, + backfill_depth: u32, + ) { + if self.blocks.contains_key(&root) { + return; + } + + if self.blocks.len() >= self.capacity { + self.evict_oldest(); + } + + let pending = PendingBlock { + block, + root, + parent_root, + slot, + received_from, + received_at: Instant::now(), + backfill_depth, + }; + + self.blocks.insert(root, pending); + self.insertion_order.push_back(root); + self.by_parent + .entry(parent_root) + .or_insert_with(HashSet::new) + .insert(root); + } + + pub fn get(&self, root: &H256) -> Option<&PendingBlock> { + self.blocks.get(root) + } + + pub fn remove(&mut self, root: &H256) -> Option { + let pending = self.blocks.remove(root)?; + + self.insertion_order.retain(|r| r != root); + self.orphans.remove(root); + + if let Some(children) = self.by_parent.get_mut(&pending.parent_root) { + children.remove(root); + if children.is_empty() { + self.by_parent.remove(&pending.parent_root); + } + } + + Some(pending) + } + + pub fn get_children(&self, parent_root: &H256) -> Vec<&PendingBlock> { + let Some(child_roots) = self.by_parent.get(parent_root) else { + return Vec::new(); + }; + + let mut children: Vec<&PendingBlock> = child_roots + .iter() + .filter_map(|r| self.blocks.get(r)) + .collect(); + + children.sort_by_key(|p| p.slot.0); + children + } + + pub fn mark_orphan(&mut self, root: H256) { + if self.blocks.contains_key(&root) { + self.orphans.insert(root); + } + } + + pub fn unmark_orphan(&mut self, root: &H256) { + self.orphans.remove(root); + } + + pub fn get_orphan_parents(&self) -> Vec { + let mut seen = HashSet::new(); + for root in &self.orphans { + if let Some(pending) = self.blocks.get(root) { + if !self.blocks.contains_key(&pending.parent_root) { + seen.insert(pending.parent_root); + } + } + } + seen.into_iter().collect() + } + + pub fn get_orphan_parents_with_hints(&self) -> Vec<(H256, Option)> { + let mut results: HashMap> = HashMap::new(); + for root in &self.orphans { + if let Some(pending) = self.blocks.get(root) { + if !self.blocks.contains_key(&pending.parent_root) { + results + .entry(pending.parent_root) + .or_insert_with(|| pending.received_from.clone()); + } + } + } + results.into_iter().collect() + } + + pub fn clear(&mut self) { + self.blocks.clear(); + self.insertion_order.clear(); + self.by_parent.clear(); + self.orphans.clear(); + } + + fn evict_oldest(&mut self) { + let Some(oldest_root) = self.insertion_order.pop_front() else { + return; + }; + + self.orphans.remove(&oldest_root); + + if let Some(block) = self.blocks.remove(&oldest_root) { + if let Some(children) = self.by_parent.get_mut(&block.parent_root) { + children.remove(&oldest_root); + if children.is_empty() { + self.by_parent.remove(&block.parent_root); + } + } + } + } +} + +impl Default for BlockCache { + fn default() -> Self { + Self::new() + } +} diff --git a/lean_client/fork_choice/src/handlers.rs b/lean_client/fork_choice/src/handlers.rs index c3c72b9..25d12e9 100644 --- a/lean_client/fork_choice/src/handlers.rs +++ b/lean_client/fork_choice/src/handlers.rs @@ -7,6 +7,7 @@ use metrics::METRICS; use ssz::{H256, SszHash}; use tracing::warn; +use crate::block_cache::BlockCache; use crate::store::{ INTERVALS_PER_SLOT, MILLIS_PER_INTERVAL, STATE_PRUNE_BUFFER, Store, tick_interval, update_head, }; @@ -395,7 +396,11 @@ fn on_attestation_internal( /// 3. Processing attestations included in the block body (on-chain) /// 4. Updating the forkchoice head /// 5. Processing the proposer's attestation (as if gossiped) -pub fn on_block(store: &mut Store, signed_block: SignedBlockWithAttestation) -> Result<()> { +pub fn on_block( + store: &mut Store, + cache: &mut BlockCache, + signed_block: SignedBlockWithAttestation, +) -> Result<()> { let block_root = signed_block.message.block.hash_tree_root(); if store.blocks.contains_key(&block_root) { @@ -405,20 +410,14 @@ pub fn on_block(store: &mut Store, signed_block: SignedBlockWithAttestation) -> let parent_root = signed_block.message.block.parent_root; if !store.states.contains_key(&parent_root) && !parent_root.is_zero() { - store - .blocks_queue - .entry(parent_root) - .or_insert_with(Vec::new) - .push(signed_block); bail!( - "Err: (Fork-choice::Handlers::OnBlock) Block queued: parent {:?} not yet available (pending: {} blocks)", - &parent_root.as_bytes()[..4], - store.blocks_queue.values().map(|v| v.len()).sum::() + "Err: (Fork-choice::Handlers::OnBlock) parent state not available for {:?}", + &parent_root.as_bytes()[..4] ); } process_block_internal(store, signed_block, block_root)?; - process_pending_blocks(store, vec![block_root]); + process_pending_blocks(store, cache, vec![block_root]); Ok(()) } @@ -638,14 +637,18 @@ fn process_block_internal( Ok(()) } -fn process_pending_blocks(store: &mut Store, mut roots: Vec) { +pub fn process_pending_blocks(store: &mut Store, cache: &mut BlockCache, mut roots: Vec) { while let Some(parent_root) = roots.pop() { - if let Some(purgatory) = store.blocks_queue.remove(&parent_root) { - for block in purgatory { - let block_origins = block.message.block.hash_tree_root(); - if let Ok(()) = process_block_internal(store, block, block_origins) { - roots.push(block_origins); - } + let children: Vec<(H256, SignedBlockWithAttestation)> = cache + .get_children(&parent_root) + .into_iter() + .map(|p| (p.root, p.block.clone())) + .collect(); + + for (child_root, child_block) in children { + cache.remove(&child_root); + if process_block_internal(store, child_block, child_root).is_ok() { + roots.push(child_root); } } } diff --git a/lean_client/fork_choice/src/lib.rs b/lean_client/fork_choice/src/lib.rs index 46bab5a..3b88082 100644 --- a/lean_client/fork_choice/src/lib.rs +++ b/lean_client/fork_choice/src/lib.rs @@ -1,3 +1,4 @@ +pub mod block_cache; pub mod handlers; pub mod store; diff --git a/lean_client/fork_choice/src/store.rs b/lean_client/fork_choice/src/store.rs index 537ce2d..53b7257 100644 --- a/lean_client/fork_choice/src/store.rs +++ b/lean_client/fork_choice/src/store.rs @@ -48,8 +48,6 @@ pub struct Store { pub latest_new_attestations: HashMap, - pub blocks_queue: HashMap>, - pub gossip_signatures: HashMap, /// Devnet-3: Aggregated signature proofs from block bodies (on-chain). @@ -224,7 +222,6 @@ pub fn get_forkchoice_store( states: [(block_root, anchor_state)].into(), latest_known_attestations: HashMap::new(), latest_new_attestations: HashMap::new(), - blocks_queue: HashMap::new(), gossip_signatures: HashMap::new(), latest_known_aggregated_payloads: HashMap::new(), latest_new_aggregated_payloads: HashMap::new(), diff --git a/lean_client/fork_choice/tests/fork_choice_test_vectors.rs b/lean_client/fork_choice/tests/fork_choice_test_vectors.rs index bc96b97..310ad5a 100644 --- a/lean_client/fork_choice/tests/fork_choice_test_vectors.rs +++ b/lean_client/fork_choice/tests/fork_choice_test_vectors.rs @@ -5,6 +5,7 @@ use containers::{ State, Validator, Validators, }; use fork_choice::{ + block_cache::BlockCache, handlers::{on_block, on_tick}, store::{Store, get_forkchoice_store}, }; @@ -548,6 +549,7 @@ fn forkchoice(spec_file: &str) { }; let mut store = get_forkchoice_store(anchor_state, anchor_block, config); + let mut cache = BlockCache::new(); let mut block_labels: HashMap = HashMap::new(); for (step_idx, step) in case.steps.into_iter().enumerate() { @@ -574,7 +576,7 @@ fn forkchoice(spec_file: &str) { * 1000; on_tick(&mut store, block_time_millis, false); - on_block(&mut store, signed_block).unwrap(); + on_block(&mut store, &mut cache, signed_block).unwrap(); Ok(block_root) })); diff --git a/lean_client/src/main.rs b/lean_client/src/main.rs index e7955d2..5fe1d26 100644 --- a/lean_client/src/main.rs +++ b/lean_client/src/main.rs @@ -8,6 +8,7 @@ use containers::{ use ethereum_types::H256; use features::Feature; use fork_choice::{ + block_cache::BlockCache, handlers::{on_aggregated_attestation, on_attestation, on_block, on_tick}, store::{ INTERVALS_PER_SLOT, MILLIS_PER_INTERVAL, Store, get_forkchoice_store, @@ -777,6 +778,7 @@ async fn main() -> Result<()> { ); let mut last_logged_slot = 0u64; let mut last_status_slot: Option = None; + let mut block_cache = BlockCache::new(); let peer_count = peer_count_for_status; @@ -833,7 +835,7 @@ async fn main() -> Result<()> { if current_slot != last_logged_slot && current_slot % 10 == 0 { debug!("(Okay)Store time updated : slot {}, pending blocks: {}", current_slot, - store.read().blocks_queue.values().map(|v| v.len()).sum::() + block_cache.len() ); last_logged_slot = current_slot; } @@ -892,23 +894,23 @@ async fn main() -> Result<()> { } if !parent_exists { - { - let mut s = store.write(); - s.blocks_queue - .entry(parent_root) - .or_insert_with(Vec::new) - .push(signed_block_with_attestation.clone()); - // Add to batch queue so this root is sent together with any - // other concurrently pending roots (attestation-triggered or - // from other unknown-parent blocks) in one BlocksByRoot request. - s.pending_fetch_roots.insert(parent_root); - } + block_cache.add( + signed_block_with_attestation.clone(), + block_root, + parent_root, + block_slot, + None, + 0, + ); + block_cache.mark_orphan(block_root); + + store.write().pending_fetch_roots.insert(parent_root); warn!( child_slot = block_slot.0, child_block_root = %format!("0x{:x}", block_root), missing_parent_root = %format!("0x{:x}", parent_root), - "Block queued (proactive) - parent not found, requesting via BlocksByRoot" + "Block cached (proactive) - parent not found, requesting via BlocksByRoot" ); let missing: Vec = store.write().pending_fetch_roots.drain().collect(); @@ -922,7 +924,7 @@ async fn main() -> Result<()> { continue; } - let result = {on_block(&mut *store.write(), signed_block_with_attestation.clone())}; + let result = {on_block(&mut *store.write(), &mut block_cache, signed_block_with_attestation.clone())}; match result { Ok(()) => { info!("Block processed successfully"); @@ -947,21 +949,6 @@ async fn main() -> Result<()> { } } } - Err(e) if format!("{e:?}").starts_with("Err: (Fork-choice::Handlers::OnBlock) Block queued") => { - warn!( - child_slot = block_slot.0, - child_block_root = %format!("0x{:x}", block_root), - missing_parent_root = %format!("0x{:x}", parent_root), - "Block queued (fallback) - parent not found, requesting via BlocksByRoot" - ); - - // Add to batch queue; the drain below (pending_fetch_roots) - // will send this together with any attestation-triggered roots - // discovered during on_block processing. - if !parent_root.is_zero() { - store.write().pending_fetch_roots.insert(parent_root); - } - } Err(e) => warn!("Problem processing block: {}", e), }