diff --git a/runner/clients/reth/client.go b/runner/clients/reth/client.go index a5f8a6f..b3065d1 100644 --- a/runner/clients/reth/client.go +++ b/runner/clients/reth/client.go @@ -94,7 +94,7 @@ func (r *RethClient) Run(ctx context.Context, cfg *types.RuntimeConfig) error { args = append(args, "--engine.state-provider-metrics") args = append(args, "--disable-discovery") args = append(args, "--port", fmt.Sprintf("%d", r.p2pPort)) - args = append(args, "-vvv") + args = append(args, "-vv") // increase mempool size args = append(args, "--txpool.pending-max-count", "100000000") diff --git a/runner/network/consensus/validator_consensus.go b/runner/network/consensus/validator_consensus.go index ce9298f..cec3f66 100644 --- a/runner/network/consensus/validator_consensus.go +++ b/runner/network/consensus/validator_consensus.go @@ -64,23 +64,34 @@ func (f *SyncingConsensusClient) propose(ctx context.Context, payload *engine.Ex } // Start starts the fake consensus client. -func (f *SyncingConsensusClient) Start(ctx context.Context, payloads []engine.ExecutableData, metricsCollector metrics.Collector, firstTestBlock uint64) error { +func (f *SyncingConsensusClient) Start(ctx context.Context, payloads []engine.ExecutableData, metricsCollector metrics.Collector, firstTestBlock uint64, startedBlockSignal chan uint64) error { f.log.Info("Starting sync benchmark", "num_payloads", len(payloads)) m := metrics.NewBlockMetrics() + for i := 0; i < len(payloads); i++ { - m.SetBlockNumber(uint64(max(0, int(payloads[i].Number)-int(firstTestBlock)))) + m.SetBlockNumber(uint64(max(0, int(payloads[i].Number)-int(firstTestBlock)+1))) f.log.Info("Proposing payload", "payload_index", i) + + startTime := time.Now() err := f.propose(ctx, &payloads[i], m) if err != nil { return err } + select { + case startedBlockSignal <- payloads[i].Number + 1: + default: + } + + time.Sleep(time.Until(startTime.Add(f.options.BlockTime))) + if payloads[i].Number >= firstTestBlock { err = metricsCollector.Collect(ctx, m) if err != nil { f.log.Error("Failed to collect metrics", "error", err) } } + } return nil } diff --git a/runner/network/flashblocks/replay_server.go b/runner/network/flashblocks/replay_server.go index 0feef0c..32f57e2 100644 --- a/runner/network/flashblocks/replay_server.go +++ b/runner/network/flashblocks/replay_server.go @@ -17,7 +17,7 @@ import ( type ReplayServer struct { log log.Logger port uint64 - flashblocks []types.FlashblocksPayloadV1 + flashblocks map[uint64][]types.FlashblocksPayloadV1 blockTime time.Duration server *http.Server @@ -30,7 +30,7 @@ type ReplayServer struct { stopOnce sync.Once } -func NewReplayServer(log log.Logger, port uint64, flashblocks []types.FlashblocksPayloadV1, blockTime time.Duration) *ReplayServer { +func NewReplayServer(log log.Logger, port uint64, flashblocks map[uint64][]types.FlashblocksPayloadV1, blockTime time.Duration) *ReplayServer { return &ReplayServer{ log: log, port: port, @@ -117,100 +117,53 @@ func (s *ReplayServer) removeConnection(conn *websocket.Conn) { } // ReplayFlashblocks replays flashblocks to connected clients at evenly spaced intervals. -func (s *ReplayServer) ReplayFlashblocks(ctx context.Context) error { +func (s *ReplayServer) ReplayFlashblock(ctx context.Context, blockNumber uint64) error { if len(s.flashblocks) == 0 { s.log.Info("No flashblocks to replay") return nil } - blockGroups := s.groupFlashblocksByBlock() + flashblocks, ok := s.flashblocks[blockNumber] + if !ok { + s.log.Info("No flashblocks to replay for block", "block_number", blockNumber) + return nil + } s.log.Info("Starting flashblock replay", - "total_flashblocks", len(s.flashblocks), - "num_blocks", len(blockGroups), + "flashblocks", len(flashblocks), + ) + + numIntervals := 1 + if len(flashblocks) > 1 { + numIntervals = len(flashblocks) + } + + interval := s.blockTime / time.Duration(numIntervals) + + s.log.Debug("Replaying flashblocks for block", + "block_number", blockNumber, + "num_flashblocks", len(flashblocks), + "interval", interval, ) - for blockNum, flashblocks := range blockGroups { + for i, flashblock := range flashblocks { select { case <-ctx.Done(): return ctx.Err() default: } - if len(flashblocks) == 0 { - continue - } - - interval := s.blockTime / time.Duration(len(flashblocks)+1) - - s.log.Debug("Replaying flashblocks for block", - "block_number", blockNum, - "num_flashblocks", len(flashblocks), - "interval", interval, - ) - - for i, flashblock := range flashblocks { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - time.Sleep(interval) - - if err := s.broadcastFlashblock(flashblock); err != nil { - s.log.Warn("Error broadcasting flashblock", "err", err, "index", i) - } + if err := s.broadcastFlashblock(flashblock); err != nil { + s.log.Warn("Error broadcasting flashblock", "err", err, "index", i) } - remainingTime := s.blockTime - interval*time.Duration(len(flashblocks)) - if remainingTime > 0 { - time.Sleep(remainingTime) - } + time.Sleep(interval) } s.log.Info("Flashblock replay complete") return nil } -// groupFlashblocksByBlock groups flashblocks by block number, sorted by index. -func (s *ReplayServer) groupFlashblocksByBlock() map[uint64][]types.FlashblocksPayloadV1 { - groups := make(map[uint64][]types.FlashblocksPayloadV1) - - // Build PayloadID -> blockNum mapping from flashblocks with Base - payloadIDToBlockNum := make(map[types.PayloadID]uint64) - for _, fb := range s.flashblocks { - if fb.Base != nil { - payloadIDToBlockNum[fb.PayloadID] = uint64(fb.Base.BlockNumber) - } - } - - for _, fb := range s.flashblocks { - var blockNum uint64 - if fb.Base != nil { - blockNum = uint64(fb.Base.BlockNumber) - } else if bn, ok := payloadIDToBlockNum[fb.PayloadID]; ok { - blockNum = bn - } - groups[blockNum] = append(groups[blockNum], fb) - } - - for blockNum := range groups { - sortByIndex(groups[blockNum]) - } - - return groups -} - -func sortByIndex(flashblocks []types.FlashblocksPayloadV1) { - for i := 1; i < len(flashblocks); i++ { - j := i - for j > 0 && flashblocks[j-1].Index > flashblocks[j].Index { - flashblocks[j-1], flashblocks[j] = flashblocks[j], flashblocks[j-1] - j-- - } - } -} func (s *ReplayServer) broadcastFlashblock(flashblock types.FlashblocksPayloadV1) error { data, err := json.Marshal(flashblock) @@ -232,9 +185,15 @@ func (s *ReplayServer) broadcastFlashblock(flashblock types.FlashblocksPayloadV1 } } + blockNumber := 0 + if flashblock.Base != nil { + blockNumber = int(flashblock.Base.BlockNumber) + } + s.log.Debug("Broadcasted flashblock", "payload_id", fmt.Sprintf("%x", flashblock.PayloadID), "index", flashblock.Index, + "block_number", blockNumber, "num_clients", len(connections), ) diff --git a/runner/network/network_benchmark.go b/runner/network/network_benchmark.go index 26f5cd8..5f462da 100644 --- a/runner/network/network_benchmark.go +++ b/runner/network/network_benchmark.go @@ -183,10 +183,10 @@ func (nb *NetworkBenchmark) benchmarkValidator(ctx context.Context, payloadResul nb.log.Info("Validator header", "number", validatorHeader.Number.Uint64(), "lastSetupBlock", lastSetupBlock) - if validatorHeader.Number.Cmp(big.NewInt(int64(lastSetupBlock))) < 0 { + if validatorHeader.Number.Cmp(big.NewInt(int64(lastSetupBlock)-1)) < 0 { nb.log.Info("Validator is behind first test block, catching up", "validator_block", validatorHeader.Number.Uint64(), "last_setup_block", lastSetupBlock) // fetch all blocks the validator node is missing - for i := validatorHeader.Number.Uint64() + 1; i <= lastSetupBlock; i++ { + for i := validatorHeader.Number.Uint64() + 1; i < lastSetupBlock; i++ { block, err := sequencerClient.Client().BlockByNumber(ctx, big.NewInt(int64(i))) if err != nil { sequencerClient.Stop() diff --git a/runner/network/sequencer_benchmark.go b/runner/network/sequencer_benchmark.go index a70ce3a..536ad1a 100644 --- a/runner/network/sequencer_benchmark.go +++ b/runner/network/sequencer_benchmark.go @@ -154,7 +154,7 @@ func (nb *sequencerBenchmark) Run(ctx context.Context, metricsCollector metrics. flashblocksClient := sequencerClient.FlashblocksClient() if flashblocksClient != nil { nb.log.Info("Starting flashblocks collection") - flashblockCollector = newFlashblockCollector() + flashblockCollector = newFlashblockCollector(nb.log) flashblocksClient.AddListener(flashblockCollector) if err := flashblocksClient.Start(benchmarkCtx); err != nil { @@ -204,23 +204,26 @@ func (nb *sequencerBenchmark) Run(ctx context.Context, metricsCollector metrics. go func() { consensusClient := consensus.NewSequencerConsensusClient(nb.log, sequencerClient.Client(), sequencerClient.AuthClient(), mempool, consensus.ConsensusClientOptions{ - BlockTime: params.BlockTime, - GasLimit: params.GasLimit, - GasLimitSetup: 1e9, // 1G gas + BlockTime: params.BlockTime, + GasLimit: params.GasLimit, + GasLimitSetup: 1e9, // 1G gas ParallelTxBatches: nb.config.Config.ParallelTxBatches(), }, headBlockHash, headBlockNumber, l1Chain, nb.config.BatcherAddr()) payloads := make([]engine.ExecutableData, 0) + var lastSetupPayload *engine.ExecutableData setupLoop: for { _blockMetrics := metrics.NewBlockMetrics() - _, err := consensusClient.Propose(benchmarkCtx, _blockMetrics, true) + setupPayload, err := consensusClient.Propose(benchmarkCtx, _blockMetrics, true) if err != nil { errChan <- err return } + lastSetupPayload = setupPayload + select { case <-setupComplete: break setupLoop @@ -231,6 +234,8 @@ func (nb *sequencerBenchmark) Run(ctx context.Context, metricsCollector metrics. } + payloads = append(payloads, *lastSetupPayload) + blockMetrics := metrics.NewBlockMetrics() // run for a few blocks @@ -276,7 +281,7 @@ func (nb *sequencerBenchmark) Run(ctx context.Context, metricsCollector metrics. return nil, 0, err case payloads := <-payloadResult: // Collect flashblocks if available - var flashblocks []types.FlashblocksPayloadV1 + var flashblocks map[uint64][]types.FlashblocksPayloadV1 if flashblockCollector != nil { flashblocks = flashblockCollector.GetFlashblocks() nb.log.Info("Collected flashblocks", "count", len(flashblocks)) @@ -286,20 +291,23 @@ func (nb *sequencerBenchmark) Run(ctx context.Context, metricsCollector metrics. ExecutablePayloads: payloads, Flashblocks: flashblocks, } - return result, payloads[0].Number - 1, nil + return result, payloads[0].Number, nil } } // flashblockCollector implements FlashblockListener to collect flashblocks. type flashblockCollector struct { - flashblocks []types.FlashblocksPayloadV1 - mu sync.Mutex + log log.Logger + flashblocks map[uint64][]types.FlashblocksPayloadV1 + currentBaseBlock *uint64 + mu sync.Mutex } // newFlashblockCollector creates a new flashblock collector. -func newFlashblockCollector() *flashblockCollector { +func newFlashblockCollector(log log.Logger) *flashblockCollector { return &flashblockCollector{ - flashblocks: make([]types.FlashblocksPayloadV1, 0), + flashblocks: make(map[uint64][]types.FlashblocksPayloadV1), + log: log, } } @@ -307,16 +315,26 @@ func newFlashblockCollector() *flashblockCollector { func (c *flashblockCollector) OnFlashblock(flashblock types.FlashblocksPayloadV1) { c.mu.Lock() defer c.mu.Unlock() - c.flashblocks = append(c.flashblocks, flashblock) + if flashblock.Base != nil { + baseBlock := uint64(flashblock.Base.BlockNumber) + c.currentBaseBlock = &baseBlock + } else if c.currentBaseBlock == nil { + c.log.Warn("received flashblock without base block number") + return + } + c.log.Info("Collected flashblock", "block_number", *c.currentBaseBlock, "index", flashblock.Index, "tx_count", len(flashblock.Diff.Transactions)) + c.flashblocks[*c.currentBaseBlock] = append(c.flashblocks[*c.currentBaseBlock], flashblock) } // GetFlashblocks returns all collected flashblocks. -func (c *flashblockCollector) GetFlashblocks() []types.FlashblocksPayloadV1 { +func (c *flashblockCollector) GetFlashblocks() map[uint64][]types.FlashblocksPayloadV1 { c.mu.Lock() defer c.mu.Unlock() // Return a copy to avoid race conditions - result := make([]types.FlashblocksPayloadV1, len(c.flashblocks)) - copy(result, c.flashblocks) + result := make(map[uint64][]types.FlashblocksPayloadV1) + for blockNumber, flashblocks := range c.flashblocks { + result[blockNumber] = append([]types.FlashblocksPayloadV1{}, flashblocks...) + } return result } diff --git a/runner/network/types/payload_result.go b/runner/network/types/payload_result.go index 2a6b892..f8c06fa 100644 --- a/runner/network/types/payload_result.go +++ b/runner/network/types/payload_result.go @@ -12,7 +12,7 @@ type PayloadResult struct { ExecutablePayloads []engine.ExecutableData // Flashblocks are the flashblock payloads collected during the benchmark (if available) - Flashblocks []clientTypes.FlashblocksPayloadV1 + Flashblocks map[uint64][]clientTypes.FlashblocksPayloadV1 } // HasFlashblocks returns true if flashblock payloads were collected. diff --git a/runner/network/validator_benchmark.go b/runner/network/validator_benchmark.go index 41e8609..cf5e588 100644 --- a/runner/network/validator_benchmark.go +++ b/runner/network/validator_benchmark.go @@ -67,6 +67,8 @@ func (vb *validatorBenchmark) Run(ctx context.Context, payloads []engine.Executa headBlockHash := headBlockHeader.Hash() headBlockNumber := headBlockHeader.Number.Uint64() + startedBlockSignal := make(chan uint64) + // If flashblock server is available and client supports flashblocks, wait for connection // and start replaying flashblocks in the background if vb.flashblockServer != nil && vb.validatorClient.SupportsFlashblocks() { @@ -80,9 +82,17 @@ func (vb *validatorBenchmark) Run(ctx context.Context, payloads []engine.Executa // Start replaying flashblocks in a goroutine go func() { - if err := vb.flashblockServer.ReplayFlashblocks(ctx); err != nil { - if !errors.Is(err, context.Canceled) { - vb.log.Warn("Error replaying flashblocks", "err", err) + for { + select { + case <-ctx.Done(): + return + case blockNumber := <-startedBlockSignal: + vb.log.Info("Replaying flashblocks for block", "block_number", blockNumber) + if err := vb.flashblockServer.ReplayFlashblock(ctx, blockNumber); err != nil { + if !errors.Is(err, context.Canceled) { + vb.log.Warn("Error replaying flashblocks", "err", err) + } + } } } }() @@ -93,7 +103,7 @@ func (vb *validatorBenchmark) Run(ctx context.Context, payloads []engine.Executa BlockTime: vb.config.Params.BlockTime, }, headBlockHash, headBlockNumber) - err = consensusClient.Start(ctx, payloads, metricsCollector, lastSetupBlock) + err = consensusClient.Start(ctx, payloads, metricsCollector, lastSetupBlock + 1, startedBlockSignal) if err != nil { if errors.Is(err, context.Canceled) { return err