Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions src/rpc/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,9 @@ fn handle_error<T>(
Ok(None)
}
}
// All snappy errors from the snap crate bubble up as `Other` kind errors
// that imply invalid response
ErrorKind::Other => Err(RPCError::InvalidData(err.to_string())),
_ => Err(RPCError::from(err)),
}
}
Expand Down Expand Up @@ -1111,6 +1114,7 @@ mod tests {
types::{EnrAttestationBitfield, ForkContext},
};
use anyhow::Result;
use enum_iterator::Sequence;
use snap::write::FrameEncoder;
use ssz::{ByteList, DynamicList};
use std::io::Write;
Expand Down Expand Up @@ -2555,4 +2559,46 @@ mod tests {
RPCError::InvalidData(_)
));
}

/// Test invalid snappy response.
#[test]
fn test_invalid_snappy_response() {
let config = Arc::new(Config::mainnet().rapid_upgrade());
let fork_context = Arc::new(ForkContext::dummy::<Mainnet>(
&config,
Phase::last().expect("there should be at least one phase"),
));
let max_packet_size = config.max_payload_size; // 10 MiB.

let protocol = ProtocolId::new(SupportedProtocol::BlocksByRangeV2, Encoding::SSZSnappy);

let mut codec = SSZSnappyOutboundCodec::<Mainnet>::new(
protocol.clone(),
max_packet_size,
fork_context.clone(),
);

let mut payload = BytesMut::new();
payload.extend_from_slice(&[0u8]);
let deneb_epoch = config.deneb_fork_epoch;
payload.extend_from_slice(&fork_context.context_bytes(deneb_epoch).as_bytes());

// Claim the MAXIMUM allowed size (10 MiB)
let claimed_size = max_packet_size;
let mut uvi_codec: Uvi<usize> = Uvi::default();
uvi_codec.encode(claimed_size, &mut payload).unwrap();
payload.extend_from_slice(&[0xBB; 16]); // Junk snappy.

let result = codec.decode(&mut payload);

assert!(result.is_err(), "Expected decode to fail");

// IoError = reached snappy decode (allocation happened).
let err = result.unwrap_err();
assert!(
matches!(err, RPCError::InvalidData(_)),
"Should return invalid data variant {}",
err
);
}
}
17 changes: 14 additions & 3 deletions src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ use helper_functions::misc;
use libp2p::PeerId;
use libp2p::swarm::handler::{
ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, DialUpgradeError,
FullyNegotiatedInbound, FullyNegotiatedOutbound, StreamUpgradeError, SubstreamProtocol,
FullyNegotiatedInbound, FullyNegotiatedOutbound, ListenUpgradeError, StreamUpgradeError,
SubstreamProtocol,
};
use libp2p::swarm::{ConnectionId, Stream};
use logging::exception;
Expand Down Expand Up @@ -890,6 +891,16 @@ where
ConnectionEvent::DialUpgradeError(DialUpgradeError { info, error }) => {
self.on_dial_upgrade_error(info, error)
}
ConnectionEvent::ListenUpgradeError(ListenUpgradeError {
error: (proto, error),
..
}) => {
self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound {
id: self.current_inbound_substream_id,
proto,
error,
}));
}
_ => {
// NOTE: ConnectionEvent is a non exhaustive enum so updates should be based on
// release notes more than compiler feedback
Expand Down Expand Up @@ -926,7 +937,7 @@ where
request.count()
)),
}));
return self.shutdown(None);
return;
}
}
RequestType::BlobsByRange(request) => {
Expand All @@ -942,7 +953,7 @@ where
max_allowed, max_requested_blobs
)),
}));
return self.shutdown(None);
return;
}
}
_ => {}
Expand Down
10 changes: 6 additions & 4 deletions src/rpc/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ where
P: Preset,
{
type Output = InboundOutput<TSocket, P>;
type Error = RPCError;
type Error = (Protocol, RPCError);
type Future = BoxFuture<'static, Result<Self::Output, Self::Error>>;

fn upgrade_inbound(self, socket: TSocket, protocol: ProtocolId) -> Self::Future {
Expand Down Expand Up @@ -632,10 +632,12 @@ where
)
.await
{
Err(e) => Err(RPCError::from(e)),
Err(e) => Err((versioned_protocol.protocol(), RPCError::from(e))),
Ok((Some(Ok(request)), stream)) => Ok((request, stream)),
Ok((Some(Err(e)), _)) => Err(e),
Ok((None, _)) => Err(RPCError::IncompleteStream),
Ok((Some(Err(e)), _)) => Err((versioned_protocol.protocol(), e)),
Ok((None, _)) => {
Err((versioned_protocol.protocol(), RPCError::IncompleteStream))
}
}
}
}
Expand Down
13 changes: 13 additions & 0 deletions src/service/gossip_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ pub struct GossipCache {
light_client_optimistic_update: Option<Duration>,
/// Timeout for execution payload bids.
execution_payload_bid: Option<Duration>,
/// Timeout for payload attestation messages.
payload_attestation_message: Option<Duration>,
}

