diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 69ea8952f..e115d53ea 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -119,6 +119,8 @@ jobs: with: version: v1.59 args: --timeout 15m --verbose ${{ env.GO_LINT_MODULES }} + env: + GOWORK: off - name: Run Build run: echo ${GO_MODULES} | tr ' ' '\n' | xargs -I {} sh -c 'go build -C {} -v ./...' diff --git a/indexer/README.md b/indexer/README.md deleted file mode 100644 index 34b40e3fd..000000000 --- a/indexer/README.md +++ /dev/null @@ -1 +0,0 @@ -# indexer diff --git a/indexer/chain.go b/indexer/chain.go new file mode 100644 index 000000000..821d5b49b --- /dev/null +++ b/indexer/chain.go @@ -0,0 +1,119 @@ +package main + +import ( + "io" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/lmittmann/w3" + "github.com/lmittmann/w3/module/eth" + "github.com/lmittmann/w3/w3types" +) + +// Chain defines an interface for interacting with a blockchain. +type Chain interface { + // BlockNumber returns the current block number. + BlockNumber() (*big.Int, error) + + // Blocks returns [start, end] blocks. + Blocks(start, end *big.Int) ([]*types.Block, error) + + // TxReceipts returns the receipts for the given transaction hashes if they exist. + TxReceipts(txHashes []common.Hash) (map[string]*types.Receipt, error) +} + +var ( + _ Chain = (*EthereumChain)(nil) + _ io.Closer = (*EthereumChain)(nil) +) + +// EthereumChain implements the Chain interface and +// represents a connection to the Ethereum blockchain. +type EthereumChain struct { + client *w3.Client +} + +// NewEthereumChain creates a new EthereumChain instance connected to the specified URL. +func NewEthereumChain(url string) (*EthereumChain, error) { + client, err := w3.Dial(url) + if err != nil { + return nil, err + } + return &EthereumChain{client: client}, nil +} + +// BlockNumber implements the Chain.BlockNumber method. +func (e *EthereumChain) BlockNumber() (*big.Int, error) { + var number big.Int + if err := e.client.Call(eth.BlockNumber().Returns(&number)); err != nil { + return nil, err + } + return &number, nil +} + +// Blocks implements the Chain.Blocks method. +func (e *EthereumChain) Blocks(start, end *big.Int) ([]*types.Block, error) { + var ( + window = new(big.Int).Sub(end, start) + length = new(big.Int).Abs(window).Uint64() + 1 + numbers = make([]*big.Int, length) + ) + + switch window.Sign() { + case -1: + for i := 0; i < len(numbers); i++ { + numbers[i] = new(big.Int).Sub(end, new(big.Int).SetUint64(uint64(i))) + } + case 1: + for i := 0; i < len(numbers); i++ { + numbers[i] = new(big.Int).Add(start, new(big.Int).SetUint64(uint64(i))) + } + default: + return nil, nil + } + + var ( + batch = make([]w3types.RPCCaller, len(numbers)) + blocks = make([]*types.Block, len(numbers)) + ) + + for i := range numbers { + block := new(types.Block) + batch[i] = eth.BlockByNumber(numbers[i]).Returns(block) + blocks[i] = block + } + + if err := e.client.Call(batch...); err != nil { + return nil, err + } + return blocks, nil +} + +// TxReceipts implements the Chain.TxReceipts method. +func (e *EthereumChain) TxReceipts(txHashes []common.Hash) (map[string]*types.Receipt, error) { + var ( + batch = make([]w3types.RPCCaller, len(txHashes)) + txReceipts = make([]types.Receipt, len(txHashes)) + ) + + for i, txHash := range txHashes { + batch[i] = eth.TxReceipt(txHash).Returns(&txReceipts[i]) + } + + if err := e.client.Call(batch...); err != nil { + return map[string]*types.Receipt{}, nil + } + + res := make(map[string]*types.Receipt, len(txReceipts)) + for _, txReceipt := range txReceipts { + res[txReceipt.TxHash.Hex()] = &txReceipt + } + return res, nil +} + +// Close closes the connection to the Ethereum blockchain. +// It implements the io.Closer interface. +func (e *EthereumChain) Close() error { + return e.client.Close() +} diff --git a/indexer/cmd/indexer.go b/indexer/cmd/indexer.go deleted file mode 100644 index cf78d9cb5..000000000 --- a/indexer/cmd/indexer.go +++ /dev/null @@ -1,387 +0,0 @@ -package main - -import ( - "context" - "encoding/hex" - "fmt" - "log/slog" - "math/big" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/primev/mev-commit/indexer/pkg/ethclient" - "github.com/primev/mev-commit/indexer/pkg/store" -) - -const TimeLayOut = "2006-01-02T15:04:05.000Z" - -type Config struct { - EthClient ethclient.EthereumClient - Storage store.Storage - IndexInterval time.Duration - AccountAddresses []string - MinBlocksToFetchAccountAddresses uint - TimeoutToFetchAccountAddresses time.Duration -} - -type BlockchainIndexer struct { - ethClient ethclient.EthereumClient - storage store.Storage - forwardBlockChan chan *types.Block - backwardBlockChan chan *types.Block - txChan chan *types.Transaction - indexInterval time.Duration - lastForwardIndexedBlock *big.Int - lastBackwardIndexedBlock *big.Int - logger *slog.Logger - accountAddresses []string - blockCounter uint - minBlocksToFetchAccountAddresses uint - timeoutToFetchAccountAddresses time.Duration -} - -func NewBlockchainIndexer(config Config) *BlockchainIndexer { - return &BlockchainIndexer{ - ethClient: config.EthClient, - storage: config.Storage, - forwardBlockChan: make(chan *types.Block, 100), - backwardBlockChan: make(chan *types.Block, 100), - txChan: make(chan *types.Transaction, 100), - indexInterval: config.IndexInterval, - logger: slog.Default(), - accountAddresses: config.AccountAddresses, - blockCounter: 0, - minBlocksToFetchAccountAddresses: config.MinBlocksToFetchAccountAddresses, - timeoutToFetchAccountAddresses: config.TimeoutToFetchAccountAddresses, - } -} - -func (bi *BlockchainIndexer) Start(ctx context.Context) error { - if err := bi.storage.CreateIndices(ctx); err != nil { - return fmt.Errorf("failed to create indices: %w", err) - } - - latestBlockNumber, err := bi.ethClient.BlockNumber(ctx) - bi.logger.Info("latest block number", "block number", latestBlockNumber) - if err != nil { - return fmt.Errorf("failed to get latest block number: %w", err) - } - - if err = bi.initializeForwardIndex(ctx, latestBlockNumber.Uint64()); err != nil { - return err - } - - if err = bi.initializeBackwardIndex(ctx, latestBlockNumber.Uint64()); err != nil { - return err - } - - go bi.fetchForwardBlocks(ctx) - go bi.processForwardBlocks(ctx) - go bi.fetchBackwardBlocks(ctx) - go bi.processBackwardBlocks(ctx) - go bi.IndexAccountBalances(ctx) - - <-ctx.Done() - return ctx.Err() -} - -func (bi *BlockchainIndexer) initializeForwardIndex(ctx context.Context, latestBlockNumber uint64) error { - lastForwardIndexedBlock, err := bi.storage.GetLastIndexedBlock(ctx, "forward") - if err != nil { - return fmt.Errorf("failed to get last forward indexed block: %w", err) - } - - bi.logger.Info("last indexed block", "blockNumber", lastForwardIndexedBlock, "direction", "forward") - - if lastForwardIndexedBlock == nil || lastForwardIndexedBlock.Sign() == 0 { - bi.lastForwardIndexedBlock = new(big.Int).SetUint64(latestBlockNumber - 1) - } else { - bi.lastForwardIndexedBlock = lastForwardIndexedBlock - } - - return nil -} - -func (bi *BlockchainIndexer) initializeBackwardIndex(ctx context.Context, latestBlockNumber uint64) error { - lastBackwardIndexedBlock, err := bi.storage.GetLastIndexedBlock(ctx, "backward") - if err != nil { - return fmt.Errorf("failed to get last backward indexed block: %w", err) - } - - bi.logger.Info("last indexed block", "blockNumber", lastBackwardIndexedBlock, "direction", "backward") - - if lastBackwardIndexedBlock == nil || lastBackwardIndexedBlock.Sign() == 0 { - bi.lastBackwardIndexedBlock = new(big.Int).SetUint64(latestBlockNumber) - } else { - bi.lastBackwardIndexedBlock = lastBackwardIndexedBlock - } - - return nil -} - -func (bi *BlockchainIndexer) fetchForwardBlocks(ctx context.Context) { - ticker := time.NewTicker(bi.indexInterval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - latestBlockNumber, err := bi.ethClient.BlockNumber(ctx) - if err != nil { - bi.logger.Error("failed to get latest block number", "error", err) - continue - } - - for blockNum := new(big.Int).Add(bi.lastForwardIndexedBlock, big.NewInt(1)); blockNum.Cmp(latestBlockNumber) <= 0; blockNum.Add(blockNum, big.NewInt(5)) { - endBlockNum := new(big.Int).Add(blockNum, big.NewInt(4)) - if endBlockNum.Cmp(latestBlockNumber) > 0 { - endBlockNum.Set(latestBlockNumber) - } - - var blockNums []*big.Int - for bn := new(big.Int).Set(blockNum); bn.Cmp(endBlockNum) <= 0; bn.Add(bn, big.NewInt(1)) { - blockNums = append(blockNums, new(big.Int).Set(bn)) - } - - blocks, err := bi.fetchBlocks(ctx, blockNums) - if err != nil { - bi.logger.Error("failed to fetch blocks", "start", blockNum, "end", endBlockNum, "error", err) - continue - } - - for _, block := range blocks { - bi.forwardBlockChan <- block - bi.lastForwardIndexedBlock.Set(block.Number()) - bi.blockCounter++ - } - } - } - } -} - -func (bi *BlockchainIndexer) fetchBackwardBlocks(ctx context.Context) { - ticker := time.NewTicker(bi.indexInterval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - if bi.lastBackwardIndexedBlock.Sign() <= 0 { - return - } - zeroBigNum := big.NewInt(0) - blockNum := new(big.Int).Sub(bi.lastBackwardIndexedBlock, big.NewInt(1)) - - for i := 0; blockNum.Cmp(zeroBigNum) >= 0; i++ { - endBlockNum := new(big.Int).Sub(blockNum, big.NewInt(4)) - if endBlockNum.Cmp(zeroBigNum) < 0 { - endBlockNum.Set(zeroBigNum) - } - - var blockNums []*big.Int - for bn := new(big.Int).Set(blockNum); bn.Cmp(endBlockNum) >= 0; bn.Sub(bn, big.NewInt(1)) { - blockNums = append(blockNums, new(big.Int).Set(bn)) - } - - blocks, err := bi.fetchBlocks(ctx, blockNums) - if err != nil { - bi.logger.Error("failed to fetch blocks", "start", blockNum, "end", endBlockNum, "error", err) - break - } - - for _, block := range blocks { - bi.backwardBlockChan <- block - bi.lastBackwardIndexedBlock.Set(block.Number()) - if block.Number().Cmp(zeroBigNum) == 0 { - bi.logger.Info("done fetching backward blocks...") - return - } - } - blockNum.Sub(endBlockNum, big.NewInt(1)) - } - } - } -} - -func (bi *BlockchainIndexer) processForwardBlocks(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case block := <-bi.forwardBlockChan: - if err := bi.indexBlock(ctx, block); err != nil { - bi.logger.Error("failed to index block", "error", err) - } - if err := bi.indexTransactions(ctx, block); err != nil { - bi.logger.Error("failed to index transactions", "error", err) - } - } - } -} - -func (bi *BlockchainIndexer) processBackwardBlocks(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case block := <-bi.backwardBlockChan: - if err := bi.indexBlock(ctx, block); err != nil { - bi.logger.Error("failed to index block", "error", err) - } - if err := bi.indexTransactions(ctx, block); err != nil { - bi.logger.Error("failed to index transactions", "error", err) - } - if block.Number().Cmp(big.NewInt(0)) == 0 { - bi.logger.Info("done processing backward blocks...") - return - } - } - } -} - -func (bi *BlockchainIndexer) IndexAccountBalances(ctx context.Context) { - timer := time.NewTimer(bi.timeoutToFetchAccountAddresses) - defer timer.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-timer.C: - if err := bi.indexBalances(ctx, 0); err != nil { - return - } - bi.blockCounter = 0 - timer.Reset(bi.timeoutToFetchAccountAddresses) - default: - if bi.blockCounter >= bi.minBlocksToFetchAccountAddresses { - if err := bi.indexBalances(ctx, bi.lastForwardIndexedBlock.Uint64()); err != nil { - return - } - bi.blockCounter = 0 - timer.Reset(bi.timeoutToFetchAccountAddresses) - } - } - } -} - -func (bi *BlockchainIndexer) indexBalances(ctx context.Context, blockNumber uint64) error { - addresses, err := bi.storage.GetAddresses(ctx) - if err != nil { - return err - } - - addresses = append(addresses, bi.accountAddresses...) - - addrs := make([]common.Address, len(addresses)) - for i, address := range addresses { - addrs[i] = common.HexToAddress(address) - } - - accBalances, err := bi.ethClient.AccountBalances(ctx, addrs, blockNumber) - if err != nil { - return err - } - - return bi.storage.IndexAccountBalances(ctx, accBalances) -} - -func (bi *BlockchainIndexer) indexBlock(ctx context.Context, block *types.Block) error { - timestamp := time.UnixMilli(int64(block.Time())).UTC().Format(TimeLayOut) - indexBlock := &store.IndexBlock{ - Number: block.NumberU64(), - Hash: block.Hash().Hex(), - ParentHash: block.ParentHash().Hex(), - Root: block.Root().Hex(), - Nonce: block.Nonce(), - Timestamp: timestamp, - Transactions: len(block.Transactions()), - BaseFee: block.BaseFee().Uint64(), - GasLimit: block.GasLimit(), - GasUsed: block.GasUsed(), - Difficulty: block.Difficulty().Uint64(), - ExtraData: hex.EncodeToString(block.Extra()), - } - - return bi.storage.IndexBlock(ctx, indexBlock) -} - -func (bi *BlockchainIndexer) indexTransactions(ctx context.Context, block *types.Block) error { - var transactions []*store.IndexTransaction - var txHashes []string - - for _, tx := range block.Transactions() { - from, err := types.Sender(types.NewCancunSigner(tx.ChainId()), tx) - if err != nil { - return fmt.Errorf("failed to derive sender: %w", err) - } - - v, r, s := tx.RawSignatureValues() - timestamp := tx.Time().UTC().Format(TimeLayOut) - transaction := &store.IndexTransaction{ - Hash: tx.Hash().Hex(), - From: from.Hex(), - Gas: tx.Gas(), - Nonce: tx.Nonce(), - BlockHash: block.Hash().Hex(), - BlockNumber: block.NumberU64(), - ChainId: tx.ChainId().String(), - V: v.String(), - R: r.String(), - S: s.String(), - Input: hex.EncodeToString(tx.Data()), - Timestamp: timestamp, - } - - if tx.To() != nil { - transaction.To = tx.To().Hex() - } - if tx.GasPrice() != nil { - transaction.GasPrice = tx.GasPrice().Uint64() - } - if tx.GasTipCap() != nil { - transaction.GasTipCap = tx.GasTipCap().Uint64() - } - if tx.GasFeeCap() != nil { - transaction.GasFeeCap = tx.GasFeeCap().Uint64() - } - if tx.Value() != nil { - transaction.Value = tx.Value().String() - } - - transactions = append(transactions, transaction) - txHashes = append(txHashes, tx.Hash().Hex()) - } - - receipts, err := bi.fetchReceipts(ctx, txHashes) - if err != nil { - return fmt.Errorf("failed to fetch transaction receipts: %w", err) - } - - for _, tx := range transactions { - if receipt, ok := receipts[tx.Hash]; ok { - tx.Status = receipt.Status - tx.GasUsed = receipt.GasUsed - tx.CumulativeGasUsed = receipt.CumulativeGasUsed - tx.ContractAddress = receipt.ContractAddress.Hex() - tx.TransactionIndex = receipt.TransactionIndex - tx.ReceiptBlockHash = receipt.BlockHash.Hex() - tx.ReceiptBlockNumber = receipt.BlockNumber.Uint64() - } - } - - return bi.storage.IndexTransactions(ctx, transactions) -} - -func (bi *BlockchainIndexer) fetchReceipts(ctx context.Context, txHashes []string) (map[string]*types.Receipt, error) { - return bi.ethClient.TxReceipts(ctx, txHashes) -} - -func (bi *BlockchainIndexer) fetchBlocks(ctx context.Context, blockNums []*big.Int) ([]*types.Block, error) { - return bi.ethClient.GetBlocks(ctx, blockNums) -} diff --git a/indexer/cmd/main.go b/indexer/cmd/main.go deleted file mode 100644 index 832dcad0c..000000000 --- a/indexer/cmd/main.go +++ /dev/null @@ -1,192 +0,0 @@ -package main - -import ( - "context" - "fmt" - "log/slog" - "os" - "os/signal" - "strings" - "syscall" - "time" - - "github.com/primev/mev-commit/indexer/pkg/ethclient" - "github.com/primev/mev-commit/indexer/pkg/logutil" - "github.com/primev/mev-commit/indexer/pkg/store" - "github.com/urfave/cli/v2" -) - -var parsedAddresses []string - -func main() { - app := &cli.App{ - Name: "blockchain-indexer", - Usage: "Index blockchain data into Elasticsearch", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: "ethereum-endpoint", - EnvVars: []string{"INDEXER_ETHEREUM_ENDPOINT"}, - Value: "http://localhost:8545", - Usage: "Ethereum node endpoint", - }, - &cli.StringFlag{ - Name: "elasticsearch-endpoint", - EnvVars: []string{"INDEXER_ELASTICSEARCH_ENDPOINT"}, - Value: "http://127.0.0.1:9200", - Usage: "Elasticsearch endpoint", - }, - &cli.StringFlag{ - Name: "es-username", - EnvVars: []string{"INDEXER_ES_USERNAME"}, - Value: "", - Usage: "Elasticsearch username", - }, - &cli.StringFlag{ - Name: "es-password", - EnvVars: []string{"INDEXER_ES_PASSWORD"}, - Value: "", - Usage: "Elasticsearch password", - }, - &cli.DurationFlag{ - Name: "index-interval", - EnvVars: []string{"INDEXER_INDEX_INTERVAL"}, - Value: 15 * time.Second, - Usage: "Interval between indexing operations", - }, - &cli.StringFlag{ - Name: "log-level", - EnvVars: []string{"INDEXER_LOG_LEVEL"}, - Value: "info", - Usage: "Log level (debug, info, warn, error)", - }, - &cli.StringFlag{ - Name: "log-fmt", - Usage: "log format to use, options are 'text' or 'json'", - EnvVars: []string{"INDEXER_LOG_FMT"}, - Value: "text", - Action: func(ctx *cli.Context, v string) error { - if v != "text" && v != "json" { - return fmt.Errorf("invalid log format: %s. Must be 'text' or 'json'", v) - } - return nil - }, - }, - &cli.StringFlag{ - Name: "log-tags", - Usage: "log tags is a comma-separated list of pairs that will be inserted into each log line", - EnvVars: []string{"INDEXER_LOG_TAGS"}, - Action: func(ctx *cli.Context, s string) error { - for i, p := range strings.Split(s, ",") { - if len(strings.Split(p, ":")) != 2 { - return fmt.Errorf("invalid log-tags at index %d, expecting ", i) - } - } - return nil - }, - }, - &cli.StringFlag{ - Name: "account-addresses", - EnvVars: []string{"INDEXER_ACCOUNT_ADDRESSES"}, - Value: "0xfA0B0f5d298d28EFE4d35641724141ef19C05684", - Usage: "comma-separated account addresses", - Action: func(c *cli.Context, value string) error { - parsedAddresses = parseAddresses(value) - return nil - }, - }, - &cli.UintFlag{ - Name: "min-blocks-to-fetch-account-addrs", - EnvVars: []string{"INDEXER_MIN_BLOCK_TO_FETCH_ACCOUNT_ADDRS"}, - Value: 10, - Usage: "minimum number of blocks needed to pass prior to fetching account addresses", - }, - &cli.DurationFlag{ - Name: "timeout-to-fetch-account-addrs", - EnvVars: []string{"INDEXER_TIMEOUT_TO_FETCH_ACCOUNT_ADDRS"}, - Value: 5 * time.Second, - Usage: "timeout in seconds to fetch account addresses", - }, - }, - Action: run, - } - - if err := app.Run(os.Args); err != nil { - fmt.Fprintf(os.Stderr, "Error: %v\n", err) - os.Exit(1) - } -} - -func parseAddresses(input string) []string { - addresses := strings.Split(input, ",") - for i, addr := range addresses { - addresses[i] = strings.TrimSpace(addr) - } - return addresses -} - -func parseLogTags(tagString string) map[string]string { - tags := make(map[string]string) - for _, p := range strings.Split(tagString, ",") { - parts := strings.Split(p, ":") - if len(parts) == 2 { - tags[parts[0]] = parts[1] - } - } - return tags -} - -func run(cliCtx *cli.Context) error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - ethClient, err := ethclient.NewW3EthereumClient(cliCtx.String("ethereum-endpoint")) - if err != nil { - slog.Error("failed to create Ethereum client", "error", err) - return err - } - - esClient, err := store.NewESClient(cliCtx.String("elasticsearch-endpoint"), cliCtx.String("es-username"), cliCtx.String("es-password")) - if err != nil { - slog.Error("failed to create Elasticsearch client", "error", err) - return err - } - defer func() { - if err := esClient.Close(ctx); err != nil { - slog.Error("Failed to close Elasticsearch client", "error", err) - } - }() - - config := Config{ - EthClient: ethClient, - Storage: esClient, - IndexInterval: cliCtx.Duration("index-interval"), - AccountAddresses: parsedAddresses, - MinBlocksToFetchAccountAddresses: cliCtx.Uint("min-blocks-to-fetch-account-addrs"), - TimeoutToFetchAccountAddresses: cliCtx.Duration("timeout-to-fetch-account-addrs"), - } - blockchainIndexer := NewBlockchainIndexer(config) - - logTags := parseLogTags(cliCtx.String("log-tags")) - // Set log level - err = logutil.SetLogLevel(cliCtx.String("log-level"), cliCtx.String("log-fmt"), logTags) - if err != nil { - return err - } - - if err = blockchainIndexer.Start(ctx); err != nil { - slog.Error("failed to start blockchain indexer", "error", err) - return err - } - - // Set up graceful shutdown - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, syscall.SIGTERM) - - // Wait for interrupt signal - <-c - slog.Info("shutting down gracefully...") - cancel() - // Wait for some time to allow ongoing operations to complete - time.Sleep(5 * time.Second) - return nil -} diff --git a/indexer/find_missing_block_num.py b/indexer/find_missing_block_num.py deleted file mode 100644 index 61d879b08..000000000 --- a/indexer/find_missing_block_num.py +++ /dev/null @@ -1,49 +0,0 @@ -from elasticsearch import Elasticsearch, helpers - -# Initialize Elasticsearch client with authentication -es = Elasticsearch( - ["http://localhost:9200"], # Replace with your Elasticsearch host if different - basic_auth=("elastic", "mev-commit") -) - -# Function to get all numbers using scroll API -def get_all_numbers(): - numbers = [] - scroll_size = 10000 - - # Initial search request - response = es.search( - index="blocks", - body={ - "size": scroll_size, - "_source": ["number"], - "sort": [{"number": "asc"}] - }, - scroll='2m' - ) - - # Get the scroll ID - scroll_id = response['_scroll_id'] - - # Get the first batch of numbers - numbers.extend([hit['_source']['number'] for hit in response['hits']['hits']]) - - # Continue scrolling until no more hits - while len(response['hits']['hits']): - response = es.scroll(scroll_id=scroll_id, scroll='2m') - numbers.extend([hit['_source']['number'] for hit in response['hits']['hits']]) - - return numbers - -# Get all numbers -all_numbers = get_all_numbers() - -# Find missing numbers -missing_numbers = [] -for i in range(len(all_numbers) - 1): - current_number = all_numbers[i] - next_number = all_numbers[i + 1] - if next_number != current_number + 1: - missing_numbers.extend(range(current_number + 1, next_number)) - -print("Missing numbers:", missing_numbers) diff --git a/indexer/go.mod b/indexer/go.mod index 130fe2632..52bf8ad51 100644 --- a/indexer/go.mod +++ b/indexer/go.mod @@ -5,9 +5,9 @@ go 1.22 require ( github.com/elastic/go-elasticsearch/v8 v8.14.0 github.com/ethereum/go-ethereum v1.14.6 - github.com/lmittmann/tint v1.0.5 github.com/lmittmann/w3 v0.16.7 github.com/urfave/cli/v2 v2.27.1 + golang.org/x/sync v0.7.0 ) require ( @@ -48,7 +48,6 @@ require ( go.opentelemetry.io/otel/trace v1.24.0 // indirect golang.org/x/crypto v0.23.0 // indirect golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect - golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.20.0 // indirect golang.org/x/time v0.5.0 // indirect google.golang.org/protobuf v1.34.1 // indirect diff --git a/indexer/go.sum b/indexer/go.sum index 91c3dde9f..1372e18f6 100644 --- a/indexer/go.sum +++ b/indexer/go.sum @@ -91,8 +91,6 @@ github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0 github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= -github.com/lmittmann/tint v1.0.5 h1:NQclAutOfYsqs2F1Lenue6OoWCajs5wJcP3DfWVpePw= -github.com/lmittmann/tint v1.0.5/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE= github.com/lmittmann/w3 v0.16.7 h1:1xlsXQ5xTbW1Rfa7ClH+KUTfscmuVgl0bzNkFfzSoz8= github.com/lmittmann/w3 v0.16.7/go.mod h1:30EWzDfQAvqdSdTDEtNvOV4Ad6qpEX0WP2fcLKzQE5I= github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU= diff --git a/indexer/indexer.go b/indexer/indexer.go new file mode 100644 index 000000000..9dbe255f5 --- /dev/null +++ b/indexer/indexer.go @@ -0,0 +1,177 @@ +package main + +import ( + "context" + "encoding/hex" + "fmt" + "log/slog" + "math/big" + "time" + + "github.com/ethereum/go-ethereum/core/types" +) + +// Config holds indexer configuration. +type Config struct { + logger *slog.Logger + chain Chain + store Store +} + +// ForwardIndexer is an indexer that indexes from the +// last indexed block and retrieves the blocks going forward. +type ForwardIndexer struct { + *Config + + lastIndexedBlock *big.Int +} + +// Run starts the indexer process. +func (fi *ForwardIndexer) Run(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return nil + default: + blocks, err := fi.fetchBlocks(ctx) + if err != nil { + fi.logger.Error("fetch blocks", "error", err) + // TODO: try again after some backoff time before continue... + continue + } + err = fi.storeBlocks(ctx, blocks) + if err != nil { + fi.logger.Error("store blocks", "error", err) + } + } + } +} + +func (fi *ForwardIndexer) fetchBlocks(ctx context.Context) ([]*types.Block, error) { + const window = 5 + + var ( + res []*types.Block + ) + + last := fi.lastIndexedBlock + curr, err := fi.chain.BlockNumber() + if err != nil { + return nil, fmt.Errorf("block number: %w", err) + } + + var ( + start = new(big.Int).Add(last, big.NewInt(1)) + end = new(big.Int).Add(start, big.NewInt(window)) + ) + for start.Cmp(curr) <= 0 { + if end.Cmp(curr) > 0 { + end.Set(curr) + } + + blocks, err := fi.chain.Blocks(start, end) + if err != nil { + fi.logger.Error("fetch blocks", "start", start, "end", end, "error", err) + continue + } + res = append(res, blocks...) + + start = new(big.Int).Add(end, big.NewInt(1)) + end = new(big.Int).Add(start, big.NewInt(window)) + } + + return res, nil +} + +func (fi *ForwardIndexer) storeBlocks(ctx context.Context, blocks []*types.Block) error { + for _, block := range blocks { + txs, txh, err := parseBlockTransactions(block) + if err != nil { + return fmt.Errorf("parse block #%v transactions: %w", block.Number(), err) + } + + rcs, err := fi.chain.TxReceipts(txh) + if err != nil { + return fmt.Errorf("fetch transaction receipts for block #%v: %w", block.Number(), err) + } + + for _, tx := range txs { + if rc, ok := rcs[tx.Hash]; ok { + tx.Status = rc.Status + tx.GasUsed = rc.GasUsed + tx.CumulativeGasUsed = rc.CumulativeGasUsed + tx.ContractAddress = rc.ContractAddress.Hex() + tx.TransactionIndex = rc.TransactionIndex + tx.ReceiptBlockHash = rc.BlockHash.Hex() + tx.ReceiptBlockNumber = rc.BlockNumber.Uint64() + } + } + + bi := &BlockItem{ + Number: block.NumberU64(), + Hash: block.Hash().Hex(), + ParentHash: block.ParentHash().Hex(), + Root: block.Root().Hex(), + Nonce: block.Nonce(), + Timestamp: time.UnixMilli(int64(block.Time())).UTC().Format(timeMilliZ), + Transactions: len(block.Transactions()), + BaseFee: block.BaseFee().Uint64(), + GasLimit: block.GasLimit(), + GasUsed: block.GasUsed(), + Difficulty: block.Difficulty().Uint64(), + ExtraData: hex.EncodeToString(block.Extra()), + } + + if err := fi.store.IndexBlock(ctx, bi); err != nil { + return fmt.Errorf("index block #%v: %w", block.Number(), err) + } + + if err := fi.store.IndexTransactions(ctx, txs); err != nil { + return fmt.Errorf("index transactions for block #%v: %w", block.Number(), err) + } + + fi.lastIndexedBlock = block.Number() + } + + return nil +} + +// BackwardIndexer is an indexer that indexes from the +// last indexed block and retrieves the blocks going backward. +type BackwardIndexer struct { + *Config + + lastIndexedBlock *big.Int +} + +// Run starts the indexer process. +func (bi *BackwardIndexer) Run(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return nil + default: + // TODO: implement... + } + } +} + +// BalanceIndexer is an indexer that indexes the +// balance of all accounts in the blockchain. +type BalanceIndexer struct { + *Config + + lastIndexedBlock *big.Int +} + +// Run starts the indexer process. +func (bi *BalanceIndexer) Run(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return nil + default: + // TODO: implement... + } + } +} diff --git a/indexer/main.go b/indexer/main.go new file mode 100644 index 000000000..ee24cbf81 --- /dev/null +++ b/indexer/main.go @@ -0,0 +1,230 @@ +package main + +import ( + "context" + "errors" + "fmt" + "io" + "log/slog" + "math/big" + "net/url" + "os" + "os/signal" + "strings" + "syscall" + + "github.com/urfave/cli/v2" + "golang.org/x/sync/errgroup" +) + +var ( + ethereumURLFlag = &cli.StringFlag{ + Name: "ethereum-url", + EnvVars: []string{"INDEXER_ETHEREUM_URL"}, + Value: "http://localhost:8545", + Usage: "Ethereum node URL", + Action: func(_ *cli.Context, s string) error { + _, err := url.Parse(s) + return err + }, + } + + elasticsearchURLFlag = &cli.StringFlag{ + Name: "elasticsearch-url", + EnvVars: []string{"INDEXER_ELASTICSEARCH_URL"}, + Value: "http://127.0.0.1:9200", + Usage: "Elasticsearch URL", + Action: func(_ *cli.Context, s string) error { + _, err := url.Parse(s) + return err + }, + } + + elasticsearchUsernameFlag = &cli.StringFlag{ + Name: "elasticsearch-username", + EnvVars: []string{"INDEXER_ELASTICSEARCH_USERNAME"}, + Value: "", + Usage: "Elasticsearch username", + } + + elasticsearchPasswordFlag = &cli.StringFlag{ + Name: "elasticsearch-password", + EnvVars: []string{"INDEXER_ELASTICSEARCH_PASSWORD"}, + Value: "", + Usage: "Elasticsearch password", + } + + logFormatFlag = &cli.StringFlag{ + Name: "log-fmt", + Usage: "log format to use, options are 'text' or 'json'", + EnvVars: []string{"INDEXER_LOG_FMT"}, + Value: "text", + Action: func(_ *cli.Context, v string) error { + if v != "text" && v != "json" { + return fmt.Errorf("invalid log format: %s", v) + } + return nil + }, + } + + logLevelFlag = &cli.StringFlag{ + Name: "log-level", + EnvVars: []string{"INDEXER_LOG_LEVEL"}, + Value: "info", + Usage: "Log level (debug, info, warn, error)", + Action: func(_ *cli.Context, v string) error { + if err := new(slog.LevelVar).UnmarshalText([]byte(v)); err != nil { + return fmt.Errorf("invalid log level: %w", err) + } + return nil + }, + } + + logTagsFlag = &cli.StringFlag{ + Name: "log-tags", + Usage: "log tags is a comma-separated list of pairs that will be inserted into each log line", + EnvVars: []string{"INDEXER_LOG_TAGS"}, + Action: func(_ *cli.Context, s string) error { + for i, p := range strings.Split(s, ",") { + if len(strings.Split(p, ":")) != 2 { + return fmt.Errorf("invalid tag %q at index %d", p, i) + } + } + return nil + }, + } +) + +func main() { + app := &cli.App{ + Name: "indexer", + Usage: "Index Ethereum blockchain data into Elasticsearch", + Flags: []cli.Flag{ + ethereumURLFlag, + elasticsearchURLFlag, + elasticsearchUsernameFlag, + elasticsearchPasswordFlag, + logFormatFlag, + logLevelFlag, + logTagsFlag, + }, + Action: run, + } + + if err := app.Run(os.Args); err != nil { + _, _ = fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) + + } +} + +func run(cctx *cli.Context) error { + var ( + closers []io.Closer + ) + + ctx, stop := signal.NotifyContext( + cctx.Context, + syscall.SIGINT, + syscall.SIGTERM, + ) + defer stop() + + logger, err := newLogger( + os.Stdout, + cctx.String(logLevelFlag.Name), + cctx.String(logFormatFlag.Name), + cctx.String(logTagsFlag.Name), + ) + if err != nil { + return fmt.Errorf("create logger: %w", err) + } + + eth, err := NewEthereumChain( + cctx.String(ethereumURLFlag.Name), + ) + if err != nil { + return fmt.Errorf("create Ethereum chain: %w", err) + } + closers = append(closers, eth) + + esc, err := NewElasticsearchStore( + cctx.String(elasticsearchURLFlag.Name), + cctx.String(elasticsearchUsernameFlag.Name), + cctx.String(elasticsearchPasswordFlag.Name), + ) + if err != nil { + return fmt.Errorf("create Elasticsearch store: %w", err) + } + + if err := esc.CreateIndexes(ctx); err != nil { + return fmt.Errorf("create indexes: %w", err) + } + + last, err := esc.LastIndexedBlock(ctx, SortOrderAsc) + if err != nil { + return fmt.Errorf("retrive last indexed block: %w", err) + } + logger.Info("last indexed block", "number", last) + + curr, err := eth.BlockNumber() + if err != nil { + return fmt.Errorf("retrive block number: %w", err) + } + + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + n := new(big.Int).Set(last) + if n.Sign() == 0 { + n.Add(curr, big.NewInt(-1)) + } + return (&ForwardIndexer{ + Config: &Config{ + logger: logger.With("indexer", "forward"), + chain: eth, + store: esc, + }, + lastIndexedBlock: n, + }).Run(ctx) + }) + g.Go(func() error { + n := new(big.Int).Set(last) + if n.Sign() == 0 { + n = new(big.Int).Set(curr) + } + return (&BackwardIndexer{ + Config: &Config{ + logger: logger.With("indexer", "backward"), + chain: eth, + store: esc, + }, + lastIndexedBlock: n, + }).Run(ctx) + }) + g.Go(func() error { + n := new(big.Int).Set(last) + // TODO: finish implementation... + return (&BalanceIndexer{ + Config: &Config{ + logger: logger.With("indexer", "balance"), + chain: eth, + store: esc, + }, + lastIndexedBlock: n, + }).Run(ctx) + }) + + select { + case <-ctx.Done(): + logger.Info("shutting down...") + var errs error + if err := g.Wait(); !errors.Is(err, context.Canceled) { + errs = errors.Join(errs, err) + } + for _, c := range closers { + errs = errors.Join(errs, c.Close()) + } + logger.Info("shutdown complete") + return errs + } +} diff --git a/indexer/pkg/ethclient/client.go b/indexer/pkg/ethclient/client.go deleted file mode 100644 index 50971c1bc..000000000 --- a/indexer/pkg/ethclient/client.go +++ /dev/null @@ -1,18 +0,0 @@ -package ethclient - -import ( - "context" - "math/big" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/primev/mev-commit/indexer/pkg/store" -) - -type EthereumClient interface { - GetBlocks(ctx context.Context, blockNums []*big.Int) ([]*types.Block, error) - BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) - BlockNumber(ctx context.Context) (*big.Int, error) - TxReceipts(ctx context.Context, txHashes []string) (map[string]*types.Receipt, error) - AccountBalances(ctx context.Context, addresses []common.Address, blockNumber uint64) ([]store.AccountBalance, error) -} diff --git a/indexer/pkg/ethclient/w3client.go b/indexer/pkg/ethclient/w3client.go deleted file mode 100644 index 294018454..000000000 --- a/indexer/pkg/ethclient/w3client.go +++ /dev/null @@ -1,99 +0,0 @@ -package ethclient - -import ( - "context" - "fmt" - "math/big" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/lmittmann/w3" - "github.com/lmittmann/w3/module/eth" - "github.com/lmittmann/w3/w3types" - "github.com/primev/mev-commit/indexer/pkg/store" -) - -type W3EvmClient struct { - client *w3.Client -} - -func NewW3EthereumClient(endpoint string) (*W3EvmClient, error) { - client, err := w3.Dial(endpoint) - if err != nil { - return nil, fmt.Errorf("failed to connect to Ethereum node: %w", err) - } - return &W3EvmClient{client: client}, nil -} - -func (c *W3EvmClient) GetBlocks(ctx context.Context, blockNums []*big.Int) ([]*types.Block, error) { - batchBlocksCaller := make([]w3types.RPCCaller, len(blockNums)) - blocks := make([]*types.Block, len(blockNums)) - - for i, blockNum := range blockNums { - block := new(types.Block) - batchBlocksCaller[i] = eth.BlockByNumber(blockNum).Returns(block) - blocks[i] = block - } - err := c.client.Call(batchBlocksCaller...) - if err != nil { - return nil, err - } - return blocks, nil -} - -func (c *W3EvmClient) TxReceipts(ctx context.Context, txHashes []string) (map[string]*types.Receipt, error) { - batchTxReceiptCaller := make([]w3types.RPCCaller, len(txHashes)) - txReceipts := make([]types.Receipt, len(txHashes)) - for i, txHash := range txHashes { - batchTxReceiptCaller[i] = eth.TxReceipt(w3.H(txHash)).Returns(&txReceipts[i]) - } - err := c.client.Call(batchTxReceiptCaller...) - if err != nil { - return map[string]*types.Receipt{}, nil - } - txHashToReceipt := make(map[string]*types.Receipt) - for _, txReceipt := range txReceipts { - txHashToReceipt[txReceipt.TxHash.Hex()] = &txReceipt - } - return txHashToReceipt, nil -} - -func (c *W3EvmClient) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { - var block types.Block - if err := c.client.Call(eth.BlockByNumber(number).Returns(&block)); err != nil { - return nil, err - } - return &block, nil -} - -func (c *W3EvmClient) BlockNumber(ctx context.Context) (*big.Int, error) { - var blockNumber big.Int - if err := c.client.Call(eth.BlockNumber().Returns(&blockNumber)); err != nil { - return nil, err - } - return &blockNumber, nil -} - -func (c *W3EvmClient) AccountBalances(ctx context.Context, addresses []common.Address, blockNumber uint64) ([]store.AccountBalance, error) { - batchAccountBalanceCaller := make([]w3types.RPCCaller, len(addresses)) - balances := make([]big.Int, len(addresses)) - for i, address := range addresses { - batchAccountBalanceCaller[i] = eth.Balance(address, new(big.Int).SetUint64(blockNumber)).Returns(&balances[i]) - } - err := c.client.Call(batchAccountBalanceCaller...) - if err != nil { - return nil, err - } - var accountBalances []store.AccountBalance - for i, address := range addresses { - accBalance := store.AccountBalance{ - Address: address.Hex(), - Balance: balances[i].String(), - Timestamp: time.Now().UTC().Format("2006-01-02T15:04:05.000Z"), - BlockNumber: blockNumber, - } - accountBalances = append(accountBalances, accBalance) - } - return accountBalances, nil -} diff --git a/indexer/pkg/logutil/logutil.go b/indexer/pkg/logutil/logutil.go deleted file mode 100644 index 54af9f437..000000000 --- a/indexer/pkg/logutil/logutil.go +++ /dev/null @@ -1,43 +0,0 @@ -package logutil - -import ( - "fmt" - "log/slog" - "os" - "time" - - "github.com/lmittmann/tint" -) - -func SetLogLevel(lvl string, format string, tags map[string]string) error { - level := new(slog.LevelVar) - if err := level.UnmarshalText([]byte(lvl)); err != nil { - return fmt.Errorf("invalid log level: %w", err) - } - - var handler slog.Handler - - switch format { - case "json": - // JSON format - handler = slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ - Level: level, - AddSource: true, - }) - case "text": - // Text format (default) - opts := &tint.Options{ - Level: level, - TimeFormat: time.Kitchen, // Optional: Customize the time format - AddSource: true, - } - handler = tint.NewHandler(os.Stdout, opts) - } - var logTags []any - for k, v := range tags { - logTags = append(logTags, k, v) - } - logger := slog.New(handler).With(logTags...) - slog.SetDefault(logger) - return nil -} diff --git a/indexer/pkg/store/elasticsearch.go b/indexer/pkg/store/elasticsearch.go deleted file mode 100644 index 87f9c3ef3..000000000 --- a/indexer/pkg/store/elasticsearch.go +++ /dev/null @@ -1,291 +0,0 @@ -package store - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "log/slog" - "math/big" - "strings" - "sync/atomic" - - "github.com/elastic/go-elasticsearch/v8" - "github.com/elastic/go-elasticsearch/v8/esapi" - "github.com/elastic/go-elasticsearch/v8/esutil" - "github.com/elastic/go-elasticsearch/v8/typedapi/core/search" - estypes "github.com/elastic/go-elasticsearch/v8/typedapi/types" -) - -type ESClient struct { - client *elasticsearch.TypedClient - bulkIndexer esutil.BulkIndexer -} - -func NewESClient(endpoint string, user, pass string) (*ESClient, error) { - config := elasticsearch.Config{ - Addresses: []string{endpoint}, - Username: user, - Password: pass, - } - client, err := elasticsearch.NewClient(config) - if err != nil { - return nil, fmt.Errorf("failed to create new elasticsearch client: %w", err) - } - bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ - Client: client, - NumWorkers: 4, - FlushBytes: 5e+6, // 5MB - }) - if err != nil { - return nil, fmt.Errorf("failed to create bulk indexer: %w", err) - } - typedClient, err := elasticsearch.NewTypedClient(config) - if err != nil { - return nil, fmt.Errorf("failed to create new typed elasticsearch client: %w", err) - - } - return &ESClient{client: typedClient, bulkIndexer: bi}, nil -} - -func (c *ESClient) Index(ctx context.Context, index string, document interface{}) error { - _, err := c.client.Index(index).Document(document).Do(ctx) - return err -} - -func (c *ESClient) Search(ctx context.Context, index string, query *estypes.Query) (*search.Response, error) { - return c.client.Search().Index(index).Query(query).Do(ctx) -} - -func (c *ESClient) GetLastIndexedBlock(ctx context.Context, direction string) (*big.Int, error) { - // Check if the index exists - exists, err := c.client.Indices.Exists("blocks").Do(ctx) - if err != nil { - return nil, fmt.Errorf("failed to check if index exists: %w", err) - } - if !exists { - return big.NewInt(0), nil - } - - // Check if the index contains any documents - countRes, err := c.client.Count().Index("blocks").Do(ctx) - if err != nil { - return nil, fmt.Errorf("failed to count documents in index: %w", err) - } - if countRes.Count == 0 { - return big.NewInt(0), nil - } - - var sortOrder string - if direction == "forward" { - sortOrder = "desc" - } else if direction == "backward" { - sortOrder = "asc" - } else { - return nil, fmt.Errorf("invalid direction: %s", direction) - } - - // Perform the search query - res, err := c.client.Search(). - Index("blocks"). - Sort(map[string]interface{}{ - "number": map[string]interface{}{ - "order": sortOrder, - }, - }). - Size(1). - Do(ctx) - if err != nil { - return nil, fmt.Errorf("failed to execute search query: %w", err) - } - - // Check if there are no hits (index exists but no documents) - if res.Hits.Total.Value == 0 { - return big.NewInt(0), nil - } - - var block struct { - Number uint64 `json:"number"` - } - if err := json.Unmarshal(res.Hits.Hits[0].Source_, &block); err != nil { - return nil, fmt.Errorf("failed to unmarshal search result: %w", err) - } - blockNumber := new(big.Int).SetUint64(block.Number) - return blockNumber, nil -} - -func (c *ESClient) IndexBlock(ctx context.Context, block *IndexBlock) error { - return c.Bulk(ctx, "blocks", []interface{}{block}) -} - -func (c *ESClient) IndexTransactions(ctx context.Context, transactions []*IndexTransaction) error { - docs := make([]interface{}, len(transactions)) - for i, tx := range transactions { - docs[i] = tx - } - return c.Bulk(ctx, "transactions", docs) -} - -func pointerInt(v int) *int { - return &v -} - -func pointerString(v string) *string { - return &v -} - -func (c *ESClient) GetAddresses(ctx context.Context) ([]string, error) { - query := &search.Request{ - Size: pointerInt(0), // We don't need the actual documents, just the aggregations - Aggregations: map[string]estypes.Aggregations{ - "unique_from_addresses": { - Terms: &estypes.TermsAggregation{ - Field: pointerString("from.keyword"), - Size: pointerInt(10000), - }, - }, - "unique_to_addresses": { - Terms: &estypes.TermsAggregation{ - Field: pointerString("to.keyword"), - Size: pointerInt(10000), - }, - }, - }, - } - - // Execute the search request - res, err := c.client.Search(). - Index("transactions"). - Request(query). - Size(1000). - Do(ctx) - if err != nil { - return nil, fmt.Errorf("failed to execute search query: %w", err) - } - - // Extract unique addresses from the aggregations - addressSet := make(map[string]struct{}) - - // Process the "from" addresses aggregation - fromAgg, ok := res.Aggregations["unique_from_addresses"] - if ok { - buckets := fromAgg.(*estypes.StringTermsAggregate).Buckets - switch b := buckets.(type) { - case []estypes.StringTermsBucket: - for _, bucket := range b { - address := bucket.Key.(string) - addressSet[address] = struct{}{} - } - } - } - - // Process the "to" addresses aggregation - toAgg, ok := res.Aggregations["unique_to_addresses"] - if ok { - buckets := toAgg.(*estypes.StringTermsAggregate).Buckets - switch b := buckets.(type) { - case []estypes.StringTermsBucket: - for _, bucket := range b { - address := bucket.Key.(string) - addressSet[address] = struct{}{} - } - } - } - - // Combine the unique addresses into a slice - addresses := make([]string, 0, len(addressSet)) - for address := range addressSet { - if address != "" && address != "0x" { - addresses = append(addresses, address) - } - } - - return addresses, nil - -} - -func (c *ESClient) IndexAccountBalances(ctx context.Context, accountBalances []AccountBalance) error { - docs := make([]interface{}, len(accountBalances)) - for i, accBal := range accountBalances { - docs[i] = accBal - } - return c.Bulk(ctx, "accounts", docs) -} - -func (c *ESClient) CreateIndices(ctx context.Context) error { - indices := []string{"blocks", "transactions", "accounts"} - for _, index := range indices { - res, err := c.client.Indices.Exists(index).Do(ctx) - if err != nil { - return fmt.Errorf("failed to check if index %s exists: %w", index, err) - } - if !res { - indexSettings := esapi.IndicesCreateRequest{ - Index: index, - Body: strings.NewReader(`{ - "settings": { - "number_of_shards": 1, - "number_of_replicas": 0 - }, - "mappings": { - "properties": { - "timestamp": { - "type": "date", - "format": "strict_date_optional_time||epoch_millis" - } - } - } - }`), - } - createRes, err := indexSettings.Do(ctx, c.client) - if err != nil { - return fmt.Errorf("failed to create index %s: %w", index, err) - } - defer createRes.Body.Close() - if createRes.IsError() { - return fmt.Errorf("error creating index %s: %s", index, createRes.String()) - } - } - } - return nil -} - -func (c *ESClient) Bulk(ctx context.Context, indexName string, docs []interface{}) error { - var ( - countSuccessful uint64 - countFailed uint64 - ) - for _, doc := range docs { - data, err := json.Marshal(doc) - if err != nil { - return fmt.Errorf("failed to marshal document: %w", err) - } - err = c.bulkIndexer.Add( - ctx, - esutil.BulkIndexerItem{ - Action: "index", - Index: indexName, - Body: bytes.NewReader(data), - OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem) { - atomic.AddUint64(&countSuccessful, 1) - }, - OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, res esutil.BulkIndexerResponseItem, err error) { - atomic.AddUint64(&countFailed, 1) - if err != nil { - slog.Error("bulk indexing error", "error", err) - } else { - slog.Error("bulk indexing error", "type", res.Error.Type, "reason", res.Error.Reason) - } - }, - }, - ) - if err != nil { - return fmt.Errorf("failed to add document to bulk indexer: %w", err) - } - } - return nil -} - -func (c *ESClient) Close(ctx context.Context) error { - return c.bulkIndexer.Close(ctx) -} diff --git a/indexer/pkg/store/store.go b/indexer/pkg/store/store.go deleted file mode 100644 index 9d582ab19..000000000 --- a/indexer/pkg/store/store.go +++ /dev/null @@ -1,16 +0,0 @@ -package store - -import ( - "context" - "math/big" -) - -type Storage interface { - IndexBlock(ctx context.Context, block *IndexBlock) error - IndexTransactions(ctx context.Context, transactions []*IndexTransaction) error - GetLastIndexedBlock(ctx context.Context, direction string) (*big.Int, error) - GetAddresses(ctx context.Context) ([]string, error) - IndexAccountBalances(ctx context.Context, accountBalances []AccountBalance) error - CreateIndices(ctx context.Context) error - Close(ctx context.Context) error -} diff --git a/indexer/pkg/store/types.go b/indexer/pkg/store/types.go deleted file mode 100644 index dc8af4155..000000000 --- a/indexer/pkg/store/types.go +++ /dev/null @@ -1,50 +0,0 @@ -package store - -type IndexBlock struct { - Number uint64 `json:"number"` - Hash string `json:"hash"` - ParentHash string `json:"parentHash"` - Root string `json:"root"` - Nonce uint64 `json:"nonce"` - Timestamp string `json:"timestamp"` - Transactions int `json:"transactions"` - BaseFee uint64 `json:"baseFee"` - GasLimit uint64 `json:"gasLimit"` - GasUsed uint64 `json:"gasUsed"` - Difficulty uint64 `json:"difficulty"` - ExtraData string `json:"extraData"` -} - -type IndexTransaction struct { - Hash string `json:"hash"` - From string `json:"from"` - To string `json:"to"` - Gas uint64 `json:"gas"` - GasPrice uint64 `json:"gasPrice"` - GasTipCap uint64 `json:"gasTipCap"` - GasFeeCap uint64 `json:"gasFeeCap"` - Value string `json:"value"` - Nonce uint64 `json:"nonce"` - BlockHash string `json:"blockHash"` - BlockNumber uint64 `json:"blockNumber"` - ChainId string `json:"chainId"` - V string `json:"v"` - R string `json:"r"` - S string `json:"s"` - Input string `json:"input"` - Timestamp string `json:"timestamp"` - Status uint64 `json:"status"` - GasUsed uint64 `json:"gasUsed"` - CumulativeGasUsed uint64 `json:"cumulativeGasUsed"` - ContractAddress string `json:"contractAddress"` - TransactionIndex uint `json:"transactionIndex"` - ReceiptBlockHash string `json:"receiptBlockHash"` - ReceiptBlockNumber uint64 `json:"receiptBlockNumber"` -} - -type AccountBalance struct { - Address string `json:"address"` - Balance string `json:"balance"` - Timestamp string `json:"timestamp"` - BlockNumber uint64 `json:"blockNumber"` -} diff --git a/indexer/store.go b/indexer/store.go new file mode 100644 index 000000000..69f7ca581 --- /dev/null +++ b/indexer/store.go @@ -0,0 +1,285 @@ +package main + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "math/big" + "runtime" + + "github.com/elastic/go-elasticsearch/v8" + "github.com/elastic/go-elasticsearch/v8/esutil" + "github.com/elastic/go-elasticsearch/v8/typedapi/types" +) + +const ( + BlockIndexName = "blocks" + AccountIndexName = "accounts" + TransactionIndexName = "transactions" +) + +type ( + BlockItem struct { + Number uint64 `json:"number"` + Hash string `json:"hash"` + ParentHash string `json:"parentHash"` + Root string `json:"root"` + Nonce uint64 `json:"nonce"` + Timestamp string `json:"timestamp"` + Transactions int `json:"transactions"` + BaseFee uint64 `json:"baseFee"` + GasLimit uint64 `json:"gasLimit"` + GasUsed uint64 `json:"gasUsed"` + Difficulty uint64 `json:"difficulty"` + ExtraData string `json:"extraData"` + } + + TransactionItem struct { + Hash string `json:"hash"` + From string `json:"from"` + To string `json:"to"` + Gas uint64 `json:"gas"` + GasPrice uint64 `json:"gasPrice"` + GasTipCap uint64 `json:"gasTipCap"` + GasFeeCap uint64 `json:"gasFeeCap"` + Value string `json:"value"` + Nonce uint64 `json:"nonce"` + BlockHash string `json:"blockHash"` + BlockNumber uint64 `json:"blockNumber"` + ChainId string `json:"chainId"` + V string `json:"v"` + R string `json:"r"` + S string `json:"s"` + Input string `json:"input"` + Timestamp string `json:"timestamp"` + Status uint64 `json:"status"` + GasUsed uint64 `json:"gasUsed"` + CumulativeGasUsed uint64 `json:"cumulativeGasUsed"` + ContractAddress string `json:"contractAddress"` + TransactionIndex uint `json:"transactionIndex"` + ReceiptBlockHash string `json:"receiptBlockHash"` + ReceiptBlockNumber uint64 `json:"receiptBlockNumber"` + } + + AccountBalanceItem struct { + Address string `json:"address"` + Balance string `json:"balance"` + Timestamp string `json:"timestamp"` + BlockNumber uint64 `json:"blockNumber"` + } +) + +// SortOrder represents the sort order for search results. +type SortOrder int + +// String returns the string representation of the sort order. +// It implements the fmt.Stringer interface. +func (s SortOrder) String() string { + switch s { + case SortOrderAsc: + return "asc" + case SortOrderDesc: + return "desc" + default: + return "unknown" + } +} + +const ( + SortOrderUnknown SortOrder = iota + SortOrderAsc + SortOrderDesc +) + +// Store defines an interface for interacting with the backend storage. +type Store interface { + // CreateIndexes creates all required indexes if they do not exist. + CreateIndexes(context.Context) error + + // IndexBlock indexes a given block. + IndexBlock(context.Context, *BlockItem) error + + // IndexTransactions indexes a list of transactions. + IndexTransactions(ctx context.Context, transactions []*TransactionItem) error + + // LastIndexedBlock returns the number of the last indexed block. + LastIndexedBlock(context.Context, SortOrder) (*big.Int, error) +} + +var ( + _ Store = (*ElasticsearchStore)(nil) +) + +// ElasticsearchStore implements the Store interface and +// represents a connection to an Elasticsearch cluster. +type ElasticsearchStore struct { + client *elasticsearch.Client + typedClient *elasticsearch.TypedClient +} + +// NewElasticsearchStore creates a new ElasticsearchStore instance connected to the specified URL. +func NewElasticsearchStore(url string, user, pass string) (*ElasticsearchStore, error) { + cfg := elasticsearch.Config{ + Addresses: []string{url}, + Username: user, + Password: pass, + } + + client, err := elasticsearch.NewClient(cfg) + if err != nil { + return nil, err + } + + typedClient, err := elasticsearch.NewTypedClient(cfg) + if err != nil { + return nil, err + } + + return &ElasticsearchStore{ + client: client, + typedClient: typedClient, + }, nil +} + +// CreateIndexes implements the Store interface. +func (e *ElasticsearchStore) CreateIndexes(ctx context.Context) error { + indices := []string{ + BlockIndexName, + AccountIndexName, + TransactionIndexName, + } + + for _, index := range indices { + exists, err := e.typedClient.Indices.Exists(index).Do(ctx) + if err != nil { + return fmt.Errorf("failed to check if index %q exists: %w", index, err) + } + if exists { + continue + } + + res, err := e.typedClient.Indices.Create(index). + Settings(&types.IndexSettings{ + NumberOfShards: "1", + NumberOfReplicas: "0", + }). + Mappings(&types.TypeMapping{ + Properties: map[string]types.Property{ + "timestamp": map[string]interface{}{ + "type": "date", + "format": "strict_date_optional_time||epoch_millis", + }, + }, + }). + Do(ctx) + if err != nil { + return fmt.Errorf("create index %q: %w", index, err) + } + if !res.Acknowledged { + return fmt.Errorf("index %q creation not acknowledged", index) + } + } + return nil +} + +// IndexBlock implements the Store interface. +func (e *ElasticsearchStore) IndexBlock(ctx context.Context, block *BlockItem) error { + return e.bulk(ctx, BlockIndexName, []interface{}{block}) +} + +// IndexTransactions implements the Store interface. +func (e *ElasticsearchStore) IndexTransactions(ctx context.Context, transactions []*TransactionItem) error { + docs := make([]interface{}, len(transactions)) + for i, tx := range transactions { + docs[i] = tx + } + return e.bulk(ctx, TransactionIndexName, docs) +} + +// LastIndexedBlock implements the Store interface. +func (e *ElasticsearchStore) LastIndexedBlock(ctx context.Context, order SortOrder) (*big.Int, error) { + const index = BlockIndexName + + exists, err := e.typedClient.Indices.Exists(index).Do(ctx) + if err != nil { + return nil, fmt.Errorf("check if index %q exists: %w", index, err) + } + if !exists { + return big.NewInt(0), nil + } + + cntRes, err := e.typedClient.Count().Index(index).Do(ctx) + if err != nil { + return nil, fmt.Errorf("count documents in %q index: %w", index, err) + } + if cntRes.Count == 0 { + return big.NewInt(0), nil + } + + schRes, err := e.typedClient.Search(). + Index(index). + Sort(map[string]interface{}{ + "number": map[string]interface{}{ + "order": order.String(), + }, + }). + Size(1). + Do(ctx) + if err != nil { + return nil, fmt.Errorf("search for last indexed block: %w", err) + } + if schRes.Hits.Total.Value == 0 { + return big.NewInt(0), nil + } + + var block struct { + Number uint64 `json:"number"` + } + if err := json.Unmarshal(schRes.Hits.Hits[0].Source_, &block); err != nil { + return nil, fmt.Errorf("unmarshal search result: %w", err) + } + return new(big.Int).SetUint64(block.Number), nil +} + +func (e *ElasticsearchStore) bulk(ctx context.Context, index string, docs []interface{}) (err error) { + indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{ + Client: e.client, + NumWorkers: runtime.GOMAXPROCS(0), + FlushBytes: 5e+6, + }) + if err != nil { + return err + } + defer func() { + err = errors.Join(err, indexer.Close(ctx)) + }() + + var errs error + for _, doc := range docs { + data, err := json.Marshal(doc) + if err != nil { + return fmt.Errorf("marshal document: %w", err) + } + err = indexer.Add( + ctx, + esutil.BulkIndexerItem{ + Action: "index", + Index: index, + Body: bytes.NewReader(data), + OnFailure: func( + _ context.Context, + _ esutil.BulkIndexerItem, + _ esutil.BulkIndexerResponseItem, err error, + ) { + errs = errors.Join(errs, fmt.Errorf("index document %s: %w", string(data), err)) + }, + }, + ) + if err != nil { + return errors.Join(errs, fmt.Errorf("add document to bulk indexer: %w", err)) + } + } + return errs +} diff --git a/indexer/util.go b/indexer/util.go new file mode 100644 index 000000000..4a6d6a9c7 --- /dev/null +++ b/indexer/util.go @@ -0,0 +1,119 @@ +package main + +import ( + "encoding/hex" + "fmt" + "io" + "log/slog" + "strings" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +// timeMilliZ is like time.StampMilli but with time zone suffix "Z". +const timeMilliZ = "2006-01-02T15:04:05.000Z" + +// newLogger initializes a *slog.Logger with specified level, format, and sink. +// - sink: destination for log output (e.g., os.Stdout, file) +// - logLvl: string representation of slog.Level; it defaults to slog.LevelInfo if empty +// - logFmt: format of the log output: "text", "json", defaults to "json"; it defaults to "text" if empty +// - logTags: comma-separated list of pairs that will be inserted into each log line +// +// Returns a configured *slog.Logger on success or nil on failure. +func newLogger(sink io.Writer, logLvl, logFmt, logTags string) (*slog.Logger, error) { + var level slog.Leveler = slog.LevelInfo + if logLvl != "" { + l := new(slog.LevelVar) + if err := l.UnmarshalText([]byte(logLvl)); err != nil { + return nil, fmt.Errorf("invalid log level: %w", err) + } + level = l + } + + var ( + handler slog.Handler + options = &slog.HandlerOptions{ + AddSource: true, + Level: level, + } + ) + switch logFmt { + case "text": + handler = slog.NewTextHandler(sink, options) + case "json": + handler = slog.NewJSONHandler(sink, options) + default: + return nil, fmt.Errorf("invalid log format: %s", logFmt) + } + + logger := slog.New(handler) + + if logTags == "" { + return logger, nil + } + + var args []any + for i, p := range strings.Split(logTags, ",") { + kv := strings.Split(p, ":") + if len(kv) != 2 { + return nil, fmt.Errorf("invalid tag at index %d", i) + } + args = append(args, strings.ToValidUTF8(kv[0], "�"), strings.ToValidUTF8(kv[1], "�")) + } + + return logger.With(args...), nil +} + +// parseBlockTransactions extracts transaction data from a given block. +// Returns a slice of TransactionItem and a slice of transaction hashes. +func parseBlockTransactions(block *types.Block) ([]*TransactionItem, []common.Hash, error) { + var ( + txs = make([]*TransactionItem, len(block.Transactions())) + txh = make([]common.Hash, len(block.Transactions())) + ) + + for i, tx := range block.Transactions() { + sender, err := types.Sender(types.NewCancunSigner(tx.ChainId()), tx) + if err != nil { + return nil, nil, fmt.Errorf("failed to derive sender: %w", err) + } + + v, r, s := tx.RawSignatureValues() + transaction := &TransactionItem{ + Hash: tx.Hash().Hex(), + From: sender.Hex(), + Gas: tx.Gas(), + Nonce: tx.Nonce(), + BlockHash: block.Hash().Hex(), + BlockNumber: block.NumberU64(), + ChainId: tx.ChainId().String(), + V: v.String(), + R: r.String(), + S: s.String(), + Input: hex.EncodeToString(tx.Data()), + Timestamp: tx.Time().UTC().Format(timeMilliZ), + } + + if tx.To() != nil { + transaction.To = tx.To().Hex() + } + if tx.GasPrice() != nil { + transaction.GasPrice = tx.GasPrice().Uint64() + } + if tx.GasTipCap() != nil { + transaction.GasTipCap = tx.GasTipCap().Uint64() + } + if tx.GasFeeCap() != nil { + transaction.GasFeeCap = tx.GasFeeCap().Uint64() + } + if tx.Value() != nil { + transaction.Value = tx.Value().String() + } + + txs[i] = transaction + txh[i] = tx.Hash() + } + + return txs, txh, nil +}