diff --git a/example_sentry.yaml b/example_sentry.yaml index 16d64f84d..e063f1901 100644 --- a/example_sentry.yaml +++ b/example_sentry.yaml @@ -96,6 +96,15 @@ validatorBlock: slotTimes: - 4s +# N+1 block prefetching - triggers v3 Proposal API request for slot N+1 +# when a block event for slot N is received +validatorBlockPrefetch: + enabled: false + delay: 0ms + # Skip prefetch if block event is more than this many slots behind wallclock + # Prevents spamming Proposal API when node is syncing + maxLagSlots: 32 + outputs: - name: http-sink type: http diff --git a/pkg/sentry/config.go b/pkg/sentry/config.go index 06f2b17ba..074242e44 100644 --- a/pkg/sentry/config.go +++ b/pkg/sentry/config.go @@ -55,6 +55,9 @@ type Config struct { // ValidatorBlock configuration ValidatorBlock *ValidatorBlockConfig `yaml:"validatorBlock" default:"{'enabled': false}"` + // ValidatorBlockPrefetch configuration for n+1 block prefetching + ValidatorBlockPrefetch ValidatorBlockPrefetchConfig `yaml:"validatorBlockPrefetch"` + // Tracing configuration Tracing observability.TracingConfig `yaml:"tracing"` } @@ -78,6 +81,10 @@ func (c *Config) Validate() error { return fmt.Errorf("invalid tracing config: %w", err) } + if err := c.ValidatorBlockPrefetch.Validate(); err != nil { + return fmt.Errorf("invalid validatorBlockPrefetch config: %w", err) + } + return nil } @@ -203,6 +210,43 @@ func (f *ValidatorBlockConfig) Validate() error { return nil } +// ValidatorBlockPrefetchConfig configures the n+1 block prefetching feature. +// When a block event is received for slot N, this triggers a v3 Proposal API +// request for slot N+1 after a configurable delay. +type ValidatorBlockPrefetchConfig struct { + // Enabled determines if the n+1 block prefetching feature is active + Enabled bool `yaml:"enabled" default:"false"` + + // Delay is the time to wait after receiving a block event before + // requesting the n+1 slot's validator block + Delay human.Duration `yaml:"delay" default:"0ms"` + + // MaxLagSlots is the maximum number of slots the block event can be behind + // the current wallclock slot before the prefetch is skipped. + // This prevents spamming the Proposal API when the node is syncing. + MaxLagSlots uint64 `yaml:"maxLagSlots" default:"32"` +} + +func (f *ValidatorBlockPrefetchConfig) Validate() error { + if !f.Enabled { + return nil + } + + if f.Delay.Duration < 0 { + return errors.New("delay must be non-negative") + } + + if f.Delay.Duration > 12*time.Second { + return errors.New("delay must be less than 12s") + } + + if f.MaxLagSlots == 0 { + return errors.New("maxLagSlots must be greater than 0") + } + + return nil +} + type BeaconCommitteesConfig struct { Enabled bool `yaml:"enabled" default:"false"` } diff --git a/pkg/sentry/proposed_validator_block.go b/pkg/sentry/proposed_validator_block.go index 701009819..e6e082523 100644 --- a/pkg/sentry/proposed_validator_block.go +++ b/pkg/sentry/proposed_validator_block.go @@ -156,3 +156,68 @@ func getVersionedProposalData[T any](response *api.Response[T]) (*api.VersionedP return data, nil } + +// handleBlockPrefetch handles the n+1 block prefetching triggered by a block event. +// When we receive a block for slot N, we request the validator block for slot N+1. +func (s *Sentry) handleBlockPrefetch(ctx context.Context, currentSlot phase0.Slot) { + nextSlot := currentSlot + 1 + + logCtx := s.log. + WithField("proccer", "block_prefetch"). + WithField("trigger_slot", uint64(currentSlot)). + WithField("target_slot", uint64(nextSlot)) + + // Check if we're already past the target slot + wallclockSlot, _, err := s.beacon.Metadata().Wallclock().Now() + if err != nil { + logCtx.WithError(err).Error("Failed to get current wallclock slot") + + return + } + + // Check if the block event is too far behind (node might be syncing) + maxLag := s.Config.ValidatorBlockPrefetch.MaxLagSlots + if wallclockSlot.Number() > uint64(currentSlot)+maxLag { + logCtx.WithField("wallclock_slot", wallclockSlot.Number()). + WithField("max_lag_slots", maxLag). + Debug("Skipping n+1 prefetch: block event too far behind wallclock") + + return + } + + if wallclockSlot.Number() > uint64(nextSlot) { + logCtx.Debug("Skipping n+1 prefetch: already past target slot") + + return + } + + // Apply configured delay + delay := s.Config.ValidatorBlockPrefetch.Delay.Duration + if delay > 0 { + select { + case <-time.After(delay): + case <-ctx.Done(): + return + } + } + + // Re-check after delay + wallclockSlot, _, err = s.beacon.Metadata().Wallclock().Now() + if err != nil { + logCtx.WithError(err).Error("Failed to get wallclock after delay") + + return + } + + if wallclockSlot.Number() > uint64(nextSlot) { + logCtx.Debug("Skipping n+1 prefetch after delay: past target slot") + + return + } + + logCtx.Debug("Fetching n+1 validator block") + + if err := s.fetchDecoratedValidatorBlock(ctx, nextSlot); err != nil { + logCtx.WithError(err).Error("Failed to fetch n+1 validator block") + } +} diff --git a/pkg/sentry/sentry.go b/pkg/sentry/sentry.go index 51ba7b6e7..74db00a85 100644 --- a/pkg/sentry/sentry.go +++ b/pkg/sentry/sentry.go @@ -428,6 +428,11 @@ func (s *Sentry) Start(ctx context.Context) error { } }() + // Trigger n+1 block prefetching if enabled + if s.Config.ValidatorBlockPrefetch.Enabled { + go s.handleBlockPrefetch(ctx, block.Slot) + } + return nil })