From 8aac410a010ad1f1ecd144a675d4ca71ebe65f33 Mon Sep 17 00:00:00 2001 From: Povilas Liubauskas Date: Fri, 27 Feb 2026 13:33:08 +0200 Subject: [PATCH 1/3] Backport c61665b3a Penalize peers that send an invalid rpc request --- src/rpc/handler.rs | 17 ++++++-- src/rpc/protocol.rs | 10 +++-- tests/rpc_tests.rs | 101 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 121 insertions(+), 7 deletions(-) diff --git a/src/rpc/handler.rs b/src/rpc/handler.rs index 17c5f7f..01cd992 100644 --- a/src/rpc/handler.rs +++ b/src/rpc/handler.rs @@ -15,7 +15,8 @@ use helper_functions::misc; use libp2p::PeerId; use libp2p::swarm::handler::{ ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError, - FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol, + FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, StreamUpgradeError, + SubstreamProtocol, }; use libp2p::swarm::{ConnectionId, Stream}; use logging::exception; @@ -890,6 +891,16 @@ where ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error }) => { self.on_dial_upgrade_error(info, error) } + ConnectionEvent::ListenUpgradeError(ListenUpgradeError { + error: (proto, error), + .. + }) => { + self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound { + id: self.current_inbound_substream_id, + proto, + error, + })); + } _ => { // NOTE: ConnectionEvent is a non exhaustive enum so updates should be based on // release notes more than compiler feedback @@ -926,7 +937,7 @@ where request.count() )), })); - return self.shutdown(None); + return; } } RequestType::BlobsByRange(request) => { @@ -942,7 +953,7 @@ where max_allowed, max_requested_blobs )), })); - return self.shutdown(None); + return; } } _ => {} diff --git a/src/rpc/protocol.rs b/src/rpc/protocol.rs index 568c9e4..b8d97b0 100644 --- a/src/rpc/protocol.rs +++ b/src/rpc/protocol.rs @@ -589,7 +589,7 @@ where P: Preset, { type Output = InboundOutput; - type Error = RPCError; + type Error = (Protocol, RPCError); type Future = BoxFuture<'static, Result>; fn upgrade_inbound(self, socket: TSocket, protocol: ProtocolId) -> Self::Future { @@ -632,10 +632,12 @@ where ) .await { - Err(e) => Err(RPCError::from(e)), + Err(e) => Err((versioned_protocol.protocol(), RPCError::from(e))), Ok((Some(Ok(request)), stream)) => Ok((request, stream)), - Ok((Some(Err(e)), _)) => Err(e), - Ok((None, _)) => Err(RPCError::IncompleteStream), + Ok((Some(Err(e)), _)) => Err((versioned_protocol.protocol(), e)), + Ok((None, _)) => { + Err((versioned_protocol.protocol(), RPCError::IncompleteStream)) + } } } } diff --git a/tests/rpc_tests.rs b/tests/rpc_tests.rs index ca313c1..c35e2d7 100644 --- a/tests/rpc_tests.rs +++ b/tests/rpc_tests.rs @@ -1612,3 +1612,104 @@ async fn test_active_requests() { } } } + +// Test that when a node receives an invalid BlocksByRange request exceeding the maximum count, +// it bans the sender. +#[tokio::test] +async fn test_request_too_large_blocks_by_range() { + let config = Arc::new(Config::mainnet().rapid_upgrade()); + + test_request_too_large( + AppRequestId::Application(1), + RequestType::BlocksByRange(OldBlocksByRangeRequest::new( + 0, + config.max_request_blocks(Phase::Phase0) + 1, // exceeds the max request defined in the spec. + 1, + )), + ) + .await; +} + +// Test that when a node receives an invalid BlobsByRange request exceeding the maximum count, +// it bans the sender. +#[tokio::test] +async fn test_request_too_large_blobs_by_range() { + let config = Arc::new(Config::mainnet().rapid_upgrade()); + + let max_request_blobs_count = + config.max_request_blob_sidecars(Phase::Phase0) / config.max_blobs_per_block(0); + test_request_too_large( + AppRequestId::Application(1), + RequestType::BlobsByRange(BlobsByRangeRequest { + start_slot: 0, + count: max_request_blobs_count + 1, // exceeds the max request defined in the spec. + }), + ) + .await; +} + +async fn test_request_too_large(app_request_id: AppRequestId, request: RequestType) { + // set up the logging. The level and enabled logging or not + let log_level = "debug"; + let enable_logging = false; + build_tracing_subscriber(log_level, enable_logging); + let config = Arc::new(Config::mainnet().rapid_upgrade()); + + // get sender/receiver + let (mut sender, mut receiver) = + common::build_node_pair::(&config, Phase::Phase0, Protocol::Tcp, false, None) + .await; + + // Build the sender future + let sender_future = async { + loop { + match sender.next_event().await { + NetworkEvent::PeerConnectedOutgoing(peer_id) => { + debug!(?request, %peer_id, "Sending RPC request"); + sender + .send_request(peer_id, app_request_id, request.clone()) + .unwrap(); + } + NetworkEvent::ResponseReceived { + app_request_id, + response, + .. + } => { + debug!(?app_request_id, ?response, "Received response"); + } + NetworkEvent::RPCFailed { error, .. } => { + // This variant should be unreachable, as the receiver doesn't respond with an error when a request exceeds the limit. + debug!(?error, "RPC failed"); + unreachable!(); + } + NetworkEvent::PeerDisconnected(peer_id) => { + // The receiver should disconnect as a result of the invalid request. + debug!(%peer_id, "Peer disconnected"); + // End the test. + return; + } + _ => {} + } + } + } + .instrument(info_span!("Sender")); + + // Build the receiver future + let receiver_future = async { + loop { + if let NetworkEvent::RequestReceived { .. } = receiver.next_event().await { + // This event should be unreachable, as the handler drops the invalid request. + unreachable!(); + } + } + } + .instrument(info_span!("Receiver")); + + tokio::select! { + _ = sender_future => {} + _ = receiver_future => {} + _ = sleep(Duration::from_secs(30)) => { + panic!("Future timed out"); + } + } +} From 48a590d8c24ce59024f03cb659b32c81c555e306 Mon Sep 17 00:00:00 2001 From: Povilas Liubauskas Date: Fri, 27 Feb 2026 13:53:25 +0200 Subject: [PATCH 2/3] Backport c5b4580e3 Return correct variant for snappy errors --- src/rpc/codec.rs | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/src/rpc/codec.rs b/src/rpc/codec.rs index 45e9b8b..ff96050 100644 --- a/src/rpc/codec.rs +++ b/src/rpc/codec.rs @@ -512,6 +512,9 @@ fn handle_error( Ok(None) } } + // All snappy errors from the snap crate bubble up as `Other` kind errors + // that imply invalid response + ErrorKind::Other => Err(RPCError::InvalidData(err.to_string())), _ => Err(RPCError::from(err)), } } @@ -1111,6 +1114,7 @@ mod tests { types::{EnrAttestationBitfield, ForkContext}, }; use anyhow::Result; + use enum_iterator::Sequence; use snap::write::FrameEncoder; use ssz::{ByteList, DynamicList}; use std::io::Write; @@ -2555,4 +2559,46 @@ mod tests { RPCError::InvalidData(_) )); } + + /// Test invalid snappy response. + #[test] + fn test_invalid_snappy_response() { + let config = Arc::new(Config::mainnet().rapid_upgrade()); + let fork_context = Arc::new(ForkContext::dummy::( + &config, + Phase::last().expect("there should be at least one phase"), + )); + let max_packet_size = config.max_payload_size; // 10 MiB. + + let protocol = ProtocolId::new(SupportedProtocol::BlocksByRangeV2, Encoding::SSZSnappy); + + let mut codec = SSZSnappyOutboundCodec::::new( + protocol.clone(), + max_packet_size, + fork_context.clone(), + ); + + let mut payload = BytesMut::new(); + payload.extend_from_slice(&[0u8]); + let deneb_epoch = config.deneb_fork_epoch; + payload.extend_from_slice(&fork_context.context_bytes(deneb_epoch).as_bytes()); + + // Claim the MAXIMUM allowed size (10 MiB) + let claimed_size = max_packet_size; + let mut uvi_codec: Uvi = Uvi::default(); + uvi_codec.encode(claimed_size, &mut payload).unwrap(); + payload.extend_from_slice(&[0xBB; 16]); // Junk snappy. + + let result = codec.decode(&mut payload); + + assert!(result.is_err(), "Expected decode to fail"); + + // IoError = reached snappy decode (allocation happened). + let err = result.unwrap_err(); + assert!( + matches!(err, RPCError::InvalidData(_)), + "Should return invalid data variant {}", + err + ); + } } From 8643cf936b8206437955a1852f8df1c963316f77 Mon Sep 17 00:00:00 2001 From: hangleang Date: Fri, 23 Jan 2026 23:22:11 +0700 Subject: [PATCH 3/3] Add payload attestation gossip topic --- src/service/gossip_cache.rs | 13 ++++++++++++ src/service/mod.rs | 1 + src/types/pubsub.rs | 41 +++++++++++++++++++++++++++++++++++-- src/types/topics.rs | 18 +++++++++++++++- 4 files changed, 70 insertions(+), 3 deletions(-) diff --git a/src/service/gossip_cache.rs b/src/service/gossip_cache.rs index cde1797..98edb77 100644 --- a/src/service/gossip_cache.rs +++ b/src/service/gossip_cache.rs @@ -46,6 +46,8 @@ pub struct GossipCache { light_client_optimistic_update: Option, /// Timeout for execution payload bids. execution_payload_bid: Option, + /// Timeout for payload attestation messages. + payload_attestation_message: Option, } #[derive(Default)] @@ -79,6 +81,8 @@ pub struct GossipCacheBuilder { light_client_optimistic_update: Option, /// Timeout for execution payload bids. execution_payload_bid: Option, + /// Timeout for payload attestation messages. + payload_attestation_message: Option, } #[allow(dead_code)] @@ -161,6 +165,12 @@ impl GossipCacheBuilder { self } + /// Timeout for payload attestation messages. + pub fn payload_attestation_message_timeout(mut self, timeout: Duration) -> Self { + self.payload_attestation_message = Some(timeout); + self + } + pub fn build(self) -> GossipCache { let GossipCacheBuilder { default_timeout, @@ -178,6 +188,7 @@ impl GossipCacheBuilder { light_client_finality_update, light_client_optimistic_update, execution_payload_bid, + payload_attestation_message, } = self; GossipCache { expirations: DelayQueue::default(), @@ -196,6 +207,7 @@ impl GossipCacheBuilder { light_client_finality_update: light_client_finality_update.or(default_timeout), light_client_optimistic_update: light_client_optimistic_update.or(default_timeout), execution_payload_bid: execution_payload_bid.or(default_timeout), + payload_attestation_message: payload_attestation_message.or(default_timeout), } } } @@ -224,6 +236,7 @@ impl GossipCache { GossipKind::LightClientFinalityUpdate => self.light_client_finality_update, GossipKind::LightClientOptimisticUpdate => self.light_client_optimistic_update, GossipKind::ExecutionPayloadBid => self.execution_payload_bid, + GossipKind::PayloadAttestationMessage => self.payload_attestation_message, }; let Some(expire_timeout) = expire_timeout else { return; diff --git a/src/service/mod.rs b/src/service/mod.rs index 5a0f81d..945c7ec 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -272,6 +272,7 @@ impl Network

{ // .sync_committee_message_timeout(timeout) // Do not retry .bls_to_execution_change_timeout(half_epoch * 2) .execution_payload_bid_timeout(chain_config.slot_duration_ms) + .payload_attestation_message_timeout(chain_config.slot_duration_ms) .build() }; diff --git a/src/types/pubsub.rs b/src/types/pubsub.rs index 03490a7..6b6e459 100644 --- a/src/types/pubsub.rs +++ b/src/types/pubsub.rs @@ -30,8 +30,8 @@ use types::{ DataColumnSidecar as FuluDataColumnSidecar, SignedBeaconBlock as FuluSignedBeaconBlock, }, gloas::containers::{ - DataColumnSidecar as GloasDataColumnSidecar, SignedBeaconBlock as GloasSignedBeaconBlock, - SignedExecutionPayloadBid, + DataColumnSidecar as GloasDataColumnSidecar, PayloadAttestationMessage, + SignedBeaconBlock as GloasSignedBeaconBlock, SignedExecutionPayloadBid, }, nonstandard::Phase, phase0::{ @@ -78,6 +78,8 @@ pub enum PubsubMessage { LightClientOptimisticUpdate(Box>), /// Gossipsub message providing notification of an execution payload bid. ExecutionPayloadBid(Arc>), + /// Gossipsub message providing notification of a payload attestation message. + PayloadAttestationMessage(Arc), } // Implements the `DataTransform` trait of gossipsub to employ snappy compression @@ -181,6 +183,7 @@ impl PubsubMessage

{ GossipKind::LightClientOptimisticUpdate } PubsubMessage::ExecutionPayloadBid(_) => GossipKind::ExecutionPayloadBid, + PubsubMessage::PayloadAttestationMessage(_) => GossipKind::PayloadAttestationMessage, } } @@ -568,6 +571,32 @@ impl PubsubMessage

