Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 13 additions & 34 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,15 @@ package node
import (
"bytes"
"context"
"crypto/ecdsa"
"encoding/hex"
"errors"
"fmt"
"net"
"net/http"
"os"
"strings"
"time"

"github.com/tendermint/tendermint/upgrade"

ethcrypto "github.com/morph-l2/go-ethereum/crypto"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/cors"
Expand Down Expand Up @@ -128,6 +123,8 @@ func DefaultNewNode(config *cfg.Config, logger log.Logger) (*Node, error) {
DefaultDBProvider,
DefaultMetricsProvider(config.Instrumentation),
logger,
nil,
nil,
)
}

Expand Down Expand Up @@ -253,7 +250,6 @@ type Node struct {
// Sequencer mode (after upgrade)
stateV2 *sequencer.StateV2
blockBroadcastReactor *sequencer.BlockBroadcastReactor
sequencerPrivKey *ecdsa.PrivateKey // ECDSA key for signing blocks in sequencer mode
}

func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) {
Expand Down Expand Up @@ -508,17 +504,19 @@ func createConsensusReactor(
// These components are created but not started - they will be started when switching to sequencer mode.
func createSequencerComponents(
l2Node l2node.L2Node,
sequencerPrivKey *ecdsa.PrivateKey,
pool *bc.BlockPool,
waitSync bool,
logger log.Logger,
verifier sequencer.SequencerVerifier,
signer sequencer.Signer,
) (*sequencer.StateV2, *sequencer.BlockBroadcastReactor, error) {
// Create StateV2
stateV2, err := sequencer.NewStateV2(
l2Node,
sequencerPrivKey,
sequencer.DefaultBlockInterval,
logger,
verifier,
signer,
)
if err != nil {
return nil, nil, fmt.Errorf("failed to create StateV2: %w", err)
Expand All @@ -530,6 +528,7 @@ func createSequencerComponents(
stateV2,
waitSync,
logger,
verifier,
)
broadcastReactor.SetLogger(logger.With("module", "sequencer"))

Expand Down Expand Up @@ -784,6 +783,8 @@ func NewNode(
dbProvider DBProvider,
metricsProvider MetricsProvider,
logger log.Logger,
sequencerVerifier sequencer.SequencerVerifier,
sequencerSigner sequencer.Signer,
options ...Option,
) (
*Node, error,
Expand Down Expand Up @@ -1007,17 +1008,15 @@ func NewNode(
if bcR, ok := bcReactor.(*bc.Reactor); ok {
l2NodeRef := bcR.L2Node()

// TODO: just for Phase1, will update in future
if err := node.SetSequencerPrivKey(); err != nil {
return nil, err
}
// Create sequencer components
if node.stateV2, node.blockBroadcastReactor, err = createSequencerComponents(
l2NodeRef,
node.sequencerPrivKey,
bcR.Pool(),
blockSync || stateSync,
logger); err != nil {
logger,
sequencerVerifier,
sequencerSigner,
); err != nil {
return nil, err
}

Expand Down Expand Up @@ -1597,26 +1596,6 @@ func splitAndTrimEmpty(s, sep, cutset string) []string {
// Sequencer Mode Methods
// ============================================================================

// TODO: optimize SetSequencerPrivKey in the future
// SetSequencerPrivKey sets the ECDSA private key for signing blocks in sequencer mode.
func (n *Node) SetSequencerPrivKey() error {
// Load sequencer private key from environment variable
if seqKeyHex := os.Getenv("SEQUENCER_PRIVATE_KEY"); seqKeyHex != "" {
seqKeyHex = strings.TrimPrefix(seqKeyHex, "0x")
keyBytes, err := hex.DecodeString(seqKeyHex)
if err != nil {
return fmt.Errorf("failed to decode SEQUENCER_PRIVATE_KEY: %w", err)
}
n.sequencerPrivKey, err = ethcrypto.ToECDSA(keyBytes)
if err != nil {
return fmt.Errorf("failed to parse SEQUENCER_PRIVATE_KEY: %w", err)
}
n.Logger.Info("Loaded sequencer private key",
"address", ethcrypto.PubkeyToAddress(n.sequencerPrivKey.PublicKey).Hex())
}
return nil
}

// startSequencerMode starts the sequencer mode components with logging.
// This is an internal method used by upgrade callbacks and startup logic.
func (n *Node) startSequencerMode() {
Expand Down
93 changes: 56 additions & 37 deletions sequencer/broadcast_reactor.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package sequencer

import (
"context"
"fmt"
"math/big"
"math/rand"
"reflect"
"sync"
"time"

"github.com/tendermint/tendermint/upgrade"

"github.com/cosmos/gogoproto/proto"
"github.com/morph-l2/go-ethereum/common"
"github.com/morph-l2/go-ethereum/crypto"
Expand All @@ -26,11 +25,12 @@ const (
SequencerSyncChannel = byte(0x51) // For block sync requests (no signature verification)

// TODO: make these parameters configurable
smallGapThreshold = 5 // Gap for direct block request
smallGapThreshold = 20 // Gap for direct block request
recentBlocksCapacity = 1000 // Recent applied blocks cache
seenBlocksCapacity = 2000 // Seen blocks for dedup
peerSentCapacity = 500 // Per-peer sent tracking
applyInterval = 500 * time.Millisecond
applyInterval = 10 * time.Second
syncInterval = 10 * time.Second
)

// BlockPool interface (avoids import cycle)
Expand Down Expand Up @@ -58,10 +58,18 @@ type BlockBroadcastReactor struct {
applyMtx sync.Mutex // Protects applyBlock to ensure sequential block application
sequencerStarted bool // True when sequencer mode is actually running (not just registered)
logger log.Logger

verifier SequencerVerifier
}

// NewBlockBroadcastReactor creates a new reactor.
func NewBlockBroadcastReactor(pool BlockPool, stateV2 *StateV2, waitSync bool, logger log.Logger) *BlockBroadcastReactor {
func NewBlockBroadcastReactor(
pool BlockPool,
stateV2 *StateV2,
waitSync bool,
logger log.Logger,
verifier SequencerVerifier,
) *BlockBroadcastReactor {
r := &BlockBroadcastReactor{
pool: pool,
stateV2: stateV2,
Expand All @@ -71,6 +79,7 @@ func NewBlockBroadcastReactor(pool BlockPool, stateV2 *StateV2, waitSync bool, l
seenBlocks: NewHashSet(seenBlocksCapacity),
peerSent: NewPeerHashSet(peerSentCapacity),
logger: logger.With("module", "broadcastReactor"),
verifier: verifier,
}
r.BaseReactor = *p2p.NewBaseReactor("BlockBroadcast", r)
return r
Expand Down Expand Up @@ -104,7 +113,7 @@ func (r *BlockBroadcastReactor) StartSequencerRoutines() error {
}
}

if upgrade.IsSequencer(r.stateV2.seqAddr) {
if r.stateV2.IsSequencerMode() {
go r.broadcastRoutine()
} else {
go r.applyRoutine()
Expand All @@ -118,12 +127,6 @@ func (r *BlockBroadcastReactor) OnStop() {
r.logger.Info("Stopping BlockBroadcastReactor")
}

// func (r *BlockBroadcastReactor) SwitchToSequencer() error {
// r.logger.Info("Sync mode switching to sequencer mode")
// r.waitSync = false
// return r.StartSequencerRoutines()
// }

func (r *BlockBroadcastReactor) GetChannels() []*p2p.ChannelDescriptor {
return []*p2p.ChannelDescriptor{
{ID: BlockBroadcastChannel, Priority: 6, SendQueueCapacity: 1000, RecvBufferCapacity: 50 * 4096},
Expand Down Expand Up @@ -220,15 +223,18 @@ func (r *BlockBroadcastReactor) broadcastRoutine() {
// applyRoutine: periodically try to apply blocks from unlink cache
func (r *BlockBroadcastReactor) applyRoutine() {
r.logger.Info("Starting block apply routine")
ticker := time.NewTicker(applyInterval)
defer ticker.Stop()
tickerApply := time.NewTicker(applyInterval)
tickerSync := time.NewTicker(syncInterval)
defer tickerSync.Stop()
defer tickerApply.Stop()

for {
select {
case <-r.Quit():
return
case <-ticker.C:
case <-tickerApply.C:
r.tryApplyFromCache()
case <-tickerSync.C:
r.checkSyncGap()
}
}
Expand All @@ -242,9 +248,10 @@ func (r *BlockBroadcastReactor) applyRoutine() {
// verifySig: true for broadcast channel, false for sync channel
func (r *BlockBroadcastReactor) onBlockV2(block *BlockV2, src p2p.Peer, verifySig bool) {
r.logger.Debug("onBlockV2", "number", block.Number, "hash", block.Hash.Hex(), "verifySig", verifySig)
// Dedup: skip if already seen
if r.markSeen(block.Hash) {
r.logger.Debug("onBlockV2 dedup", "number", block.Number, "hash", block.Hash.Hex(), "verifySig", verifySig)
// Dedup: skip if already seen, only for broadcast channel.
// Sync channel should not check dedup.
if r.markSeen(block.Hash) && verifySig {
r.logger.Debug("onBlockV2 broadcast dedup", "number", block.Number, "hash", block.Hash.Hex(), "verifySig", verifySig)
return
}

Expand All @@ -256,18 +263,22 @@ func (r *BlockBroadcastReactor) onBlockV2(block *BlockV2, src p2p.Peer, verifySi
// Try apply if it's the next block (height + parent match)
if r.isNextBlock(block) {
if err := r.applyBlock(block, verifySig); err != nil {
r.logger.Error("Apply failed, caching", "number", block.Number, "err", err)
r.pendingCache.Add(block, uint64(localHeight))
r.logger.Error("Apply failed", "number", block.Number, "hash", block.Hash.Hex(), "err", err)
if verifySig {
r.logger.Debug("Apply failed, caching block", "number", block.Number, "hash", block.Hash.Hex())
r.pendingCache.Add(block, uint64(localHeight))
}
return
}
} else {
// Gossip the latest block to other peers
if verifySig {
r.gossipBlock(block, src.ID())
}
} else if verifySig {
// Cache all other blocks (future or past for potential reorg)
r.logger.Debug("future block, caching", "number", block.Number, "hash", block.Hash.Hex())
r.pendingCache.Add(block, uint64(localHeight))
}

// Gossip the latest block to other peers
if verifySig {
r.gossipBlock(block, src.ID())
}
}

// tryApplyFromCache: apply blocks from unlink cache (called by applyRoutine)
Expand All @@ -290,7 +301,7 @@ func (r *BlockBroadcastReactor) tryApplyFromCache() {
break
}
r.logger.Debug("Trying to apply from cache", "number", block.Number, "hash", block.Hash.Hex())
if err := r.applyBlock(block, false); err != nil { // no signature verification
if err := r.applyBlock(block, true); err != nil { // should signature verification
r.logger.Error("Apply from cache failed", "number", block.Number, "err", err)
break
}
Expand All @@ -310,16 +321,12 @@ func (r *BlockBroadcastReactor) checkSyncGap() {
maxPeerHeight := r.pool.MaxPeerHeight()
gap := maxPeerHeight - localHeight
r.logger.Debug("Checking sync goroutines", "gap", gap, "localHeight", localHeight, "maxPeerHeight", maxPeerHeight)
if gap <= 0 {
if gap <= smallGapThreshold {
return
}

// Request missing blocks (limited to smallGapThreshold per cycle to avoid spam)
end := localHeight + int64(smallGapThreshold)
if end > maxPeerHeight {
end = maxPeerHeight
}
r.requestMissingBlocks(localHeight+1, end)
r.requestMissingBlocks(localHeight+1, maxPeerHeight)
}

// requestMissingBlocks requests blocks in range [start, end] from peers
Expand Down Expand Up @@ -421,12 +428,24 @@ func (r *BlockBroadcastReactor) verifySignature(block *BlockV2) bool {
return false
}
recoveredAddr := crypto.PubkeyToAddress(*pubKey)
expectedAddr := upgrade.SequencerAddress
if !upgrade.IsSequencer(recoveredAddr) {
r.logger.Error("Signature verification failed: address mismatch",

if r.verifier == nil {
r.logger.Error("Sequencer verifier not set", "block", block.Number)
return false
}

isSeq, err := r.verifier.IsSequencer(context.Background(), recoveredAddr)
if err != nil {
r.logger.Error("Signature verification failed: verifier error",
"block", block.Number,
"recovered", recoveredAddr.Hex(),
"expected", expectedAddr.Hex())
"err", err)
return false
}
if !isSeq {
r.logger.Error("Signature verification failed: not a valid sequencer",
"block", block.Number,
"recovered", recoveredAddr.Hex())
return false
}
return true
Expand Down
22 changes: 22 additions & 0 deletions sequencer/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package sequencer

import (
"context"

"github.com/morph-l2/go-ethereum/common"
)

// SequencerVerifier verifies if an address is the current L1 sequencer
type SequencerVerifier interface {
IsSequencer(ctx context.Context, addr common.Address) (bool, error)
}

// Signer interface for sequencer block signing
type Signer interface {
// Sign signs the data with the sequencer's private key
Sign(data []byte) ([]byte, error)
// Address returns the sequencer's address
Address() common.Address
// IsActiveSequencer checks if this signer is the current L1 sequencer
IsActiveSequencer(ctx context.Context) (bool, error)
}
Loading
Loading