Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,8 @@ impl PeerManager {
Protocol::BlobsByRoot => PeerAction::MidToleranceError,
Protocol::DataColumnsByRoot => PeerAction::MidToleranceError,
Protocol::DataColumnsByRange => PeerAction::MidToleranceError,
Protocol::ExecutionPayloadEnvelopesByRange => PeerAction::MidToleranceError,
Protocol::ExecutionPayloadEnvelopesByRoot => PeerAction::MidToleranceError,
Protocol::Goodbye => PeerAction::LowToleranceError,
Protocol::MetaData => PeerAction::LowToleranceError,
Protocol::Status => PeerAction::LowToleranceError,
Expand All @@ -621,6 +623,8 @@ impl PeerManager {
Protocol::BlobsByRoot => return,
Protocol::DataColumnsByRoot => return,
Protocol::DataColumnsByRange => return,
Protocol::ExecutionPayloadEnvelopesByRange => return,
Protocol::ExecutionPayloadEnvelopesByRoot => return,
Protocol::Goodbye => return,
Protocol::LightClientBootstrap => return,
Protocol::LightClientOptimisticUpdate => return,
Expand All @@ -644,6 +648,8 @@ impl PeerManager {
Protocol::BlobsByRoot => PeerAction::MidToleranceError,
Protocol::DataColumnsByRoot => PeerAction::MidToleranceError,
Protocol::DataColumnsByRange => PeerAction::MidToleranceError,
Protocol::ExecutionPayloadEnvelopesByRange => PeerAction::MidToleranceError,
Protocol::ExecutionPayloadEnvelopesByRoot => PeerAction::MidToleranceError,
Protocol::LightClientBootstrap => return,
Protocol::LightClientOptimisticUpdate => return,
Protocol::LightClientFinalityUpdate => return,
Expand Down
128 changes: 128 additions & 0 deletions src/rpc/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use types::{
},
gloas::containers::{
DataColumnSidecar as GloasDataColumnSidecar, SignedBeaconBlock as GloasSignedBeaconBlock,
SignedExecutionPayloadEnvelope,
},
nonstandard::Phase,
phase0::{containers::SignedBeaconBlock as Phase0SignedBeaconBlock, primitives::ForkDigest},
Expand Down Expand Up @@ -98,6 +99,8 @@ impl<P: Preset> SSZSnappyInboundCodec<P> {
RpcSuccessResponse::BlobsByRoot(res) => res.to_ssz()?,
RpcSuccessResponse::DataColumnsByRoot(res) => res.to_ssz()?,
RpcSuccessResponse::DataColumnsByRange(res) => res.to_ssz()?,
RpcSuccessResponse::ExecutionPayloadEnvelopesByRoot(res) => res.to_ssz()?,
RpcSuccessResponse::ExecutionPayloadEnvelopesByRange(res) => res.to_ssz()?,
RpcSuccessResponse::LightClientBootstrap(res) => res.to_ssz()?,
RpcSuccessResponse::LightClientOptimisticUpdate(res) => res.to_ssz()?,
RpcSuccessResponse::LightClientFinalityUpdate(res) => res.to_ssz()?,
Expand Down Expand Up @@ -397,6 +400,8 @@ impl<P: Preset> Encoder<RequestType<P>> for SSZSnappyOutboundCodec<P> {
RequestType::BlobsByRoot(req) => req.blob_ids.to_ssz()?,
RequestType::DataColumnsByRange(req) => req.to_ssz()?,
RequestType::DataColumnsByRoot(req) => req.data_column_ids.to_ssz()?,
RequestType::ExecutionPayloadEnvelopesByRange(req) => req.to_ssz()?,
RequestType::ExecutionPayloadEnvelopesByRoot(req) => req.block_roots.to_ssz()?,
RequestType::Ping(req) => req.to_ssz()?,
RequestType::LightClientBootstrap(req) => req.to_ssz()?,
RequestType::LightClientUpdatesByRange(req) => req.to_ssz()?,
Expand Down Expand Up @@ -512,6 +517,9 @@ fn handle_error<T>(
Ok(None)
}
}
// All snappy errors from the snap crate bubble up as `Other` kind errors
// that imply invalid response
ErrorKind::Other => Err(RPCError::InvalidData(err.to_string())),
_ => Err(RPCError::from(err)),
}
}
Expand Down Expand Up @@ -628,6 +636,19 @@ fn handle_rpc_request<P: Preset>(
)?,
},
))),
SupportedProtocol::ExecutionPayloadEnvelopesByRangeV1 => {
Ok(Some(RequestType::ExecutionPayloadEnvelopesByRange(
ExecutionPayloadEnvelopesByRangeRequest::from_ssz_default(decoded_buffer)?,
)))
}
SupportedProtocol::ExecutionPayloadEnvelopesByRootV1 => Ok(Some(
RequestType::ExecutionPayloadEnvelopesByRoot(ExecutionPayloadEnvelopesByRootRequest {
block_roots: DynamicList::from_ssz(
&(config.max_request_payloads as usize),
decoded_buffer,
)?,
}),
)),
SupportedProtocol::PingV1 => Ok(Some(RequestType::Ping(Ping {
data: u64::from_ssz_default(decoded_buffer)?,
}))),
Expand Down Expand Up @@ -799,6 +820,58 @@ fn handle_rpc_response<P: Preset>(
),
)),
},
SupportedProtocol::ExecutionPayloadEnvelopesByRangeV1 => match fork_name {
Some(Phase::Gloas) => Ok(Some(RpcSuccessResponse::ExecutionPayloadEnvelopesByRange(
Arc::new(SignedExecutionPayloadEnvelope::from_ssz_default(
decoded_buffer,
)?),
))),
Some(
Phase::Phase0
| Phase::Altair
| Phase::Bellatrix
| Phase::Capella
| Phase::Deneb
| Phase::Electra
| Phase::Fulu,
) => Err(RPCError::ErrorResponse(
RpcErrorResponse::InvalidRequest,
"Invalid fork name for execution payload envelopes by range".to_string(),
)),
None => Err(RPCError::ErrorResponse(
RpcErrorResponse::InvalidRequest,
format!(
"No context bytes provided for {:?} response",
versioned_protocol
),
)),
},
SupportedProtocol::ExecutionPayloadEnvelopesByRootV1 => match fork_name {
Some(Phase::Gloas) => Ok(Some(RpcSuccessResponse::ExecutionPayloadEnvelopesByRoot(
Arc::new(SignedExecutionPayloadEnvelope::from_ssz_default(
decoded_buffer,
)?),
))),
Some(
Phase::Phase0
| Phase::Altair
| Phase::Bellatrix
| Phase::Capella
| Phase::Deneb
| Phase::Electra
| Phase::Fulu,
) => Err(RPCError::ErrorResponse(
RpcErrorResponse::InvalidRequest,
"Invalid fork name for execution payload envelopes by root".to_string(),
)),
None => Err(RPCError::ErrorResponse(
RpcErrorResponse::InvalidRequest,
format!(
"No context bytes provided for {:?} response",
versioned_protocol
),
)),
},
SupportedProtocol::PingV1 => Ok(Some(RpcSuccessResponse::Pong(Ping {
data: u64::from_ssz_default(decoded_buffer)?,
}))),
Expand Down Expand Up @@ -1111,6 +1184,7 @@ mod tests {
types::{EnrAttestationBitfield, ForkContext},
};
use anyhow::Result;
use enum_iterator::Sequence;
use snap::write::FrameEncoder;
use ssz::{ByteList, DynamicList};
use std::io::Write;
Expand Down Expand Up @@ -1477,6 +1551,18 @@ mod tests {
RequestType::DataColumnsByRange(dcbrange) => {
assert_eq!(decoded, RequestType::DataColumnsByRange(dcbrange))
}
RequestType::ExecutionPayloadEnvelopesByRange(epbrange) => {
assert_eq!(
decoded,
RequestType::ExecutionPayloadEnvelopesByRange(epbrange)
)
}
RequestType::ExecutionPayloadEnvelopesByRoot(epbroot) => {
assert_eq!(
decoded,
RequestType::ExecutionPayloadEnvelopesByRoot(epbroot)
)
}
RequestType::Ping(ping) => {
assert_eq!(decoded, RequestType::Ping(ping))
}
Expand Down Expand Up @@ -2555,4 +2641,46 @@ mod tests {
RPCError::InvalidData(_)
));
}

