Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lean_client/containers/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ impl State {
continue;
}

if !target.slot.is_justifiable_after(self.latest_finalized.slot) {
if !target.slot.is_justifiable_after(finalized_slot) {
info!("skipping attestation, target slot is not yet justifiable");
continue;
}
Expand Down Expand Up @@ -538,7 +538,7 @@ impl State {
justifications.remove(&target.root);

if !(source.slot.0 + 1..target.slot.0)
.any(|slot| Slot(slot).is_justifiable_after(self.latest_finalized.slot))
.any(|slot| Slot(slot).is_justifiable_after(finalized_slot))
{
info!("finalizing {source:?}");
let old_finalized_slot = finalized_slot;
Expand Down
14 changes: 10 additions & 4 deletions lean_client/fork_choice/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use metrics::METRICS;
use ssz::{H256, SszHash};
use tracing::warn;

use crate::store::{INTERVALS_PER_SLOT, MILLIS_PER_INTERVAL, Store, tick_interval, update_head};
use crate::store::{
INTERVALS_PER_SLOT, MILLIS_PER_INTERVAL, STATE_PRUNE_BUFFER, Store, tick_interval, update_head,
};

#[inline]
pub fn on_tick(store: &mut Store, time_millis: u64, has_proposal: bool) {
Expand Down Expand Up @@ -463,11 +465,8 @@ fn process_block_internal(
"Block processed - new state info"
);

// Store block and state, store the plain Block (not SignedBlockWithAttestation)
store.blocks.insert(block_root, block.clone());
store.states.insert(block_root, new_state.clone());
// Also store signed block for serving BlocksByRoot requests (checkpoint sync backfill)
store.signed_blocks.insert(block_root, signed_block.clone());

// Retry attestations that arrived before this block was known.
// Drain the queue for this root and re-process each attestation.
Expand Down Expand Up @@ -525,6 +524,13 @@ fn process_block_internal(
};
metrics.lean_latest_finalized_slot.set(slot);
});

let keep_from = store
.latest_finalized
.slot
.0
.saturating_sub(STATE_PRUNE_BUFFER);
store.states.retain(|_, state| state.slot.0 >= keep_from);
}

if !justified_updated && !finalized_updated {
Expand Down
16 changes: 9 additions & 7 deletions lean_client/fork_choice/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,10 @@ pub struct Store {
pub latest_new_aggregated_payloads: HashMap<SignatureKey, Vec<AggregatedSignatureProof>>,

/// Attestation data indexed by hash (data_root).
/// Used to look up the exact attestation data that was signed,
/// matching ream's attestation_data_by_root_provider design.
/// Used to look up the exact attestation data that was signed when
/// processing aggregated payloads for safe target computation.
pub attestation_data_by_root: HashMap<H256, AttestationData>,

/// Signed blocks indexed by block root.
/// Used to serve BlocksByRoot requests to peers for checkpoint sync backfill.
pub signed_blocks: HashMap<H256, SignedBlockWithAttestation>,

/// Gossip attestations waiting for referenced blocks to arrive.
/// Keyed by the missing block root. Drained when that block is processed.
pub pending_attestations: HashMap<H256, Vec<SignedAttestation>>,
Expand All @@ -86,6 +82,13 @@ pub struct Store {

const JUSTIFICATION_LOOKBACK_SLOTS: u64 = 3;

/// Number of slots before the finalized slot for which states are retained.
/// States older than (finalized_slot - STATE_PRUNE_BUFFER) are pruned after
/// each finalization advance. The buffer covers late-arriving blocks and rapid
/// finalization jumps without risk of evicting a parent state still needed
/// for an in-flight state transition.
pub const STATE_PRUNE_BUFFER: u64 = 128;

impl Store {
pub fn produce_attestation_data(&self, slot: Slot) -> Result<AttestationData> {
let head_checkpoint = Checkpoint {
Expand Down Expand Up @@ -226,7 +229,6 @@ pub fn get_forkchoice_store(
latest_known_aggregated_payloads: HashMap::new(),
latest_new_aggregated_payloads: HashMap::new(),
attestation_data_by_root: HashMap::new(),
signed_blocks: [(block_root, anchor_block)].into(),
pending_attestations: HashMap::new(),
pending_aggregated_attestations: HashMap::new(),
pending_fetch_roots: HashSet::new(),
Expand Down
57 changes: 53 additions & 4 deletions lean_client/networking/src/network/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,24 @@ use crate::{
network::behaviour::{LeanNetworkBehaviour, LeanNetworkBehaviourEvent},
req_resp::{self, LeanRequest, ReqRespMessage},
types::{
ChainMessage, ChainMessageSink, ConnectionState, OutboundP2pRequest, P2pRequestSource,
SignedBlockProvider, StatusProvider,
ChainMessage, ChainMessageSink, ConnectionState, MAX_BLOCK_CACHE_SIZE,
OutboundP2pRequest, P2pRequestSource, SignedBlockProvider, StatusProvider,
},
};

const MAX_BLOCKS_BY_ROOT_RETRIES: u8 = 3;
const MAX_BLOCK_FETCH_DEPTH: u32 = 65536;
/// Maximum roots per BlocksByRoot request, aligned with leanSpec BackfillSync.
const MAX_BLOCKS_PER_REQUEST: usize = 10;
/// Stalled request timeout. If a peer accepts the stream but never sends a response,
/// the request is cancelled and retried with a different peer after this duration.
const BLOCKS_BY_ROOT_REQUEST_TIMEOUT: Duration = Duration::from_secs(8);

struct PendingBlocksRequest {
roots: Vec<H256>,
retries: u8,
depth: u32,
created_at: tokio::time::Instant,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -322,6 +326,11 @@ where
let mut sync_interval = interval(Duration::from_secs(30));
sync_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);

// Sweep for stalled BlocksByRoot requests. Fires at the same cadence as the timeout
// so stale entries are caught within one extra period at most.
let mut timeout_interval = interval(BLOCKS_BY_ROOT_REQUEST_TIMEOUT);
timeout_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);

loop {
select! {
_ = reconnect_interval.tick() => {
Expand All @@ -330,6 +339,9 @@ where
_ = sync_interval.tick() => {
self.send_status_to_all_connected_peers();
}
_ = timeout_interval.tick() => {
self.sweep_timed_out_requests();
}
_ = discovery_interval.tick() => {
// Trigger active peer discovery
if let Some(ref discovery) = self.discovery {
Expand Down Expand Up @@ -759,6 +771,18 @@ where
let root = block.message.block.hash_tree_root();
provider.insert(root, block.clone());
}
// Hard cap: evict lowest-slot blocks if still over limit.
if provider.len() > MAX_BLOCK_CACHE_SIZE {
let to_remove = provider.len() - MAX_BLOCK_CACHE_SIZE;
let mut slots: Vec<(H256, u64)> = provider
.iter()
.map(|(root, b)| (*root, b.message.block.slot.0))
.collect();
slots.sort_by_key(|(_, slot)| *slot);
for (root, _) in slots.into_iter().take(to_remove) {
provider.remove(&root);
}
}
}

// Step 2: Collect unique parent roots that are not yet received.
Expand Down Expand Up @@ -959,6 +983,30 @@ where
}
}

fn sweep_timed_out_requests(&mut self) {
let timed_out: Vec<OutboundRequestId> = self
.pending_blocks_by_root
.iter()
.filter(|(_, req)| req.created_at.elapsed() > BLOCKS_BY_ROOT_REQUEST_TIMEOUT)
.map(|(id, _)| *id)
.collect();

for request_id in timed_out {
if let Some(req) = self.pending_blocks_by_root.remove(&request_id) {
warn!(
num_roots = req.roots.len(),
depth = req.depth,
"BlocksByRoot request timed out, retrying with different peer"
);
for root in &req.roots {
self.in_flight_roots.remove(root);
}
// Pass a non-existent peer so all connected peers are eligible for retry.
self.retry_blocks_by_root_request(PeerId::random(), req);
}
}
}

fn handle_identify_event(&mut self, event: identify::Event) -> Option<NetworkEvent> {
match event {
identify::Event::Received {
Expand Down Expand Up @@ -1010,9 +1058,9 @@ where
let current_state = self.peer_table.lock().get(&peer_id).cloned();
if !matches!(
current_state,
Some(ConnectionState::Disconnected | ConnectionState::Connecting) | None
Some(ConnectionState::Disconnected) | None
) {
trace!(?peer_id, "Already connected");
trace!(?peer_id, "Already connected or connecting");
continue;
}

Expand Down Expand Up @@ -1311,6 +1359,7 @@ where
roots,
retries,
depth,
created_at: tokio::time::Instant::now(),
},
);
}
Expand Down
4 changes: 4 additions & 0 deletions lean_client/networking/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ use tracing::warn;

use crate::serde_utils::quoted_u64;

/// Hard cap on the number of blocks held in the signed block provider.
/// Prevents unbounded memory growth during backfill.
pub const MAX_BLOCK_CACHE_SIZE: usize = 1024;

/// Shared block provider for serving BlocksByRoot requests.
/// Allows NetworkService to look up signed blocks for checkpoint sync backfill.
pub type SignedBlockProvider = Arc<RwLock<HashMap<H256, SignedBlockWithAttestation>>>;
Expand Down
Loading
Loading