Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 189 additions & 0 deletions lean_client/fork_choice/src/block_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
use std::collections::{HashMap, HashSet, VecDeque};
use std::time::Instant;

use containers::{SignedBlockWithAttestation, Slot};
use ssz::H256;

pub const MAX_CACHED_BLOCKS: usize = 1024;

#[derive(Debug, Clone)]
pub struct PendingBlock {
pub block: SignedBlockWithAttestation,
pub root: H256,
pub parent_root: H256,
pub slot: Slot,
pub received_from: Option<String>,
pub received_at: Instant,
pub backfill_depth: u32,
}

pub struct BlockCache {
blocks: HashMap<H256, PendingBlock>,
insertion_order: VecDeque<H256>,
by_parent: HashMap<H256, HashSet<H256>>,
orphans: HashSet<H256>,
capacity: usize,
}

impl BlockCache {
pub fn new() -> Self {
Self::with_capacity(MAX_CACHED_BLOCKS)
}

pub fn with_capacity(capacity: usize) -> Self {
Self {
blocks: HashMap::new(),
insertion_order: VecDeque::new(),
by_parent: HashMap::new(),
orphans: HashSet::new(),
capacity,
}
}

pub fn len(&self) -> usize {
self.blocks.len()
}

pub fn orphan_count(&self) -> usize {
self.orphans.len()
}

pub fn contains(&self, root: &H256) -> bool {
self.blocks.contains_key(root)
}

pub fn add(
&mut self,
block: SignedBlockWithAttestation,
root: H256,
parent_root: H256,
slot: Slot,
received_from: Option<String>,
backfill_depth: u32,
) {
if self.blocks.contains_key(&root) {
return;
}

if self.blocks.len() >= self.capacity {
self.evict_oldest();
}

let pending = PendingBlock {
block,
root,
parent_root,
slot,
received_from,
received_at: Instant::now(),
backfill_depth,
};

self.blocks.insert(root, pending);
self.insertion_order.push_back(root);
self.by_parent
.entry(parent_root)
.or_insert_with(HashSet::new)
.insert(root);
}

pub fn get(&self, root: &H256) -> Option<&PendingBlock> {
self.blocks.get(root)
}

pub fn remove(&mut self, root: &H256) -> Option<PendingBlock> {
let pending = self.blocks.remove(root)?;

self.insertion_order.retain(|r| r != root);
self.orphans.remove(root);

if let Some(children) = self.by_parent.get_mut(&pending.parent_root) {
children.remove(root);
if children.is_empty() {
self.by_parent.remove(&pending.parent_root);
}
}

Some(pending)
}

pub fn get_children(&self, parent_root: &H256) -> Vec<&PendingBlock> {
let Some(child_roots) = self.by_parent.get(parent_root) else {
return Vec::new();
};

let mut children: Vec<&PendingBlock> = child_roots
.iter()
.filter_map(|r| self.blocks.get(r))
.collect();

children.sort_by_key(|p| p.slot.0);
children
}

pub fn mark_orphan(&mut self, root: H256) {
if self.blocks.contains_key(&root) {
self.orphans.insert(root);
}
}

pub fn unmark_orphan(&mut self, root: &H256) {
self.orphans.remove(root);
}

pub fn get_orphan_parents(&self) -> Vec<H256> {
let mut seen = HashSet::new();
for root in &self.orphans {
if let Some(pending) = self.blocks.get(root) {
if !self.blocks.contains_key(&pending.parent_root) {
seen.insert(pending.parent_root);
}
}
}
seen.into_iter().collect()
}

pub fn get_orphan_parents_with_hints(&self) -> Vec<(H256, Option<String>)> {
let mut results: HashMap<H256, Option<String>> = HashMap::new();
for root in &self.orphans {
if let Some(pending) = self.blocks.get(root) {
if !self.blocks.contains_key(&pending.parent_root) {
results
.entry(pending.parent_root)
.or_insert_with(|| pending.received_from.clone());
}
}
}
results.into_iter().collect()
}

pub fn clear(&mut self) {
self.blocks.clear();
self.insertion_order.clear();
self.by_parent.clear();
self.orphans.clear();
}

fn evict_oldest(&mut self) {
let Some(oldest_root) = self.insertion_order.pop_front() else {
return;
};

self.orphans.remove(&oldest_root);

if let Some(block) = self.blocks.remove(&oldest_root) {
if let Some(children) = self.by_parent.get_mut(&block.parent_root) {
children.remove(&oldest_root);
if children.is_empty() {
self.by_parent.remove(&block.parent_root);
}
}
}
}
}