#[derive(Default)]
Expand Down Expand Up @@ -79,6 +81,8 @@ pub struct GossipCacheBuilder {
light_client_optimistic_update: Option<Duration>,
/// Timeout for execution payload bids.
execution_payload_bid: Option<Duration>,
/// Timeout for payload attestation messages.
payload_attestation_message: Option<Duration>,
}

#[allow(dead_code)]
Expand Down Expand Up @@ -161,6 +165,12 @@ impl GossipCacheBuilder {
self
}

/// Timeout for payload attestation messages.
pub fn payload_attestation_message_timeout(mut self, timeout: Duration) -> Self {
self.payload_attestation_message = Some(timeout);
self
}

pub fn build(self) -> GossipCache {
let GossipCacheBuilder {
default_timeout,
Expand All @@ -178,6 +188,7 @@ impl GossipCacheBuilder {
light_client_finality_update,
light_client_optimistic_update,
execution_payload_bid,
payload_attestation_message,
} = self;
GossipCache {
expirations: DelayQueue::default(),
Expand All @@ -196,6 +207,7 @@ impl GossipCacheBuilder {
light_client_finality_update: light_client_finality_update.or(default_timeout),
light_client_optimistic_update: light_client_optimistic_update.or(default_timeout),
execution_payload_bid: execution_payload_bid.or(default_timeout),
payload_attestation_message: payload_attestation_message.or(default_timeout),
}
}
}
Expand Down Expand Up @@ -224,6 +236,7 @@ impl GossipCache {
GossipKind::LightClientFinalityUpdate => self.light_client_finality_update,
GossipKind::LightClientOptimisticUpdate => self.light_client_optimistic_update,
GossipKind::ExecutionPayloadBid => self.execution_payload_bid,
GossipKind::PayloadAttestationMessage => self.payload_attestation_message,
};
let Some(expire_timeout) = expire_timeout else {
return;
Expand Down
1 change: 1 addition & 0 deletions src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ impl<P: Preset> Network<P> {
// .sync_committee_message_timeout(timeout) // Do not retry
.bls_to_execution_change_timeout(half_epoch * 2)
.execution_payload_bid_timeout(chain_config.slot_duration_ms)
.payload_attestation_message_timeout(chain_config.slot_duration_ms)
.build()
};

Expand Down
41 changes: 39 additions & 2 deletions src/types/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use types::{
DataColumnSidecar as FuluDataColumnSidecar, SignedBeaconBlock as FuluSignedBeaconBlock,
},
gloas::containers::{
DataColumnSidecar as GloasDataColumnSidecar, SignedBeaconBlock as GloasSignedBeaconBlock,
SignedExecutionPayloadBid,
DataColumnSidecar as GloasDataColumnSidecar, PayloadAttestationMessage,
SignedBeaconBlock as GloasSignedBeaconBlock, SignedExecutionPayloadBid,
},
nonstandard::Phase,
phase0::{
Expand Down Expand Up @@ -78,6 +78,8 @@ pub enum PubsubMessage<P: Preset> {
LightClientOptimisticUpdate(Box<LightClientOptimisticUpdate<P>>),
/// Gossipsub message providing notification of an execution payload bid.
ExecutionPayloadBid(Arc<SignedExecutionPayloadBid<P>>),
/// Gossipsub message providing notification of a payload attestation message.
PayloadAttestationMessage(Arc<PayloadAttestationMessage>),
}

// Implements the `DataTransform` trait of gossipsub to employ snappy compression
Expand Down Expand Up @@ -181,6 +183,7 @@ impl<P: Preset> PubsubMessage<P> {
GossipKind::LightClientOptimisticUpdate
}
PubsubMessage::ExecutionPayloadBid(_) => GossipKind::ExecutionPayloadBid,
PubsubMessage::PayloadAttestationMessage(_) => GossipKind::PayloadAttestationMessage,
}
}

Expand Down Expand Up @@ -568,6 +571,32 @@ impl<P: Preset> PubsubMessage<P> {
)),
}
}
GossipKind::PayloadAttestationMessage => {
match fork_context.get_fork_from_context_bytes(gossip_topic.fork_digest) {
Some(Phase::Gloas) => {
let payload_attestation = Arc::new(
PayloadAttestationMessage::from_ssz_default(data)
.map_err(|e| format!("{:?}", e))?,
);
Ok(PubsubMessage::PayloadAttestationMessage(
payload_attestation,
))
}
Some(
Phase::Phase0
| Phase::Altair
| Phase::Bellatrix
| Phase::Capella
| Phase::Deneb
| Phase::Electra
| Phase::Fulu,
)
| None => Err(format!(
"payload_attestation_message topic invalid for given fork digest {:?}",
gossip_topic.fork_digest
)),
}
}
}
}
}
Expand Down Expand Up @@ -596,6 +625,7 @@ impl<P: Preset> PubsubMessage<P> {
PubsubMessage::LightClientFinalityUpdate(data) => data.to_ssz(),
PubsubMessage::LightClientOptimisticUpdate(data) => data.to_ssz(),
PubsubMessage::ExecutionPayloadBid(data) => data.to_ssz(),
PubsubMessage::PayloadAttestationMessage(data) => data.to_ssz(),
}
}
}
Expand Down Expand Up @@ -672,6 +702,13 @@ impl<P: Preset> std::fmt::Display for PubsubMessage<P> {
data.message.slot, data.message.parent_block_root, data.message.builder_index
)
}
PubsubMessage::PayloadAttestationMessage(data) => {
write!(
f,
"Payload Attestation: slot: {}, beacon_block_root: {:?}, validator_index: {}",
data.data.slot, data.data.beacon_block_root, data.validator_index
)
}
}
}
}
18 changes: 17 additions & 1 deletion src/types/topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub const BLS_TO_EXECUTION_CHANGE_TOPIC: &str = "bls_to_execution_change";
pub const LIGHT_CLIENT_FINALITY_UPDATE: &str = "light_client_finality_update";
pub const LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic_update";
pub const EXECUTION_PAYLOAD_BID_TOPIC: &str = "execution_payload_bid";
pub const PAYLOAD_ATTESTATION_MESSAGE_TOPIC: &str = "payload_attestation_message";

