Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 128 additions & 66 deletions lean_client/fork_choice/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,32 +35,19 @@ pub fn on_tick(store: &mut Store, time_millis: u64, has_proposal: bool) {
/// 3. A vote cannot be for a future slot.
/// 4. Checkpoint slots must match block slots.
fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<()> {
// Cannot count a vote if we haven't seen the blocks involved
ensure!(
store.blocks.contains_key(&data.source.root),
"Unknown source block: {:?}",
data.source.root
);

ensure!(
store.blocks.contains_key(&data.target.root),
"Unknown target block: {:?}",
&data.target.root
);

ensure!(
store.blocks.contains_key(&data.head.root),
"Unknown head block: {:?}",
&data.head.root
);

// Source must be older than Target.
// Topology: history is linear and monotonic — source <= target <= head (store.py:310-311).
ensure!(
data.source.slot <= data.target.slot,
"Source checkpoint slot {} must not exceed target slot {}",
data.source.slot.0,
data.target.slot.0
);
ensure!(
data.head.slot >= data.target.slot,
"Head slot {} must not be older than target slot {}",
data.head.slot.0,
data.target.slot.0
);

// Validate checkpoint slots match block slots.
// Skip the source slot-match when the root is the store's known justified or
Expand All @@ -69,6 +56,7 @@ fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<()
// slot from the downloaded state (e.g. 100994 vs anchor 101002).
let source_block = &store.blocks[&data.source.root];
let target_block = &store.blocks[&data.target.root];
let head_block = &store.blocks[&data.head.root];

let source_is_trusted_checkpoint = data.source.root == store.latest_justified.root
|| data.source.root == store.latest_finalized.root;
Expand All @@ -89,6 +77,13 @@ fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<()
target_block.slot.0
);

ensure!(
head_block.slot == data.head.slot,
"Head checkpoint slot mismatch: checkpoint {} vs block {}",
data.head.slot.0,
head_block.slot.0
);

// Validate attestation is not too far in the future
// We allow a small margin for clock disparity (1 slot), but no further.
let current_slot = store.time / INTERVALS_PER_SLOT;
Expand Down Expand Up @@ -147,6 +142,10 @@ pub fn on_gossip_attestation(
.or_default()
.push(signed_attestation);
store.pending_fetch_roots.insert(missing_root);
METRICS.get().map(|m| {
m.grandine_pending_fetch_roots
.set(store.pending_fetch_roots.len() as i64)
});
return Ok(());
}

Expand All @@ -160,51 +159,61 @@ pub fn on_gossip_attestation(
});
})?;

// Verify individual XMSS signature against the validator's public key.
// State is available: the pending-block check above confirmed target.root is in the store,
// and states are stored 1:1 with blocks in process_block_internal.
let key_state = store
.states
.get(&attestation_data.target.root)
.ok_or_else(|| anyhow!("no state for target block {}", attestation_data.target.root))?;
let data_root = attestation_data.hash_tree_root();
let sig_key = SignatureKey::new(signed_attestation.validator_id, data_root);

