Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 20 additions & 5 deletions block_producer/src/block_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use itertools::{Either, Itertools as _};
use keymanager::ProposerConfigs;
use logging::{error_with_peers, info_with_peers, warn_with_peers};
use operation_pools::{
AttestationAggPool, BlsToExecutionChangePool, PoolAdditionOutcome, PoolRejectionReason,
SyncCommitteeAggPool,
AttestationAggPool, BlsToExecutionChangePool, PayloadAttestationAggPool, PoolAdditionOutcome,
PoolRejectionReason, SyncCommitteeAggPool,
};
use prometheus_metrics::Metrics;
use pubkey_cache::PubkeyCache;
Expand Down Expand Up @@ -74,7 +74,7 @@ use types::{
fulu::containers::{BeaconBlock as FuluBeaconBlock, BeaconBlockBody as FuluBeaconBlockBody},
gloas::containers::{
BeaconBlock as GloasBeaconBlock, BeaconBlockBody as GloasBeaconBlockBody,
SignedExecutionPayloadBid,
PayloadAttestation, SignedExecutionPayloadBid,
},
nonstandard::{BlockRewards, Phase, WithBlobsAndMev},
phase0::{
Expand Down Expand Up @@ -125,6 +125,7 @@ impl<P: Preset, W: Wait> BlockProducer<P, W> {
attestation_agg_pool: Arc<AttestationAggPool<P, W>>,
bls_to_execution_change_pool: Arc<BlsToExecutionChangePool>,
sync_committee_agg_pool: Arc<SyncCommitteeAggPool<P, W>>,
payload_attestation_agg_pool: Arc<PayloadAttestationAggPool<P, W>>,
metrics: Option<Arc<Metrics>>,
options: Option<Options>,
) -> Self {
Expand All @@ -143,6 +144,7 @@ impl<P: Preset, W: Wait> BlockProducer<P, W> {
attestation_agg_pool,
bls_to_execution_change_pool,
sync_committee_agg_pool,
payload_attestation_agg_pool,
prepared_proposers: Mutex::new(HashMap::new()),
proposer_slashings: Mutex::new(vec![]),
attester_slashings: Mutex::new(vec![]),
Expand Down Expand Up @@ -617,6 +619,7 @@ struct ProducerContext<P: Preset, W: Wait> {
attestation_agg_pool: Arc<AttestationAggPool<P, W>>,
bls_to_execution_change_pool: Arc<BlsToExecutionChangePool>,
sync_committee_agg_pool: Arc<SyncCommitteeAggPool<P, W>>,
payload_attestation_agg_pool: Arc<PayloadAttestationAggPool<P, W>>,
prepared_proposers: Mutex<HashMap<ValidatorIndex, ExecutionAddress>>,
proposer_slashings: Mutex<Vec<ProposerSlashing>>,
attester_slashings: Mutex<Vec<AttesterSlashing<P>>>,
Expand Down Expand Up @@ -982,8 +985,7 @@ impl<P: Preset, W: Wait> BlockBuildContext<P, W> {
},
})),
Phase::Gloas => {
// TODO(gloas): prepare_payload_attestations
let payload_attestations = ContiguousList::default();
let payload_attestations = self.prepare_payload_attestations().await?;

BeaconBlock::from(Hc::new(GloasBeaconBlock {
slot,
Expand Down Expand Up @@ -1395,6 +1397,19 @@ impl<P: Preset, W: Wait> BlockBuildContext<P, W> {
.await
}

// Constructed `payload_attestations` in current block is to aggregate all payload attestations for the previous block payload.
// See <https://github.com/ethereum/consensus-specs/blob/v1.7.0-alpha.1/specs/gloas/validator.md#constructing-payload_attestations>
async fn prepare_payload_attestations(
&self,
) -> Result<ContiguousList<PayloadAttestation<P>, P::MaxPayloadAttestation>> {
let message_slot = misc::previous_slot(self.beacon_state.slot());

self.producer_context
.payload_attestation_agg_pool
.aggregate_payload_attestations(message_slot)
.await
}

async fn prepare_voluntary_exits(
&self,
) -> ContiguousList<SignedVoluntaryExit, P::MaxVoluntaryExits> {
Expand Down
9 changes: 9 additions & 0 deletions clock/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,15 @@ impl Tick {
)
}

#[must_use]
pub fn is_end_of_slot<P: Preset>(self, config: &Config) -> bool {
if config.phase_at_slot::<P>(self.slot) >= Phase::Gloas {
matches!(self.kind, TickKind::PayloadAttest)
} else {
matches!(self.kind, TickKind::Aggregate)
}
}

#[must_use]
pub const fn is_end_of_interval(self) -> bool {
matches!(
Expand Down
2 changes: 1 addition & 1 deletion eth2_libp2p
54 changes: 51 additions & 3 deletions fork_choice_control/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use execution_engine::{ExecutionEngine, PayloadStatusV1};
use fork_choice_store::{
AggregateAndProofOrigin, AttestationItem, AttestationOrigin, AttesterSlashingOrigin,
BlobSidecarOrigin, BlockOrigin, DataColumnSidecarOrigin, ExecutionPayloadBidOrigin,
StateCacheProcessor, Store, StoreConfig,
PayloadAttestationItem, PayloadAttestationOrigin, StateCacheProcessor, Store, StoreConfig,
};
use futures::channel::{mpsc::Sender as MultiSender, oneshot::Sender as OneshotSender};
use genesis::AnchorCheckpointProvider;
Expand All @@ -43,7 +43,7 @@ use types::{
config::Config as ChainConfig,
deneb::containers::BlobSidecar,
fulu::{containers::DataColumnIdentifier, primitives::ColumnIndex},
gloas::containers::SignedExecutionPayloadBid,
gloas::containers::{PayloadAttestationMessage, SignedExecutionPayloadBid},
nonstandard::ValidationOutcome,
phase0::{
containers::Checkpoint,
Expand All @@ -70,7 +70,7 @@ use crate::{
tasks::{
AggregateAndProofTask, AttestationTask, AttesterSlashingTask, BlobSidecarTask, BlockTask,
BlockVerifyForGossipTask, DataColumnSidecarTask, ExecutionPayloadBidTask,
StateAtSlotCacheFlushTask,
PayloadAttestationBatchTask, PayloadAttestationTask, StateAtSlotCacheFlushTask,
},
thread_pool::{Spawn, ThreadPool},
unbounded_sink::UnboundedSink,
Expand Down Expand Up @@ -647,6 +647,54 @@ where
.await
}

pub fn on_payload_attestation(
&self,
wait_group: W,
payload_attestation: PayloadAttestationItem<P>,
) {
self.spawn(PayloadAttestationTask {
store_snapshot: self.owned_store_snapshot(),
mutator_tx: self.owned_mutator_tx(),
wait_group,
payload_attestation,
metrics: self.metrics.clone(),
})
}

pub fn on_gossip_payload_attestation(
&self,
payload_attestation: Arc<PayloadAttestationMessage>,
gossip_id: GossipId,
) {
self.spawn(PayloadAttestationTask {
store_snapshot: self.owned_store_snapshot(),
mutator_tx: self.owned_mutator_tx(),
wait_group: self.owned_wait_group(),
payload_attestation: PayloadAttestationItem::unverified(
Arc::new(payload_attestation.into()),
PayloadAttestationOrigin::Gossip(gossip_id),
),
metrics: self.metrics.clone(),
})
}

pub fn on_api_payload_attestation_batch(
&self,
payload_attestations: Vec<PayloadAttestationItem<P>>,
) {
if payload_attestations.is_empty() {
return;
}

self.spawn(PayloadAttestationBatchTask {
store_snapshot: self.owned_store_snapshot(),
mutator_tx: self.owned_mutator_tx(),
wait_group: self.owned_wait_group(),
payload_attestations,
metrics: self.metrics.clone(),
})
}

pub fn on_reconstruction(
&self,
wait_group: W,
Expand Down
53 changes: 52 additions & 1 deletion fork_choice_control/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use types::{
primitives::{BlobIndex, KzgCommitment, VersionedHash},
},
fulu::primitives::ColumnIndex,
gloas::containers::SignedExecutionPayloadBid,
gloas::containers::{PayloadAttestationMessage, SignedExecutionPayloadBid},
nonstandard::Phase,
phase0::{
containers::{Checkpoint, ProposerSlashing, SignedVoluntaryExit},
Expand Down Expand Up @@ -54,6 +54,7 @@ pub enum Topic {
ExecutionPayloadBid,
FinalizedCheckpoint,
Head,
PayloadAttestation,
PayloadAttributes,
ProposerSlashing,
VoluntaryExit,
Expand All @@ -73,6 +74,7 @@ pub enum Event<P: Preset> {
ExecutionPayloadBid(Arc<SignedExecutionPayloadBid<P>>),
FinalizedCheckpoint(FinalizedCheckpointEvent),
Head(HeadEvent),
PayloadAttestation(PayloadAttestationEvent),
PayloadAttributes(PayloadAttributesEvent),
ProposerSlashing(Box<ProposerSlashing>),
VoluntaryExit(Box<SignedVoluntaryExit>),
Expand All @@ -94,6 +96,7 @@ impl<P: Preset> Event<P> {
Self::ExecutionPayloadBid(_) => Topic::ExecutionPayloadBid,
Self::FinalizedCheckpoint(_) => Topic::FinalizedCheckpoint,
Self::Head(_) => Topic::Head,
Self::PayloadAttestation(_) => Topic::PayloadAttestation,
Self::PayloadAttributes(_) => Topic::PayloadAttributes,
Self::ProposerSlashing(_) => Topic::ProposerSlashing,
Self::VoluntaryExit(_) => Topic::VoluntaryExit,
Expand All @@ -116,6 +119,7 @@ pub struct EventChannels<P: Preset> {
pub execution_payload_bids: Sender<Event<P>>,
pub finalized_checkpoints: Sender<Event<P>>,
pub heads: Sender<Event<P>>,
pub payload_attestations: Sender<Event<P>>,
pub payload_attributes: Sender<Event<P>>,
pub proposer_slashings: Sender<Event<P>>,
pub voluntary_exits: Sender<Event<P>>,
Expand Down Expand Up @@ -145,6 +149,7 @@ impl<P: Preset> EventChannels<P> {
execution_payload_bids: broadcast::channel(max_events).0,
finalized_checkpoints: broadcast::channel(max_events).0,
heads: broadcast::channel(max_events).0,
payload_attestations: broadcast::channel(max_events).0,
payload_attributes: broadcast::channel(max_events).0,
proposer_slashings: broadcast::channel(max_events).0,
voluntary_exits: broadcast::channel(max_events).0,
Expand All @@ -167,6 +172,7 @@ impl<P: Preset> EventChannels<P> {
Topic::ExecutionPayloadBid => &self.execution_payload_bids,
Topic::FinalizedCheckpoint => &self.finalized_checkpoints,
Topic::Head => &self.heads,
Topic::PayloadAttestation => &self.payload_attestations,
Topic::PayloadAttributes => &self.payload_attributes,
Topic::ProposerSlashing => &self.proposer_slashings,
Topic::VoluntaryExit => &self.voluntary_exits,
Expand Down Expand Up @@ -301,6 +307,17 @@ impl<P: Preset> EventChannels<P> {
}
}

pub fn send_payload_attestation_event(
&self,
phase: Phase,
payload_attestation: &Arc<PayloadAttestationMessage>,
) {
if let Err(error) = self.send_payload_attestation_event_internal(phase, payload_attestation)
{
warn_with_peers!("unable to send payload attestation event: {error}");
}
}

#[expect(clippy::too_many_arguments)]
pub fn send_payload_attributes_event(
&self,
Expand Down Expand Up @@ -520,6 +537,21 @@ impl<P: Preset> EventChannels<P> {
Ok(())
}

fn send_payload_attestation_event_internal(
&self,
phase: Phase,
payload_attestation: &Arc<PayloadAttestationMessage>,
) -> Result<()> {
if self.payload_attestations.receiver_count() > 0 {
let payload_attestation_event =
PayloadAttestationEvent::new(phase, payload_attestation);
let event = Event::PayloadAttestation(payload_attestation_event);
self.payload_attestations.send(event)?;
}

Ok(())
}

#[expect(clippy::too_many_arguments)]
fn send_payload_attributes_event_internal(
&self,
Expand Down Expand Up @@ -742,6 +774,25 @@ impl HeadEvent {
}
}

#[derive(Clone, Copy, Debug, Serialize)]
pub struct PayloadAttestationEvent {
pub version: Phase,
pub data: PayloadAttestationMessage,
}

impl PayloadAttestationEvent {
fn new(phase: Phase, payload_attestation: &Arc<PayloadAttestationMessage>) -> Self {
Self {
version: phase,
data: PayloadAttestationMessage {
validator_index: payload_attestation.validator_index,
data: payload_attestation.data,
signature: payload_attestation.signature,
},
}
}
}

#[derive(Clone, Debug, Serialize)]
pub struct PayloadAttributesEvent {
pub version: Phase,
Expand Down
2 changes: 1 addition & 1 deletion fork_choice_control/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub use crate::{
},
misc::{
MutatorRejectionReason, SidecarsPendingReconstruction, VerifyAggregateAndProofResult,
VerifyAttestationResult,
VerifyAttestationResult, VerifyPayloadAttestationResult,
},
queries::{BlockWithRoot, ForkChoiceContext, ForkTip, Snapshot},
specialized::{AdHocBenchController, BenchController},
Expand Down
16 changes: 15 additions & 1 deletion fork_choice_control/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use types::{
},
deneb::containers::{BlobIdentifier, BlobSidecar},
fulu::{containers::DataColumnIdentifier, primitives::ColumnIndex},
gloas::containers::PayloadAttestationMessage,
phase0::{
containers::Checkpoint,
primitives::{ExecutionBlockHash, H256, Slot, ValidatorIndex},
Expand All @@ -33,7 +34,7 @@ use types::{
use crate::{
misc::{
MutatorRejectionReason, ProcessingTimings, VerifyAggregateAndProofResult,
VerifyAttestationResult,
VerifyAttestationResult, VerifyPayloadAttestationResult,
},
unbounded_sink::UnboundedSink,
};
Expand Down Expand Up @@ -111,6 +112,10 @@ pub enum MutatorMessage<P: Preset, W> {
results:
Vec<Result<AttestationAction<P, GossipId>, AttestationValidationError<P, GossipId>>>,
},
BlockPayloadAttestations {
wait_group: W,
results: Vec<VerifyPayloadAttestationResult<P>>,
},
AttesterSlashing {
wait_group: W,
result: Result<Vec<ValidatorIndex>>,
Expand Down Expand Up @@ -153,6 +158,14 @@ pub enum MutatorMessage<P: Preset, W> {
result: Result<ExecutionPayloadBidAction<P>>,
origin: ExecutionPayloadBidOrigin,
},
PayloadAttestation {
wait_group: W,
result: VerifyPayloadAttestationResult<P>,
},
PayloadAttestationBatch {
wait_group: W,
results: Vec<VerifyPayloadAttestationResult<P>>,
},
PreprocessedBeaconState {
state: Arc<BeaconState<P>>,
},
Expand Down Expand Up @@ -256,6 +269,7 @@ pub enum ValidatorMessage<P: Preset, W> {
Tick(W, Tick),
Head(W, ChainLink<P>),
ValidAttestation(W, Arc<Attestation<P>>),
ValidPayloadAttestation(W, Arc<PayloadAttestationMessage>),
PrepareExecutionPayload(Slot, ExecutionBlockHash, ExecutionBlockHash),
Stop,
}
Expand Down
Loading
Loading