diff --git a/src/peer_manager/mod.rs b/src/peer_manager/mod.rs index ff7edee..72167be 100644 --- a/src/peer_manager/mod.rs +++ b/src/peer_manager/mod.rs @@ -600,6 +600,8 @@ impl PeerManager { Protocol::BlobsByRoot => PeerAction::MidToleranceError, Protocol::DataColumnsByRoot => PeerAction::MidToleranceError, Protocol::DataColumnsByRange => PeerAction::MidToleranceError, + Protocol::ExecutionPayloadEnvelopesByRange => PeerAction::MidToleranceError, + Protocol::ExecutionPayloadEnvelopesByRoot => PeerAction::MidToleranceError, Protocol::Goodbye => PeerAction::LowToleranceError, Protocol::MetaData => PeerAction::LowToleranceError, Protocol::Status => PeerAction::LowToleranceError, @@ -621,6 +623,8 @@ impl PeerManager { Protocol::BlobsByRoot => return, Protocol::DataColumnsByRoot => return, Protocol::DataColumnsByRange => return, + Protocol::ExecutionPayloadEnvelopesByRange => return, + Protocol::ExecutionPayloadEnvelopesByRoot => return, Protocol::Goodbye => return, Protocol::LightClientBootstrap => return, Protocol::LightClientOptimisticUpdate => return, @@ -644,6 +648,8 @@ impl PeerManager { Protocol::BlobsByRoot => PeerAction::MidToleranceError, Protocol::DataColumnsByRoot => PeerAction::MidToleranceError, Protocol::DataColumnsByRange => PeerAction::MidToleranceError, + Protocol::ExecutionPayloadEnvelopesByRange => PeerAction::MidToleranceError, + Protocol::ExecutionPayloadEnvelopesByRoot => PeerAction::MidToleranceError, Protocol::LightClientBootstrap => return, Protocol::LightClientOptimisticUpdate => return, Protocol::LightClientFinalityUpdate => return, diff --git a/src/rpc/codec.rs b/src/rpc/codec.rs index 45e9b8b..533fa29 100644 --- a/src/rpc/codec.rs +++ b/src/rpc/codec.rs @@ -32,6 +32,7 @@ use types::{ }, gloas::containers::{ DataColumnSidecar as GloasDataColumnSidecar, SignedBeaconBlock as GloasSignedBeaconBlock, + SignedExecutionPayloadEnvelope, }, nonstandard::Phase, phase0::{containers::SignedBeaconBlock as Phase0SignedBeaconBlock, primitives::ForkDigest}, @@ -98,6 +99,8 @@ impl SSZSnappyInboundCodec

{ RpcSuccessResponse::BlobsByRoot(res) => res.to_ssz()?, RpcSuccessResponse::DataColumnsByRoot(res) => res.to_ssz()?, RpcSuccessResponse::DataColumnsByRange(res) => res.to_ssz()?, + RpcSuccessResponse::ExecutionPayloadEnvelopesByRoot(res) => res.to_ssz()?, + RpcSuccessResponse::ExecutionPayloadEnvelopesByRange(res) => res.to_ssz()?, RpcSuccessResponse::LightClientBootstrap(res) => res.to_ssz()?, RpcSuccessResponse::LightClientOptimisticUpdate(res) => res.to_ssz()?, RpcSuccessResponse::LightClientFinalityUpdate(res) => res.to_ssz()?, @@ -397,6 +400,8 @@ impl Encoder> for SSZSnappyOutboundCodec

{ RequestType::BlobsByRoot(req) => req.blob_ids.to_ssz()?, RequestType::DataColumnsByRange(req) => req.to_ssz()?, RequestType::DataColumnsByRoot(req) => req.data_column_ids.to_ssz()?, + RequestType::ExecutionPayloadEnvelopesByRange(req) => req.to_ssz()?, + RequestType::ExecutionPayloadEnvelopesByRoot(req) => req.block_roots.to_ssz()?, RequestType::Ping(req) => req.to_ssz()?, RequestType::LightClientBootstrap(req) => req.to_ssz()?, RequestType::LightClientUpdatesByRange(req) => req.to_ssz()?, @@ -512,6 +517,9 @@ fn handle_error( Ok(None) } } + // All snappy errors from the snap crate bubble up as `Other` kind errors + // that imply invalid response + ErrorKind::Other => Err(RPCError::InvalidData(err.to_string())), _ => Err(RPCError::from(err)), } } @@ -628,6 +636,19 @@ fn handle_rpc_request( )?, }, ))), + SupportedProtocol::ExecutionPayloadEnvelopesByRangeV1 => { + Ok(Some(RequestType::ExecutionPayloadEnvelopesByRange( + ExecutionPayloadEnvelopesByRangeRequest::from_ssz_default(decoded_buffer)?, + ))) + } + SupportedProtocol::ExecutionPayloadEnvelopesByRootV1 => Ok(Some( + RequestType::ExecutionPayloadEnvelopesByRoot(ExecutionPayloadEnvelopesByRootRequest { + block_roots: DynamicList::from_ssz( + &(config.max_request_payloads as usize), + decoded_buffer, + )?, + }), + )), SupportedProtocol::PingV1 => Ok(Some(RequestType::Ping(Ping { data: u64::from_ssz_default(decoded_buffer)?, }))), @@ -799,6 +820,58 @@ fn handle_rpc_response( ), )), }, + SupportedProtocol::ExecutionPayloadEnvelopesByRangeV1 => match fork_name { + Some(Phase::Gloas) => Ok(Some(RpcSuccessResponse::ExecutionPayloadEnvelopesByRange( + Arc::new(SignedExecutionPayloadEnvelope::from_ssz_default( + decoded_buffer, + )?), + ))), + Some( + Phase::Phase0 + | Phase::Altair + | Phase::Bellatrix + | Phase::Capella + | Phase::Deneb + | Phase::Electra + | Phase::Fulu, + ) => Err(RPCError::ErrorResponse( + RpcErrorResponse::InvalidRequest, + "Invalid fork name for execution payload envelopes by range".to_string(), + )), + None => Err(RPCError::ErrorResponse( + RpcErrorResponse::InvalidRequest, + format!( + "No context bytes provided for {:?} response", + versioned_protocol + ), + )), + }, + SupportedProtocol::ExecutionPayloadEnvelopesByRootV1 => match fork_name { + Some(Phase::Gloas) => Ok(Some(RpcSuccessResponse::ExecutionPayloadEnvelopesByRoot( + Arc::new(SignedExecutionPayloadEnvelope::from_ssz_default( + decoded_buffer, + )?), + ))), + Some( + Phase::Phase0 + | Phase::Altair + | Phase::Bellatrix + | Phase::Capella + | Phase::Deneb + | Phase::Electra + | Phase::Fulu, + ) => Err(RPCError::ErrorResponse( + RpcErrorResponse::InvalidRequest, + "Invalid fork name for execution payload envelopes by root".to_string(), + )), + None => Err(RPCError::ErrorResponse( + RpcErrorResponse::InvalidRequest, + format!( + "No context bytes provided for {:?} response", + versioned_protocol + ), + )), + }, SupportedProtocol::PingV1 => Ok(Some(RpcSuccessResponse::Pong(Ping { data: u64::from_ssz_default(decoded_buffer)?, }))), @@ -1111,6 +1184,7 @@ mod tests { types::{EnrAttestationBitfield, ForkContext}, }; use anyhow::Result; + use enum_iterator::Sequence; use snap::write::FrameEncoder; use ssz::{ByteList, DynamicList}; use std::io::Write; @@ -1477,6 +1551,18 @@ mod tests { RequestType::DataColumnsByRange(dcbrange) => { assert_eq!(decoded, RequestType::DataColumnsByRange(dcbrange)) } + RequestType::ExecutionPayloadEnvelopesByRange(epbrange) => { + assert_eq!( + decoded, + RequestType::ExecutionPayloadEnvelopesByRange(epbrange) + ) + } + RequestType::ExecutionPayloadEnvelopesByRoot(epbroot) => { + assert_eq!( + decoded, + RequestType::ExecutionPayloadEnvelopesByRoot(epbroot) + ) + } RequestType::Ping(ping) => { assert_eq!(decoded, RequestType::Ping(ping)) } @@ -2555,4 +2641,46 @@ mod tests { RPCError::InvalidData(_) )); } + + /// Test invalid snappy response. + #[test] + fn test_invalid_snappy_response() { + let config = Arc::new(Config::mainnet().rapid_upgrade()); + let fork_context = Arc::new(ForkContext::dummy::( + &config, + Phase::last().expect("there should be at least one phase"), + )); + let max_packet_size = config.max_payload_size; // 10 MiB. + + let protocol = ProtocolId::new(SupportedProtocol::BlocksByRangeV2, Encoding::SSZSnappy); + + let mut codec = SSZSnappyOutboundCodec::::new( + protocol.clone(), + max_packet_size, + fork_context.clone(), + ); + + let mut payload = BytesMut::new(); + payload.extend_from_slice(&[0u8]); + let deneb_epoch = config.deneb_fork_epoch; + payload.extend_from_slice(&fork_context.context_bytes(deneb_epoch).as_bytes()); + + // Claim the MAXIMUM allowed size (10 MiB) + let claimed_size = max_packet_size; + let mut uvi_codec: Uvi = Uvi::default(); + uvi_codec.encode(claimed_size, &mut payload).unwrap(); + payload.extend_from_slice(&[0xBB; 16]); // Junk snappy. + + let result = codec.decode(&mut payload); + + assert!(result.is_err(), "Expected decode to fail"); + + // IoError = reached snappy decode (allocation happened). + let err = result.unwrap_err(); + assert!( + matches!(err, RPCError::InvalidData(_)), + "Should return invalid data variant {}", + err + ); + } } diff --git a/src/rpc/config.rs b/src/rpc/config.rs index b0ee6fe..d96567f 100644 --- a/src/rpc/config.rs +++ b/src/rpc/config.rs @@ -93,6 +93,8 @@ pub struct RateLimiterConfig { pub(super) blobs_by_root_quota: Quota, pub(super) data_columns_by_root_quota: Quota, pub(super) data_columns_by_range_quota: Quota, + pub(super) execution_payload_envelopes_by_range_quota: Quota, + pub(super) execution_payload_envelopes_by_root_quota: Quota, pub(super) light_client_bootstrap_quota: Quota, pub(super) light_client_optimistic_update_quota: Quota, pub(super) light_client_finality_update_quota: Quota, @@ -126,6 +128,10 @@ impl RateLimiterConfig { pub const DEFAULT_LIGHT_CLIENT_OPTIMISTIC_UPDATE_QUOTA: Quota = Quota::one_every(10); pub const DEFAULT_LIGHT_CLIENT_FINALITY_UPDATE_QUOTA: Quota = Quota::one_every(10); pub const DEFAULT_LIGHT_CLIENT_UPDATES_BY_RANGE_QUOTA: Quota = Quota::one_every(10); + pub const DEFAULT_EXECUTION_PAYLOAD_ENVELOPES_BY_RANGE_QUOTA: Quota = + Quota::n_every(NonZeroU64::new(128).unwrap(), 10); + pub const DEFAULT_EXECUTION_PAYLOAD_ENVELOPES_BY_ROOT_QUOTA: Quota = + Quota::n_every(NonZeroU64::new(128).unwrap(), 10); } impl Default for RateLimiterConfig { @@ -141,6 +147,10 @@ impl Default for RateLimiterConfig { blobs_by_root_quota: Self::DEFAULT_BLOBS_BY_ROOT_QUOTA, data_columns_by_root_quota: Self::DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA, data_columns_by_range_quota: Self::DEFAULT_DATA_COLUMNS_BY_RANGE_QUOTA, + execution_payload_envelopes_by_range_quota: + Self::DEFAULT_EXECUTION_PAYLOAD_ENVELOPES_BY_RANGE_QUOTA, + execution_payload_envelopes_by_root_quota: + Self::DEFAULT_EXECUTION_PAYLOAD_ENVELOPES_BY_ROOT_QUOTA, light_client_bootstrap_quota: Self::DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA, light_client_optimistic_update_quota: Self::DEFAULT_LIGHT_CLIENT_OPTIMISTIC_UPDATE_QUOTA, @@ -179,6 +189,14 @@ impl Debug for RateLimiterConfig { "data_columns_by_root", fmt_q!(&self.data_columns_by_root_quota), ) + .field( + "execution_payload_envelopes_by_range", + fmt_q!(&self.execution_payload_envelopes_by_range_quota), + ) + .field( + "execution_payload_envelopes_by_root", + fmt_q!(&self.execution_payload_envelopes_by_root_quota), + ) .finish() } } @@ -201,6 +219,8 @@ impl FromStr for RateLimiterConfig { let mut blobs_by_root_quota = None; let mut data_columns_by_root_quota = None; let mut data_columns_by_range_quota = None; + let mut execution_payload_envelopes_by_range_quota = None; + let mut execution_payload_envelopes_by_root_quota = None; let mut light_client_bootstrap_quota = None; let mut light_client_optimistic_update_quota = None; let mut light_client_finality_update_quota = None; @@ -222,6 +242,14 @@ impl FromStr for RateLimiterConfig { Protocol::DataColumnsByRange => { data_columns_by_range_quota = data_columns_by_range_quota.or(quota) } + Protocol::ExecutionPayloadEnvelopesByRange => { + execution_payload_envelopes_by_range_quota = + execution_payload_envelopes_by_range_quota.or(quota) + } + Protocol::ExecutionPayloadEnvelopesByRoot => { + execution_payload_envelopes_by_root_quota = + execution_payload_envelopes_by_root_quota.or(quota) + } Protocol::Ping => ping_quota = ping_quota.or(quota), Protocol::MetaData => meta_data_quota = meta_data_quota.or(quota), Protocol::LightClientBootstrap => { @@ -257,6 +285,10 @@ impl FromStr for RateLimiterConfig { .unwrap_or(Self::DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA), data_columns_by_range_quota: data_columns_by_range_quota .unwrap_or(Self::DEFAULT_DATA_COLUMNS_BY_RANGE_QUOTA), + execution_payload_envelopes_by_range_quota: execution_payload_envelopes_by_range_quota + .unwrap_or(Self::DEFAULT_EXECUTION_PAYLOAD_ENVELOPES_BY_RANGE_QUOTA), + execution_payload_envelopes_by_root_quota: execution_payload_envelopes_by_root_quota + .unwrap_or(Self::DEFAULT_EXECUTION_PAYLOAD_ENVELOPES_BY_ROOT_QUOTA), light_client_bootstrap_quota: light_client_bootstrap_quota .unwrap_or(Self::DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA), light_client_optimistic_update_quota: light_client_optimistic_update_quota diff --git a/src/rpc/handler.rs b/src/rpc/handler.rs index 17c5f7f..01cd992 100644 --- a/src/rpc/handler.rs +++ b/src/rpc/handler.rs @@ -15,7 +15,8 @@ use helper_functions::misc; use libp2p::PeerId; use libp2p::swarm::handler::{ ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError, - FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol, + FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, StreamUpgradeError, + SubstreamProtocol, }; use libp2p::swarm::{ConnectionId, Stream}; use logging::exception; @@ -890,6 +891,16 @@ where ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error }) => { self.on_dial_upgrade_error(info, error) } + ConnectionEvent::ListenUpgradeError(ListenUpgradeError { + error: (proto, error), + .. + }) => { + self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound { + id: self.current_inbound_substream_id, + proto, + error, + })); + } _ => { // NOTE: ConnectionEvent is a non exhaustive enum so updates should be based on // release notes more than compiler feedback @@ -926,7 +937,7 @@ where request.count() )), })); - return self.shutdown(None); + return; } } RequestType::BlobsByRange(request) => { @@ -942,7 +953,7 @@ where max_allowed, max_requested_blobs )), })); - return self.shutdown(None); + return; } } _ => {} diff --git a/src/rpc/methods.rs b/src/rpc/methods.rs index f2f9456..4502789 100644 --- a/src/rpc/methods.rs +++ b/src/rpc/methods.rs @@ -23,6 +23,7 @@ use types::{ config::Config as ChainConfig, deneb::containers::BlobSidecar, fulu::{containers::DataColumnsByRootIdentifier, primitives::ColumnIndex}, + gloas::containers::SignedExecutionPayloadEnvelope, phase0::primitives::{Epoch, ForkDigest, H256, Slot}, preset::Preset, traits::SignedBeaconBlock as _, @@ -793,6 +794,43 @@ impl DataColumnsByRootRequest

{ } } +/// Request a number of execution payload envelopes from a peer. +#[derive(Copy, Clone, Debug, PartialEq, Eq, Ssz)] +#[ssz(derive_hash = false)] +pub struct ExecutionPayloadEnvelopesByRangeRequest { + /// The starting slot to request execution payload envelopes. + pub start_slot: Slot, + /// The number of slots from the start slot. + pub count: u64, +} + +impl ExecutionPayloadEnvelopesByRangeRequest { + pub fn new(start_slot: Slot, count: u64) -> Self { + Self { start_slot, count } + } + + pub fn max_requested(&self) -> u64 { + self.count + } +} + +#[derive(Clone, Debug, PartialEq)] +pub struct ExecutionPayloadEnvelopesByRootRequest { + /// The list of beacon block roots being requested. + pub block_roots: DynamicList, +} + +impl ExecutionPayloadEnvelopesByRootRequest { + pub fn new(config: &ChainConfig, block_roots: impl Iterator) -> Self { + let block_roots = + DynamicList::from_iter_with_maximum(block_roots, config.max_request_payloads as usize); + Self { block_roots } + } + + pub fn max_requested(&self) -> usize { + self.block_roots.len() + } +} /// Request a number of beacon data columns from a peer. #[derive(Clone, Debug, PartialEq, Ssz)] pub struct LightClientUpdatesByRangeRequest { @@ -847,6 +885,12 @@ pub enum RpcSuccessResponse { /// A response to a get DATA_COLUMN_SIDECARS_BY_RANGE request. DataColumnsByRange(Arc>), + /// A response to a get EXECUTION_PAYLOAD_ENVELOPES_BY_RANGE request. + ExecutionPayloadEnvelopesByRange(Arc>), + + /// A response to a get EXECUTION_PAYLOAD_ENVELOPES_BY_ROOT request. + ExecutionPayloadEnvelopesByRoot(Arc>), + /// A PONG response to a PING request. Pong(Ping), @@ -875,6 +919,12 @@ pub enum ResponseTermination { /// Data column sidecars by range stream termination. DataColumnsByRange, + /// Execution payload envelopes by range stream termination. + ExecutionPayloadEnvelopesByRange, + + /// Execution payload envelopes by root stream termination. + ExecutionPayloadEnvelopesByRoot, + /// Light client updates by range stream termination. LightClientUpdatesByRange, } @@ -888,6 +938,12 @@ impl ResponseTermination { ResponseTermination::BlobsByRoot => Protocol::BlobsByRoot, ResponseTermination::DataColumnsByRoot => Protocol::DataColumnsByRoot, ResponseTermination::DataColumnsByRange => Protocol::DataColumnsByRange, + ResponseTermination::ExecutionPayloadEnvelopesByRange => { + Protocol::ExecutionPayloadEnvelopesByRange + } + ResponseTermination::ExecutionPayloadEnvelopesByRoot => { + Protocol::ExecutionPayloadEnvelopesByRoot + } ResponseTermination::LightClientUpdatesByRange => Protocol::LightClientUpdatesByRange, } } @@ -983,6 +1039,12 @@ impl RpcSuccessResponse

{ RpcSuccessResponse::BlobsByRoot(_) => Protocol::BlobsByRoot, RpcSuccessResponse::DataColumnsByRoot(_) => Protocol::DataColumnsByRoot, RpcSuccessResponse::DataColumnsByRange(_) => Protocol::DataColumnsByRange, + RpcSuccessResponse::ExecutionPayloadEnvelopesByRange(_) => { + Protocol::ExecutionPayloadEnvelopesByRange + } + RpcSuccessResponse::ExecutionPayloadEnvelopesByRoot(_) => { + Protocol::ExecutionPayloadEnvelopesByRoot + } RpcSuccessResponse::Pong(_) => Protocol::Ping, RpcSuccessResponse::MetaData(_) => Protocol::MetaData, RpcSuccessResponse::LightClientBootstrap(_) => Protocol::LightClientBootstrap, @@ -1004,6 +1066,10 @@ impl RpcSuccessResponse

{ } RpcSuccessResponse::DataColumnsByRange(column) | RpcSuccessResponse::DataColumnsByRoot(column) => Some(column.slot()), + RpcSuccessResponse::ExecutionPayloadEnvelopesByRange(envelope) + | RpcSuccessResponse::ExecutionPayloadEnvelopesByRoot(envelope) => { + Some(envelope.message.slot) + } RpcSuccessResponse::LightClientBootstrap(b) => Some(b.slot()), RpcSuccessResponse::LightClientOptimisticUpdate(update) => { Some(update.signature_slot()) @@ -1072,6 +1138,14 @@ impl std::fmt::Display for RpcSuccessResponse

{ sidecar.slot() ) } + RpcSuccessResponse::ExecutionPayloadEnvelopesByRange(envelope) + | RpcSuccessResponse::ExecutionPayloadEnvelopesByRoot(envelope) => { + write!( + f, + "ExecutionPayloadEnvelope: slot: {}", + envelope.message.slot + ) + } RpcSuccessResponse::Pong(ping) => write!(f, "Pong: {}", ping.data), RpcSuccessResponse::MetaData(metadata) => { write!(f, "Metadata: {}", metadata.seq_number()) diff --git a/src/rpc/protocol.rs b/src/rpc/protocol.rs index 568c9e4..926ef96 100644 --- a/src/rpc/protocol.rs +++ b/src/rpc/protocol.rs @@ -21,7 +21,9 @@ use tokio_util::{ use typenum::Unsigned as _; use types::deneb::containers::BlobIdentifier; use types::fulu::containers::DataColumnSidecar as FuluDataColumnSidecar; -use types::gloas::containers::DataColumnSidecar as GloasDataColumnSidecar; +use types::gloas::containers::{ + DataColumnSidecar as GloasDataColumnSidecar, SignedExecutionPayloadEnvelope, +}; use types::phase0::primitives::Epoch; use types::{ altair::containers::{ @@ -74,6 +76,23 @@ pub static DATA_COLUMN_GLOAS_MAX: LazyLock = LazyLock::new(|| { .len() }); +/// Minimum SSZ size of SignedExecutionPayloadEnvelope (all variable fields at minimum). +pub static SIGNED_EXECUTION_PAYLOAD_ENVELOPE_GLOAS_MIN: LazyLock = LazyLock::new(|| { + SignedExecutionPayloadEnvelope::::default() + .to_ssz() + .expect("should serialize") + .len() +}); + +/// Maximum SSZ size of SignedExecutionPayloadEnvelope. +/// Uses .full() method which fills all variable-length fields to maximum. +pub static SIGNED_EXECUTION_PAYLOAD_ENVELOPE_GLOAS_MAX: LazyLock = LazyLock::new(|| { + SignedExecutionPayloadEnvelope::::full() + .to_ssz() + .expect("should serialize") + .len() +}); + pub const ERROR_TYPE_MIN: usize = 0; pub const ERROR_TYPE_MAX: usize = 256; @@ -168,6 +187,13 @@ fn rpc_light_client_bootstrap_limits_by_fork(current_fork: Phase) -> } } +fn rpc_execution_payload_envelope_limits() -> RpcLimits { + RpcLimits::new( + *SIGNED_EXECUTION_PAYLOAD_ENVELOPE_GLOAS_MIN, + *SIGNED_EXECUTION_PAYLOAD_ENVELOPE_GLOAS_MAX, + ) +} + /// Protocol names to be used. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumString, AsRefStr, Display)] #[strum(serialize_all = "snake_case")] @@ -194,6 +220,12 @@ pub enum Protocol { /// The `DataColumnSidecarsByRange` protocol name. #[strum(serialize = "data_column_sidecars_by_range")] DataColumnsByRange, + /// The `ExecutionPayloadEnvelopesByRange` protocol name. + #[strum(serialize = "execution_payload_envelopes_by_range")] + ExecutionPayloadEnvelopesByRange, + /// The `ExecutionPayloadEnvelopesByRoot` protocol name. + #[strum(serialize = "execution_payload_envelopes_by_root")] + ExecutionPayloadEnvelopesByRoot, /// The `Ping` protocol name. Ping, /// The `MetaData` protocol name. @@ -224,6 +256,12 @@ impl Protocol { Protocol::BlobsByRoot => Some(ResponseTermination::BlobsByRoot), Protocol::DataColumnsByRoot => Some(ResponseTermination::DataColumnsByRoot), Protocol::DataColumnsByRange => Some(ResponseTermination::DataColumnsByRange), + Protocol::ExecutionPayloadEnvelopesByRange => { + Some(ResponseTermination::ExecutionPayloadEnvelopesByRange) + } + Protocol::ExecutionPayloadEnvelopesByRoot => { + Some(ResponseTermination::ExecutionPayloadEnvelopesByRoot) + } Protocol::Ping => None, Protocol::MetaData => None, Protocol::LightClientBootstrap => None, @@ -256,6 +294,8 @@ pub enum SupportedProtocol { BlobsByRootV1, DataColumnsByRootV1, DataColumnsByRangeV1, + ExecutionPayloadEnvelopesByRangeV1, + ExecutionPayloadEnvelopesByRootV1, PingV1, MetaDataV1, MetaDataV2, @@ -280,6 +320,8 @@ impl SupportedProtocol { SupportedProtocol::BlobsByRootV1 => "1", SupportedProtocol::DataColumnsByRootV1 => "1", SupportedProtocol::DataColumnsByRangeV1 => "1", + SupportedProtocol::ExecutionPayloadEnvelopesByRangeV1 => "1", + SupportedProtocol::ExecutionPayloadEnvelopesByRootV1 => "1", SupportedProtocol::PingV1 => "1", SupportedProtocol::MetaDataV1 => "1", SupportedProtocol::MetaDataV2 => "2", @@ -304,6 +346,12 @@ impl SupportedProtocol { SupportedProtocol::BlobsByRootV1 => Protocol::BlobsByRoot, SupportedProtocol::DataColumnsByRootV1 => Protocol::DataColumnsByRoot, SupportedProtocol::DataColumnsByRangeV1 => Protocol::DataColumnsByRange, + SupportedProtocol::ExecutionPayloadEnvelopesByRangeV1 => { + Protocol::ExecutionPayloadEnvelopesByRange + } + SupportedProtocol::ExecutionPayloadEnvelopesByRootV1 => { + Protocol::ExecutionPayloadEnvelopesByRoot + } SupportedProtocol::PingV1 => Protocol::Ping, SupportedProtocol::MetaDataV1 => Protocol::MetaData, SupportedProtocol::MetaDataV2 => Protocol::MetaData, @@ -354,6 +402,18 @@ impl SupportedProtocol { ProtocolId::new(SupportedProtocol::DataColumnsByRangeV1, Encoding::SSZSnappy), ]); } + if fork_context.fork_exists(Phase::Gloas) { + supported.extend_from_slice(&[ + ProtocolId::new( + SupportedProtocol::ExecutionPayloadEnvelopesByRangeV1, + Encoding::SSZSnappy, + ), + ProtocolId::new( + SupportedProtocol::ExecutionPayloadEnvelopesByRootV1, + Encoding::SSZSnappy, + ), + ]); + } supported } } @@ -478,6 +538,14 @@ impl ProtocolId { DataColumnsByRangeRequest::

::ssz_max_len() .expect("Unable to get DataColumnsByRange ssz_max_len"), ), + Protocol::ExecutionPayloadEnvelopesByRoot => RpcLimits::new( + 0, + chain_config.max_request_payloads as usize * H256::SIZE.get(), + ), + Protocol::ExecutionPayloadEnvelopesByRange => RpcLimits::new( + ExecutionPayloadEnvelopesByRangeRequest::SIZE.get(), + ExecutionPayloadEnvelopesByRangeRequest::SIZE.get(), + ), Protocol::Ping => RpcLimits::new(Ping::SIZE.get(), Ping::SIZE.get()), Protocol::LightClientBootstrap => RpcLimits::new( LightClientBootstrapRequest::SIZE.get(), @@ -507,6 +575,8 @@ impl ProtocolId { Protocol::DataColumnsByRange => { rpc_data_column_limits::

(fork_context.current_fork_name()) } + Protocol::ExecutionPayloadEnvelopesByRange => rpc_execution_payload_envelope_limits(), + Protocol::ExecutionPayloadEnvelopesByRoot => rpc_execution_payload_envelope_limits(), Protocol::Ping => RpcLimits::new(Ping::SIZE.get(), Ping::SIZE.get()), Protocol::MetaData => RpcLimits::new(MetaDataV1::SIZE.get(), MetaDataV3::SIZE.get()), Protocol::LightClientBootstrap => { @@ -540,6 +610,8 @@ impl ProtocolId { | SupportedProtocol::BlobsByRootV1 | SupportedProtocol::DataColumnsByRootV1 | SupportedProtocol::DataColumnsByRangeV1 + | SupportedProtocol::ExecutionPayloadEnvelopesByRangeV1 + | SupportedProtocol::ExecutionPayloadEnvelopesByRootV1 | SupportedProtocol::LightClientBootstrapV1 | SupportedProtocol::LightClientOptimisticUpdateV1 | SupportedProtocol::LightClientFinalityUpdateV1 @@ -589,7 +661,7 @@ where P: Preset, { type Output = InboundOutput; - type Error = RPCError; + type Error = (Protocol, RPCError); type Future = BoxFuture<'static, Result>; fn upgrade_inbound(self, socket: TSocket, protocol: ProtocolId) -> Self::Future { @@ -632,10 +704,12 @@ where ) .await { - Err(e) => Err(RPCError::from(e)), + Err(e) => Err((versioned_protocol.protocol(), RPCError::from(e))), Ok((Some(Ok(request)), stream)) => Ok((request, stream)), - Ok((Some(Err(e)), _)) => Err(e), - Ok((None, _)) => Err(RPCError::IncompleteStream), + Ok((Some(Err(e)), _)) => Err((versioned_protocol.protocol(), e)), + Ok((None, _)) => { + Err((versioned_protocol.protocol(), RPCError::IncompleteStream)) + } } } } @@ -654,6 +728,8 @@ pub enum RequestType { BlobsByRoot(BlobsByRootRequest), DataColumnsByRoot(DataColumnsByRootRequest

), DataColumnsByRange(DataColumnsByRangeRequest

), + ExecutionPayloadEnvelopesByRange(ExecutionPayloadEnvelopesByRangeRequest), + ExecutionPayloadEnvelopesByRoot(ExecutionPayloadEnvelopesByRootRequest), LightClientBootstrap(LightClientBootstrapRequest), LightClientOptimisticUpdate, LightClientFinalityUpdate, @@ -680,6 +756,8 @@ impl RequestType

{ RequestType::BlobsByRoot(req) => req.blob_ids.len() as u64, RequestType::DataColumnsByRoot(req) => req.max_requested() as u64, RequestType::DataColumnsByRange(req) => req.max_requested(), + RequestType::ExecutionPayloadEnvelopesByRange(req) => req.max_requested(), + RequestType::ExecutionPayloadEnvelopesByRoot(req) => req.max_requested() as u64, RequestType::Ping(_) => 1, RequestType::MetaData(_) => 1, RequestType::LightClientBootstrap(_) => 1, @@ -709,6 +787,12 @@ impl RequestType

{ RequestType::BlobsByRoot(_) => SupportedProtocol::BlobsByRootV1, RequestType::DataColumnsByRoot(_) => SupportedProtocol::DataColumnsByRootV1, RequestType::DataColumnsByRange(_) => SupportedProtocol::DataColumnsByRangeV1, + RequestType::ExecutionPayloadEnvelopesByRange(_) => { + SupportedProtocol::ExecutionPayloadEnvelopesByRangeV1 + } + RequestType::ExecutionPayloadEnvelopesByRoot(_) => { + SupportedProtocol::ExecutionPayloadEnvelopesByRootV1 + } RequestType::Ping(_) => SupportedProtocol::PingV1, RequestType::MetaData(req) => match req { MetadataRequest::V1(_) => SupportedProtocol::MetaDataV1, @@ -740,6 +824,12 @@ impl RequestType

{ RequestType::BlobsByRoot(_) => ResponseTermination::BlobsByRoot, RequestType::DataColumnsByRoot(_) => ResponseTermination::DataColumnsByRoot, RequestType::DataColumnsByRange(_) => ResponseTermination::DataColumnsByRange, + RequestType::ExecutionPayloadEnvelopesByRange(_) => { + ResponseTermination::ExecutionPayloadEnvelopesByRange + } + RequestType::ExecutionPayloadEnvelopesByRoot(_) => { + ResponseTermination::ExecutionPayloadEnvelopesByRoot + } RequestType::Status(_) => unreachable!(), RequestType::Goodbye(_) => unreachable!(), RequestType::Ping(_) => unreachable!(), @@ -786,6 +876,14 @@ impl RequestType

{ SupportedProtocol::DataColumnsByRangeV1, Encoding::SSZSnappy, )], + RequestType::ExecutionPayloadEnvelopesByRange(_) => vec![ProtocolId::new( + SupportedProtocol::ExecutionPayloadEnvelopesByRangeV1, + Encoding::SSZSnappy, + )], + RequestType::ExecutionPayloadEnvelopesByRoot(_) => vec![ProtocolId::new( + SupportedProtocol::ExecutionPayloadEnvelopesByRootV1, + Encoding::SSZSnappy, + )], RequestType::Ping(_) => vec![ProtocolId::new( SupportedProtocol::PingV1, Encoding::SSZSnappy, @@ -830,6 +928,8 @@ impl RequestType

{ RequestType::LightClientOptimisticUpdate => true, RequestType::LightClientFinalityUpdate => true, RequestType::LightClientUpdatesByRange(_) => true, + RequestType::ExecutionPayloadEnvelopesByRange(_) => false, + RequestType::ExecutionPayloadEnvelopesByRoot(_) => false, } } } @@ -963,6 +1063,12 @@ impl std::fmt::Display for RequestType

{ RequestType::LightClientUpdatesByRange(_) => { write!(f, "Light client updates by range request") } + RequestType::ExecutionPayloadEnvelopesByRange(req) => { + write!(f, "Execution payload envelopes by range: {:?}", req) + } + RequestType::ExecutionPayloadEnvelopesByRoot(req) => { + write!(f, "Execution payload envelopes by root: {:?}", req) + } } } } diff --git a/src/rpc/rate_limiter.rs b/src/rpc/rate_limiter.rs index ceda039..6b0ad13 100644 --- a/src/rpc/rate_limiter.rs +++ b/src/rpc/rate_limiter.rs @@ -108,6 +108,10 @@ pub struct RPCRateLimiter { dcbroot_rl: Limiter, /// DataColumnsByRange rate limiter. dcbrange_rl: Limiter, + /// ExecutionPayloadEnvelopesByRange rate limiter. + epe_by_range_rl: Limiter, + /// ExecutionPayloadEnvelopesByRoot rate limiter. + epe_by_root_rl: Limiter, /// LightClientBootstrap rate limiter. lc_bootstrap_rl: Limiter, /// LightClientOptimisticUpdate rate limiter. @@ -151,6 +155,10 @@ pub struct RPCRateLimiterBuilder { dcbroot_quota: Option, /// Quota for the DataColumnsByRange protocol. dcbrange_quota: Option, + /// Quota for the ExecutionPayloadEnvelopesByRange protocol. + epe_by_range_quota: Option, + /// Quota for the ExecutionPayloadEnvelopesByRoot protocol. + epe_by_root_quota: Option, /// Quota for the LightClientBootstrap protocol. lcbootstrap_quota: Option, /// Quota for the LightClientOptimisticUpdate protocol. @@ -176,6 +184,8 @@ impl RPCRateLimiterBuilder { Protocol::BlobsByRoot => self.blbroot_quota = q, Protocol::DataColumnsByRoot => self.dcbroot_quota = q, Protocol::DataColumnsByRange => self.dcbrange_quota = q, + Protocol::ExecutionPayloadEnvelopesByRange => self.epe_by_range_quota = q, + Protocol::ExecutionPayloadEnvelopesByRoot => self.epe_by_root_quota = q, Protocol::LightClientBootstrap => self.lcbootstrap_quota = q, Protocol::LightClientOptimisticUpdate => self.lc_optimistic_update_quota = q, Protocol::LightClientFinalityUpdate => self.lc_finality_update_quota = q, @@ -224,6 +234,14 @@ impl RPCRateLimiterBuilder { .dcbrange_quota .ok_or("DataColumnsByRange quota not specified")?; + let epe_by_range_quota = self + .epe_by_range_quota + .ok_or("ExecutionPayloadEnvelopesByRange quota not specified")?; + + let epe_by_root_quota = self + .epe_by_root_quota + .ok_or("ExecutionPayloadEnvelopesByRoot quota not specified")?; + // create the rate limiters let ping_rl = Limiter::from_quota(ping_quota)?; let metadata_rl = Limiter::from_quota(metadata_quota)?; @@ -235,6 +253,8 @@ impl RPCRateLimiterBuilder { let blbroot_rl = Limiter::from_quota(blbroots_quota)?; let dcbroot_rl = Limiter::from_quota(dcbroot_quota)?; let dcbrange_rl = Limiter::from_quota(dcbrange_quota)?; + let epe_by_range_rl = Limiter::from_quota(epe_by_range_quota)?; + let epe_by_root_rl = Limiter::from_quota(epe_by_root_quota)?; let lc_bootstrap_rl = Limiter::from_quota(lc_bootstrap_quota)?; let lc_optimistic_update_rl = Limiter::from_quota(lc_optimistic_update_quota)?; let lc_finality_update_rl = Limiter::from_quota(lc_finality_update_quota)?; @@ -258,6 +278,8 @@ impl RPCRateLimiterBuilder { blbroot_rl, dcbroot_rl, dcbrange_rl, + epe_by_range_rl, + epe_by_root_rl, lc_bootstrap_rl, lc_optimistic_update_rl, lc_finality_update_rl, @@ -311,6 +333,8 @@ impl RPCRateLimiter { blobs_by_root_quota, data_columns_by_root_quota, data_columns_by_range_quota, + execution_payload_envelopes_by_range_quota, + execution_payload_envelopes_by_root_quota, light_client_bootstrap_quota, light_client_optimistic_update_quota, light_client_finality_update_quota, @@ -328,6 +352,14 @@ impl RPCRateLimiter { .set_quota(Protocol::BlobsByRoot, blobs_by_root_quota) .set_quota(Protocol::DataColumnsByRoot, data_columns_by_root_quota) .set_quota(Protocol::DataColumnsByRange, data_columns_by_range_quota) + .set_quota( + Protocol::ExecutionPayloadEnvelopesByRange, + execution_payload_envelopes_by_range_quota, + ) + .set_quota( + Protocol::ExecutionPayloadEnvelopesByRoot, + execution_payload_envelopes_by_root_quota, + ) .set_quota(Protocol::LightClientBootstrap, light_client_bootstrap_quota) .set_quota( Protocol::LightClientOptimisticUpdate, @@ -375,6 +407,8 @@ impl RPCRateLimiter { Protocol::BlobsByRoot => &mut self.blbroot_rl, Protocol::DataColumnsByRoot => &mut self.dcbroot_rl, Protocol::DataColumnsByRange => &mut self.dcbrange_rl, + Protocol::ExecutionPayloadEnvelopesByRange => &mut self.epe_by_range_rl, + Protocol::ExecutionPayloadEnvelopesByRoot => &mut self.epe_by_root_rl, Protocol::LightClientBootstrap => &mut self.lc_bootstrap_rl, Protocol::LightClientOptimisticUpdate => &mut self.lc_optimistic_update_rl, Protocol::LightClientFinalityUpdate => &mut self.lc_finality_update_rl, @@ -399,6 +433,8 @@ impl RPCRateLimiter { blbroot_rl, dcbroot_rl, dcbrange_rl, + epe_by_range_rl, + epe_by_root_rl, lc_bootstrap_rl, lc_optimistic_update_rl, lc_finality_update_rl, @@ -416,6 +452,8 @@ impl RPCRateLimiter { blbroot_rl.prune(time_since_start); dcbrange_rl.prune(time_since_start); dcbroot_rl.prune(time_since_start); + epe_by_range_rl.prune(time_since_start); + epe_by_root_rl.prune(time_since_start); lc_bootstrap_rl.prune(time_since_start); lc_optimistic_update_rl.prune(time_since_start); lc_finality_update_rl.prune(time_since_start); diff --git a/src/service/api_types.rs b/src/service/api_types.rs index 33adfe0..2d6462a 100644 --- a/src/service/api_types.rs +++ b/src/service/api_types.rs @@ -7,6 +7,7 @@ use types::{ LightClientOptimisticUpdate, LightClientUpdate, SignedBeaconBlock, }, deneb::containers::BlobSidecar, + gloas::containers::SignedExecutionPayloadEnvelope, preset::Preset, }; @@ -41,6 +42,10 @@ pub enum Response { BlobsByRoot(Option>>), /// A response to a get DATA_COLUMN_SIDECARS_BY_ROOT request. DataColumnsByRoot(Option>>), + // A responcse to a get EXECUTION_PAYLOAD_ENVELOPES_BY_RANGE request. + ExecutionPayloadEnvelopesByRange(Option>>), + // A response to a get EXECUTION_PAYLOAD_ENVELOPES_BY_ROOT request. + ExecutionPayloadEnvelopesByRoot(Option>>), /// A response to a LightClientUpdate request. LightClientBootstrap(Arc>), /// A response to a LightClientOptimisticUpdate request. @@ -78,6 +83,22 @@ impl std::convert::From> for RpcResponse

{ Some(d) => RpcResponse::Success(RpcSuccessResponse::DataColumnsByRange(d)), None => RpcResponse::StreamTermination(ResponseTermination::DataColumnsByRange), }, + Response::ExecutionPayloadEnvelopesByRoot(r) => match r { + Some(e) => { + RpcResponse::Success(RpcSuccessResponse::ExecutionPayloadEnvelopesByRoot(e)) + } + None => RpcResponse::StreamTermination( + ResponseTermination::ExecutionPayloadEnvelopesByRoot, + ), + }, + Response::ExecutionPayloadEnvelopesByRange(r) => match r { + Some(e) => { + RpcResponse::Success(RpcSuccessResponse::ExecutionPayloadEnvelopesByRange(e)) + } + None => RpcResponse::StreamTermination( + ResponseTermination::ExecutionPayloadEnvelopesByRange, + ), + }, Response::Status(s) => RpcResponse::Success(RpcSuccessResponse::Status(s)), Response::LightClientBootstrap(b) => { RpcResponse::Success(RpcSuccessResponse::LightClientBootstrap(b)) diff --git a/src/service/gossip_cache.rs b/src/service/gossip_cache.rs index cde1797..d85ed2d 100644 --- a/src/service/gossip_cache.rs +++ b/src/service/gossip_cache.rs @@ -46,6 +46,8 @@ pub struct GossipCache { light_client_optimistic_update: Option, /// Timeout for execution payload bids. execution_payload_bid: Option, + /// Timeout for execution payload envelopes. + execution_payload: Option, } #[derive(Default)] @@ -79,6 +81,8 @@ pub struct GossipCacheBuilder { light_client_optimistic_update: Option, /// Timeout for execution payload bids. execution_payload_bid: Option, + /// Timeout for execution payload envelopes. + execution_payload: Option, } #[allow(dead_code)] @@ -161,6 +165,12 @@ impl GossipCacheBuilder { self } + /// Timeout for execution payload messages. + pub fn execution_payload_timeout(mut self, timeout: Duration) -> Self { + self.execution_payload = Some(timeout); + self + } + pub fn build(self) -> GossipCache { let GossipCacheBuilder { default_timeout, @@ -178,6 +188,7 @@ impl GossipCacheBuilder { light_client_finality_update, light_client_optimistic_update, execution_payload_bid, + execution_payload, } = self; GossipCache { expirations: DelayQueue::default(), @@ -196,6 +207,7 @@ impl GossipCacheBuilder { light_client_finality_update: light_client_finality_update.or(default_timeout), light_client_optimistic_update: light_client_optimistic_update.or(default_timeout), execution_payload_bid: execution_payload_bid.or(default_timeout), + execution_payload: execution_payload.or(default_timeout), } } } @@ -224,6 +236,7 @@ impl GossipCache { GossipKind::LightClientFinalityUpdate => self.light_client_finality_update, GossipKind::LightClientOptimisticUpdate => self.light_client_optimistic_update, GossipKind::ExecutionPayloadBid => self.execution_payload_bid, + GossipKind::ExecutionPayload => self.execution_payload, }; let Some(expire_timeout) = expire_timeout else { return; diff --git a/src/service/mod.rs b/src/service/mod.rs index 5a0f81d..3f8a56b 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -272,6 +272,7 @@ impl Network

{ // .sync_committee_message_timeout(timeout) // Do not retry .bls_to_execution_change_timeout(half_epoch * 2) .execution_payload_bid_timeout(chain_config.slot_duration_ms) + .execution_payload_timeout(chain_config.slot_duration_ms) .build() }; @@ -1621,6 +1622,28 @@ impl Network

{ request_type, }) } + RequestType::ExecutionPayloadEnvelopesByRange(_) => { + metrics::inc_counter_vec( + &metrics::TOTAL_RPC_REQUESTS, + &["execution_payload_envelopes_by_range"], + ); + Some(NetworkEvent::RequestReceived { + peer_id, + inbound_request_id, + request_type, + }) + } + RequestType::ExecutionPayloadEnvelopesByRoot(_) => { + metrics::inc_counter_vec( + &metrics::TOTAL_RPC_REQUESTS, + &["execution_payload_envelopes_by_root"], + ); + Some(NetworkEvent::RequestReceived { + peer_id, + inbound_request_id, + request_type, + }) + } RequestType::LightClientBootstrap(_) => { metrics::inc_counter_vec( &metrics::TOTAL_RPC_REQUESTS, @@ -1706,6 +1729,18 @@ impl Network

{ RpcSuccessResponse::DataColumnsByRange(resp) => { self.build_response(id, peer_id, Response::DataColumnsByRange(Some(resp))) } + RpcSuccessResponse::ExecutionPayloadEnvelopesByRange(resp) => self + .build_response( + id, + peer_id, + Response::ExecutionPayloadEnvelopesByRange(Some(resp)), + ), + RpcSuccessResponse::ExecutionPayloadEnvelopesByRoot(resp) => self + .build_response( + id, + peer_id, + Response::ExecutionPayloadEnvelopesByRoot(Some(resp)), + ), // Should never be reached RpcSuccessResponse::LightClientBootstrap(bootstrap) => { self.build_response(id, peer_id, Response::LightClientBootstrap(bootstrap)) @@ -1735,6 +1770,12 @@ impl Network

{ ResponseTermination::BlobsByRoot => Response::BlobsByRoot(None), ResponseTermination::DataColumnsByRoot => Response::DataColumnsByRoot(None), ResponseTermination::DataColumnsByRange => Response::DataColumnsByRange(None), + ResponseTermination::ExecutionPayloadEnvelopesByRange => { + Response::ExecutionPayloadEnvelopesByRange(None) + } + ResponseTermination::ExecutionPayloadEnvelopesByRoot => { + Response::ExecutionPayloadEnvelopesByRoot(None) + } ResponseTermination::LightClientUpdatesByRange => { Response::LightClientUpdatesByRange(None) } diff --git a/src/types/pubsub.rs b/src/types/pubsub.rs index 03490a7..8105489 100644 --- a/src/types/pubsub.rs +++ b/src/types/pubsub.rs @@ -31,7 +31,7 @@ use types::{ }, gloas::containers::{ DataColumnSidecar as GloasDataColumnSidecar, SignedBeaconBlock as GloasSignedBeaconBlock, - SignedExecutionPayloadBid, + SignedExecutionPayloadBid, SignedExecutionPayloadEnvelope, }, nonstandard::Phase, phase0::{ @@ -78,6 +78,8 @@ pub enum PubsubMessage { LightClientOptimisticUpdate(Box>), /// Gossipsub message providing notification of an execution payload bid. ExecutionPayloadBid(Arc>), + /// Gossipsub message providing notification of an execution payload envelope. + ExecutionPayload(Arc>), } // Implements the `DataTransform` trait of gossipsub to employ snappy compression @@ -181,6 +183,7 @@ impl PubsubMessage

{ GossipKind::LightClientOptimisticUpdate } PubsubMessage::ExecutionPayloadBid(_) => GossipKind::ExecutionPayloadBid, + PubsubMessage::ExecutionPayload(_) => GossipKind::ExecutionPayload, } } @@ -568,6 +571,30 @@ impl PubsubMessage

{ )), } } + GossipKind::ExecutionPayload => { + match fork_context.get_fork_from_context_bytes(gossip_topic.fork_digest) { + Some(Phase::Gloas) => { + let execution_payload_envelope = Arc::new( + SignedExecutionPayloadEnvelope::from_ssz_default(data) + .map_err(|e| format!("{:?}", e))?, + ); + Ok(PubsubMessage::ExecutionPayload(execution_payload_envelope)) + } + Some( + Phase::Phase0 + | Phase::Altair + | Phase::Bellatrix + | Phase::Capella + | Phase::Deneb + | Phase::Electra + | Phase::Fulu, + ) + | None => Err(format!( + "execution_payload topic invalid for given fork digest {:?}", + gossip_topic.fork_digest + )), + } + } } } } @@ -596,6 +623,7 @@ impl PubsubMessage

{ PubsubMessage::LightClientFinalityUpdate(data) => data.to_ssz(), PubsubMessage::LightClientOptimisticUpdate(data) => data.to_ssz(), PubsubMessage::ExecutionPayloadBid(data) => data.to_ssz(), + PubsubMessage::ExecutionPayload(data) => data.to_ssz(), } } } @@ -672,6 +700,13 @@ impl std::fmt::Display for PubsubMessage

{ data.message.slot, data.message.parent_block_root, data.message.builder_index ) } + PubsubMessage::ExecutionPayload(data) => { + write!( + f, + "Execution Payload: slot: {}, beacon_block_root: {:?}", + data.message.slot, data.message.beacon_block_root + ) + } } } } diff --git a/src/types/topics.rs b/src/types/topics.rs index 14ce48f..737d5d4 100644 --- a/src/types/topics.rs +++ b/src/types/topics.rs @@ -35,6 +35,7 @@ pub const BLS_TO_EXECUTION_CHANGE_TOPIC: &str = "bls_to_execution_change"; pub const LIGHT_CLIENT_FINALITY_UPDATE: &str = "light_client_finality_update"; pub const LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update"; pub const EXECUTION_PAYLOAD_BID_TOPIC: &str = "execution_payload_bid"; +pub const EXECUTION_PAYLOAD_TOPIC: &str = "execution_payload"; #[derive(Debug)] pub struct TopicConfig { @@ -104,6 +105,7 @@ pub fn core_topics_to_subscribe( if current_phase >= Phase::Gloas { topics.push(GossipKind::ExecutionPayloadBid); + topics.push(GossipKind::ExecutionPayload); } topics @@ -131,7 +133,8 @@ pub fn is_fork_non_core_topic(topic: &GossipTopic, _phase: Phase) -> bool { | GossipKind::BlsToExecutionChange | GossipKind::LightClientFinalityUpdate | GossipKind::LightClientOptimisticUpdate - | GossipKind::ExecutionPayloadBid => false, + | GossipKind::ExecutionPayloadBid + | GossipKind::ExecutionPayload => false, } } @@ -194,6 +197,8 @@ pub enum GossipKind { LightClientOptimisticUpdate, /// Topic for publishing execution payload bids. ExecutionPayloadBid, + /// Topic for publishing execution payload envelopes. + ExecutionPayload, } impl std::fmt::Display for GossipKind { @@ -276,6 +281,7 @@ impl GossipTopic { LIGHT_CLIENT_FINALITY_UPDATE => GossipKind::LightClientFinalityUpdate, LIGHT_CLIENT_OPTIMISTIC_UPDATE => GossipKind::LightClientOptimisticUpdate, EXECUTION_PAYLOAD_BID_TOPIC => GossipKind::ExecutionPayloadBid, + EXECUTION_PAYLOAD_TOPIC => GossipKind::ExecutionPayload, topic => match subnet_topic_index(topic) { Some(kind) => kind, None => return Err(format!("Unknown topic: {}", topic)), @@ -344,6 +350,7 @@ impl std::fmt::Display for GossipTopic { GossipKind::LightClientFinalityUpdate => LIGHT_CLIENT_FINALITY_UPDATE.into(), GossipKind::LightClientOptimisticUpdate => LIGHT_CLIENT_OPTIMISTIC_UPDATE.into(), GossipKind::ExecutionPayloadBid => EXECUTION_PAYLOAD_BID_TOPIC.into(), + GossipKind::ExecutionPayload => EXECUTION_PAYLOAD_TOPIC.into(), }; write!( f, @@ -416,6 +423,8 @@ mod tests { VoluntaryExit, ProposerSlashing, AttesterSlashing, + ExecutionPayloadBid, + ExecutionPayload, ] .iter() { @@ -512,6 +521,8 @@ mod tests { assert_eq!("voluntary_exit", VoluntaryExit.as_ref()); assert_eq!("proposer_slashing", ProposerSlashing.as_ref()); assert_eq!("attester_slashing", AttesterSlashing.as_ref()); + assert_eq!("execution_payload_bid", ExecutionPayloadBid.as_ref()); + assert_eq!("execution_payload", ExecutionPayload.as_ref()); } fn get_chain_config() -> ChainConfig { @@ -522,6 +533,7 @@ mod tests { config.deneb_fork_epoch = 4; config.electra_fork_epoch = 5; config.fulu_fork_epoch = 6; + config.gloas_fork_epoch = 7; config } @@ -593,6 +605,8 @@ mod tests { GossipKind::LightClientFinalityUpdate, GossipKind::LightClientOptimisticUpdate, GossipKind::BlsToExecutionChange, + GossipKind::ExecutionPayloadBid, + GossipKind::ExecutionPayload, ]; for subnet in s { expected_topics.push(GossipKind::DataColumnSidecar(subnet)); diff --git a/tests/rpc_tests.rs b/tests/rpc_tests.rs index ca313c1..c35e2d7 100644 --- a/tests/rpc_tests.rs +++ b/tests/rpc_tests.rs @@ -1612,3 +1612,104 @@ async fn test_active_requests() { } } } + +// Test that when a node receives an invalid BlocksByRange request exceeding the maximum count, +// it bans the sender. +#[tokio::test] +async fn test_request_too_large_blocks_by_range() { + let config = Arc::new(Config::mainnet().rapid_upgrade()); + + test_request_too_large( + AppRequestId::Application(1), + RequestType::BlocksByRange(OldBlocksByRangeRequest::new( + 0, + config.max_request_blocks(Phase::Phase0) + 1, // exceeds the max request defined in the spec. + 1, + )), + ) + .await; +} + +// Test that when a node receives an invalid BlobsByRange request exceeding the maximum count, +// it bans the sender. +#[tokio::test] +async fn test_request_too_large_blobs_by_range() { + let config = Arc::new(Config::mainnet().rapid_upgrade()); + + let max_request_blobs_count = + config.max_request_blob_sidecars(Phase::Phase0) / config.max_blobs_per_block(0); + test_request_too_large( + AppRequestId::Application(1), + RequestType::BlobsByRange(BlobsByRangeRequest { + start_slot: 0, + count: max_request_blobs_count + 1, // exceeds the max request defined in the spec. + }), + ) + .await; +} + +async fn test_request_too_large(app_request_id: AppRequestId, request: RequestType) { + // set up the logging. The level and enabled logging or not + let log_level = "debug"; + let enable_logging = false; + build_tracing_subscriber(log_level, enable_logging); + let config = Arc::new(Config::mainnet().rapid_upgrade()); + + // get sender/receiver + let (mut sender, mut receiver) = + common::build_node_pair::(&config, Phase::Phase0, Protocol::Tcp, false, None) + .await; + + // Build the sender future + let sender_future = async { + loop { + match sender.next_event().await { + NetworkEvent::PeerConnectedOutgoing(peer_id) => { + debug!(?request, %peer_id, "Sending RPC request"); + sender + .send_request(peer_id, app_request_id, request.clone()) + .unwrap(); + } + NetworkEvent::ResponseReceived { + app_request_id, + response, + .. + } => { + debug!(?app_request_id, ?response, "Received response"); + } + NetworkEvent::RPCFailed { error, .. } => { + // This variant should be unreachable, as the receiver doesn't respond with an error when a request exceeds the limit. + debug!(?error, "RPC failed"); + unreachable!(); + } + NetworkEvent::PeerDisconnected(peer_id) => { + // The receiver should disconnect as a result of the invalid request. + debug!(%peer_id, "Peer disconnected"); + // End the test. + return; + } + _ => {} + } + } + } + .instrument(info_span!("Sender")); + + // Build the receiver future + let receiver_future = async { + loop { + if let NetworkEvent::RequestReceived { .. } = receiver.next_event().await { + // This event should be unreachable, as the handler drops the invalid request. + unreachable!(); + } + } + } + .instrument(info_span!("Receiver")); + + tokio::select! { + _ = sender_future => {} + _ = receiver_future => {} + _ = sleep(Duration::from_secs(30)) => { + panic!("Future timed out"); + } + } +}