Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion runner/clients/reth/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (r *RethClient) Run(ctx context.Context, cfg *types.RuntimeConfig) error {
args = append(args, "--engine.state-provider-metrics")
args = append(args, "--disable-discovery")
args = append(args, "--port", fmt.Sprintf("%d", r.p2pPort))
args = append(args, "-vvv")
args = append(args, "-vv")

// increase mempool size
args = append(args, "--txpool.pending-max-count", "100000000")
Expand Down
15 changes: 13 additions & 2 deletions runner/network/consensus/validator_consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,23 +64,34 @@ func (f *SyncingConsensusClient) propose(ctx context.Context, payload *engine.Ex
}

// Start starts the fake consensus client.
func (f *SyncingConsensusClient) Start(ctx context.Context, payloads []engine.ExecutableData, metricsCollector metrics.Collector, firstTestBlock uint64) error {
func (f *SyncingConsensusClient) Start(ctx context.Context, payloads []engine.ExecutableData, metricsCollector metrics.Collector, firstTestBlock uint64, startedBlockSignal chan uint64) error {
f.log.Info("Starting sync benchmark", "num_payloads", len(payloads))
m := metrics.NewBlockMetrics()

for i := 0; i < len(payloads); i++ {
m.SetBlockNumber(uint64(max(0, int(payloads[i].Number)-int(firstTestBlock))))
m.SetBlockNumber(uint64(max(0, int(payloads[i].Number)-int(firstTestBlock)+1)))
f.log.Info("Proposing payload", "payload_index", i)

startTime := time.Now()
err := f.propose(ctx, &payloads[i], m)
if err != nil {
return err
}

select {
case startedBlockSignal <- payloads[i].Number + 1:
default:
}

time.Sleep(time.Until(startTime.Add(f.options.BlockTime)))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previously, validator benchmark wasn't sleeping (also caused an issue with Reth processing spikes)


if payloads[i].Number >= firstTestBlock {
err = metricsCollector.Collect(ctx, m)
if err != nil {
f.log.Error("Failed to collect metrics", "error", err)
}
}

}
return nil
}
105 changes: 32 additions & 73 deletions runner/network/flashblocks/replay_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
type ReplayServer struct {
log log.Logger
port uint64
flashblocks []types.FlashblocksPayloadV1
flashblocks map[uint64][]types.FlashblocksPayloadV1
blockTime time.Duration

server *http.Server
Expand All @@ -30,7 +30,7 @@ type ReplayServer struct {
stopOnce sync.Once
}

func NewReplayServer(log log.Logger, port uint64, flashblocks []types.FlashblocksPayloadV1, blockTime time.Duration) *ReplayServer {
func NewReplayServer(log log.Logger, port uint64, flashblocks map[uint64][]types.FlashblocksPayloadV1, blockTime time.Duration) *ReplayServer {
return &ReplayServer{
log: log,
port: port,
Expand Down Expand Up @@ -117,100 +117,53 @@ func (s *ReplayServer) removeConnection(conn *websocket.Conn) {
}

// ReplayFlashblocks replays flashblocks to connected clients at evenly spaced intervals.
func (s *ReplayServer) ReplayFlashblocks(ctx context.Context) error {
func (s *ReplayServer) ReplayFlashblock(ctx context.Context, blockNumber uint64) error {
if len(s.flashblocks) == 0 {
s.log.Info("No flashblocks to replay")
return nil
}

blockGroups := s.groupFlashblocksByBlock()
flashblocks, ok := s.flashblocks[blockNumber]
if !ok {
s.log.Info("No flashblocks to replay for block", "block_number", blockNumber)
return nil
}

s.log.Info("Starting flashblock replay",
"total_flashblocks", len(s.flashblocks),
"num_blocks", len(blockGroups),
"flashblocks", len(flashblocks),
)

numIntervals := 1
if len(flashblocks) > 1 {
numIntervals = len(flashblocks)
}

interval := s.blockTime / time.Duration(numIntervals)

s.log.Debug("Replaying flashblocks for block",
"block_number", blockNumber,
"num_flashblocks", len(flashblocks),
"interval", interval,
)

for blockNum, flashblocks := range blockGroups {
for i, flashblock := range flashblocks {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

if len(flashblocks) == 0 {
continue
}

interval := s.blockTime / time.Duration(len(flashblocks)+1)

s.log.Debug("Replaying flashblocks for block",
"block_number", blockNum,
"num_flashblocks", len(flashblocks),
"interval", interval,
)

for i, flashblock := range flashblocks {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

time.Sleep(interval)

if err := s.broadcastFlashblock(flashblock); err != nil {
s.log.Warn("Error broadcasting flashblock", "err", err, "index", i)
}
if err := s.broadcastFlashblock(flashblock); err != nil {
s.log.Warn("Error broadcasting flashblock", "err", err, "index", i)
}

remainingTime := s.blockTime - interval*time.Duration(len(flashblocks))
if remainingTime > 0 {
time.Sleep(remainingTime)
}
time.Sleep(interval)
}

s.log.Info("Flashblock replay complete")
return nil
}

// groupFlashblocksByBlock groups flashblocks by block number, sorted by index.
func (s *ReplayServer) groupFlashblocksByBlock() map[uint64][]types.FlashblocksPayloadV1 {
groups := make(map[uint64][]types.FlashblocksPayloadV1)

// Build PayloadID -> blockNum mapping from flashblocks with Base
payloadIDToBlockNum := make(map[types.PayloadID]uint64)
for _, fb := range s.flashblocks {
if fb.Base != nil {
payloadIDToBlockNum[fb.PayloadID] = uint64(fb.Base.BlockNumber)
}
}

for _, fb := range s.flashblocks {
var blockNum uint64
if fb.Base != nil {
blockNum = uint64(fb.Base.BlockNumber)
} else if bn, ok := payloadIDToBlockNum[fb.PayloadID]; ok {
blockNum = bn
}
groups[blockNum] = append(groups[blockNum], fb)
}

for blockNum := range groups {
sortByIndex(groups[blockNum])
}

return groups
}

func sortByIndex(flashblocks []types.FlashblocksPayloadV1) {
for i := 1; i < len(flashblocks); i++ {
j := i
for j > 0 && flashblocks[j-1].Index > flashblocks[j].Index {
flashblocks[j-1], flashblocks[j] = flashblocks[j], flashblocks[j-1]
j--
}
}
}

func (s *ReplayServer) broadcastFlashblock(flashblock types.FlashblocksPayloadV1) error {
data, err := json.Marshal(flashblock)
Expand All @@ -232,9 +185,15 @@ func (s *ReplayServer) broadcastFlashblock(flashblock types.FlashblocksPayloadV1
}
}

blockNumber := 0
if flashblock.Base != nil {
blockNumber = int(flashblock.Base.BlockNumber)
}

s.log.Debug("Broadcasted flashblock",
"payload_id", fmt.Sprintf("%x", flashblock.PayloadID),
"index", flashblock.Index,
"block_number", blockNumber,
"num_clients", len(connections),
)

Expand Down
4 changes: 2 additions & 2 deletions runner/network/network_benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,10 @@ func (nb *NetworkBenchmark) benchmarkValidator(ctx context.Context, payloadResul

nb.log.Info("Validator header", "number", validatorHeader.Number.Uint64(), "lastSetupBlock", lastSetupBlock)

if validatorHeader.Number.Cmp(big.NewInt(int64(lastSetupBlock))) < 0 {
if validatorHeader.Number.Cmp(big.NewInt(int64(lastSetupBlock)-1)) < 0 {
nb.log.Info("Validator is behind first test block, catching up", "validator_block", validatorHeader.Number.Uint64(), "last_setup_block", lastSetupBlock)
// fetch all blocks the validator node is missing
for i := validatorHeader.Number.Uint64() + 1; i <= lastSetupBlock; i++ {
for i := validatorHeader.Number.Uint64() + 1; i < lastSetupBlock; i++ {
block, err := sequencerClient.Client().BlockByNumber(ctx, big.NewInt(int64(i)))
if err != nil {
sequencerClient.Stop()
Expand Down
48 changes: 33 additions & 15 deletions runner/network/sequencer_benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (nb *sequencerBenchmark) Run(ctx context.Context, metricsCollector metrics.
flashblocksClient := sequencerClient.FlashblocksClient()
if flashblocksClient != nil {
nb.log.Info("Starting flashblocks collection")
flashblockCollector = newFlashblockCollector()
flashblockCollector = newFlashblockCollector(nb.log)
flashblocksClient.AddListener(flashblockCollector)

if err := flashblocksClient.Start(benchmarkCtx); err != nil {
Expand Down Expand Up @@ -204,23 +204,26 @@ func (nb *sequencerBenchmark) Run(ctx context.Context, metricsCollector metrics.

go func() {
consensusClient := consensus.NewSequencerConsensusClient(nb.log, sequencerClient.Client(), sequencerClient.AuthClient(), mempool, consensus.ConsensusClientOptions{
BlockTime: params.BlockTime,
GasLimit: params.GasLimit,
GasLimitSetup: 1e9, // 1G gas
BlockTime: params.BlockTime,
GasLimit: params.GasLimit,
GasLimitSetup: 1e9, // 1G gas
ParallelTxBatches: nb.config.Config.ParallelTxBatches(),
}, headBlockHash, headBlockNumber, l1Chain, nb.config.BatcherAddr())

payloads := make([]engine.ExecutableData, 0)

var lastSetupPayload *engine.ExecutableData
setupLoop:
for {
_blockMetrics := metrics.NewBlockMetrics()
_, err := consensusClient.Propose(benchmarkCtx, _blockMetrics, true)
setupPayload, err := consensusClient.Propose(benchmarkCtx, _blockMetrics, true)
if err != nil {
errChan <- err
return
}

lastSetupPayload = setupPayload

select {
case <-setupComplete:
break setupLoop
Expand All @@ -231,6 +234,8 @@ func (nb *sequencerBenchmark) Run(ctx context.Context, metricsCollector metrics.

}

payloads = append(payloads, *lastSetupPayload)

blockMetrics := metrics.NewBlockMetrics()

// run for a few blocks
Expand Down Expand Up @@ -276,7 +281,7 @@ func (nb *sequencerBenchmark) Run(ctx context.Context, metricsCollector metrics.
return nil, 0, err
case payloads := <-payloadResult:
// Collect flashblocks if available
var flashblocks []types.FlashblocksPayloadV1
var flashblocks map[uint64][]types.FlashblocksPayloadV1
if flashblockCollector != nil {
flashblocks = flashblockCollector.GetFlashblocks()
nb.log.Info("Collected flashblocks", "count", len(flashblocks))
Expand All @@ -286,37 +291,50 @@ func (nb *sequencerBenchmark) Run(ctx context.Context, metricsCollector metrics.
ExecutablePayloads: payloads,
Flashblocks: flashblocks,
}
return result, payloads[0].Number - 1, nil
return result, payloads[0].Number, nil
}
}

// flashblockCollector implements FlashblockListener to collect flashblocks.
type flashblockCollector struct {
flashblocks []types.FlashblocksPayloadV1
mu sync.Mutex
log log.Logger
flashblocks map[uint64][]types.FlashblocksPayloadV1
currentBaseBlock *uint64
mu sync.Mutex
}

// newFlashblockCollector creates a new flashblock collector.
func newFlashblockCollector() *flashblockCollector {
func newFlashblockCollector(log log.Logger) *flashblockCollector {
return &flashblockCollector{
flashblocks: make([]types.FlashblocksPayloadV1, 0),
flashblocks: make(map[uint64][]types.FlashblocksPayloadV1),
log: log,
}
}

// OnFlashblock implements FlashblockListener.
func (c *flashblockCollector) OnFlashblock(flashblock types.FlashblocksPayloadV1) {
c.mu.Lock()
defer c.mu.Unlock()
c.flashblocks = append(c.flashblocks, flashblock)
if flashblock.Base != nil {
baseBlock := uint64(flashblock.Base.BlockNumber)
c.currentBaseBlock = &baseBlock
} else if c.currentBaseBlock == nil {
c.log.Warn("received flashblock without base block number")
return
}
c.log.Info("Collected flashblock", "block_number", *c.currentBaseBlock, "index", flashblock.Index, "tx_count", len(flashblock.Diff.Transactions))
c.flashblocks[*c.currentBaseBlock] = append(c.flashblocks[*c.currentBaseBlock], flashblock)
}

// GetFlashblocks returns all collected flashblocks.
func (c *flashblockCollector) GetFlashblocks() []types.FlashblocksPayloadV1 {
func (c *flashblockCollector) GetFlashblocks() map[uint64][]types.FlashblocksPayloadV1 {
c.mu.Lock()
defer c.mu.Unlock()

// Return a copy to avoid race conditions
result := make([]types.FlashblocksPayloadV1, len(c.flashblocks))
copy(result, c.flashblocks)
result := make(map[uint64][]types.FlashblocksPayloadV1)
for blockNumber, flashblocks := range c.flashblocks {
result[blockNumber] = append([]types.FlashblocksPayloadV1{}, flashblocks...)
}
return result
}
2 changes: 1 addition & 1 deletion runner/network/types/payload_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type PayloadResult struct {
ExecutablePayloads []engine.ExecutableData

// Flashblocks are the flashblock payloads collected during the benchmark (if available)
Flashblocks []clientTypes.FlashblocksPayloadV1
Flashblocks map[uint64][]clientTypes.FlashblocksPayloadV1
}

// HasFlashblocks returns true if flashblock payloads were collected.
Expand Down
18 changes: 14 additions & 4 deletions runner/network/validator_benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ func (vb *validatorBenchmark) Run(ctx context.Context, payloads []engine.Executa
headBlockHash := headBlockHeader.Hash()
headBlockNumber := headBlockHeader.Number.Uint64()

startedBlockSignal := make(chan uint64)

// If flashblock server is available and client supports flashblocks, wait for connection
// and start replaying flashblocks in the background
if vb.flashblockServer != nil && vb.validatorClient.SupportsFlashblocks() {
Expand All @@ -80,9 +82,17 @@ func (vb *validatorBenchmark) Run(ctx context.Context, payloads []engine.Executa

// Start replaying flashblocks in a goroutine
go func() {
if err := vb.flashblockServer.ReplayFlashblocks(ctx); err != nil {
if !errors.Is(err, context.Canceled) {
vb.log.Warn("Error replaying flashblocks", "err", err)
for {
select {
case <-ctx.Done():
return
case blockNumber := <-startedBlockSignal:
vb.log.Info("Replaying flashblocks for block", "block_number", blockNumber)
if err := vb.flashblockServer.ReplayFlashblock(ctx, blockNumber); err != nil {
if !errors.Is(err, context.Canceled) {
vb.log.Warn("Error replaying flashblocks", "err", err)
}
}
}
}
}()
Expand All @@ -93,7 +103,7 @@ func (vb *validatorBenchmark) Run(ctx context.Context, payloads []engine.Executa
BlockTime: vb.config.Params.BlockTime,
}, headBlockHash, headBlockNumber)

err = consensusClient.Start(ctx, payloads, metricsCollector, lastSetupBlock)
err = consensusClient.Start(ctx, payloads, metricsCollector, lastSetupBlock + 1, startedBlockSignal)
if err != nil {
if errors.Is(err, context.Canceled) {
return err
Expand Down