{ )), } } + GossipKind::PayloadAttestationMessage => { + match fork_context.get_fork_from_context_bytes(gossip_topic.fork_digest) { + Some(Phase::Gloas) => { + let payload_attestation = Arc::new( + PayloadAttestationMessage::from_ssz_default(data) + .map_err(|e| format!("{:?}", e))?, + ); + Ok(PubsubMessage::PayloadAttestationMessage( + payload_attestation, + )) + } + Some( + Phase::Phase0 + | Phase::Altair + | Phase::Bellatrix + | Phase::Capella + | Phase::Deneb + | Phase::Electra + | Phase::Fulu, + ) + | None => Err(format!( + "payload_attestation_message topic invalid for given fork digest {:?}", + gossip_topic.fork_digest + )), + } + } } } } @@ -596,6 +625,7 @@ impl PubsubMessage

{ PubsubMessage::LightClientFinalityUpdate(data) => data.to_ssz(), PubsubMessage::LightClientOptimisticUpdate(data) => data.to_ssz(), PubsubMessage::ExecutionPayloadBid(data) => data.to_ssz(), + PubsubMessage::PayloadAttestationMessage(data) => data.to_ssz(), } } } @@ -672,6 +702,13 @@ impl std::fmt::Display for PubsubMessage

{ data.message.slot, data.message.parent_block_root, data.message.builder_index ) } + PubsubMessage::PayloadAttestationMessage(data) => { + write!( + f, + "Payload Attestation: slot: {}, beacon_block_root: {:?}, validator_index: {}", + data.data.slot, data.data.beacon_block_root, data.validator_index + ) + } } } } diff --git a/src/types/topics.rs b/src/types/topics.rs index 14ce48f..db61c6a 100644 --- a/src/types/topics.rs +++ b/src/types/topics.rs @@ -35,6 +35,7 @@ pub const BLS_TO_EXECUTION_CHANGE_TOPIC: &str = "bls_to_execution_change"; pub const LIGHT_CLIENT_FINALITY_UPDATE: &str = "light_client_finality_update"; pub const LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update"; pub const EXECUTION_PAYLOAD_BID_TOPIC: &str = "execution_payload_bid"; +pub const PAYLOAD_ATTESTATION_MESSAGE_TOPIC: &str = "payload_attestation_message"; #[derive(Debug)] pub struct TopicConfig { @@ -104,6 +105,7 @@ pub fn core_topics_to_subscribe( if current_phase >= Phase::Gloas { topics.push(GossipKind::ExecutionPayloadBid); + topics.push(GossipKind::PayloadAttestationMessage); } topics @@ -131,7 +133,8 @@ pub fn is_fork_non_core_topic(topic: &GossipTopic, _phase: Phase) -> bool { | GossipKind::BlsToExecutionChange | GossipKind::LightClientFinalityUpdate | GossipKind::LightClientOptimisticUpdate - | GossipKind::ExecutionPayloadBid => false, + | GossipKind::ExecutionPayloadBid + | GossipKind::PayloadAttestationMessage => false, } } @@ -194,6 +197,8 @@ pub enum GossipKind { LightClientOptimisticUpdate, /// Topic for publishing execution payload bids. ExecutionPayloadBid, + /// Topic for publishing payload attestation messages. + PayloadAttestationMessage, } impl std::fmt::Display for GossipKind { @@ -276,6 +281,7 @@ impl GossipTopic { LIGHT_CLIENT_FINALITY_UPDATE => GossipKind::LightClientFinalityUpdate, LIGHT_CLIENT_OPTIMISTIC_UPDATE => GossipKind::LightClientOptimisticUpdate, EXECUTION_PAYLOAD_BID_TOPIC => GossipKind::ExecutionPayloadBid, + PAYLOAD_ATTESTATION_MESSAGE_TOPIC => GossipKind::PayloadAttestationMessage, topic => match subnet_topic_index(topic) { Some(kind) => kind, None => return Err(format!("Unknown topic: {}", topic)), @@ -344,6 +350,7 @@ impl std::fmt::Display for GossipTopic { GossipKind::LightClientFinalityUpdate => LIGHT_CLIENT_FINALITY_UPDATE.into(), GossipKind::LightClientOptimisticUpdate => LIGHT_CLIENT_OPTIMISTIC_UPDATE.into(), GossipKind::ExecutionPayloadBid => EXECUTION_PAYLOAD_BID_TOPIC.into(), + GossipKind::PayloadAttestationMessage => PAYLOAD_ATTESTATION_MESSAGE_TOPIC.into(), }; write!( f, @@ -416,6 +423,8 @@ mod tests { VoluntaryExit, ProposerSlashing, AttesterSlashing, + ExecutionPayloadBid, + PayloadAttestationMessage, ] .iter() { @@ -512,6 +521,11 @@ mod tests { assert_eq!("voluntary_exit", VoluntaryExit.as_ref()); assert_eq!("proposer_slashing", ProposerSlashing.as_ref()); assert_eq!("attester_slashing", AttesterSlashing.as_ref()); + assert_eq!("execution_payload_bid", ExecutionPayloadBid.as_ref()); + assert_eq!( + "payload_attestation_message", + PayloadAttestationMessage.as_ref() + ); } fn get_chain_config() -> ChainConfig { @@ -593,6 +607,8 @@ mod tests { GossipKind::LightClientFinalityUpdate, GossipKind::LightClientOptimisticUpdate, GossipKind::BlsToExecutionChange, + GossipKind::ExecutionPayloadBid, + GossipKind::PayloadAttestationMessage, ]; for subnet in s { expected_topics.push(GossipKind::DataColumnSidecar(subnet));