Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/cdk-erigon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func runErigon(cliCtx *cli.Context) error {
ethCfg := node.NewEthConfigUrfave(cliCtx, nodeCfg, logger)

// Init for X Layer
initRunForXLayer(cliCtx, ethCfg)
initRunForXLayer(cliCtx, ethCfg, nodeCfg)

ethNode, err := node.New(cliCtx.Context, nodeCfg, ethCfg, logger)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions cmd/cdk-erigon/run_xlayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package main
import (
"github.com/ledgerwatch/erigon/cmd/utils"
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/node/nodecfg"
"github.com/ledgerwatch/erigon/zk/apollo"
"github.com/ledgerwatch/erigon/zk/metrics"
"github.com/ledgerwatch/log/v3"
"github.com/urfave/cli/v2"
)

func initRunForXLayer(cliCtx *cli.Context, ethCfg *ethconfig.Config) {
apolloClient := apollo.NewClient(ethCfg)
func initRunForXLayer(cliCtx *cli.Context, ethCfg *ethconfig.Config, nodeCfg *nodecfg.Config) {
apolloClient := apollo.NewClient(ethCfg, nodeCfg)
if apolloClient.LoadConfig() {
log.Info("Apollo config loaded")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/hack/hack_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func getOldAccInputHash(batchNum uint64) error {

func newEtherMan(cfg *ethconfig.Zk) *etherman.Client {
ethmanConf := etherman.Config{
URL: cfg.L1RpcUrl,
URL: cfg.GetL1RpcUrl(),
L1ChainID: cfg.L1ChainId,
L2ChainID: cfg.L2ChainId,
PoEAddr: cfg.AddressRollup,
Expand Down
2 changes: 1 addition & 1 deletion cmd/rpcdaemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func main() {
ethConfig := ethconfig.Defaults
ethConfig.L2RpcUrl = cfg.L2RpcUrl

gasTracker := jsonrpc.NewRecurringL1GasPriceTracker(ethConfig.AllowFreeTransactions, ethConfig.GasPriceFactor, ethConfig.DefaultGasPrice, ethConfig.MaxGasPrice, ethConfig.L1RpcUrl, ethConfig.GasPriceCheckFrequency, ethConfig.GasPriceHistoryCount)
gasTracker := jsonrpc.NewRecurringL1GasPriceTracker(ethConfig.AllowFreeTransactions, ethConfig.GasPriceFactor, ethConfig.DefaultGasPrice, ethConfig.MaxGasPrice, ethConfig.Zk.GetL1RpcUrl(), ethConfig.GasPriceCheckFrequency, ethConfig.GasPriceHistoryCount)
gasTracker.Start()
defer gasTracker.Stop()

Expand Down
20 changes: 15 additions & 5 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,10 @@ type Ethereum struct {
smtFlushDoneCh chan struct{}

// For X Layer, apollo
seqVerSyncer *syncer.L1Syncer
l1InfoTreeSyncer *syncer.L1Syncer
l1BlockSyncer *syncer.L1Syncer
seqVerSyncer *syncer.L1Syncer
l1InfoTreeSyncer *syncer.L1Syncer
l1BlockSyncer *syncer.L1Syncer
sequencerL1Syncer *syncer.L1Syncer

// For X Layer, realtime
kafkaEnabled bool
Expand Down Expand Up @@ -1048,7 +1049,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
backend.config.GasPriceFactor,
backend.config.DefaultGasPrice,
backend.config.MaxGasPrice,
backend.config.L1RpcUrl,
backend.config.Zk.GetL1RpcUrl(),
backend.config.GasPriceCheckFrequency,
backend.config.GasPriceHistoryCount,
)
Expand Down Expand Up @@ -1091,7 +1092,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger

backend.chainConfig.AllowFreeTransactions = cfg.AllowFreeTransactions
backend.chainConfig.ZkDefaultGasPrice = cfg.DefaultGasPrice
l1Urls := strings.Split(cfg.L1RpcUrl, ",")
l1Urls := strings.Split(cfg.Zk.GetL1RpcUrl(), ",")

if cfg.Zk.L1CacheEnabled {
l1Cache, err := l1_cache.NewL1Cache(ctx, path.Join(stack.DataDir(), "l1cache"), cfg.Zk.L1CachePort)
Expand Down Expand Up @@ -1357,6 +1358,10 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
cfg.Zk.XLayer.GetLogsRetries,
)

// Store the syncers in backend for dynamic updates
backend.l1BlockSyncer = l1BlockSyncer
backend.sequencerL1Syncer = sequencerL1Syncer

log.Info("RPC node: Created dedicated Sequencer L1 syncer for event pre-synchronization")
}

Expand All @@ -1382,6 +1387,11 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
)

backend.syncUnwindOrder = zkStages.ZkUnwindOrder

// For Xlayer
if cfg.Zk.XLayer.Apollo.Enable {
go backend.listenApolloForRPC(ctx, cfg)
}
}
// TODO: SEQ: prune order

Expand Down
94 changes: 94 additions & 0 deletions eth/backend_xlayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package eth
import (
"context"
"fmt"
"net/url"
"slices"
"time"

Expand All @@ -11,6 +12,9 @@ import (
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/smt/pkg/blockinfo"
"github.com/ledgerwatch/erigon/zk/apollo"
"github.com/ledgerwatch/erigon/zk/sequencer"
"github.com/ledgerwatch/erigon/zk/syncer"
"github.com/ledgerwatch/erigon/zkevm/etherman"
"github.com/ledgerwatch/erigon/zkevm/log"
)

Expand Down Expand Up @@ -68,6 +72,35 @@ func (s *Ethereum) listenApollo(ctx context.Context, cfg *ethconfig.Config) {
if l1SyncerConfigChanged {
s.updateAllL1Syncer(cfg.Zk.XLayer.GetLogsTimeout, cfg.Zk.XLayer.GetLogsRetries)
}
if slices.Contains(ethCfg.XLayer.ApolloChanged, utils.L1RpcUrlFlag.Name) {
newUrl := ethCfg.Zk.GetL1RpcUrl()

// Apply URL change immediately if sync is not running
s.applyL1UrlChange(cfg, newUrl)
}
case <-ctx.Done():
return
}
}
}

func (s *Ethereum) listenApolloForRPC(ctx context.Context, cfg *ethconfig.Config) {
stream := apollo.GetEthConfigStream()
ch, remove := stream.Sub()
defer remove()

for {
select {
case ethCfg := <-ch:
if ethCfg == nil {
continue
}
if !sequencer.IsSequencer() && slices.Contains(ethCfg.XLayer.ApolloChanged, utils.L1RpcUrlFlag.Name) {
newUrl := ethCfg.Zk.GetL1RpcUrl()

// Apply URL change immediately if sync is not running
s.applyL1UrlChange(cfg, newUrl)
}
case <-ctx.Done():
return
}
Expand All @@ -88,3 +121,64 @@ func (s *Ethereum) updateAllL1Syncer(getLogsTimeout time.Duration, getLogsRetrie
s.l1BlockSyncer.UpdateConfig(getLogsTimeout, getLogsRetries)
}
}

// recreateEtherMansFromURLs recreates etherman clients from new URLs
func (s *Ethereum) recreateEtherMansFromURLs(cfg *ethconfig.Config, urls []string) {
// Handle L1 cache if enabled
l1Urls := urls
if cfg.Zk.L1CacheEnabled && s.l1Cache != nil {
var cacheL1Urls []string
for _, l1Url := range urls {
encoded := url.QueryEscape(l1Url)
cacheL1Url := fmt.Sprintf("http://localhost:%d?endpoint=%s&chainid=%d", cfg.Zk.L1CachePort, encoded, cfg.L2ChainId)
cacheL1Urls = append(cacheL1Urls, cacheL1Url)
}
l1Urls = cacheL1Urls
}

// Create new etherman clients
newEtherManClients := make([]*etherman.Client, len(l1Urls))
for i, url := range l1Urls {
newEtherManClients[i] = newEtherMan(cfg, s.chainConfig.ChainName, url)
}

// Update the backend's etherman clients
s.etherManClients = newEtherManClients

// Convert to IEtherman interface
ethermanClients := make([]syncer.IEtherman, len(newEtherManClients))
for i, c := range newEtherManClients {
ethermanClients[i] = c.EthClient
}

// Update all L1 syncers with new etherman clients
if s.l1Syncer != nil {
s.l1Syncer.UpdateEtherMans(ethermanClients)
}
if s.seqVerSyncer != nil {
s.seqVerSyncer.UpdateEtherMans(ethermanClients)
}
if s.l1InfoTreeSyncer != nil {
s.l1InfoTreeSyncer.UpdateEtherMans(ethermanClients)
}
if s.l1BlockSyncer != nil {
s.l1BlockSyncer.UpdateEtherMans(ethermanClients)
}
if s.sequencerL1Syncer != nil {
s.sequencerL1Syncer.UpdateEtherMans(ethermanClients)
}

log.Info("Recreated etherman clients from new URLs", "urls", urls, "count", len(newEtherManClients))
}

// applyL1UrlChange applies the L1 URL change immediately
func (s *Ethereum) applyL1UrlChange(cfg *ethconfig.Config, newUrl string) {
// Update gas tracker URL
if s.gasTracker != nil {
s.gasTracker.SetRpcUrl(newUrl)
}

// Recreate etherman clients with the new URL
s.recreateEtherMansFromURLs(cfg, []string{newUrl})
log.Info("L1RpcUrl updated from apollo", "url", newUrl)
}
16 changes: 15 additions & 1 deletion eth/ethconfig/config_zkevm.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ethconfig

import (
"sync/atomic"
"time"

"github.com/c2h5oh/datasize"
Expand All @@ -17,7 +18,7 @@ type Zk struct {
L1SyncStartBlock uint64
L1SyncStopBatch uint64
L1ChainId uint64
L1RpcUrl string
L1RpcUrl atomic.Value
AddressSequencer common.Address
AddressAdmin common.Address
AddressRollup common.Address
Expand Down Expand Up @@ -111,6 +112,19 @@ var DefaultZkConfig = Zk{
XLayer: DefaultXLayerConfig,
}

// GetL1RpcUrl atomically gets the current L1 RPC URL
func (z *Zk) GetL1RpcUrl() string {
if url := z.L1RpcUrl.Load(); url != nil {
return url.(string)
}
return ""
}

// SetL1RpcUrl atomically sets the L1 RPC URL
func (z *Zk) SetL1RpcUrl(url string) {
z.L1RpcUrl.Store(url)
}

// ShouldImportInitialBatch returns true in case initial batch config file name is non-empty string.
func (c *Zk) ShouldImportInitialBatch() bool {
return c.InitialBatchCfgFile != ""
Expand Down
4 changes: 2 additions & 2 deletions turbo/cli/flags_zkevm.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) {
L1SyncStartBlock: ctx.Uint64(utils.L1SyncStartBlock.Name),
L1SyncStopBatch: ctx.Uint64(utils.L1SyncStopBatch.Name),
L1ChainId: ctx.Uint64(utils.L1ChainIdFlag.Name),
L1RpcUrl: ctx.String(utils.L1RpcUrlFlag.Name),
L1CacheEnabled: ctx.Bool(utils.L1CacheEnabledFlag.Name),
L1CachePort: ctx.Uint(utils.L1CachePortFlag.Name),
AddressSequencer: libcommon.HexToAddress(ctx.String(utils.AddressSequencerFlag.Name)),
Expand Down Expand Up @@ -263,6 +262,7 @@ func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) {
RejectLowGasPriceTolerance: ctx.Float64(utils.RejectLowGasPriceTolerance.Name),
PessimisticForkNumber: ctx.Uint64(utils.PessimisticForkNumber.Name),
}
cfg.Zk.SetL1RpcUrl(ctx.String(utils.L1RpcUrlFlag.Name))

// For X Layer
ApplyFlagsForEthXLayerConfig(ctx, cfg)
Expand All @@ -286,7 +286,7 @@ func ApplyFlagsForZkConfig(ctx *cli.Context, cfg *ethconfig.Config) {
checkFlag(utils.AddressZkevmFlag.Name, cfg.AddressZkevm)

checkFlag(utils.L1ChainIdFlag.Name, cfg.L1ChainId)
checkFlag(utils.L1RpcUrlFlag.Name, cfg.L1RpcUrl)
checkFlag(utils.L1RpcUrlFlag.Name, cfg.Zk.GetL1RpcUrl())
checkFlag(utils.L1MaticContractAddressFlag.Name, cfg.L1MaticContractAddress.Hex())
checkFlag(utils.L1FirstBlockFlag.Name, cfg.L1FirstBlock)
checkFlag(utils.RebuildTreeAfterFlag.Name, cfg.RebuildTreeAfter)
Expand Down
5 changes: 3 additions & 2 deletions turbo/jsonrpc/eth_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ type APIImpl struct {
AllowPreEIP155Transactions bool
AllowUnprotectedTxs bool
MaxGetProofRewindBlockCount int
L1RpcUrl string
L1RpcUrl atomic.Value
DefaultGasPrice uint64
MaxGasPrice uint64
GasPriceFactor float64
Expand Down Expand Up @@ -425,7 +425,6 @@ func NewEthAPI(base *BaseAPI, db kv.RoDB, dbsmt kv.RoDB, eth rpchelper.ApiBacken
AllowFreeTransactions: ethCfg.AllowFreeTransactions,
AllowPreEIP155Transactions: ethCfg.AllowPreEIP155Transactions,
MaxGetProofRewindBlockCount: maxGetProofRewindBlockCount,
L1RpcUrl: ethCfg.L1RpcUrl,
DefaultGasPrice: ethCfg.DefaultGasPrice,
MaxGasPrice: ethCfg.MaxGasPrice,
GasPriceFactor: ethCfg.GasPriceFactor,
Expand Down Expand Up @@ -453,6 +452,8 @@ func NewEthAPI(base *BaseAPI, db kv.RoDB, dbsmt kv.RoDB, eth rpchelper.ApiBacken
BlockGasLimit: ethCfg.XLayer.DynamicBlockGasLimit,
}

apii.L1RpcUrl.Store(ethCfg.Zk.GetL1RpcUrl())

// For X Layer
// Only Sequencer requires to calculate dynamic gas price periodically
// eth_gasPrice requests for the RPC nodes are all redirected to the Sequencer node (via zkevm.l2-sequencer-rpc-url)
Expand Down
18 changes: 18 additions & 0 deletions turbo/jsonrpc/eth_api_xlayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/ledgerwatch/erigon/rpc"
"github.com/ledgerwatch/erigon/turbo/rpchelper"
"github.com/ledgerwatch/erigon/zk/apollo"
"github.com/ledgerwatch/log/v3"
)

func (apii *APIImpl) GetGPCache() *GasPriceCache {
Expand Down Expand Up @@ -62,6 +63,9 @@ func (apii *APIImpl) listenApollo(ctx context.Context) {
if slices.Contains(ethCfg.XLayer.ApolloChanged, utils.DynamicBlockGasLimit.Name) {
apii.BlockGasLimit = ethCfg.XLayer.DynamicBlockGasLimit
}
if slices.Contains(ethCfg.XLayer.ApolloChanged, utils.L1RpcUrlFlag.Name) {
apii.SetL1RpcUrl(ethCfg.Zk.GetL1RpcUrl())
}
case <-ctx.Done():
return
}
Expand Down Expand Up @@ -106,6 +110,20 @@ func MarshalReceipt(receipt *types.Receipt, txn types.Transaction, chainConfig *
return marshalReceipt(receipt, txn, chainConfig, header, txnHash, signed)
}

// GetL1RpcUrl atomically gets the current L1 RPC URL
func (api *APIImpl) GetL1RpcUrl() string {
if url := api.L1RpcUrl.Load(); url != nil {
return url.(string)
}
return ""
}

// SetL1RpcUrl atomically sets the L1 RPC URL
func (api *APIImpl) SetL1RpcUrl(url string) {
api.L1RpcUrl.Store(url)
log.Info("API L1RpcUrl updated", "new_url", url)
}

const (
// maxCacheSize = 300sec (TTL) / 10sec (UpdatePeriod) = 30
maxCacheSize = 30
Expand Down
Loading