impl Default for BlockCache {
fn default() -> Self {
Self::new()
}
}
37 changes: 20 additions & 17 deletions lean_client/fork_choice/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use metrics::METRICS;
use ssz::{H256, SszHash};
use tracing::warn;

use crate::block_cache::BlockCache;
use crate::store::{
INTERVALS_PER_SLOT, MILLIS_PER_INTERVAL, STATE_PRUNE_BUFFER, Store, tick_interval, update_head,
};
Expand Down Expand Up @@ -395,7 +396,11 @@ fn on_attestation_internal(
/// 3. Processing attestations included in the block body (on-chain)
/// 4. Updating the forkchoice head
/// 5. Processing the proposer's attestation (as if gossiped)
pub fn on_block(store: &mut Store, signed_block: SignedBlockWithAttestation) -> Result<()> {
pub fn on_block(
store: &mut Store,
cache: &mut BlockCache,
signed_block: SignedBlockWithAttestation,
) -> Result<()> {
let block_root = signed_block.message.block.hash_tree_root();

if store.blocks.contains_key(&block_root) {
Expand All @@ -405,20 +410,14 @@ pub fn on_block(store: &mut Store, signed_block: SignedBlockWithAttestation) ->
let parent_root = signed_block.message.block.parent_root;

if !store.states.contains_key(&parent_root) && !parent_root.is_zero() {
store
.blocks_queue
.entry(parent_root)
.or_insert_with(Vec::new)
.push(signed_block);
bail!(
"Err: (Fork-choice::Handlers::OnBlock) Block queued: parent {:?} not yet available (pending: {} blocks)",
&parent_root.as_bytes()[..4],
store.blocks_queue.values().map(|v| v.len()).sum::<usize>()
"Err: (Fork-choice::Handlers::OnBlock) parent state not available for {:?}",
&parent_root.as_bytes()[..4]
);
}

process_block_internal(store, signed_block, block_root)?;
process_pending_blocks(store, vec![block_root]);
process_pending_blocks(store, cache, vec![block_root]);

Ok(())
}
Expand Down Expand Up @@ -638,14 +637,18 @@ fn process_block_internal(
Ok(())
}

fn process_pending_blocks(store: &mut Store, mut roots: Vec<H256>) {
pub fn process_pending_blocks(store: &mut Store, cache: &mut BlockCache, mut roots: Vec<H256>) {
while let Some(parent_root) = roots.pop() {
if let Some(purgatory) = store.blocks_queue.remove(&parent_root) {
for block in purgatory {
let block_origins = block.message.block.hash_tree_root();
if let Ok(()) = process_block_internal(store, block, block_origins) {
roots.push(block_origins);
}
let children: Vec<(H256, SignedBlockWithAttestation)> = cache
.get_children(&parent_root)
.into_iter()
.map(|p| (p.root, p.block.clone()))
.collect();

for (child_root, child_block) in children {
cache.remove(&child_root);
if process_block_internal(store, child_block, child_root).is_ok() {
roots.push(child_root);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions lean_client/fork_choice/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod block_cache;
pub mod handlers;
pub mod store;

Expand Down
3 changes: 0 additions & 3 deletions lean_client/fork_choice/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@ pub struct Store {

pub latest_new_attestations: HashMap<u64, AttestationData>,

pub blocks_queue: HashMap<H256, Vec<SignedBlockWithAttestation>>,

pub gossip_signatures: HashMap<SignatureKey, Signature>,

/// Devnet-3: Aggregated signature proofs from block bodies (on-chain).
Expand Down Expand Up @@ -224,7 +222,6 @@ pub fn get_forkchoice_store(
states: [(block_root, anchor_state)].into(),
latest_known_attestations: HashMap::new(),
latest_new_attestations: HashMap::new(),
blocks_queue: HashMap::new(),
gossip_signatures: HashMap::new(),
latest_known_aggregated_payloads: HashMap::new(),
latest_new_aggregated_payloads: HashMap::new(),
Expand Down
4 changes: 3 additions & 1 deletion lean_client/fork_choice/tests/fork_choice_test_vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use containers::{
State, Validator, Validators,
};
use fork_choice::{
block_cache::BlockCache,
handlers::{on_block, on_tick},
store::{Store, get_forkchoice_store},
};
Expand Down Expand Up @@ -548,6 +549,7 @@ fn forkchoice(spec_file: &str) {
};

let mut store = get_forkchoice_store(anchor_state, anchor_block, config);
let mut cache = BlockCache::new();
let mut block_labels: HashMap<String, H256> = HashMap::new();

for (step_idx, step) in case.steps.into_iter().enumerate() {
Expand All @@ -574,7 +576,7 @@ fn forkchoice(spec_file: &str) {
* 1000;
on_tick(&mut store, block_time_millis, false);

on_block(&mut store, signed_block).unwrap();
on_block(&mut store, &mut cache, signed_block).unwrap();
Ok(block_root)
}));

Expand Down
45 changes: 16 additions & 29 deletions lean_client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use containers::{
use ethereum_types::H256;
use features::Feature;
use fork_choice::{
block_cache::BlockCache,
handlers::{on_aggregated_attestation, on_attestation, on_block, on_tick},
store::{
INTERVALS_PER_SLOT, MILLIS_PER_INTERVAL, Store, get_forkchoice_store,
Expand Down Expand Up @@ -777,6 +778,7 @@ async fn main() -> Result<()> {
);
let mut last_logged_slot = 0u64;
let mut last_status_slot: Option<u64> = None;
let mut block_cache = BlockCache::new();

let peer_count = peer_count_for_status;

Expand Down Expand Up @@ -833,7 +835,7 @@ async fn main() -> Result<()> {
if current_slot != last_logged_slot && current_slot % 10 == 0 {
debug!("(Okay)Store time updated : slot {}, pending blocks: {}",
current_slot,
store.read().blocks_queue.values().map(|v| v.len()).sum::<usize>()
block_cache.len()
);
last_logged_slot = current_slot;
}
Expand Down Expand Up @@ -892,23 +894,23 @@ async fn main() -> Result<()> {
}

if !parent_exists {
{
let mut s = store.write();
s.blocks_queue
.entry(parent_root)
.or_insert_with(Vec::new)
.push(signed_block_with_attestation.clone());
// Add to batch queue so this root is sent together with any
// other concurrently pending roots (attestation-triggered or
// from other unknown-parent blocks) in one BlocksByRoot request.
s.pending_fetch_roots.insert(parent_root);
}
block_cache.add(
signed_block_with_attestation.clone(),
block_root,
parent_root,
block_slot,
None,
0,
);
block_cache.mark_orphan(block_root);

store.write().pending_fetch_roots.insert(parent_root);

warn!(
child_slot = block_slot.0,
child_block_root = %format!("0x{:x}", block_root),
missing_parent_root = %format!("0x{:x}", parent_root),
"Block queued (proactive) - parent not found, requesting via BlocksByRoot"
"Block cached (proactive) - parent not found, requesting via BlocksByRoot"
);

let missing: Vec<H256> = store.write().pending_fetch_roots.drain().collect();
Expand All @@ -922,7 +924,7 @@ async fn main() -> Result<()> {
continue;
}

let result = {on_block(&mut *store.write(), signed_block_with_attestation.clone())};
let result = {on_block(&mut *store.write(), &mut block_cache, signed_block_with_attestation.clone())};
match result {
Ok(()) => {
info!("Block processed successfully");
Expand All @@ -947,21 +949,6 @@ async fn main() -> Result<()> {
}
}
}
Err(e) if format!("{e:?}").starts_with("Err: (Fork-choice::Handlers::OnBlock) Block queued") => {
warn!(
child_slot = block_slot.0,
child_block_root = %format!("0x{:x}", block_root),
missing_parent_root = %format!("0x{:x}", parent_root),
"Block queued (fallback) - parent not found, requesting via BlocksByRoot"
);

// Add to batch queue; the drain below (pending_fetch_roots)
// will send this together with any attestation-triggered roots
// discovered during on_block processing.
if !parent_root.is_zero() {
store.write().pending_fetch_roots.insert(parent_root);
}
}
Err(e) => warn!("Problem processing block: {}", e),
}

Expand Down
Loading