/// Test invalid snappy response.
#[test]
fn test_invalid_snappy_response() {
let config = Arc::new(Config::mainnet().rapid_upgrade());
let fork_context = Arc::new(ForkContext::dummy::<Mainnet>(
&config,
Phase::last().expect("there should be at least one phase"),
));
let max_packet_size = config.max_payload_size; // 10 MiB.

let protocol = ProtocolId::new(SupportedProtocol::BlocksByRangeV2, Encoding::SSZSnappy);

let mut codec = SSZSnappyOutboundCodec::<Mainnet>::new(
protocol.clone(),
max_packet_size,
fork_context.clone(),
);

let mut payload = BytesMut::new();
payload.extend_from_slice(&[0u8]);
let deneb_epoch = config.deneb_fork_epoch;
payload.extend_from_slice(&fork_context.context_bytes(deneb_epoch).as_bytes());

// Claim the MAXIMUM allowed size (10 MiB)
let claimed_size = max_packet_size;
let mut uvi_codec: Uvi<usize> = Uvi::default();
uvi_codec.encode(claimed_size, &mut payload).unwrap();
payload.extend_from_slice(&[0xBB; 16]); // Junk snappy.

let result = codec.decode(&mut payload);

assert!(result.is_err(), "Expected decode to fail");

// IoError = reached snappy decode (allocation happened).
let err = result.unwrap_err();
assert!(
matches!(err, RPCError::InvalidData(_)),
"Should return invalid data variant {}",
err
);
}
}
32 changes: 32 additions & 0 deletions src/rpc/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ pub struct RateLimiterConfig {
pub(super) blobs_by_root_quota: Quota,
pub(super) data_columns_by_root_quota: Quota,
pub(super) data_columns_by_range_quota: Quota,
pub(super) execution_payload_envelopes_by_range_quota: Quota,
pub(super) execution_payload_envelopes_by_root_quota: Quota,
pub(super) light_client_bootstrap_quota: Quota,
pub(super) light_client_optimistic_update_quota: Quota,
pub(super) light_client_finality_update_quota: Quota,
Expand Down Expand Up @@ -126,6 +128,10 @@ impl RateLimiterConfig {
pub const DEFAULT_LIGHT_CLIENT_OPTIMISTIC_UPDATE_QUOTA: Quota = Quota::one_every(10);
pub const DEFAULT_LIGHT_CLIENT_FINALITY_UPDATE_QUOTA: Quota = Quota::one_every(10);
pub const DEFAULT_LIGHT_CLIENT_UPDATES_BY_RANGE_QUOTA: Quota = Quota::one_every(10);
pub const DEFAULT_EXECUTION_PAYLOAD_ENVELOPES_BY_RANGE_QUOTA: Quota =
Quota::n_every(NonZeroU64::new(128).unwrap(), 10);
pub const DEFAULT_EXECUTION_PAYLOAD_ENVELOPES_BY_ROOT_QUOTA: Quota =
Quota::n_every(NonZeroU64::new(128).unwrap(), 10);
}

impl Default for RateLimiterConfig {
Expand All @@ -141,6 +147,10 @@ impl Default for RateLimiterConfig {
blobs_by_root_quota: Self::DEFAULT_BLOBS_BY_ROOT_QUOTA,
data_columns_by_root_quota: Self::DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA,
data_columns_by_range_quota: Self::DEFAULT_DATA_COLUMNS_BY_RANGE_QUOTA,
execution_payload_envelopes_by_range_quota:
Self::DEFAULT_EXECUTION_PAYLOAD_ENVELOPES_BY_RANGE_QUOTA,
execution_payload_envelopes_by_root_quota:
Self::DEFAULT_EXECUTION_PAYLOAD_ENVELOPES_BY_ROOT_QUOTA,
light_client_bootstrap_quota: Self::DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA,
light_client_optimistic_update_quota:
Self::DEFAULT_LIGHT_CLIENT_OPTIMISTIC_UPDATE_QUOTA,
Expand Down Expand Up @@ -179,6 +189,14 @@ impl Debug for RateLimiterConfig {
"data_columns_by_root",
fmt_q!(&self.data_columns_by_root_quota),
)
.field(
"execution_payload_envelopes_by_range",
fmt_q!(&self.execution_payload_envelopes_by_range_quota),
)
.field(
"execution_payload_envelopes_by_root",
fmt_q!(&self.execution_payload_envelopes_by_root_quota),
)
.finish()
}
}
Expand All @@ -201,6 +219,8 @@ impl FromStr for RateLimiterConfig {
let mut blobs_by_root_quota = None;
let mut data_columns_by_root_quota = None;
let mut data_columns_by_range_quota = None;
let mut execution_payload_envelopes_by_range_quota = None;
let mut execution_payload_envelopes_by_root_quota = None;
let mut light_client_bootstrap_quota = None;
let mut light_client_optimistic_update_quota = None;
let mut light_client_finality_update_quota = None;
Expand All @@ -222,6 +242,14 @@ impl FromStr for RateLimiterConfig {
Protocol::DataColumnsByRange => {
data_columns_by_range_quota = data_columns_by_range_quota.or(quota)
}
Protocol::ExecutionPayloadEnvelopesByRange => {
execution_payload_envelopes_by_range_quota =
execution_payload_envelopes_by_range_quota.or(quota)
}
Protocol::ExecutionPayloadEnvelopesByRoot => {
execution_payload_envelopes_by_root_quota =
execution_payload_envelopes_by_root_quota.or(quota)
}
Protocol::Ping => ping_quota = ping_quota.or(quota),
Protocol::MetaData => meta_data_quota = meta_data_quota.or(quota),
Protocol::LightClientBootstrap => {
Expand Down Expand Up @@ -257,6 +285,10 @@ impl FromStr for RateLimiterConfig {
.unwrap_or(Self::DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA),
data_columns_by_range_quota: data_columns_by_range_quota
.unwrap_or(Self::DEFAULT_DATA_COLUMNS_BY_RANGE_QUOTA),
execution_payload_envelopes_by_range_quota: execution_payload_envelopes_by_range_quota
.unwrap_or(Self::DEFAULT_EXECUTION_PAYLOAD_ENVELOPES_BY_RANGE_QUOTA),
execution_payload_envelopes_by_root_quota: execution_payload_envelopes_by_root_quota
.unwrap_or(Self::DEFAULT_EXECUTION_PAYLOAD_ENVELOPES_BY_ROOT_QUOTA),
light_client_bootstrap_quota: light_client_bootstrap_quota
.unwrap_or(Self::DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA),
light_client_optimistic_update_quota: light_client_optimistic_update_quota
Expand Down
17 changes: 14 additions & 3 deletions src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ use helper_functions::misc;
use libp2p::PeerId;
use libp2p::swarm::handler::{
ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol,
FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, StreamUpgradeError,
SubstreamProtocol,
};
use libp2p::swarm::{ConnectionId, Stream};
use logging::exception;
Expand Down Expand Up @@ -890,6 +891,16 @@ where
ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error }) => {
self.on_dial_upgrade_error(info, error)
}
ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
error: (proto, error),
..
}) => {
self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound {
id: self.current_inbound_substream_id,
proto,
error,
}));
}
_ => {
// NOTE: ConnectionEvent is a non exhaustive enum so updates should be based on
// release notes more than compiler feedback
Expand Down Expand Up @@ -926,7 +937,7 @@ where
request.count()
)),
}));
return self.shutdown(None);
return;
}
}
RequestType::BlobsByRange(request) => {
Expand All @@ -942,7 +953,7 @@ where
max_allowed, max_requested_blobs
)),
}));
return self.shutdown(None);
return;
}
}
_ => {}
Expand Down
Loading