#[derive(Debug)]
pub struct TopicConfig {
Expand Down Expand Up @@ -104,6 +105,7 @@ pub fn core_topics_to_subscribe(

if current_phase >= Phase::Gloas {
topics.push(GossipKind::ExecutionPayloadBid);
topics.push(GossipKind::PayloadAttestationMessage);
}

topics
Expand Down Expand Up @@ -131,7 +133,8 @@ pub fn is_fork_non_core_topic(topic: &GossipTopic, _phase: Phase) -> bool {
| GossipKind::BlsToExecutionChange
| GossipKind::LightClientFinalityUpdate
| GossipKind::LightClientOptimisticUpdate
| GossipKind::ExecutionPayloadBid => false,
| GossipKind::ExecutionPayloadBid
| GossipKind::PayloadAttestationMessage => false,
}
}

Expand Down Expand Up @@ -194,6 +197,8 @@ pub enum GossipKind {
LightClientOptimisticUpdate,
/// Topic for publishing execution payload bids.
ExecutionPayloadBid,
/// Topic for publishing payload attestation messages.
PayloadAttestationMessage,
}

impl std::fmt::Display for GossipKind {
Expand Down Expand Up @@ -276,6 +281,7 @@ impl GossipTopic {
LIGHT_CLIENT_FINALITY_UPDATE => GossipKind::LightClientFinalityUpdate,
LIGHT_CLIENT_OPTIMISTIC_UPDATE => GossipKind::LightClientOptimisticUpdate,
EXECUTION_PAYLOAD_BID_TOPIC => GossipKind::ExecutionPayloadBid,
PAYLOAD_ATTESTATION_MESSAGE_TOPIC => GossipKind::PayloadAttestationMessage,
topic => match subnet_topic_index(topic) {
Some(kind) => kind,
None => return Err(format!("Unknown topic: {}", topic)),
Expand Down Expand Up @@ -344,6 +350,7 @@ impl std::fmt::Display for GossipTopic {
GossipKind::LightClientFinalityUpdate => LIGHT_CLIENT_FINALITY_UPDATE.into(),
GossipKind::LightClientOptimisticUpdate => LIGHT_CLIENT_OPTIMISTIC_UPDATE.into(),
GossipKind::ExecutionPayloadBid => EXECUTION_PAYLOAD_BID_TOPIC.into(),
GossipKind::PayloadAttestationMessage => PAYLOAD_ATTESTATION_MESSAGE_TOPIC.into(),
};
write!(
f,
Expand Down Expand Up @@ -416,6 +423,8 @@ mod tests {
VoluntaryExit,
ProposerSlashing,
AttesterSlashing,
ExecutionPayloadBid,
PayloadAttestationMessage,
]
.iter()
{
Expand Down Expand Up @@ -512,6 +521,11 @@ mod tests {
assert_eq!("voluntary_exit", VoluntaryExit.as_ref());
assert_eq!("proposer_slashing", ProposerSlashing.as_ref());
assert_eq!("attester_slashing", AttesterSlashing.as_ref());
assert_eq!("execution_payload_bid", ExecutionPayloadBid.as_ref());
assert_eq!(
"payload_attestation_message",
PayloadAttestationMessage.as_ref()
);
}

fn get_chain_config() -> ChainConfig {
Expand Down Expand Up @@ -593,6 +607,8 @@ mod tests {
GossipKind::LightClientFinalityUpdate,
GossipKind::LightClientOptimisticUpdate,
GossipKind::BlsToExecutionChange,
GossipKind::ExecutionPayloadBid,
GossipKind::PayloadAttestationMessage,
];
for subnet in s {
expected_topics.push(GossipKind::DataColumnSidecar(subnet));
Expand Down
Loading