ensure!(
validator_id < key_state.validators.len_u64(),
"validator {} out of range (max {})",
validator_id,
key_state.validators.len_u64()
);
// Skip expensive XMSS verification for already-known signatures.
// Duplicate attestations arrive when the IDontWant buffer fills under CPU load,
// causing peers to rebroadcast. Each verify costs ~100ms; the early exit breaks
// the saturation loop without dropping vote data.
if !store.gossip_signatures.contains_key(&sig_key) {
// Verify individual XMSS signature against the validator's public key.
// State is available: the pending-block check above confirmed target.root is in
// the store, and states are stored 1:1 with blocks in process_block_internal.
let key_state = store
.states
.get(&attestation_data.target.root)
.ok_or_else(|| anyhow!("no state for target block {}", attestation_data.target.root))?;

let pubkey = key_state
.validators
.get(validator_id)
.map(|v| v.pubkey.clone())
.map_err(|e| anyhow!("{e}"))?;
ensure!(
validator_id < key_state.validators.len_u64(),
"validator {} out of range (max {})",
validator_id,
key_state.validators.len_u64()
);

let data_root = attestation_data.hash_tree_root();
let pubkey = key_state
.validators
.get(validator_id)
.map(|v| v.pubkey.clone())
.map_err(|e| anyhow!("{e}"))?;

signed_attestation
.signature
.verify(&pubkey, attestation_data.slot.0 as u32, data_root)
.context("individual attestation signature verification failed")?;
signed_attestation
.signature
.verify(&pubkey, attestation_data.slot.0 as u32, data_root)
.context("individual attestation signature verification failed")?;

// Store verified signature for later lookup during block building
let sig_key = SignatureKey::new(signed_attestation.validator_id, data_root);
store
.gossip_signatures
.insert(sig_key, signed_attestation.signature);
store
.gossip_signatures
.insert(sig_key, signed_attestation.signature);

// Update gossip signatures gauge
METRICS.get().map(|metrics| {
metrics
.lean_gossip_signatures
.set(store.gossip_signatures.len() as i64);
});
// Update gossip signatures gauge
METRICS.get().map(|metrics| {
metrics
.lean_gossip_signatures
.set(store.gossip_signatures.len() as i64);
});
} else {
METRICS.get().map(|m| m.grandine_xmss_verify_skipped_total.inc());
}

// Store attestation data indexed by hash for aggregation lookup
store
.attestation_data_by_root
.insert(data_root, attestation_data.clone());
METRICS.get().map(|m| {
m.grandine_attestation_data_by_root
.set(store.attestation_data_by_root.len() as i64)
});

