diff --git a/lean_client/containers/src/slot.rs b/lean_client/containers/src/slot.rs index dbadcc9..cda0d33 100644 --- a/lean_client/containers/src/slot.rs +++ b/lean_client/containers/src/slot.rs @@ -56,7 +56,7 @@ impl Slot { // Rule 2: Slots at perfect square distances are justifiable. // Examples: delta = 1, 4, 9, 16, 25, 36, 49, 64, ... // Check: integer square root squared equals delta - let sqrt = (delta as f64).sqrt() as u64; + let sqrt = delta.isqrt(); if sqrt * sqrt == delta { return true; } @@ -66,10 +66,11 @@ impl Slot { // Mathematical insight: For pronic delta = n(n+1), we have: // 4*delta + 1 = 4n(n+1) + 1 = (2n+1)^2 // Check: 4*delta+1 is an odd perfect square - let test = 4 * delta + 1; - let test_sqrt = (test as f64).sqrt() as u64; - if test_sqrt * test_sqrt == test && test_sqrt % 2 == 1 { - return true; + if let Some(test) = delta.checked_mul(4).and_then(|v| v.checked_add(1)) { + let test_sqrt = test.isqrt(); + if test_sqrt * test_sqrt == test && test_sqrt % 2 == 1 { + return true; + } } false diff --git a/lean_client/fork_choice/src/handlers.rs b/lean_client/fork_choice/src/handlers.rs index 954d659..4f94e1e 100644 --- a/lean_client/fork_choice/src/handlers.rs +++ b/lean_client/fork_choice/src/handlers.rs @@ -92,6 +92,18 @@ fn validate_attestation_data(store: &Store, data: &AttestationData) -> Result<() Ok(()) } +/// Returns the first block root (source, target, or head) referenced by `attestation_data` +/// that is not yet present in the store, or `None` if all are known. +fn find_unknown_attestation_block(store: &Store, attestation_data: &AttestationData) -> Option { + [ + attestation_data.source.root, + attestation_data.target.root, + attestation_data.head.root, + ] + .into_iter() + .find(|root| !store.blocks.contains_key(root)) +} + /// Process a signed attestation received via gossip network /// /// 1. Validates the attestation data @@ -112,6 +124,18 @@ pub fn on_gossip_attestation( let validator_id = signed_attestation.validator_id; let attestation_data = signed_attestation.message.clone(); + // Queue attestation if any referenced block is not yet in the store. + // When the missing block arrives, pending attestations are retried. + if let Some(missing_root) = find_unknown_attestation_block(store, &attestation_data) { + store + .pending_attestations + .entry(missing_root) + .or_default() + .push(signed_attestation); + store.pending_fetch_roots.insert(missing_root); + return Ok(()); + } + // Validate the attestation data first validate_attestation_data(store, &attestation_data).inspect_err(|_| { METRICS.get().map(|metrics| { @@ -184,6 +208,19 @@ pub fn on_attestation( let validator_id = signed_attestation.validator_id; let attestation_data = signed_attestation.message.clone(); + // Queue gossip attestations if any referenced block is not yet in the store. + if !is_from_block { + if let Some(missing_root) = find_unknown_attestation_block(store, &attestation_data) { + store + .pending_attestations + .entry(missing_root) + .or_default() + .push(signed_attestation); + store.pending_fetch_roots.insert(missing_root); + return Ok(()); + } + } + // Validate attestation data validate_attestation_data(store, &attestation_data).inspect_err(|_| { METRICS.get().map(|metrics| { @@ -206,6 +243,11 @@ pub fn on_attestation( store .gossip_signatures .insert(sig_key, signed_attestation.signature); + METRICS.get().map(|metrics| { + metrics + .lean_gossip_signatures + .set(store.gossip_signatures.len() as i64) + }); } on_attestation_internal(store, validator_id, attestation_data, is_from_block) @@ -241,9 +283,6 @@ pub fn on_attestation( /// For production, signature verification should be added: /// 1. Verify the `AggregatedSignatureProof` against the aggregation bits /// 2. Consider async/batched verification for throughput -/// 3. Cache verification results to avoid re-verifying the same aggregations -/// -/// See Ream's approach: deferred verification in gossip path with later validation #[inline] pub fn on_aggregated_attestation( store: &mut Store, @@ -253,6 +292,17 @@ pub fn on_aggregated_attestation( let attestation_data = signed_aggregated_attestation.data.clone(); let proof = signed_aggregated_attestation.proof.clone(); + // Queue if any referenced block is not yet in the store. + if let Some(missing_root) = find_unknown_attestation_block(store, &attestation_data) { + store + .pending_aggregated_attestations + .entry(missing_root) + .or_default() + .push(signed_aggregated_attestation); + store.pending_fetch_roots.insert(missing_root); + return Ok(()); + } + // Validate attestation data (slot bounds, target validity, etc.) // TODO(production): Add signature verification here or in caller validate_attestation_data(store, &attestation_data)?; @@ -344,7 +394,6 @@ pub fn on_block(store: &mut Store, signed_block: SignedBlockWithAttestation) -> let block_root = signed_block.message.block.hash_tree_root(); if store.blocks.contains_key(&block_root) { - // stop_and_discard(timer); return Ok(()); } @@ -417,6 +466,29 @@ fn process_block_internal( // Also store signed block for serving BlocksByRoot requests (checkpoint sync backfill) store.signed_blocks.insert(block_root, signed_block.clone()); + // Retry attestations that arrived before this block was known. + // Drain the queue for this root and re-process each attestation. + // Attestations that still reference other unknown blocks are re-queued automatically. + let pending = store + .pending_attestations + .remove(&block_root) + .unwrap_or_default(); + for signed_att in pending { + if let Err(err) = on_attestation(store, signed_att, false) { + warn!(%err, "Pending attestation retry failed after block arrival"); + } + } + + let pending_agg = store + .pending_aggregated_attestations + .remove(&block_root) + .unwrap_or_default(); + for signed_agg in pending_agg { + if let Err(err) = on_aggregated_attestation(store, signed_agg) { + warn!(%err, "Pending aggregated attestation retry failed after block arrival"); + } + } + let justified_updated = new_state.latest_justified.slot > store.latest_justified.slot; let finalized_updated = new_state.latest_finalized.slot > store.latest_finalized.slot; @@ -535,6 +607,11 @@ fn process_block_internal( store .gossip_signatures .insert(proposer_sig_key, signed_block.signature.proposer_signature); + METRICS.get().map(|metrics| { + metrics + .lean_gossip_signatures + .set(store.gossip_signatures.len() as i64) + }); store .attestation_data_by_root .insert(proposer_data_root, proposer_attestation.data.clone()); diff --git a/lean_client/fork_choice/src/store.rs b/lean_client/fork_choice/src/store.rs index d51ad9a..c9d165d 100644 --- a/lean_client/fork_choice/src/store.rs +++ b/lean_client/fork_choice/src/store.rs @@ -1,9 +1,10 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use anyhow::{Result, anyhow, ensure}; use containers::{ AggregatedSignatureProof, Attestation, AttestationData, Block, BlockHeader, Checkpoint, Config, - SignatureKey, SignedBlockWithAttestation, Slot, State, + SignatureKey, SignedAggregatedAttestation, SignedAttestation, SignedBlockWithAttestation, Slot, + State, }; use metrics::{METRICS, set_gauge_u64}; use ssz::{H256, SszHash}; @@ -62,6 +63,18 @@ pub struct Store { /// Signed blocks indexed by block root. /// Used to serve BlocksByRoot requests to peers for checkpoint sync backfill. pub signed_blocks: HashMap, + + /// Gossip attestations waiting for referenced blocks to arrive. + /// Keyed by the missing block root. Drained when that block is processed. + pub pending_attestations: HashMap>, + + /// Aggregated attestations waiting for referenced blocks to arrive. + /// Keyed by the missing block root. Drained when that block is processed. + pub pending_aggregated_attestations: HashMap>, + + /// Block roots that were referenced by attestations but not found in the store. + /// Drained by the caller (main.rs) to trigger blocks-by-root RPC fetches. + pub pending_fetch_roots: HashSet, } const JUSTIFICATION_LOOKBACK_SLOTS: u64 = 3; @@ -165,26 +178,25 @@ pub fn get_forkchoice_store( block_header.hash_tree_root() }; - // Per checkpoint sync: always use anchor block's root for checkpoints. + // Per checkpoint sync: always use anchor block's root and slot for checkpoints. // The original checkpoint roots point to blocks that don't exist in our store. - // We only have the anchor block, so use its root. Keep the slot from state - // to preserve justification/finalization progress information. + // We only have the anchor block, so both root and slot must refer to it. + // + // Using the state's justified.slot with the anchor root creates an inconsistency: + // validate_attestation_data requires store.blocks[source.root].slot == source.slot, + // which fails when the chain has progressed beyond the last justified block + // (e.g., state downloaded at slot 2291, last justified at slot 2285). + // + // The first real justification event from on_block will replace these values + // with the correct ones, so the anchor slot is only used for the initial period. let latest_justified = Checkpoint { root: block_root, - slot: if anchor_state.latest_justified.root.is_zero() { - block_slot - } else { - anchor_state.latest_justified.slot - }, + slot: block_slot, }; let latest_finalized = Checkpoint { root: block_root, - slot: if anchor_state.latest_finalized.root.is_zero() { - block_slot - } else { - anchor_state.latest_finalized.slot - }, + slot: block_slot, }; // Store the original anchor_state - do NOT modify it @@ -207,6 +219,9 @@ pub fn get_forkchoice_store( latest_new_aggregated_payloads: HashMap::new(), attestation_data_by_root: HashMap::new(), signed_blocks: [(block_root, anchor_block)].into(), + pending_attestations: HashMap::new(), + pending_aggregated_attestations: HashMap::new(), + pending_fetch_roots: HashSet::new(), } } @@ -499,7 +514,10 @@ pub fn get_proposal_head(store: &mut Store, slot: Slot) -> H256 { /// 1. **Get Proposal Head**: Retrieve current chain head as parent /// 2. **Collect Attestations**: Convert known attestations to plain attestations /// 3. **Build Block**: Use State.build_block with signature caches -/// 4. **Store Block**: Insert block and post-state into Store +/// +/// The block and state are NOT inserted here. The caller signs the block and sends +/// it back via `ChainMessage::ProcessBlock`, which runs the full `on_block` path: +/// state transition, `update_head`, checkpoint updates, and proposer attestation. /// /// # Arguments /// * `store` - Mutable reference to the fork choice store @@ -513,7 +531,6 @@ pub fn produce_block_with_signatures( slot: Slot, validator_index: u64, ) -> Result<(H256, Block, Vec)> { - // Get parent block head let head_root = get_proposal_head(store, slot); let head_state = store .states @@ -521,7 +538,6 @@ pub fn produce_block_with_signatures( .ok_or_else(|| anyhow!("Head state not found"))? .clone(); - // Validate proposer authorization for this slot let num_validators = head_state.validators.len_u64(); let expected_proposer = slot.0 % num_validators; ensure!( @@ -532,8 +548,6 @@ pub fn produce_block_with_signatures( expected_proposer ); - // Convert AttestationData to Attestation objects for build_block - // Per devnet-2, store now holds AttestationData directly let available_attestations: Vec = store .latest_known_attestations .iter() @@ -543,28 +557,21 @@ pub fn produce_block_with_signatures( }) .collect(); - // Get known block roots for attestation validation let known_block_roots: std::collections::HashSet = store.blocks.keys().copied().collect(); - // Build block with fixed-point attestation collection and signature aggregation - let (final_block, final_post_state, _aggregated_attestations, signatures) = head_state + let (final_block, _final_post_state, _aggregated_attestations, signatures) = head_state .build_block( slot, validator_index, head_root, - None, // initial_attestations - start with empty, let fixed-point collect + None, Some(available_attestations), Some(&known_block_roots), Some(&store.gossip_signatures), Some(&store.latest_known_aggregated_payloads), )?; - // Compute block root using the header hash (canonical block root) let block_root = final_block.hash_tree_root(); - // Store block and state (per devnet-2, we store the plain Block) - store.blocks.insert(block_root, final_block.clone()); - store.states.insert(block_root, final_post_state); - Ok((block_root, final_block, signatures)) } diff --git a/lean_client/networking/src/network/service.rs b/lean_client/networking/src/network/service.rs index decd6cc..34b112b 100644 --- a/lean_client/networking/src/network/service.rs +++ b/lean_client/networking/src/network/service.rs @@ -1,5 +1,5 @@ use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, fs::File, io, net::IpAddr, @@ -48,7 +48,9 @@ use crate::{ }; const MAX_BLOCKS_BY_ROOT_RETRIES: u8 = 3; -const MAX_BLOCK_FETCH_DEPTH: u32 = 512; +const MAX_BLOCK_FETCH_DEPTH: u32 = 65536; +/// Maximum roots per BlocksByRoot request, aligned with leanSpec BackfillSync. +const MAX_BLOCKS_PER_REQUEST: usize = 10; struct PendingBlocksRequest { roots: Vec, @@ -187,6 +189,8 @@ where pending_blocks_by_root: HashMap, /// Depth tracking per block root for limiting backward chain walking pending_block_depths: HashMap, + /// Roots currently in-flight to deduplicate network-layer pipelining vs chain-side requests + in_flight_roots: HashSet, } impl NetworkService @@ -295,6 +299,7 @@ where status_provider, pending_blocks_by_root: HashMap::new(), pending_block_depths: HashMap::new(), + in_flight_roots: HashSet::new(), }; service.listen(&multiaddr)?; @@ -312,11 +317,19 @@ where let mut discovery_interval = interval(Duration::from_secs(30)); discovery_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + // Periodic sync trigger: send status to all connected peers so backfill re-fires + // whenever lean is behind, regardless of who dialed whom. + let mut sync_interval = interval(Duration::from_secs(30)); + sync_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + loop { select! { _ = reconnect_interval.tick() => { self.connect_to_peers(self.network_config.bootnodes.to_multiaddrs()).await; } + _ = sync_interval.tick() => { + self.send_status_to_all_connected_peers(); + } _ = discovery_interval.tick() => { // Trigger active peer discovery if let Some(ref discovery) = self.discovery { @@ -596,8 +609,31 @@ where match event { Event::Message { peer, message, .. } => match message { Message::Response { response, .. } => match response { - LeanResponse::Status(_) => { - info!(peer = %peer, "Received Status response"); + LeanResponse::Status(peer_status) => { + let (our_finalized_slot, our_head_slot) = { + let s = self.status_provider.read(); + (s.finalized.slot.0, s.head.slot.0) + }; + let peer_finalized_slot = peer_status.finalized.slot.0; + let peer_head_root = peer_status.head.root; + let peer_head_slot = peer_status.head.slot.0; + + info!( + peer = %peer, + our_finalized = our_finalized_slot, + peer_finalized = peer_finalized_slot, + peer_head = peer_head_slot, + "Received Status response" + ); + + self.maybe_trigger_backfill( + peer, + peer_finalized_slot, + peer_head_slot, + peer_head_root, + our_finalized_slot, + our_head_slot, + ); } _ => { warn!(peer = %peer, "Unexpected response type on Status protocol"); @@ -608,11 +644,16 @@ where } => { use crate::req_resp::{LeanRequest, LeanResponse}; - let response = match request { - LeanRequest::Status(_) => { + let (response, peer_finalized_slot, peer_head_root, peer_head_slot, our_finalized_slot, our_head_slot) = match request { + LeanRequest::Status(peer_status) => { let status = self.status_provider.read().clone(); - info!(peer = %peer, finalized_slot = status.finalized.slot.0, head_slot = status.head.slot.0, "Received Status request"); - LeanResponse::Status(status) + let our_finalized = status.finalized.slot.0; + let our_head = status.head.slot.0; + info!(peer = %peer, finalized_slot = our_finalized, head_slot = our_head, "Received Status request"); + let pf = peer_status.finalized.slot.0; + let ph = peer_status.head.root; + let phs = peer_status.head.slot.0; + (LeanResponse::Status(status), pf, ph, phs, our_finalized, our_head) } _ => { warn!(peer = %peer, "Unexpected request type on Status protocol"); @@ -628,6 +669,15 @@ where { warn!(peer = %peer, ?e, "Failed to send Status response"); } + + self.maybe_trigger_backfill( + peer, + peer_finalized_slot, + peer_head_slot, + peer_head_root, + our_finalized_slot, + our_head_slot, + ); } }, Event::OutboundFailure { peer, error, .. } => { @@ -659,6 +709,14 @@ where let pending = self.pending_blocks_by_root.remove(&request_id); let request_depth = pending.as_ref().map(|p| p.depth).unwrap_or(0); + // Release in-flight tracking so these roots can be re-requested if needed. + // Retry paths re-add them via send_blocks_by_root_request_internal. + if let Some(ref req) = pending { + for root in &req.roots { + self.in_flight_roots.remove(root); + } + } + match response { LeanResponse::BlocksByRoot(blocks) => { info!( @@ -668,17 +726,78 @@ where "Received BlocksByRoot response" ); - // Track depth for potential parent block requests - // Each block's parent will be requested at depth + 1 - for block in &blocks { - let parent_root = block.message.block.parent_root; - if !parent_root.is_zero() { - self.pending_block_depths - .insert(parent_root, request_depth + 1); + // Step 1: Insert all received blocks into signed_block_provider + // immediately — before chain processing. This mirrors leanSpec's + // BlockCache.add(): blocks are "known" as soon as they arrive. + // Siblings within the same response batch are visible to each other + // during the parent check below. + { + let mut provider = self.signed_block_provider.write(); + for block in &blocks { + let root = block.message.block.hash_tree_root(); + provider.insert(root, block.clone()); } } - // Feed received blocks back into chain processing + // Step 2: Collect unique parent roots that are not yet received. + // Fire the next BlocksByRoot request immediately from the network + // layer, overlapping the next RTT with chain processing time. + // This is the leanSpec BackfillSync recursive pattern: + // _process_received_blocks → fill_missing(new_orphan_parents) + let next_depth = request_depth + 1; + if next_depth < MAX_BLOCK_FETCH_DEPTH { + let unknown_parents: Vec = { + let provider = self.signed_block_provider.read(); + let mut seen = HashSet::new(); + blocks + .iter() + .filter_map(|block| { + let parent_root = block.message.block.parent_root; + if parent_root.is_zero() { + return None; + } + // Already received (in this batch or previously)? + if provider.contains_key(&parent_root) { + return None; + } + // Already in-flight from a prior request? + if self.in_flight_roots.contains(&parent_root) { + return None; + } + // Deduplicate within this batch + if !seen.insert(parent_root) { + return None; + } + Some(parent_root) + }) + .collect() + }; + + if !unknown_parents.is_empty() { + info!( + num_parents = unknown_parents.len(), + depth = next_depth, + "Pipelining parent fetch before chain processing" + ); + for &root in &unknown_parents { + self.pending_block_depths.insert(root, next_depth); + } + // Chunk and send; each chunk goes to a random peer. + for chunk in unknown_parents.chunks(MAX_BLOCKS_PER_REQUEST) { + if let Some(peer_id) = self.get_random_connected_peer() { + self.send_blocks_by_root_request_internal( + peer_id, + chunk.to_vec(), + 0, + next_depth, + ); + } + } + } + } + + // Step 3: Feed received blocks to chain for state processing. + // This runs in parallel with the pipelined network request above. let chain_sink = self.chain_message_sink.clone(); tokio::spawn(async move { for block in blocks { @@ -687,7 +806,7 @@ where .send(ChainMessage::ProcessBlock { signed_block_with_attestation: block, is_trusted: false, - should_gossip: false, // Don't re-gossip requested blocks + should_gossip: false, }) .await { @@ -760,6 +879,10 @@ where } => { warn!(peer = %peer, ?error, "BlocksByRoot outbound request failed"); if let Some(req) = self.pending_blocks_by_root.remove(&request_id) { + // Release in-flight tracking before retry; retry re-adds via send_internal. + for root in &req.roots { + self.in_flight_roots.remove(root); + } self.retry_blocks_by_root_request(peer, req); } } @@ -900,6 +1023,54 @@ where } } + fn maybe_trigger_backfill( + &mut self, + peer: PeerId, + peer_finalized_slot: u64, + peer_head_slot: u64, + peer_head_root: H256, + our_finalized_slot: u64, + our_head_slot: u64, + ) { + if (peer_finalized_slot > our_finalized_slot || peer_head_slot > our_head_slot) + && !peer_head_root.is_zero() + { + info!( + peer = %peer, + peer_head_slot, + our_head_slot, + peer_finalized = peer_finalized_slot, + our_finalized = our_finalized_slot, + "Peer is ahead — requesting head block to trigger backfill" + ); + self.send_blocks_by_root_request(peer, vec![peer_head_root]); + } + } + + fn send_status_to_all_connected_peers(&mut self) { + let peers: Vec = self + .peer_table + .lock() + .iter() + .filter(|(_, state)| **state == ConnectionState::Connected) + .map(|(peer_id, _)| *peer_id) + .collect(); + + if peers.is_empty() { + return; + } + + let our_finalized = self.status_provider.read().finalized.slot.0; + info!( + num_peers = peers.len(), + our_finalized, + "Periodic sync check: sending status to all connected peers" + ); + for peer_id in peers { + self.send_status_request(peer_id); + } + } + async fn dispatch_outbound_request(&mut self, request: OutboundP2pRequest) { match request { OutboundP2pRequest::GossipBlockWithAttestation(signed_block_with_attestation) => { @@ -975,6 +1146,10 @@ where max_depth = MAX_BLOCK_FETCH_DEPTH, "Skipping block request: exceeded max fetch depth" ); + } else if self.in_flight_roots.contains(&root) { + // Network-layer pipelining already sent this request; skip the + // duplicate from the chain-side pending_fetch_roots drain. + debug!(root = %root, "Skipping chain-side request: root already in-flight"); } else { roots_to_request.push((root, depth)); } @@ -984,20 +1159,33 @@ where return; } - if let Some(peer_id) = self.get_random_connected_peer() { - // Use max depth among requested roots for the batch - let depth = roots_to_request.iter().map(|(_, d)| *d).max().unwrap_or(0); - let roots: Vec = roots_to_request.into_iter().map(|(r, _)| r).collect(); - - info!( - peer = %peer_id, - num_blocks = roots.len(), - depth = depth, - "Requesting missing blocks from peer" - ); - self.send_blocks_by_root_request_with_depth(peer_id, roots, depth); - } else { - warn!("Cannot request blocks: no connected peers"); + // Split into chunks of MAX_BLOCKS_PER_REQUEST (aligned with leanSpec BackfillSync). + // Each chunk is sent to a random connected peer, spreading the load and allowing + // parallel fetches when multiple roots are needed. + let all_roots: Vec<(H256, u32)> = roots_to_request; + let chunks: Vec> = all_roots + .chunks(MAX_BLOCKS_PER_REQUEST) + .map(|c| c.to_vec()) + .collect(); + + let num_chunks = chunks.len(); + for chunk in chunks { + let depth = chunk.iter().map(|(_, d)| *d).max().unwrap_or(0); + let roots: Vec = chunk.into_iter().map(|(r, _)| r).collect(); + + if let Some(peer_id) = self.get_random_connected_peer() { + info!( + peer = %peer_id, + num_blocks = roots.len(), + total_chunks = num_chunks, + depth = depth, + "Requesting missing blocks from peer (batch)" + ); + self.send_blocks_by_root_request_with_depth(peer_id, roots, depth); + } else { + warn!("Cannot request blocks: no connected peers"); + break; + } } } } @@ -1082,8 +1270,12 @@ where return; } - // Depth is tracked in PendingBlocksRequest for retries - // No need to store in pending_block_depths here - it's set when blocks are received + // Register roots as in-flight before sending so the chain-side drain + // (pending_fetch_roots) and the network-layer pipeline both see them and skip duplicates. + for &root in &roots { + self.in_flight_roots.insert(root); + } + let request = LeanRequest::BlocksByRoot(roots.clone()); info!(peer = %peer_id, num_roots = roots.len(), retries, depth, "Sending BlocksByRoot request"); let request_id = self diff --git a/lean_client/networking/src/req_resp.rs b/lean_client/networking/src/req_resp.rs index ae30aa9..d85ec7d 100644 --- a/lean_client/networking/src/req_resp.rs +++ b/lean_client/networking/src/req_resp.rs @@ -259,9 +259,50 @@ impl LeanCodec { } } + /// Returns the byte length of one snappy framing stream starting at data[0]. + /// + /// Snappy framing format (https://github.com/google/snappy/blob/main/framing_format.txt): + /// Stream identifier chunk: [0xFF][0x06][0x00][0x00][s][N][a][P][p][Y] + /// Data chunk: [type][len_lo][len_mid][len_hi][data...] + /// + /// We scan through chunk headers to advance without decompressing. The + /// stream identifier byte 0xFF cannot legally appear as a chunk type inside + /// a stream, so hitting 0xFF after the first chunk signals the start of the + /// next framing stream and therefore the end of this one. + fn snappy_frame_size(data: &[u8]) -> io::Result { + // Stream identifier is 10 bytes: 4-byte header + 6-byte "sNaPpY" + const STREAM_ID: &[u8] = b"\xff\x06\x00\x00sNaPpY"; + if data.len() < STREAM_ID.len() || &data[..STREAM_ID.len()] != STREAM_ID { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "Missing snappy stream identifier", + )); + } + + let mut pos = STREAM_ID.len(); + while pos < data.len() { + // 0xFF marks the start of a new snappy stream — this stream ends here. + if data[pos] == 0xFF { + break; + } + if pos + 4 > data.len() { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Truncated snappy chunk header", + )); + } + // 3-byte LE length field (bytes 1..=3 of header) + let chunk_len = + u32::from_le_bytes([data[pos + 1], data[pos + 2], data[pos + 3], 0]) as usize; + pos += 4 + chunk_len; + } + + Ok(pos) + } + /// Decode a single response chunk per spec: /// [response_code: 1 byte][varint: uncompressed_length][snappy_framed_payload] - /// Returns (code, ssz_bytes, total_bytes_consumed) + /// Returns (code, ssz_bytes, total_bytes_consumed) so the caller can advance the offset. fn decode_response_chunk(data: &[u8]) -> io::Result<(u8, Vec, usize)> { if data.is_empty() { return Err(io::Error::new( @@ -273,40 +314,33 @@ impl LeanCodec { // First byte is response code let code = data[0]; - // Parse varint length starting at offset 1 + // Parse uncompressed length varint at offset 1 let (declared_len, varint_size) = Self::decode_varint(&data[1..])?; if declared_len as usize > MAX_PAYLOAD_SIZE { return Err(io::Error::new( io::ErrorKind::InvalidData, - format!( - "Declared length too large: {} > {}", - declared_len, MAX_PAYLOAD_SIZE - ), + format!("Declared length too large: {} > {}", declared_len, MAX_PAYLOAD_SIZE), )); } - // Decompress payload after code + varint let payload_start = 1 + varint_size; - let compressed = &data[payload_start..]; - let ssz_bytes = Self::decompress(compressed)?; - // Validate length matches + // Determine the byte length of this snappy framing stream so we know + // exactly where the next chunk begins (required for multi-block responses). + let frame_size = Self::snappy_frame_size(&data[payload_start..])?; + let payload_end = payload_start + frame_size; + + let ssz_bytes = Self::decompress(&data[payload_start..payload_end])?; + if ssz_bytes.len() != declared_len as usize { return Err(io::Error::new( io::ErrorKind::InvalidData, - format!( - "Length mismatch: declared {}, got {}", - declared_len, - ssz_bytes.len() - ), + format!("Length mismatch: declared {}, got {}", declared_len, ssz_bytes.len()), )); } - // Calculate total bytes consumed (approximate - we consumed all remaining data) - let total_consumed = data.len(); - - Ok((code, ssz_bytes, total_consumed)) + Ok((code, ssz_bytes, payload_end)) } /// Decode response per spec. For BlocksByRoot, handle chunked format: @@ -334,25 +368,32 @@ impl LeanCodec { })?; Ok(LeanResponse::Status(status)) } else if protocol.contains("blocks_by_root") { - let (code, ssz_bytes, _) = Self::decode_response_chunk(data)?; - - if code != RESPONSE_SUCCESS { - // Non-success codes indicate block not found or error - warn!(response_code = code, "BlocksByRoot non-success response"); - return Ok(LeanResponse::BlocksByRoot(Vec::new())); - } + // Multi-chunk response: each block is a separate chunk. + // Loop until all bytes are consumed. + let mut blocks = Vec::new(); + let mut offset = 0; + while offset < data.len() { + let (code, ssz_bytes, consumed) = Self::decode_response_chunk(&data[offset..])?; + offset += consumed; + + if code != RESPONSE_SUCCESS { + warn!(response_code = code, "BlocksByRoot non-success response chunk"); + continue; + } + if ssz_bytes.is_empty() { + continue; + } - if ssz_bytes.is_empty() { - return Ok(LeanResponse::BlocksByRoot(Vec::new())); + let block = + SignedBlockWithAttestation::from_ssz_default(&ssz_bytes).map_err(|e| { + io::Error::new( + io::ErrorKind::Other, + format!("SSZ decode Block failed: {e:?}"), + ) + })?; + blocks.push(block); } - - let block = SignedBlockWithAttestation::from_ssz_default(&ssz_bytes).map_err(|e| { - io::Error::new( - io::ErrorKind::Other, - format!("SSZ decode Block failed: {e:?}"), - ) - })?; - Ok(LeanResponse::BlocksByRoot(vec![block])) + Ok(LeanResponse::BlocksByRoot(blocks)) } else { Err(io::Error::new( io::ErrorKind::Other, diff --git a/lean_client/networking/src/types.rs b/lean_client/networking/src/types.rs index 7af3615..43db367 100644 --- a/lean_client/networking/src/types.rs +++ b/lean_client/networking/src/types.rs @@ -3,13 +3,14 @@ use std::{collections::HashMap, fmt::Display, sync::Arc}; use anyhow::{Result, anyhow}; use async_trait::async_trait; use containers::{ - SignedAggregatedAttestation, SignedAttestation, SignedBlockWithAttestation, Status, + AggregatedSignatureProof, AttestationData, Block, SignedAggregatedAttestation, + SignedAttestation, SignedBlockWithAttestation, Slot, Status, }; use metrics::METRICS; use parking_lot::RwLock; use serde::{Deserialize, Serialize}; use ssz::H256; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; use tracing::warn; use crate::serde_utils::quoted_u64; @@ -211,6 +212,30 @@ impl Display for ChainMessage { } } +/// Messages from the validator task to the chain task (request-response pattern). +/// This keeps `ChainMessage` Clone-able (for network use) while these variants +/// carry non-Clone oneshot senders. +#[derive(Debug)] +pub enum ValidatorChainMessage { + /// Request block production for the given slot. + /// Chain executes the state transition under write lock (fast, no XMSS signing) + /// and returns the raw block + aggregated signature proofs via the sender. + /// XMSS signing is performed by the validator task after receiving the response, + /// so the chain task is free to process incoming peer messages during signing. + ProduceBlock { + slot: Slot, + proposer_index: u64, + sender: oneshot::Sender)>>, + }, + /// Request attestation data for the given slot. + /// Chain reads the current head/justified/target state and returns it via sender. + /// The validator task uses this data to sign attestations without holding the store lock. + BuildAttestationData { + slot: Slot, + sender: oneshot::Sender>, + }, +} + #[derive(Debug, Clone)] pub enum OutboundP2pRequest { GossipBlockWithAttestation(SignedBlockWithAttestation), diff --git a/lean_client/src/main.rs b/lean_client/src/main.rs index 05a2956..445074b 100644 --- a/lean_client/src/main.rs +++ b/lean_client/src/main.rs @@ -9,7 +9,10 @@ use ethereum_types::H256; use features::Feature; use fork_choice::{ handlers::{on_aggregated_attestation, on_attestation, on_block, on_tick}, - store::{INTERVALS_PER_SLOT, Store, get_forkchoice_store}, + store::{ + INTERVALS_PER_SLOT, MILLIS_PER_INTERVAL, Store, get_forkchoice_store, + produce_block_with_signatures, + }, }; use http_api::HttpServerConfig; use libp2p_identity::Keypair; @@ -17,7 +20,9 @@ use metrics::{METRICS, Metrics, MetricsServerConfig}; use networking::gossipsub::config::GossipsubConfig; use networking::gossipsub::topic::{compute_subnet_id, get_subscription_topics}; use networking::network::{NetworkService, NetworkServiceConfig}; -use networking::types::{ChainMessage, OutboundP2pRequest, SignedBlockProvider, StatusProvider}; +use networking::types::{ + ChainMessage, OutboundP2pRequest, SignedBlockProvider, StatusProvider, ValidatorChainMessage, +}; use parking_lot::RwLock; use ssz::{PersistentList, SszHash, SszReadDefault as _}; use std::collections::HashMap; @@ -26,9 +31,9 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{SystemTime, UNIX_EPOCH}; use std::{io::IsTerminal, net::IpAddr}; use tokio::{ - sync::mpsc, + sync::{mpsc, oneshot}, task, - time::{Duration, interval}, + time::{Duration, Instant, interval_at}, }; use tracing::level_filters::LevelFilter; use tracing::{debug, error, info, warn}; @@ -292,6 +297,11 @@ async fn main() -> Result<()> { mpsc::unbounded_channel::(); let (chain_message_sender, mut chain_message_receiver) = mpsc::unbounded_channel::(); + // Separate channel for validator task → chain task request-response messages. + // Keeps ValidatorChainMessage (which carries oneshot senders) separate from + // ChainMessage (which is Clone and used by the network layer). + let (validator_chain_sender, mut validator_chain_receiver) = + mpsc::unbounded_channel::(); let (genesis_time, validators) = if let Some(genesis_path) = &args.genesis { let genesis_config = containers::GenesisConfig::load_from_file(genesis_path) @@ -541,6 +551,16 @@ async fn main() -> Result<()> { None }; + // Wrap in Arc so chain task (aggregation at tick 2) and validator task (proposal/attestation) + // can both access ValidatorService without cloning the XMSS keys. + let validator_service: Option> = validator_service.map(Arc::new); + // Chain task clone: used only for tick-2 aggregation (maybe_aggregate takes &self) + let vs_for_chain = validator_service.clone(); + // Validator task: takes ownership for proposal and attestation duties + let vs_for_validator = validator_service.clone(); + // Validator task needs to send ProcessBlock / ProcessAttestation back to the chain task + let chain_msg_sender_for_validator = chain_message_sender.clone(); + // Extract first validator ID for subnet subscription and metrics let first_validator_id: Option = validator_service .as_ref() @@ -556,7 +576,7 @@ async fn main() -> Result<()> { }); } - let fork = "devnet0".to_string(); + let fork = "devnet3".to_string(); // Subscribe to topics based on validator role: // - Aggregators: all attestation subnets // - Non-aggregator validators: only their own subnet @@ -668,23 +688,40 @@ async fn main() -> Result<()> { } }); + // Compute genesis-aligned tick delay once; both tasks capture genesis_millis and + // genesis_tick_delay by copy (u64 / Duration are Copy). + let genesis_millis = genesis_time * 1000; + let genesis_tick_delay = { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + let elapsed = now.saturating_sub(genesis_millis); + let next = elapsed / MILLIS_PER_INTERVAL + 1; + Duration::from_millis((genesis_millis + next * MILLIS_PER_INTERVAL).saturating_sub(now)) + }; + let chain_handle = task::spawn(async move { - let mut tick_interval = interval(Duration::from_millis(800)); + let mut tick_interval = interval_at( + Instant::now() + genesis_tick_delay, + Duration::from_millis(MILLIS_PER_INTERVAL), + ); let mut last_logged_slot = 0u64; let mut last_status_slot: Option = None; - let mut last_proposal_slot: Option = None; - let mut last_attestation_slot: Option = None; let peer_count = peer_count_for_status; loop { tokio::select! { + biased; _ = tick_interval.tick() => { let now_millis = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_millis() as u64; - on_tick(&mut *store.write(), now_millis, false); + let target_interval = now_millis.saturating_sub(genesis_millis) / MILLIS_PER_INTERVAL; + let has_proposal = target_interval % INTERVALS_PER_SLOT == 0; + on_tick(&mut *store.write(), now_millis, has_proposal); let (current_slot, current_interval) = { let s = store.read(); @@ -698,83 +735,9 @@ async fn main() -> Result<()> { } match current_interval { - 0 => { - if let Some(ref vs) = validator_service { - if last_proposal_slot != Some(current_slot) { - if let Some(proposer_idx) = vs.get_proposer_for_slot(Slot(current_slot)) { - info!( - slot = current_slot, - proposer = proposer_idx, - "Our turn to propose block!" - ); - - let result = {vs.build_block_proposal(&mut *store.write(), Slot(current_slot), proposer_idx)}; - match result { - Ok(signed_block) => { - let block_root = signed_block.message.block.hash_tree_root(); - info!( - slot = current_slot, - block_root = %format!("0x{:x}", block_root), - "Built block, processing and gossiping" - ); - - let now_millis = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_millis() as u64; - on_tick(&mut *store.write(), now_millis, false); - - match on_block(&mut *store.write(), signed_block.clone()) { - Ok(()) => { - info!("Own block processed successfully"); - // GOSSIP TO NETWORK - if let Err(e) = chain_outbound_sender.send( - OutboundP2pRequest::GossipBlockWithAttestation(signed_block) - ) { - warn!("Failed to gossip our block: {}", e); - } - } - Err(e) => warn!("Failed to process our own block: {}", e), - } - } - Err(e) => warn!("Failed to build block proposal: {}", e), - } - last_proposal_slot = Some(current_slot); - } - } - } - } - 1 => { - if let Some(ref vs) = validator_service { - if last_attestation_slot != Some(current_slot) { - let attestations = vs.create_attestations(&*store.read(), Slot(current_slot)); - for signed_att in attestations { - let validator_id = signed_att.validator_id; - let subnet_id = compute_subnet_id(validator_id); - info!( - slot = current_slot, - validator = validator_id, - subnet_id = subnet_id, - "Broadcasting attestation to subnet" - ); - - match on_attestation(&mut *store.write(), signed_att.clone(), false) { - Ok(()) => { - if let Err(e) = chain_outbound_sender.send( - OutboundP2pRequest::GossipAttestation(signed_att, subnet_id) - ) { - warn!("Failed to gossip attestation: {}", e); - } - } - Err(e) => warn!("Error processing own attestation: {}", e), - } - } - last_attestation_slot = Some(current_slot); - } - } - } + 0 | 1 => {} 2 => { - if let Some(ref vs) = validator_service { + if let Some(ref vs) = vs_for_chain { if let Some(aggregations) = vs.maybe_aggregate(&*store.read(), Slot(current_slot)) { for aggregation in aggregations { if let Err(e) = chain_outbound_sender.send( @@ -833,22 +796,28 @@ async fn main() -> Result<()> { .as_millis() as u64; on_tick(&mut *store.write(), now_millis, false); - // Proactive parent check: verify parent exists BEFORE calling on_block. - // This avoids unnecessary state transition attempts and is more efficient - // than catching the "Block queued" error after the fact. let parent_exists = { let s = store.read(); parent_root.is_zero() || s.states.contains_key(&parent_root) }; + // Store block immediately so we can serve it to peers via + // BlocksByRoot even if it can't be processed yet (e.g. parent + // missing). This prevents STREAM_CLOSED errors when a peer + // requests a block we received but haven't incorporated yet. + signed_block_provider.write().insert(block_root, signed_block_with_attestation.clone()); + if !parent_exists { - // Queue the block and request parent without calling on_block { let mut s = store.write(); s.blocks_queue .entry(parent_root) .or_insert_with(Vec::new) .push(signed_block_with_attestation.clone()); + // Add to batch queue so this root is sent together with any + // other concurrently pending roots (attestation-triggered or + // from other unknown-parent blocks) in one BlocksByRoot request. + s.pending_fetch_roots.insert(parent_root); } warn!( @@ -858,10 +827,13 @@ async fn main() -> Result<()> { "Block queued (proactive) - parent not found, requesting via BlocksByRoot" ); - if let Err(req_err) = outbound_p2p_sender.send( - OutboundP2pRequest::RequestBlocksByRoot(vec![parent_root]) - ) { - warn!("Failed to request missing parent block: {}", req_err); + let missing: Vec = store.write().pending_fetch_roots.drain().collect(); + if !missing.is_empty() { + if let Err(req_err) = outbound_p2p_sender.send( + OutboundP2pRequest::RequestBlocksByRoot(missing) + ) { + warn!("Failed to request missing parent block: {}", req_err); + } } continue; } @@ -871,8 +843,6 @@ async fn main() -> Result<()> { Ok(()) => { info!("Block processed successfully"); - signed_block_provider.write().insert(block_root, signed_block_with_attestation.clone()); - { let s = store.read(); let mut status = status_provider.write(); @@ -894,8 +864,6 @@ async fn main() -> Result<()> { } } Err(e) if format!("{e:?}").starts_with("Err: (Fork-choice::Handlers::OnBlock) Block queued") => { - // This path should be rare now due to proactive check, - // but handle it for edge cases (e.g., parent pruned between check and call) warn!( child_slot = block_slot.0, child_block_root = %format!("0x{:x}", block_root), @@ -903,16 +871,25 @@ async fn main() -> Result<()> { "Block queued (fallback) - parent not found, requesting via BlocksByRoot" ); + // Add to batch queue; the drain below (pending_fetch_roots) + // will send this together with any attestation-triggered roots + // discovered during on_block processing. if !parent_root.is_zero() { - if let Err(req_err) = outbound_p2p_sender.send( - OutboundP2pRequest::RequestBlocksByRoot(vec![parent_root]) - ) { - warn!("Failed to request missing parent block: {}", req_err); - } + store.write().pending_fetch_roots.insert(parent_root); } } Err(e) => warn!("Problem processing block: {}", e), } + + // Drain block roots queued by retried attestations inside on_block. + let missing: Vec = store.write().pending_fetch_roots.drain().collect(); + if !missing.is_empty() { + if let Err(e) = outbound_p2p_sender.send( + OutboundP2pRequest::RequestBlocksByRoot(missing) + ) { + warn!("Failed to request blocks missing from retried attestations: {}", e); + } + } } ChainMessage::ProcessAttestation { signed_attestation, @@ -947,6 +924,15 @@ async fn main() -> Result<()> { } Err(e) => warn!("Error processing attestation: {}", e), } + + let missing: Vec = store.write().pending_fetch_roots.drain().collect(); + if !missing.is_empty() { + if let Err(e) = outbound_p2p_sender.send( + OutboundP2pRequest::RequestBlocksByRoot(missing) + ) { + warn!("Failed to request blocks missing from attestation: {}", e); + } + } } ChainMessage::ProcessAggregation { signed_aggregated_attestation, @@ -979,6 +965,15 @@ async fn main() -> Result<()> { } } + let missing: Vec = store.write().pending_fetch_roots.drain().collect(); + if !missing.is_empty() { + if let Err(e) = outbound_p2p_sender.send( + OutboundP2pRequest::RequestBlocksByRoot(missing) + ) { + warn!("Failed to request blocks missing from aggregated attestation: {}", e); + } + } + // Gossip the aggregation if needed if should_gossip { if let Err(e) = outbound_p2p_sender.send( @@ -992,6 +987,221 @@ async fn main() -> Result<()> { } } } + v_message = validator_chain_receiver.recv() => { + let Some(v_message) = v_message else { break }; + match v_message { + ValidatorChainMessage::ProduceBlock { slot, proposer_index, sender } => { + let result = produce_block_with_signatures( + &mut *store.write(), slot, proposer_index + ).map(|(_, block, sigs)| (block, sigs)); + let _ = sender.send(result); + } + ValidatorChainMessage::BuildAttestationData { slot, sender } => { + let result = store.read().produce_attestation_data(slot); + let _ = sender.send(result); + } + } + } + } + } + }); + + let validator_handle = task::spawn(async move { + let Some(vs) = vs_for_validator else { + return; + }; + + let mut v_tick_interval = interval_at( + Instant::now() + genesis_tick_delay, + Duration::from_millis(MILLIS_PER_INTERVAL), + ); + let mut last_proposal_slot: Option = None; + let mut last_attestation_slot: Option = None; + + loop { + v_tick_interval.tick().await; + + let now_millis = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + let elapsed = now_millis.saturating_sub(genesis_millis); + let tick_time = elapsed / MILLIS_PER_INTERVAL; + let current_slot = tick_time / INTERVALS_PER_SLOT; + let current_interval = tick_time % INTERVALS_PER_SLOT; + + match current_interval { + 0 => { + if last_proposal_slot != Some(current_slot) { + if current_slot > 0 { + if let Some(proposer_idx) = vs.get_proposer_for_slot(Slot(current_slot)) + { + info!( + slot = current_slot, + proposer = proposer_idx, + "Validator task: proposing block" + ); + + let (tx, rx) = oneshot::channel(); + if validator_chain_sender + .send(ValidatorChainMessage::ProduceBlock { + slot: Slot(current_slot), + proposer_index: proposer_idx, + sender: tx, + }) + .is_err() + { + warn!("Validator task: chain channel closed, stopping"); + break; + } + + let (block, signatures) = match rx.await { + Ok(Ok(pair)) => pair, + Ok(Err(e)) => { + warn!(slot = current_slot, error = %e, "Validator task: chain failed to produce block"); + last_proposal_slot = Some(current_slot); + continue; + } + Err(_) => { + warn!( + slot = current_slot, + "Validator task: no response to ProduceBlock" + ); + last_proposal_slot = Some(current_slot); + continue; + } + }; + + let (atx, arx) = oneshot::channel(); + if validator_chain_sender + .send(ValidatorChainMessage::BuildAttestationData { + slot: Slot(current_slot), + sender: atx, + }) + .is_err() + { + warn!("Validator task: chain channel closed, stopping"); + break; + } + + let attestation_data = match arx.await { + Ok(Ok(data)) => data, + Ok(Err(e)) => { + warn!(slot = current_slot, error = %e, "Validator task: chain failed to build attestation data"); + last_proposal_slot = Some(current_slot); + continue; + } + Err(_) => { + warn!( + slot = current_slot, + "Validator task: no response to BuildAttestationData" + ); + last_proposal_slot = Some(current_slot); + continue; + } + }; + + match vs.sign_block_with_data( + block, + proposer_idx, + signatures, + attestation_data, + ) { + Ok(signed_block) => { + let block_root = + signed_block.message.block.hash_tree_root(); + info!( + slot = current_slot, + block_root = %format!("0x{:x}", block_root), + "Validator task: block signed, sending to chain" + ); + if chain_msg_sender_for_validator + .send(ChainMessage::ProcessBlock { + signed_block_with_attestation: signed_block, + is_trusted: true, + should_gossip: true, + }) + .is_err() + { + warn!( + "Validator task: chain message channel closed, stopping" + ); + break; + } + } + Err(e) => { + warn!(slot = current_slot, error = %e, "Validator task: failed to sign block") + } + } + } + } + last_proposal_slot = Some(current_slot); + } + } + 1 => { + if last_attestation_slot != Some(current_slot) { + let (tx, rx) = oneshot::channel(); + if validator_chain_sender + .send(ValidatorChainMessage::BuildAttestationData { + slot: Slot(current_slot), + sender: tx, + }) + .is_err() + { + warn!("Validator task: chain channel closed, stopping"); + break; + } + + match rx.await { + Ok(Ok(attestation_data)) => { + let proposer_index = if vs.num_validators > 0 { + current_slot % vs.num_validators + } else { + u64::MAX + }; + let attestations = vs.create_attestations_from_data( + Slot(current_slot), + attestation_data, + ); + for signed_att in attestations { + if signed_att.validator_id == proposer_index { + continue; + } + let validator_id = signed_att.validator_id; + let subnet_id = compute_subnet_id(validator_id); + info!( + slot = current_slot, + validator = validator_id, + subnet_id = subnet_id, + "Validator task: broadcasting attestation" + ); + if chain_msg_sender_for_validator + .send(ChainMessage::ProcessAttestation { + signed_attestation: signed_att, + is_trusted: true, + should_gossip: true, + }) + .is_err() + { + warn!( + "Validator task: chain message channel closed, stopping" + ); + return; + } + } + } + Ok(Err(e)) => { + warn!(slot = current_slot, error = %e, "Validator task: chain failed to build attestation data") + } + Err(_) => warn!( + slot = current_slot, + "Validator task: no response to BuildAttestationData" + ), + } + last_attestation_slot = Some(current_slot); + } + } + _ => {} } } }); @@ -1003,6 +1213,9 @@ async fn main() -> Result<()> { _ = chain_handle => { info!("Chain service finished."); } + _ = validator_handle => { + info!("Validator task finished."); + } } info!("Main async task exiting..."); diff --git a/lean_client/validator/src/lib.rs b/lean_client/validator/src/lib.rs index 103e196..c1cdc9f 100644 --- a/lean_client/validator/src/lib.rs +++ b/lean_client/validator/src/lib.rs @@ -5,10 +5,10 @@ use std::path::Path; use anyhow::{Context, Result, anyhow, bail}; use containers::{ AggregatedSignatureProof, AggregationBits, Attestation, AttestationData, AttestationSignatures, - Block, BlockSignatures, BlockWithAttestation, Checkpoint, SignatureKey, + Block, BlockSignatures, BlockWithAttestation, SignedAggregatedAttestation, SignedAttestation, SignedBlockWithAttestation, Slot, }; -use fork_choice::store::{Store, produce_block_with_signatures}; +use fork_choice::store::Store; use metrics::{METRICS, stop_and_discard, stop_and_record}; use ssz::H256; use ssz::SszHash; @@ -280,36 +280,19 @@ impl ValidatorService { } } - /// Build a block proposal for the given slot - pub fn build_block_proposal( + /// Sign a block given pre-fetched attestation data. + /// + /// Unlike `sign_block`, this method does not need a `&Store` reference. + /// The validator task calls `BuildAttestationData` on the chain task first, + /// receives the `AttestationData` via oneshot, then calls this method. + /// This keeps XMSS signing (~170ms) entirely off the chain task's thread. + pub fn sign_block_with_data( &self, - store: &mut Store, - slot: Slot, - proposer_index: u64, - ) -> Result { - info!( - slot = slot.0, - proposer = proposer_index, - "Building block proposal" - ); - - let (_, block, signatures) = produce_block_with_signatures(store, slot, proposer_index) - .context("failed to produce block")?; - - let signed_block = self.sign_block(store, block, proposer_index, signatures)?; - - Ok(signed_block) - } - - fn sign_block( - &self, - store: &Store, block: Block, validator_index: u64, attestation_signatures: Vec, + proposer_attestation_data: AttestationData, ) -> Result { - let proposer_attestation_data = store.produce_attestation_data(block.slot)?; - let proposer_attestation = Attestation { validator_id: validator_index, data: proposer_attestation_data, @@ -350,48 +333,32 @@ impl ValidatorService { Ok(SignedBlockWithAttestation { message, signature }) } - /// Create attestations for all our validators for the given slot - pub fn create_attestations(&self, store: &Store, slot: Slot) -> Vec { - let vote_target = store.get_attestation_target(); - - // Skip attestation creation if target slot is less than source slot - // At genesis, both target and source are slot 0, which is valid - if vote_target.slot < store.latest_justified.slot { + /// Create and sign attestations for all validators given pre-fetched attestation data. + /// + /// Unlike `create_attestations`, this method does not need a `&Store` reference. + /// The validator task calls `BuildAttestationData` on the chain task first, + /// receives the `AttestationData` via oneshot, then calls this method. + /// This keeps XMSS signing entirely off the chain task's thread. + pub fn create_attestations_from_data( + &self, + slot: Slot, + attestation_data: AttestationData, + ) -> Vec { + if attestation_data.target.slot < attestation_data.source.slot { warn!( - target_slot = vote_target.slot.0, - source_slot = store.latest_justified.slot.0, + target_slot = attestation_data.target.slot.0, + source_slot = attestation_data.source.slot.0, "Skipping attestation: target slot must be >= source slot" ); return vec![]; } - let head_block = match store.blocks.get(&store.head) { - Some(b) => b, - None => { - warn!("WARNING: Attestation skipped. (Reason: HEAD BLOCK NOT FOUND)"); - return vec![]; - } - }; - - let head_checkpoint = Checkpoint { - root: store.head, - slot: head_block.slot, - }; - self.config .validator_indices .iter() .filter_map(|&idx| { - let attestation = AttestationData { - slot, - head: head_checkpoint.clone(), - target: vote_target.clone(), - source: store.latest_justified.clone(), - }; - let signature = if let Some(ref key_manager) = self.key_manager { - // Sign with XMSS - let message = attestation.hash_tree_root(); + let message = attestation_data.hash_tree_root(); let epoch = slot.0 as u32; let _timer = METRICS.get().map(|metrics| { @@ -399,17 +366,17 @@ impl ValidatorService { .lean_pq_sig_attestation_signing_time_seconds .start_timer() }); + match key_manager.sign(idx, epoch, message) { Ok(sig) => { - // Record successful attestation signature METRICS.get().map(|metrics| { metrics.lean_pq_sig_attestation_signatures_total.inc(); }); info!( slot = slot.0, validator = idx, - target_slot = vote_target.slot.0, - source_slot = store.latest_justified.slot.0, + target_slot = attestation_data.target.slot.0, + source_slot = attestation_data.source.slot.0, "Created signed attestation" ); sig @@ -424,12 +391,9 @@ impl ValidatorService { } } } else { - // No key manager - use zero signature info!( slot = slot.0, validator = idx, - target_slot = vote_target.slot.0, - source_slot = store.latest_justified.slot.0, "Created attestation with zero signature" ); Signature::default() @@ -437,10 +401,11 @@ impl ValidatorService { Some(SignedAttestation { validator_id: idx, - message: attestation, + message: attestation_data.clone(), signature, }) }) .collect() } + }