Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions example_sentry.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ validatorBlock:
slotTimes:
- 4s

# N+1 block prefetching - triggers v3 Proposal API request for slot N+1
# when a block event for slot N is received
validatorBlockPrefetch:
enabled: false
delay: 0ms
# Skip prefetch if block event is more than this many slots behind wallclock
# Prevents spamming Proposal API when node is syncing
maxLagSlots: 32

outputs:
- name: http-sink
type: http
Expand Down
44 changes: 44 additions & 0 deletions pkg/sentry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ type Config struct {
// ValidatorBlock configuration
ValidatorBlock *ValidatorBlockConfig `yaml:"validatorBlock" default:"{'enabled': false}"`

// ValidatorBlockPrefetch configuration for n+1 block prefetching
ValidatorBlockPrefetch ValidatorBlockPrefetchConfig `yaml:"validatorBlockPrefetch"`

// Tracing configuration
Tracing observability.TracingConfig `yaml:"tracing"`
}
Expand All @@ -78,6 +81,10 @@ func (c *Config) Validate() error {
return fmt.Errorf("invalid tracing config: %w", err)
}

if err := c.ValidatorBlockPrefetch.Validate(); err != nil {
return fmt.Errorf("invalid validatorBlockPrefetch config: %w", err)
}

return nil
}

Expand Down Expand Up @@ -203,6 +210,43 @@ func (f *ValidatorBlockConfig) Validate() error {
return nil
}

// ValidatorBlockPrefetchConfig configures the n+1 block prefetching feature.
// When a block event is received for slot N, this triggers a v3 Proposal API
// request for slot N+1 after a configurable delay.
type ValidatorBlockPrefetchConfig struct {
// Enabled determines if the n+1 block prefetching feature is active
Enabled bool `yaml:"enabled" default:"false"`

// Delay is the time to wait after receiving a block event before
// requesting the n+1 slot's validator block
Delay human.Duration `yaml:"delay" default:"0ms"`

// MaxLagSlots is the maximum number of slots the block event can be behind
// the current wallclock slot before the prefetch is skipped.
// This prevents spamming the Proposal API when the node is syncing.
MaxLagSlots uint64 `yaml:"maxLagSlots" default:"32"`
}

func (f *ValidatorBlockPrefetchConfig) Validate() error {
if !f.Enabled {
return nil
}

if f.Delay.Duration < 0 {
return errors.New("delay must be non-negative")
}

if f.Delay.Duration > 12*time.Second {
return errors.New("delay must be less than 12s")
}

if f.MaxLagSlots == 0 {
return errors.New("maxLagSlots must be greater than 0")
}

return nil
}

type BeaconCommitteesConfig struct {
Enabled bool `yaml:"enabled" default:"false"`
}
Expand Down
65 changes: 65 additions & 0 deletions pkg/sentry/proposed_validator_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,68 @@ func getVersionedProposalData[T any](response *api.Response[T]) (*api.VersionedP

return data, nil
}

// handleBlockPrefetch handles the n+1 block prefetching triggered by a block event.
// When we receive a block for slot N, we request the validator block for slot N+1.
func (s *Sentry) handleBlockPrefetch(ctx context.Context, currentSlot phase0.Slot) {
nextSlot := currentSlot + 1

logCtx := s.log.
WithField("proccer", "block_prefetch").
WithField("trigger_slot", uint64(currentSlot)).
WithField("target_slot", uint64(nextSlot))

// Check if we're already past the target slot
wallclockSlot, _, err := s.beacon.Metadata().Wallclock().Now()
if err != nil {
logCtx.WithError(err).Error("Failed to get current wallclock slot")

return
}

// Check if the block event is too far behind (node might be syncing)
maxLag := s.Config.ValidatorBlockPrefetch.MaxLagSlots
if wallclockSlot.Number() > uint64(currentSlot)+maxLag {
logCtx.WithField("wallclock_slot", wallclockSlot.Number()).
WithField("max_lag_slots", maxLag).
Debug("Skipping n+1 prefetch: block event too far behind wallclock")

return
}

if wallclockSlot.Number() > uint64(nextSlot) {
logCtx.Debug("Skipping n+1 prefetch: already past target slot")

return
}

// Apply configured delay
delay := s.Config.ValidatorBlockPrefetch.Delay.Duration
if delay > 0 {
select {
case <-time.After(delay):
case <-ctx.Done():
return
}
}

// Re-check after delay
wallclockSlot, _, err = s.beacon.Metadata().Wallclock().Now()
if err != nil {
logCtx.WithError(err).Error("Failed to get wallclock after delay")

return
}

if wallclockSlot.Number() > uint64(nextSlot) {
logCtx.Debug("Skipping n+1 prefetch after delay: past target slot")

return
}

logCtx.Debug("Fetching n+1 validator block")

if err := s.fetchDecoratedValidatorBlock(ctx, nextSlot); err != nil {
logCtx.WithError(err).Error("Failed to fetch n+1 validator block")
}
}
5 changes: 5 additions & 0 deletions pkg/sentry/sentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,11 @@ func (s *Sentry) Start(ctx context.Context) error {
}
}()

// Trigger n+1 block prefetching if enabled
if s.Config.ValidatorBlockPrefetch.Enabled {
go s.handleBlockPrefetch(ctx, block.Slot)
}

return nil
})

Expand Down
Loading