// Process the attestation data (not from block)
on_attestation_internal(store, validator_id, attestation_data, false)
Expand Down Expand Up @@ -249,17 +258,24 @@ pub fn on_attestation(
let validator_id = signed_attestation.validator_id;
let attestation_data = signed_attestation.message.clone();

// Queue gossip attestations if any referenced block is not yet in the store.
if !is_from_block {
if let Some(missing_root) = find_unknown_attestation_block(store, &attestation_data) {
store
.pending_attestations
.entry(missing_root)
.or_default()
.push(signed_attestation);
store.pending_fetch_roots.insert(missing_root);
return Ok(());
// All three referenced roots (source, target, head) must be in the store.
// For gossip: queue as pending if a block is missing, to be retried on arrival.
// For block-body: a missing root means the block itself is invalid — reject it.
if let Some(missing_root) = find_unknown_attestation_block(store, &attestation_data) {
if is_from_block {
bail!("block-body attestation references unknown block root {missing_root}");
}
store
.pending_attestations
.entry(missing_root)
.or_default()
.push(signed_attestation);
store.pending_fetch_roots.insert(missing_root);
METRICS.get().map(|m| {
m.grandine_pending_fetch_roots
.set(store.pending_fetch_roots.len() as i64)
});
return Ok(());
}

// Validate attestation data
Expand Down Expand Up @@ -334,17 +350,24 @@ pub fn on_aggregated_attestation(
.or_default()
.push(signed_aggregated_attestation);
store.pending_fetch_roots.insert(missing_root);
METRICS.get().map(|m| {
m.grandine_pending_fetch_roots
.set(store.pending_fetch_roots.len() as i64)
});
return Ok(());
}

// Validate attestation data (slot bounds, target validity, etc.)
validate_attestation_data(store, &attestation_data)?;

// Store attestation data indexed by hash for later extraction
let data_root = attestation_data.hash_tree_root();
store
.attestation_data_by_root
.insert(data_root, attestation_data.clone());
METRICS.get().map(|m| {
m.grandine_attestation_data_by_root
.set(store.attestation_data_by_root.len() as i64)
});

// Verify aggregated XMSS proof against participant public keys.
// State is available: the pending-block check above confirmed target.root is in the store,
Expand Down Expand Up @@ -444,6 +467,12 @@ fn on_attestation_internal(
.insert(validator_id, attestation_data);
}
}
METRICS.get().map(|m| {
m.grandine_fork_choice_known_attestations
.set(store.latest_known_attestations.len() as i64);
m.grandine_fork_choice_new_attestations
.set(store.latest_new_attestations.len() as i64);
});
Ok(())
}

Expand Down Expand Up @@ -595,6 +624,31 @@ fn process_block_internal(
.saturating_sub(STATE_PRUNE_BUFFER);
store.states.retain(|_, state| state.slot.0 >= keep_from);
store.blocks.retain(|_, block| block.slot.0 >= keep_from);

// Prune stale attestation data — mirrors leanSpec store.prune_stale_attestation_data()
// (store.py:230-278), called at store.py:565-566 whenever finalization advances.
// Criterion: target.slot <= finalized_slot → stale, no longer affects fork choice.
// attestation_data_by_root is the secondary index used for target.slot lookup and
// must be pruned last so the three retain calls above can still resolve target.slot.
let finalized_slot = store.latest_finalized.slot.0;
let adr = &store.attestation_data_by_root;
store.gossip_signatures.retain(|key, _| {
adr.get(&key.data_root)
.map_or(true, |data| data.target.slot.0 > finalized_slot)
});
store.latest_known_aggregated_payloads.retain(|key, _| {
adr.get(&key.data_root)
.map_or(true, |data| data.target.slot.0 > finalized_slot)
});
store.latest_new_aggregated_payloads.retain(|key, _| {
adr.get(&key.data_root)
.map_or(true, |data| data.target.slot.0 > finalized_slot)
});
store.attestation_data_by_root.retain(|_, data| data.target.slot.0 > finalized_slot);
METRICS.get().map(|m| {
m.grandine_attestation_data_by_root
.set(store.attestation_data_by_root.len() as i64)
});
}

if !justified_updated && !finalized_updated {
Expand Down Expand Up @@ -647,6 +701,10 @@ fn process_block_internal(
.lean_latest_known_aggregated_payloads
.set(store.latest_known_aggregated_payloads.len() as i64);
});
METRICS.get().map(|m| {
m.grandine_attestation_data_by_root
.set(store.attestation_data_by_root.len() as i64)
});

// Process each aggregated attestation's validators for fork choice.
// Signatures have already been verified above via verify_signatures().
Expand Down Expand Up @@ -689,6 +747,10 @@ fn process_block_internal(
store
.attestation_data_by_root
.insert(proposer_data_root, proposer_attestation.data.clone());
METRICS.get().map(|m| {
m.grandine_attestation_data_by_root
.set(store.attestation_data_by_root.len() as i64)
});

// Process proposer attestation as if received via gossip (is_from_block=false)
// This ensures it goes to "new" attestations and doesn't immediately affect fork choice
Expand Down
19 changes: 18 additions & 1 deletion lean_client/fork_choice/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use containers::{
SignedBlockWithAttestation, Slot, State,
};
use metrics::{METRICS, set_gauge_u64};
use tracing::warn;
use ssz::{H256, SszHash};
use xmss::Signature;

Expand Down Expand Up @@ -261,7 +262,17 @@ pub fn get_fork_choice_head(
.expect("Error: Empty block.");
}
let mut vote_weights: HashMap<H256, usize> = HashMap::new();
let root_slot = store.blocks[&root].slot;
let root_slot = match store.blocks.get(&root) {
Some(block) => block.slot,
None => {
warn!(
%root,
justified_slot = store.latest_justified.slot.0,
"justified root not in store blocks, returning justified root as head"
);
return root;
}
};

// stage 1: accumulate weights by walking up from each attestation's head
for attestation_data in latest_attestations.values() {
Expand Down Expand Up @@ -498,6 +509,12 @@ pub fn accept_new_attestations(store: &mut Store) {
.latest_known_attestations
.extend(store.latest_new_attestations.drain());
update_head(store);
METRICS.get().map(|m| {
m.grandine_fork_choice_known_attestations
.set(store.latest_known_attestations.len() as i64);
m.grandine_fork_choice_new_attestations
.set(store.latest_new_attestations.len() as i64);
});
}

pub fn tick_interval(store: &mut Store, has_proposal: bool) {
Expand Down
Loading
Loading