diff --git a/lean_client/containers/src/state.rs b/lean_client/containers/src/state.rs index 34d825c..963949b 100644 --- a/lean_client/containers/src/state.rs +++ b/lean_client/containers/src/state.rs @@ -342,11 +342,6 @@ impl State { let state = self.process_block_header(block)?; - ensure!( - !AggregatedAttestation::has_duplicate_data(&block.body.attestations), - "block contains duplicate attestation data" - ); - state.process_attestations(&block.body.attestations) } diff --git a/lean_client/fork_choice/src/handlers.rs b/lean_client/fork_choice/src/handlers.rs index 25d12e9..eb7d824 100644 --- a/lean_client/fork_choice/src/handlers.rs +++ b/lean_client/fork_choice/src/handlers.rs @@ -1,4 +1,4 @@ -use anyhow::{Result, anyhow, bail, ensure}; +use anyhow::{Context, Result, anyhow, bail, ensure}; use containers::{ AttestationData, SignatureKey, SignedAggregatedAttestation, SignedAttestation, SignedBlockWithAttestation, @@ -62,17 +62,25 @@ fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<() data.target.slot.0 ); - // Validate checkpoint slots match block slots - // Per devnet-2, store.blocks now contains Block (not SignedBlockWithAttestation) + // Validate checkpoint slots match block slots. + // Skip the source slot-match when the root is the store's known justified or + // finalized checkpoint: after checkpoint sync the anchor block sits at + // anchor_slot in store.blocks but the checkpoint carries the actual historical + // slot from the downloaded state (e.g. 100994 vs anchor 101002). let source_block = &store.blocks[&data.source.root]; let target_block = &store.blocks[&data.target.root]; - ensure!( - source_block.slot == data.source.slot, - "Source checkpoint slot mismatch: checkpoint {} vs block {}", - data.source.slot.0, - source_block.slot.0 - ); + let source_is_trusted_checkpoint = data.source.root == store.latest_justified.root + || data.source.root == store.latest_finalized.root; + + if !source_is_trusted_checkpoint { + ensure!( + source_block.slot == data.source.slot, + "Source checkpoint slot mismatch: checkpoint {} vs block {}", + data.source.slot.0, + source_block.slot.0 + ); + } ensure!( target_block.slot == data.target.slot, @@ -152,8 +160,35 @@ pub fn on_gossip_attestation( }); })?; - // Store signature for later lookup during block building + // Verify individual XMSS signature against the validator's public key. + // State is available: the pending-block check above confirmed target.root is in the store, + // and states are stored 1:1 with blocks in process_block_internal. + let key_state = store + .states + .get(&attestation_data.target.root) + .ok_or_else(|| anyhow!("no state for target block {}", attestation_data.target.root))?; + + ensure!( + validator_id < key_state.validators.len_u64(), + "validator {} out of range (max {})", + validator_id, + key_state.validators.len_u64() + ); + + let pubkey = key_state + .validators + .get(validator_id) + .map(|v| v.pubkey.clone()) + .map_err(|e| anyhow!("{e}"))?; + let data_root = attestation_data.hash_tree_root(); + + signed_attestation + .signature + .verify(&pubkey, attestation_data.slot.0 as u32, data_root) + .context("individual attestation signature verification failed")?; + + // Store verified signature for later lookup during block building let sig_key = SignatureKey::new(signed_attestation.validator_id, data_root); store .gossip_signatures @@ -281,14 +316,7 @@ pub fn on_attestation( /// `latest_new_aggregated_payloads`. At interval 3, these are merged with /// `latest_known_aggregated_payloads` (from blocks) to compute safe target. /// -/// # Signature Verification Strategy (TODO for production) -/// -/// Currently, this function validates attestation data but does NOT verify the -/// aggregated XMSS signature. This is intentional for devnet-3 performance testing. -/// -/// For production, signature verification should be added: -/// 1. Verify the `AggregatedSignatureProof` against the aggregation bits -/// 2. Consider async/batched verification for throughput +/// Verifies the aggregated XMSS proof against participant public keys before storing. #[inline] pub fn on_aggregated_attestation( store: &mut Store, @@ -310,7 +338,6 @@ pub fn on_aggregated_attestation( } // Validate attestation data (slot bounds, target validity, etc.) - // TODO(production): Add signature verification here or in caller validate_attestation_data(store, &attestation_data)?; // Store attestation data indexed by hash for later extraction @@ -319,7 +346,38 @@ pub fn on_aggregated_attestation( .attestation_data_by_root .insert(data_root, attestation_data.clone()); - // Per leanSpec: Store the proof in latest_new_aggregated_payloads + // Verify aggregated XMSS proof against participant public keys. + // State is available: the pending-block check above confirmed target.root is in the store, + // and states are stored 1:1 with blocks in process_block_internal. + let key_state = store + .states + .get(&attestation_data.target.root) + .ok_or_else(|| anyhow!("no state for target block {}", attestation_data.target.root))?; + + // Guard before calling to_validator_indices() which panics on an empty bitfield. + ensure!( + proof.participants.0.iter().any(|b| *b), + "aggregated attestation has empty participants bitfield" + ); + + let validator_ids = proof.participants.to_validator_indices(); + + let public_keys = validator_ids + .iter() + .map(|&id| { + key_state + .validators + .get(id) + .map(|v| v.pubkey.clone()) + .map_err(Into::into) + }) + .collect::>>()?; + + proof + .verify(public_keys, data_root, attestation_data.slot.0 as u32) + .context("aggregated attestation proof verification failed")?; + + // Per leanSpec: Store the verified proof in latest_new_aggregated_payloads // Each participating validator gets an entry via their SignatureKey for (bit_idx, bit) in proof.participants.0.iter().enumerate() { if *bit { @@ -452,7 +510,12 @@ fn process_block_internal( "Processing block - parent state info" ); - // Execute state transition to get post-state + // Verify block signatures against parent state before executing the state transition. + // If any signature is invalid the error propagates and the block is rejected; + // it never enters store.blocks or store.states. + signed_block.verify_signatures(state.clone())?; + + // Execute state transition to get post-state (signatures verified above) let new_state = state.state_transition(signed_block.clone(), true)?; // Debug: Log new state checkpoints after transition @@ -516,6 +579,7 @@ fn process_block_internal( "Store finalized checkpoint updated!" ); store.latest_finalized = new_state.latest_finalized.clone(); + store.finalized_ever_updated = true; METRICS.get().map(|metrics| { let Some(slot) = new_state.latest_finalized.slot.0.try_into().ok() else { warn!("unable to set latest_finalized slot in metrics"); @@ -530,6 +594,7 @@ fn process_block_internal( .0 .saturating_sub(STATE_PRUNE_BUFFER); store.states.retain(|_, state| state.slot.0 >= keep_from); + store.blocks.retain(|_, block| block.slot.0 >= keep_from); } if !justified_updated && !finalized_updated { @@ -583,9 +648,9 @@ fn process_block_internal( .set(store.latest_known_aggregated_payloads.len() as i64); }); - // Process each aggregated attestation's validators for fork choice - // Signature verification is done in verify_signatures() before on_block() - // Per Devnet-2, we process attestation data directly (not SignedAttestation) + // Process each aggregated attestation's validators for fork choice. + // Signatures have already been verified above via verify_signatures(). + // Per Devnet-2, we process attestation data directly (not SignedAttestation). for aggregated_attestation in aggregated_attestations.into_iter() { let validator_ids: Vec = aggregated_attestation .aggregation_bits diff --git a/lean_client/fork_choice/src/lib.rs b/lean_client/fork_choice/src/lib.rs index 3b88082..9bb2e0b 100644 --- a/lean_client/fork_choice/src/lib.rs +++ b/lean_client/fork_choice/src/lib.rs @@ -1,6 +1,7 @@ pub mod block_cache; pub mod handlers; pub mod store; +pub mod sync_state; // dirty hack to avoid issues compiling grandine dependencies. by default, bls // crate has no features enabled, and thus compilation fails (as exactly one diff --git a/lean_client/fork_choice/src/store.rs b/lean_client/fork_choice/src/store.rs index 53b7257..d649e7d 100644 --- a/lean_client/fork_choice/src/store.rs +++ b/lean_client/fork_choice/src/store.rs @@ -2,9 +2,9 @@ use std::collections::{HashMap, HashSet}; use anyhow::{Result, anyhow, ensure}; use containers::{ - AggregatedSignatureProof, Attestation, AttestationData, Block, BlockHeader, Checkpoint, Config, - SignatureKey, SignedAggregatedAttestation, SignedAttestation, SignedBlockWithAttestation, Slot, - State, + AggregatedSignatureProof, Attestation, AttestationData, Block, BlockHeader, + Checkpoint, Config, SignatureKey, SignedAggregatedAttestation, SignedAttestation, + SignedBlockWithAttestation, Slot, State, }; use metrics::{METRICS, set_gauge_u64}; use ssz::{H256, SszHash}; @@ -34,12 +34,22 @@ pub struct Store { pub latest_finalized: Checkpoint, /// Set to `true` the first time `on_block` drives a justified checkpoint - /// update beyond the initial anchor value. Validator duties (attestation, - /// block proposal) must not run while this is `false` — the store's - /// `latest_justified` is still the placeholder anchor checkpoint and using - /// it as an attestation source would produce wrong source checkpoints. + /// update beyond the initial checkpoint-sync value. Validator duties must + /// not run while this is `false` — the node has not yet observed real + /// justification progress and its attestations would reference a stale source. pub justified_ever_updated: bool, + /// Set to `true` the first time `on_block` drives a finalized checkpoint + /// update beyond the initial anchor value. + /// + /// The `/states/finalized` endpoint must return 503 while this is `false`. + /// A checkpoint-synced node that has not yet seen real finalization holds + /// the anchor block (head slot, not finalized slot) as `latest_finalized`. + /// Serving that state poisons downstream checkpoint syncs: the receiving + /// node anchors at the head slot, which exceeds the network's justified + /// slot, causing the justified-ever-updated gate to never fire. + pub finalized_ever_updated: bool, + pub blocks: HashMap, pub states: HashMap, @@ -170,9 +180,11 @@ pub fn get_forkchoice_store( let block_slot = block.slot; // Compute block root differently for genesis vs checkpoint sync: - // - Genesis (slot 0): Use block.hash_tree_root() directly - // - Checkpoint sync (slot > 0): Use BlockHeader from state.latest_block_header - // because we have the correct body_root there but may have synthetic empty body in Block + // - Genesis (slot 0): Use block.hash_tree_root() directly — block and state are consistent. + // - Checkpoint sync (slot > 0): Reconstruct BlockHeader from state.latest_block_header, + // using anchor_state.hash_tree_root() as state_root. This guarantees the root stored + // as the key in store.blocks / store.states is the canonical one committed to by the + // downloaded state, independent of what the real block's state_root field contains. let block_root = if block_slot.0 == 0 { block.hash_tree_root() } else { @@ -186,25 +198,18 @@ pub fn get_forkchoice_store( block_header.hash_tree_root() }; - // Per checkpoint sync: always use anchor block's root and slot for checkpoints. - // The original checkpoint roots point to blocks that don't exist in our store. - // We only have the anchor block, so both root and slot must refer to it. - // - // Using the state's justified.slot with the anchor root creates an inconsistency: - // validate_attestation_data requires store.blocks[source.root].slot == source.slot, - // which fails when the chain has progressed beyond the last justified block - // (e.g., state downloaded at slot 2291, last justified at slot 2285). - // - // The first real justification event from on_block will replace these values - // with the correct ones, so the anchor slot is only used for the initial period. + // Per leanSpec: substitute anchor_root for the checkpoint roots (the + // historical justified/finalized blocks are not in our store), but keep + // the actual slots from the downloaded state. validate_attestation_data + // skips the block-slot match when the source root is a known checkpoint. let latest_justified = Checkpoint { root: block_root, - slot: block_slot, + slot: anchor_state.latest_justified.slot, }; let latest_finalized = Checkpoint { root: block_root, - slot: block_slot, + slot: anchor_state.latest_finalized.slot, }; // Store the original anchor_state - do NOT modify it @@ -218,8 +223,17 @@ pub fn get_forkchoice_store( latest_justified, latest_finalized, justified_ever_updated: false, - blocks: [(block_root, block)].into(), - states: [(block_root, anchor_state)].into(), + finalized_ever_updated: false, + blocks: { + let mut m = HashMap::new(); + m.insert(block_root, block); + m + }, + states: { + let mut m = HashMap::new(); + m.insert(block_root, anchor_state); + m + }, latest_known_attestations: HashMap::new(), latest_new_attestations: HashMap::new(), gossip_signatures: HashMap::new(), diff --git a/lean_client/fork_choice/src/sync_state.rs b/lean_client/fork_choice/src/sync_state.rs new file mode 100644 index 0000000..bdd54ad --- /dev/null +++ b/lean_client/fork_choice/src/sync_state.rs @@ -0,0 +1,25 @@ +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum SyncState { + #[default] + Idle, + Syncing, + Synced, +} + +impl SyncState { + pub fn accepts_gossip(self) -> bool { + matches!(self, SyncState::Syncing | SyncState::Synced) + } + + pub fn is_idle(self) -> bool { + self == SyncState::Idle + } + + pub fn is_syncing(self) -> bool { + self == SyncState::Syncing + } + + pub fn is_synced(self) -> bool { + self == SyncState::Synced + } +} diff --git a/lean_client/http_api/src/handlers.rs b/lean_client/http_api/src/handlers.rs index 138c9d3..b55ac4c 100644 --- a/lean_client/http_api/src/handlers.rs +++ b/lean_client/http_api/src/handlers.rs @@ -23,6 +23,10 @@ pub async fn health() -> impl IntoResponse { pub async fn states_finalized(State(store): State) -> Result { let store = store.read(); + if !store.finalized_ever_updated { + return Err(StatusCode::SERVICE_UNAVAILABLE); + } + let finalized_root = store.latest_finalized.root; let state = store diff --git a/lean_client/networking/src/network/service.rs b/lean_client/networking/src/network/service.rs index b123046..866c9d1 100644 --- a/lean_client/networking/src/network/service.rs +++ b/lean_client/networking/src/network/service.rs @@ -30,6 +30,7 @@ use rand::seq::IndexedRandom; use serde::{Deserialize, Serialize}; use ssz::{H256, SszHash, SszWrite as _}; use tokio::select; +use tokio::sync::Notify; use tokio::time::{Duration, MissedTickBehavior, interval}; use tracing::{debug, info, trace, warn}; @@ -43,13 +44,13 @@ use crate::{ req_resp::{self, LeanRequest, ReqRespMessage}, types::{ ChainMessage, ChainMessageSink, ConnectionState, MAX_BLOCK_CACHE_SIZE, - OutboundP2pRequest, P2pRequestSource, SignedBlockProvider, StatusProvider, + NetworkFinalizedSlot, OutboundP2pRequest, P2pRequestSource, SignedBlockProvider, + StatusProvider, }, }; const MAX_BLOCKS_BY_ROOT_RETRIES: u8 = 3; const MAX_BLOCK_FETCH_DEPTH: u32 = 65536; -/// Maximum roots per BlocksByRoot request, aligned with leanSpec BackfillSync. const MAX_BLOCKS_PER_REQUEST: usize = 10; /// Stalled request timeout. If a peer accepts the stream but never sends a response, /// the request is cancelled and retried with a different peer after this duration. @@ -195,6 +196,9 @@ where pending_block_depths: HashMap, /// Roots currently in-flight to deduplicate network-layer pipelining vs chain-side requests in_flight_roots: HashSet, + network_finalized_slot: NetworkFinalizedSlot, + peer_finalized_slots: HashMap, + status_notify: Arc, } impl NetworkService @@ -208,6 +212,8 @@ where chain_message_sink: S, signed_block_provider: SignedBlockProvider, status_provider: StatusProvider, + network_finalized_slot: NetworkFinalizedSlot, + status_notify: Arc, ) -> Result { Self::new_with_peer_count( network_config, @@ -216,6 +222,8 @@ where Arc::new(AtomicU64::new(0)), signed_block_provider, status_provider, + network_finalized_slot, + status_notify, ) .await } @@ -227,6 +235,8 @@ where peer_count: Arc, signed_block_provider: SignedBlockProvider, status_provider: StatusProvider, + network_finalized_slot: NetworkFinalizedSlot, + status_notify: Arc, ) -> Result { let local_key = Keypair::generate_secp256k1(); Self::new_with_keypair( @@ -237,6 +247,8 @@ where local_key, signed_block_provider, status_provider, + network_finalized_slot, + status_notify, ) .await } @@ -249,6 +261,8 @@ where local_key: Keypair, signed_block_provider: SignedBlockProvider, status_provider: StatusProvider, + network_finalized_slot: NetworkFinalizedSlot, + status_notify: Arc, ) -> Result { let behaviour = Self::build_behaviour(&local_key, &network_config)?; @@ -304,6 +318,9 @@ where pending_blocks_by_root: HashMap::new(), pending_block_depths: HashMap::new(), in_flight_roots: HashSet::new(), + network_finalized_slot, + peer_finalized_slots: HashMap::new(), + status_notify, }; service.listen(&multiaddr)?; @@ -453,6 +470,9 @@ where .count() as u64; self.peer_count.store(connected, Ordering::Relaxed); + self.peer_finalized_slots.remove(&peer_id); + self.recompute_network_finalized_slot(); + info!(peer = %peer_id, ?cause, "Disconnected from peer (total: {})", connected); METRICS.get().map(|metrics| { @@ -1093,6 +1113,29 @@ where } } + fn recompute_network_finalized_slot(&mut self) { + let mut counts: HashMap = HashMap::new(); + for &slot in self.peer_finalized_slots.values() { + *counts.entry(slot).or_insert(0) += 1; + } + let mode = if counts.is_empty() { + None + } else { + let max_count = *counts.values().max().unwrap(); + counts + .iter() + .filter(|(_, c)| **c == max_count) + .map(|(s, _)| *s) + .min() + }; + let mut slot_guard = self.network_finalized_slot.lock(); + if *slot_guard != mode { + *slot_guard = mode; + drop(slot_guard); + self.status_notify.notify_one(); + } + } + fn maybe_trigger_backfill( &mut self, peer: PeerId, @@ -1102,6 +1145,9 @@ where our_finalized_slot: u64, our_head_slot: u64, ) { + self.peer_finalized_slots.insert(peer, peer_finalized_slot); + self.recompute_network_finalized_slot(); + if (peer_finalized_slot > our_finalized_slot || peer_head_slot > our_head_slot) && !peer_head_root.is_zero() { diff --git a/lean_client/networking/src/types.rs b/lean_client/networking/src/types.rs index 919b435..6969b41 100644 --- a/lean_client/networking/src/types.rs +++ b/lean_client/networking/src/types.rs @@ -1,4 +1,8 @@ -use std::{collections::HashMap, fmt::Display, sync::Arc}; +use std::{ + collections::HashMap, + fmt::Display, + sync::Arc, +}; use anyhow::{Result, anyhow}; use async_trait::async_trait; @@ -7,7 +11,7 @@ use containers::{ SignedAttestation, SignedBlockWithAttestation, Slot, Status, }; use metrics::METRICS; -use parking_lot::RwLock; +use parking_lot::{Mutex, RwLock}; use serde::{Deserialize, Serialize}; use ssz::H256; use tokio::sync::{mpsc, oneshot}; @@ -27,6 +31,8 @@ pub type SignedBlockProvider = Arc>; +pub type NetworkFinalizedSlot = Arc>>; + /// 1-byte domain for gossip message-id isolation of valid snappy messages. /// Per leanSpec, prepended to the message hash when decompression succeeds. pub const MESSAGE_DOMAIN_VALID_SNAPPY: &[u8; 1] = &[0x01]; diff --git a/lean_client/src/main.rs b/lean_client/src/main.rs index 5fe1d26..92a1df2 100644 --- a/lean_client/src/main.rs +++ b/lean_client/src/main.rs @@ -14,6 +14,7 @@ use fork_choice::{ INTERVALS_PER_SLOT, MILLIS_PER_INTERVAL, Store, get_forkchoice_store, produce_block_with_signatures, }, + sync_state::SyncState, }; use http_api::HttpServerConfig; use libp2p_identity::Keypair; @@ -22,10 +23,10 @@ use networking::gossipsub::config::GossipsubConfig; use networking::gossipsub::topic::{compute_subnet_id, get_subscription_topics}; use networking::network::{NetworkService, NetworkServiceConfig}; use networking::types::{ - ChainMessage, MAX_BLOCK_CACHE_SIZE, OutboundP2pRequest, SignedBlockProvider, StatusProvider, - ValidatorChainMessage, + ChainMessage, MAX_BLOCK_CACHE_SIZE, NetworkFinalizedSlot, OutboundP2pRequest, + SignedBlockProvider, StatusProvider, ValidatorChainMessage, }; -use parking_lot::RwLock; +use parking_lot::{Mutex, RwLock}; use ssz::{PersistentList, SszHash, SszReadDefault as _}; use std::collections::HashMap; use std::sync::Arc; @@ -33,9 +34,9 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{SystemTime, UNIX_EPOCH}; use std::{io::IsTerminal, net::IpAddr}; use tokio::{ - sync::{mpsc, oneshot}, + sync::{Notify, mpsc, oneshot}, task, - time::{Duration, Instant, interval_at}, + time::{Duration, Instant, interval, interval_at}, }; use tracing::level_filters::LevelFilter; use tracing::{debug, error, info, warn}; @@ -257,6 +258,67 @@ fn print_chain_status(store: &Store, connected_peers: u64) { println!("+===============================================================+\n"); } +fn check_sync_trigger(state: &mut SyncState, head_slot: u64, network_finalized: Option) { + if state.is_syncing() { + return; + } + let Some(nf) = network_finalized else { + return; + }; + if nf > head_slot || state.is_idle() { + let prev = *state; + *state = SyncState::Syncing; + info!(head_slot, network_finalized = nf, prev = ?prev, "Sync state: → SYNCING"); + } +} + +fn check_sync_complete( + state: &mut SyncState, + head_slot: u64, + orphan_count: usize, + network_finalized: Option, +) { + if !state.is_syncing() { + return; + } + if orphan_count > 0 { + return; + } + let Some(nf) = network_finalized else { + return; + }; + if head_slot >= nf { + *state = SyncState::Synced; + info!( + head_slot, + network_finalized = nf, + "Sync state: SYNCING → SYNCED" + ); + } +} + +fn check_sync_idle(state: &mut SyncState) { + if state.is_idle() { + return; + } + let prev = *state; + *state = SyncState::Idle; + info!(prev = ?prev, "Sync state: → IDLE (no peers)"); +} + +fn evaluate_sync_state( + state: &mut SyncState, + peers: u64, + head_slot: u64, + network_finalized: Option, +) { + if peers == 0 { + check_sync_idle(state); + } else { + check_sync_trigger(state, head_slot, network_finalized); + } +} + #[derive(Parser, Debug)] struct Args { #[arg(short, long, default_value = "127.0.0.1")] @@ -443,21 +505,24 @@ async fn main() -> Result<()> { let config = Config { genesis_time }; - let (anchor_state, anchor_block) = if let Some(ref url) = args.checkpoint_sync_url { - info!("Checkpoint sync enabled, downloading from: {}", url); + // ── Anchor state: download checkpoint or use genesis ───────────────────────────────────── + // For checkpoint sync: state is downloaded now; the anchor block is fetched from the + // network after the network service starts. `checkpoint_block_root` holds the expected + // block root that MUST arrive before the store is initialised. + // For genesis: state and block are both ready immediately; no network wait is needed. + let anchor_state: State; + let checkpoint_block_root: Option; + let anchor_block_root: H256; + if let Some(ref url) = args.checkpoint_sync_url { + info!("Checkpoint sync enabled, downloading from: {}", url); match download_checkpoint_state(url).await { Ok(checkpoint_state) => { if let Err(e) = verify_checkpoint_state(&checkpoint_state, &genesis_state) { error!("Checkpoint verification failed: {}. Refusing to start.", e); return Err(e); } - - // Compute state root for the checkpoint state let checkpoint_state_root = checkpoint_state.hash_tree_root(); - - // Reconstruct block header from state's latest_block_header with correct state_root - // The state's latest_block_header already contains the correct body_root from the original block let checkpoint_block_header = BlockHeader { slot: checkpoint_state.latest_block_header.slot, proposer_index: checkpoint_state.latest_block_header.proposer_index, @@ -465,56 +530,18 @@ async fn main() -> Result<()> { state_root: checkpoint_state_root, body_root: checkpoint_state.latest_block_header.body_root, }; - - // Compute block root from the BlockHeader (NOT from a synthetic Block with empty body) - let checkpoint_block_root = checkpoint_block_header.hash_tree_root(); - - // Create a Block structure for the SignedBlockWithAttestation - // Note: body is synthetic but block_root is computed correctly from header above - let checkpoint_block = Block { - slot: checkpoint_block_header.slot, - proposer_index: checkpoint_block_header.proposer_index, - parent_root: checkpoint_block_header.parent_root, - state_root: checkpoint_state_root, - body: BlockBody { - attestations: Default::default(), - }, - }; - - let checkpoint_proposer_attestation = Attestation { - validator_id: checkpoint_state.latest_block_header.proposer_index, - data: AttestationData { - slot: checkpoint_state.slot, - head: Checkpoint { - root: checkpoint_block_root, - slot: checkpoint_state.slot, - }, - target: checkpoint_state.latest_finalized.clone(), - source: checkpoint_state.latest_justified.clone(), - }, - }; - - let checkpoint_signed_block = SignedBlockWithAttestation { - message: BlockWithAttestation { - block: checkpoint_block, - proposer_attestation: checkpoint_proposer_attestation, - }, - signature: BlockSignatures { - attestation_signatures: PersistentList::default(), - proposer_signature: Signature::default(), - }, - }; - + let root = checkpoint_block_header.hash_tree_root(); info!( slot = checkpoint_state.slot.0, finalized = checkpoint_state.latest_finalized.slot.0, justified = checkpoint_state.latest_justified.slot.0, - block_root = %format!("0x{:x}", checkpoint_block_root), + block_root = %format!("0x{:x}", root), state_root = %format!("0x{:x}", checkpoint_state_root), - "Checkpoint sync successful" + "Checkpoint state downloaded and verified — will fetch anchor block from network" ); - - (checkpoint_state, checkpoint_signed_block) + anchor_state = checkpoint_state; + checkpoint_block_root = Some(root); + anchor_block_root = root; } Err(e) => { return Err(e.context( @@ -524,27 +551,18 @@ async fn main() -> Result<()> { } } } else { - (genesis_state.clone(), genesis_signed_block) - }; - - // Clone anchor block for seeding the shared block provider later - let anchor_block_for_provider = anchor_block.clone(); - // Compute block root from BlockHeader (NOT from Block with potentially empty body) - // Must match the computation in get_forkchoice_store - let anchor_block_header = BlockHeader { - slot: anchor_state.latest_block_header.slot, - proposer_index: anchor_state.latest_block_header.proposer_index, - parent_root: anchor_state.latest_block_header.parent_root, - state_root: anchor_state.hash_tree_root(), - body_root: anchor_state.latest_block_header.body_root, - }; - let anchor_block_root = anchor_block_header.hash_tree_root(); - - let store = Arc::new(RwLock::new(get_forkchoice_store( - anchor_state.clone(), - anchor_block, - config, - ))); + anchor_state = genesis_state.clone(); + checkpoint_block_root = None; + // Genesis path: reconstruct root from state's latest block header. + anchor_block_root = BlockHeader { + slot: anchor_state.latest_block_header.slot, + proposer_index: anchor_state.latest_block_header.proposer_index, + parent_root: anchor_state.latest_block_header.parent_root, + state_root: anchor_state.hash_tree_root(), + body_root: anchor_state.latest_block_header.body_root, + } + .hash_tree_root(); + } let num_validators = anchor_state.validators.len_u64(); info!(num_validators = num_validators, "Anchor state loaded"); @@ -669,27 +687,29 @@ async fn main() -> Result<()> { let peer_count = Arc::new(AtomicU64::new(0)); let peer_count_for_status = peer_count.clone(); - // Create shared block provider for BlocksByRoot requests (checkpoint sync backfill) - // Seed with anchor block so we can serve it to peers doing checkpoint sync - let mut initial_blocks = HashMap::new(); - initial_blocks.insert(anchor_block_root, anchor_block_for_provider.clone()); - - let signed_block_provider: SignedBlockProvider = Arc::new(RwLock::new(initial_blocks)); + // Create shared block provider for BlocksByRoot requests. + // Start empty: the anchor block is inserted after it is received from the network + // (checkpoint path) or after get_forkchoice_store returns (genesis path). + let signed_block_provider: SignedBlockProvider = Arc::new(RwLock::new(HashMap::new())); let signed_block_provider_for_network = signed_block_provider.clone(); - let initial_status = { - let s = store.read(); - Status::new( - s.latest_finalized.clone(), - Checkpoint { - root: s.head, - slot: s.blocks.get(&s.head).map(|b| b.slot).unwrap_or(Slot(0)), - }, - ) - }; + // Build initial status from anchor state so peers know our finalized checkpoint and head + // before the store is initialised. Updated once the store is ready. + let initial_status = Status::new( + anchor_state.latest_finalized.clone(), + Checkpoint { + root: anchor_block_root, + slot: anchor_state.latest_block_header.slot, + }, + ); let status_provider: StatusProvider = Arc::new(RwLock::new(initial_status)); let status_provider_for_network = status_provider.clone(); + let network_finalized_slot: NetworkFinalizedSlot = Arc::new(Mutex::new(None)); + let network_finalized_slot_for_network = network_finalized_slot.clone(); + + let status_notify = Arc::new(Notify::new()); + // LOAD NODE KEY let mut network_service = if let Some(key_path) = &args.node_key { match load_node_key(key_path) { @@ -704,6 +724,8 @@ async fn main() -> Result<()> { keypair, signed_block_provider_for_network, status_provider_for_network, + network_finalized_slot_for_network, + status_notify.clone(), ) .await .expect("Failed to create network service with custom key") @@ -717,6 +739,8 @@ async fn main() -> Result<()> { peer_count, signed_block_provider_for_network, status_provider_for_network, + network_finalized_slot_for_network, + status_notify.clone(), ) .await .expect("Failed to create network service") @@ -730,6 +754,8 @@ async fn main() -> Result<()> { peer_count, signed_block_provider_for_network, status_provider_for_network, + network_finalized_slot_for_network, + status_notify.clone(), ) .await .expect("Failed to create network service") @@ -741,6 +767,135 @@ async fn main() -> Result<()> { } }); + // ── Anchor block: fetch from network (checkpoint) or use genesis block directly ────────── + // Genesis path : genesis_signed_block is ready immediately; no network wait needed. + // Checkpoint path: send periodic BlocksByRoot(checkpoint_block_root) requests until a peer + // delivers the block. Blocks that arrive with a non-matching root are discarded here; + // they will be re-requested by the normal backfill mechanism once the chain task starts. + // Abort with a clear error if no valid block arrives within the timeout. + const ANCHOR_BLOCK_TIMEOUT_SECS: u64 = 300; + + let anchor_block: SignedBlockWithAttestation = if let Some(expected_root) = + checkpoint_block_root + { + info!( + block_root = %format!("0x{:x}", expected_root), + timeout_secs = ANCHOR_BLOCK_TIMEOUT_SECS, + "Waiting for anchor block from network" + ); + + let mut retry_interval = interval(Duration::from_secs(5)); + + // Wrap the loop in tokio::time::timeout so the deadline fires unconditionally at + // T+ANCHOR_BLOCK_TIMEOUT_SECS, regardless of how many non-anchor messages arrive. + // A deadline arm inside a biased select would be starved when peers continuously + // deliver other blocks and the channel is never empty. + let timeout_result = tokio::time::timeout( + Duration::from_secs(ANCHOR_BLOCK_TIMEOUT_SECS), + async { + loop { + tokio::select! { + msg = chain_message_receiver.recv() => { + let Some(msg) = msg else { + return Err(anyhow::anyhow!( + "Chain message channel closed during anchor block wait" + )); + }; + if let ChainMessage::ProcessBlock { signed_block_with_attestation, .. } = msg { + let root = signed_block_with_attestation.message.block.hash_tree_root(); + if root == expected_root { + // Root match guarantees slot, proposer_index, parent_root, + // state_root, and body contents (via body_root). + // proposer_signature is NOT covered by the hash — verify it + // explicitly so a peer serving a validly-hashed but unsigned + // block cannot become our anchor. + match signed_block_with_attestation + .verify_signatures(anchor_state.clone()) + { + Ok(()) => { + info!( + slot = signed_block_with_attestation.message.block.slot.0, + block_root = %format!("0x{:x}", root), + "Anchor block received and verified — initialising fork-choice store" + ); + return Ok(signed_block_with_attestation); + } + Err(e) => { + warn!( + slot = signed_block_with_attestation.message.block.slot.0, + block_root = %format!("0x{:x}", root), + error = %e, + "Anchor block signature verification failed — \ + discarding, waiting for valid block from another peer" + ); + // Keep waiting; the retry interval will re-request from peers. + } + } + } else { + debug!( + slot = signed_block_with_attestation.message.block.slot.0, + root = %format!("0x{:x}", root), + "Waiting for anchor block — discarding non-anchor block" + ); + } + } + // Attestations and aggregations before the store is ready: discard silently. + } + _ = retry_interval.tick() => { + let _ = outbound_p2p_sender.send( + OutboundP2pRequest::RequestBlocksByRoot(vec![expected_root]) + ); + } + } + } + }, + ) + .await; + + match timeout_result { + Err(_elapsed) => { + return Err(anyhow::anyhow!( + "Anchor block 0x{:x} not received within {} seconds. \ + The checkpoint source may be on a minority fork. \ + Verify the checkpoint URL and retry.", + expected_root, + ANCHOR_BLOCK_TIMEOUT_SECS, + )); + } + Ok(Err(e)) => return Err(e), + Ok(Ok(block)) => block, + } + } else { + // Genesis path: block was prepared at startup, no network wait needed. + genesis_signed_block + }; + + // ── Initialise fork-choice store with the real anchor block ────────────────────────────── + let anchor_block_for_provider = anchor_block.clone(); + + let store = Arc::new(RwLock::new(get_forkchoice_store( + anchor_state.clone(), + anchor_block, + config, + ))); + + // Seed the block provider so we can serve the anchor block to peers via BlocksByRoot. + { + let mut provider = signed_block_provider.write(); + provider.insert(anchor_block_root, anchor_block_for_provider); + } + + // Sync status_provider to the now-initialised store (ensures head root/slot are accurate). + { + let s = store.read(); + let mut status = status_provider.write(); + status.finalized = s.latest_finalized.clone(); + status.head = Checkpoint { + root: s.head, + slot: s.blocks.get(&s.head).map(|b| b.slot).unwrap_or(Slot(0)), + }; + } + let chain_outbound_sender = outbound_p2p_sender.clone(); let http_store = store.clone(); @@ -779,6 +934,11 @@ async fn main() -> Result<()> { let mut last_logged_slot = 0u64; let mut last_status_slot: Option = None; let mut block_cache = BlockCache::new(); + let mut sync_state = if vs_for_chain.is_some() { + SyncState::Syncing + } else { + SyncState::Idle + }; let peer_count = peer_count_for_status; @@ -803,6 +963,10 @@ async fn main() -> Result<()> { let peers = peer_count.load(Ordering::Relaxed); print_chain_status(&*store.read(), peers); last_status_slot = Some(current_slot); + + let head_slot = { let s = store.read(); s.blocks.get(&s.head).map(|b| b.slot.0).unwrap_or(0) }; + let nf = *network_finalized_slot.lock(); + evaluate_sync_state(&mut sync_state, peers, head_slot, nf); } match current_interval { @@ -840,14 +1004,29 @@ async fn main() -> Result<()> { last_logged_slot = current_slot; } } + _ = status_notify.notified() => { + let peers = peer_count.load(Ordering::Relaxed); + let head_slot = { let s = store.read(); s.blocks.get(&s.head).map(|b| b.slot.0).unwrap_or(0) }; + let nf = *network_finalized_slot.lock(); + evaluate_sync_state(&mut sync_state, peers, head_slot, nf); + } message = chain_message_receiver.recv() => { let Some(message) = message else { break }; match message { ChainMessage::ProcessBlock { signed_block_with_attestation, + is_trusted, should_gossip, - .. } => { + if should_gossip && !is_trusted && !sync_state.accepts_gossip() { + debug!( + state = ?sync_state, + slot = signed_block_with_attestation.message.block.slot.0, + "Dropping gossip block: sync state does not accept gossip" + ); + continue; + } + let block_slot = signed_block_with_attestation.message.block.slot; let proposer = signed_block_with_attestation.message.block.proposer_index; let block_root = signed_block_with_attestation.message.block.hash_tree_root(); @@ -921,6 +1100,12 @@ async fn main() -> Result<()> { warn!("Failed to request missing parent block: {}", req_err); } } + + let head_slot = { let s = store.read(); s.blocks.get(&s.head).map(|b| b.slot.0).unwrap_or(0) }; + let nf = *network_finalized_slot.lock(); + check_sync_trigger(&mut sync_state, head_slot, nf); + check_sync_complete(&mut sync_state, head_slot, block_cache.orphan_count(), nf); + continue; } @@ -948,6 +1133,10 @@ async fn main() -> Result<()> { info!(slot = block_slot.0, "Broadcasted block"); } } + + let head_slot = { let s = store.read(); s.blocks.get(&s.head).map(|b| b.slot.0).unwrap_or(0) }; + let nf = *network_finalized_slot.lock(); + check_sync_complete(&mut sync_state, head_slot, block_cache.orphan_count(), nf); } Err(e) => warn!("Problem processing block: {}", e), } @@ -964,9 +1153,18 @@ async fn main() -> Result<()> { } ChainMessage::ProcessAttestation { signed_attestation, + is_trusted, should_gossip, - .. } => { + if should_gossip && !is_trusted && !sync_state.accepts_gossip() { + debug!( + state = ?sync_state, + slot = signed_attestation.message.slot.0, + "Dropping gossip attestation: sync state does not accept gossip" + ); + continue; + } + let att_slot = signed_attestation.message.slot.0; let source_slot = signed_attestation.message.source.slot.0; let target_slot = signed_attestation.message.target.slot.0; @@ -1007,9 +1205,18 @@ async fn main() -> Result<()> { } ChainMessage::ProcessAggregation { signed_aggregated_attestation, + is_trusted, should_gossip, - .. } => { + if should_gossip && !is_trusted && !sync_state.accepts_gossip() { + debug!( + state = ?sync_state, + slot = signed_aggregated_attestation.data.slot.0, + "Dropping gossip aggregation: sync state does not accept gossip" + ); + continue; + } + let agg_slot = signed_aggregated_attestation.data.slot.0; let validator_count = signed_aggregated_attestation .proof