Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions lean_client/containers/src/slot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl Slot {
// Rule 2: Slots at perfect square distances are justifiable.
// Examples: delta = 1, 4, 9, 16, 25, 36, 49, 64, ...
// Check: integer square root squared equals delta
let sqrt = (delta as f64).sqrt() as u64;
let sqrt = delta.isqrt();
if sqrt * sqrt == delta {
return true;
}
Expand All @@ -66,10 +66,11 @@ impl Slot {
// Mathematical insight: For pronic delta = n(n+1), we have:
// 4*delta + 1 = 4n(n+1) + 1 = (2n+1)^2
// Check: 4*delta+1 is an odd perfect square
let test = 4 * delta + 1;
let test_sqrt = (test as f64).sqrt() as u64;
if test_sqrt * test_sqrt == test && test_sqrt % 2 == 1 {
return true;
if let Some(test) = delta.checked_mul(4).and_then(|v| v.checked_add(1)) {
let test_sqrt = test.isqrt();
if test_sqrt * test_sqrt == test && test_sqrt % 2 == 1 {
return true;
}
}

false
Expand Down
85 changes: 81 additions & 4 deletions lean_client/fork_choice/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,18 @@ fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<()
Ok(())
}

/// Returns the first block root (source, target, or head) referenced by `attestation_data`
/// that is not yet present in the store, or `None` if all are known.
fn find_unknown_attestation_block(store: &Store, attestation_data: &AttestationData) -> Option<H256> {
[
attestation_data.source.root,
attestation_data.target.root,
attestation_data.head.root,
]
.into_iter()
.find(|root| !store.blocks.contains_key(root))
}

/// Process a signed attestation received via gossip network
///
/// 1. Validates the attestation data
Expand All @@ -112,6 +124,18 @@ pub fn on_gossip_attestation(
let validator_id = signed_attestation.validator_id;
let attestation_data = signed_attestation.message.clone();

// Queue attestation if any referenced block is not yet in the store.
// When the missing block arrives, pending attestations are retried.
if let Some(missing_root) = find_unknown_attestation_block(store, &attestation_data) {
store
.pending_attestations
.entry(missing_root)
.or_default()
.push(signed_attestation);
store.pending_fetch_roots.insert(missing_root);
return Ok(());
}

// Validate the attestation data first
validate_attestation_data(store, &attestation_data).inspect_err(|_| {
METRICS.get().map(|metrics| {
Expand Down Expand Up @@ -184,6 +208,19 @@ pub fn on_attestation(
let validator_id = signed_attestation.validator_id;
let attestation_data = signed_attestation.message.clone();

// Queue gossip attestations if any referenced block is not yet in the store.
if !is_from_block {
if let Some(missing_root) = find_unknown_attestation_block(store, &attestation_data) {
store
.pending_attestations
.entry(missing_root)
.or_default()
.push(signed_attestation);
store.pending_fetch_roots.insert(missing_root);
return Ok(());
}
}

// Validate attestation data
validate_attestation_data(store, &attestation_data).inspect_err(|_| {
METRICS.get().map(|metrics| {
Expand All @@ -206,6 +243,11 @@ pub fn on_attestation(
store
.gossip_signatures
.insert(sig_key, signed_attestation.signature);
METRICS.get().map(|metrics| {
metrics
.lean_gossip_signatures
.set(store.gossip_signatures.len() as i64)
});
}

on_attestation_internal(store, validator_id, attestation_data, is_from_block)
Expand Down Expand Up @@ -241,9 +283,6 @@ pub fn on_attestation(
/// For production, signature verification should be added:
/// 1. Verify the `AggregatedSignatureProof` against the aggregation bits
/// 2. Consider async/batched verification for throughput
/// 3. Cache verification results to avoid re-verifying the same aggregations
///
/// See Ream's approach: deferred verification in gossip path with later validation
#[inline]
pub fn on_aggregated_attestation(
store: &mut Store,
Expand All @@ -253,6 +292,17 @@ pub fn on_aggregated_attestation(
let attestation_data = signed_aggregated_attestation.data.clone();
let proof = signed_aggregated_attestation.proof.clone();

// Queue if any referenced block is not yet in the store.
if let Some(missing_root) = find_unknown_attestation_block(store, &attestation_data) {
store
.pending_aggregated_attestations
.entry(missing_root)
.or_default()
.push(signed_aggregated_attestation);
store.pending_fetch_roots.insert(missing_root);
return Ok(());
}

// Validate attestation data (slot bounds, target validity, etc.)
// TODO(production): Add signature verification here or in caller
validate_attestation_data(store, &attestation_data)?;
Expand Down Expand Up @@ -344,7 +394,6 @@ pub fn on_block(store: &mut Store, signed_block: SignedBlockWithAttestation) ->
let block_root = signed_block.message.block.hash_tree_root();

if store.blocks.contains_key(&block_root) {
// stop_and_discard(timer);
return Ok(());
}

Expand Down Expand Up @@ -417,6 +466,29 @@ fn process_block_internal(
// Also store signed block for serving BlocksByRoot requests (checkpoint sync backfill)
store.signed_blocks.insert(block_root, signed_block.clone());

// Retry attestations that arrived before this block was known.
// Drain the queue for this root and re-process each attestation.
// Attestations that still reference other unknown blocks are re-queued automatically.
let pending = store
.pending_attestations
.remove(&block_root)
.unwrap_or_default();
for signed_att in pending {
if let Err(err) = on_attestation(store, signed_att, false) {
warn!(%err, "Pending attestation retry failed after block arrival");
}
}

let pending_agg = store
.pending_aggregated_attestations
.remove(&block_root)
.unwrap_or_default();
for signed_agg in pending_agg {
if let Err(err) = on_aggregated_attestation(store, signed_agg) {
warn!(%err, "Pending aggregated attestation retry failed after block arrival");
}
}

let justified_updated = new_state.latest_justified.slot > store.latest_justified.slot;
let finalized_updated = new_state.latest_finalized.slot > store.latest_finalized.slot;

Expand Down Expand Up @@ -535,6 +607,11 @@ fn process_block_internal(
store
.gossip_signatures
.insert(proposer_sig_key, signed_block.signature.proposer_signature);
METRICS.get().map(|metrics| {
metrics
.lean_gossip_signatures
.set(store.gossip_signatures.len() as i64)
});
store
.attestation_data_by_root
.insert(proposer_data_root, proposer_attestation.data.clone());
Expand Down
65 changes: 36 additions & 29 deletions lean_client/fork_choice/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};

use anyhow::{Result, anyhow, ensure};
use containers::{
AggregatedSignatureProof, Attestation, AttestationData, Block, BlockHeader, Checkpoint, Config,
SignatureKey, SignedBlockWithAttestation, Slot, State,
SignatureKey, SignedAggregatedAttestation, SignedAttestation, SignedBlockWithAttestation, Slot,
State,
};
use metrics::{METRICS, set_gauge_u64};
use ssz::{H256, SszHash};
Expand Down Expand Up @@ -62,6 +63,18 @@ pub struct Store {
/// Signed blocks indexed by block root.
/// Used to serve BlocksByRoot requests to peers for checkpoint sync backfill.
pub signed_blocks: HashMap<H256, SignedBlockWithAttestation>,

/// Gossip attestations waiting for referenced blocks to arrive.
/// Keyed by the missing block root. Drained when that block is processed.
pub pending_attestations: HashMap<H256, Vec<SignedAttestation>>,

/// Aggregated attestations waiting for referenced blocks to arrive.
/// Keyed by the missing block root. Drained when that block is processed.
pub pending_aggregated_attestations: HashMap<H256, Vec<SignedAggregatedAttestation>>,

/// Block roots that were referenced by attestations but not found in the store.
/// Drained by the caller (main.rs) to trigger blocks-by-root RPC fetches.
pub pending_fetch_roots: HashSet<H256>,
}

const JUSTIFICATION_LOOKBACK_SLOTS: u64 = 3;
Expand Down Expand Up @@ -165,26 +178,25 @@ pub fn get_forkchoice_store(
block_header.hash_tree_root()
};

// Per checkpoint sync: always use anchor block's root for checkpoints.
// Per checkpoint sync: always use anchor block's root and slot for checkpoints.
// The original checkpoint roots point to blocks that don't exist in our store.
// We only have the anchor block, so use its root. Keep the slot from state
// to preserve justification/finalization progress information.
// We only have the anchor block, so both root and slot must refer to it.
//
// Using the state's justified.slot with the anchor root creates an inconsistency:
// validate_attestation_data requires store.blocks[source.root].slot == source.slot,
// which fails when the chain has progressed beyond the last justified block
// (e.g., state downloaded at slot 2291, last justified at slot 2285).
//
// The first real justification event from on_block will replace these values
// with the correct ones, so the anchor slot is only used for the initial period.
let latest_justified = Checkpoint {
root: block_root,
slot: if anchor_state.latest_justified.root.is_zero() {
block_slot
} else {
anchor_state.latest_justified.slot
},
slot: block_slot,
};

let latest_finalized = Checkpoint {
root: block_root,
slot: if anchor_state.latest_finalized.root.is_zero() {
block_slot
} else {
anchor_state.latest_finalized.slot
},
slot: block_slot,
};

// Store the original anchor_state - do NOT modify it
Expand All @@ -207,6 +219,9 @@ pub fn get_forkchoice_store(
latest_new_aggregated_payloads: HashMap::new(),
attestation_data_by_root: HashMap::new(),
signed_blocks: [(block_root, anchor_block)].into(),
pending_attestations: HashMap::new(),
pending_aggregated_attestations: HashMap::new(),
pending_fetch_roots: HashSet::new(),
}
}

Expand Down Expand Up @@ -499,7 +514,10 @@ pub fn get_proposal_head(store: &mut Store, slot: Slot) -> H256 {
/// 1. **Get Proposal Head**: Retrieve current chain head as parent
/// 2. **Collect Attestations**: Convert known attestations to plain attestations
/// 3. **Build Block**: Use State.build_block with signature caches
/// 4. **Store Block**: Insert block and post-state into Store
///
/// The block and state are NOT inserted here. The caller signs the block and sends
/// it back via `ChainMessage::ProcessBlock`, which runs the full `on_block` path:
/// state transition, `update_head`, checkpoint updates, and proposer attestation.
///
/// # Arguments
/// * `store` - Mutable reference to the fork choice store
Expand All @@ -513,15 +531,13 @@ pub fn produce_block_with_signatures(
slot: Slot,
validator_index: u64,
) -> Result<(H256, Block, Vec<AggregatedSignatureProof>)> {
// Get parent block head
let head_root = get_proposal_head(store, slot);
let head_state = store
.states
.get(&head_root)
.ok_or_else(|| anyhow!("Head state not found"))?
.clone();

// Validate proposer authorization for this slot
let num_validators = head_state.validators.len_u64();
let expected_proposer = slot.0 % num_validators;
ensure!(
Expand All @@ -532,8 +548,6 @@ pub fn produce_block_with_signatures(
expected_proposer
);

// Convert AttestationData to Attestation objects for build_block
// Per devnet-2, store now holds AttestationData directly
let available_attestations: Vec<Attestation> = store
.latest_known_attestations
.iter()
Expand All @@ -543,28 +557,21 @@ pub fn produce_block_with_signatures(
})
.collect();

// Get known block roots for attestation validation
let known_block_roots: std::collections::HashSet<H256> = store.blocks.keys().copied().collect();

// Build block with fixed-point attestation collection and signature aggregation
let (final_block, final_post_state, _aggregated_attestations, signatures) = head_state
let (final_block, _final_post_state, _aggregated_attestations, signatures) = head_state
.build_block(
slot,
validator_index,
head_root,
None, // initial_attestations - start with empty, let fixed-point collect
None,
Some(available_attestations),
Some(&known_block_roots),
Some(&store.gossip_signatures),
Some(&store.latest_known_aggregated_payloads),
)?;

// Compute block root using the header hash (canonical block root)
let block_root = final_block.hash_tree_root();

// Store block and state (per devnet-2, we store the plain Block)
store.blocks.insert(block_root, final_block.clone());
store.states.insert(block_root, final_post_state);

Ok((block_root, final_block, signatures))
}
Loading
Loading