From 673c25575cd820011c20e7fd444ad0b33f061eb6 Mon Sep 17 00:00:00 2001 From: 84hero <84hero@users.noreply.github.com> Date: Fri, 19 Dec 2025 09:08:54 +0800 Subject: [PATCH 1/2] chore: formatting and documentation improvements to raise goreportcard score --- cmd/example/main.go | 28 ++++++++++++++-------------- examples/basic/main.go | 28 ++++++++++++++-------------- examples/custom-chain/main.go | 14 +++++++------- examples/custom-sink/main.go | 2 +- examples/multi-sink/main.go | 14 +++++++------- internal/webhook/client.go | 9 +++++---- internal/webhook/client_test.go | 4 ++-- pkg/chain/presets.go | 20 ++++++++++---------- pkg/config/config.go | 22 +++++++++++++--------- pkg/config/config_test.go | 6 +++--- pkg/decoder/decoder.go | 6 +++--- pkg/decoder/decoder_test.go | 6 +++--- pkg/rpc/client.go | 22 +++++++++++++--------- pkg/rpc/interface.go | 14 +++++++------- pkg/rpc/node.go | 11 ++++++++--- pkg/scanner/filter.go | 2 +- pkg/scanner/filter_test.go | 4 ++-- pkg/scanner/scanner.go | 4 ++++ pkg/scanner/scanner_test.go | 16 ++++++++-------- pkg/sink/sink.go | 30 +++++++++++++++++++++++++----- pkg/storage/memory.go | 6 +++++- pkg/storage/postgres.go | 7 +++++-- pkg/storage/redis.go | 10 +++++++--- 23 files changed, 167 insertions(+), 118 deletions(-) diff --git a/cmd/example/main.go b/cmd/example/main.go index 516b264..831c11d 100644 --- a/cmd/example/main.go +++ b/cmd/example/main.go @@ -72,20 +72,20 @@ func main() { // 4. Build filter usdtAddress := common.HexToAddress("0xdAC17F958D2ee523a2206206994597C13D831ec7") transferTopic := crypto.Keccak256Hash([]byte("Transfer(address,address,uint256)")) - + filter := scanner.NewFilter(). AddContract(usdtAddress). SetTopic(0, transferTopic) // 5. Configure Storage [Feature 2: Multiple Storage Engine Support] var store storage.Persistence - + // Storage Prefix (Namespace): Prioritize storage_prefix from config, otherwise use Project name storePrefix := cfg.Scanner.StoragePrefix if storePrefix == "" { storePrefix = cfg.Project + "_" } - + if dbURL := os.Getenv("PG_URL"); dbURL != "" { // PostgreSQL pgStore, err := storage.NewPostgresStore(dbURL, storePrefix) @@ -113,15 +113,15 @@ func main() { // 6. Initialize Scanner scanCfg := scanner.Config{ - ChainID: cfg.Scanner.ChainID, - StartBlock: cfg.Scanner.StartBlock, - ForceStart: cfg.Scanner.ForceStart, - Rewind: cfg.Scanner.Rewind, - CursorRewind: cfg.Scanner.CursorRewind, - BatchSize: cfg.Scanner.BatchSize, - Interval: cfg.Scanner.Interval, - ReorgSafe: cfg.Scanner.Confirmations, // Using merged preset values - UseBloom: cfg.Scanner.UseBloom, + ChainID: cfg.Scanner.ChainID, + StartBlock: cfg.Scanner.StartBlock, + ForceStart: cfg.Scanner.ForceStart, + Rewind: cfg.Scanner.Rewind, + CursorRewind: cfg.Scanner.CursorRewind, + BatchSize: cfg.Scanner.BatchSize, + Interval: cfg.Scanner.Interval, + ReorgSafe: cfg.Scanner.Confirmations, // Using merged preset values + UseBloom: cfg.Scanner.UseBloom, } s := scanner.New(client, store, scanCfg, filter) @@ -137,7 +137,7 @@ func main() { } // Print human-readable data - fmt.Printf(" [Event] %s | Block: %d | From: %v | To: %v | Value: %v\n", + fmt.Printf(" [Event] %s | Block: %d | From: %v | To: %v | Value: %v\n", decoded.Name, l.BlockNumber, decoded.Inputs["from"], @@ -162,4 +162,4 @@ func main() { log.Info("Shutting down...") cancel() time.Sleep(1 * time.Second) -} \ No newline at end of file +} diff --git a/examples/basic/main.go b/examples/basic/main.go index 516b264..831c11d 100644 --- a/examples/basic/main.go +++ b/examples/basic/main.go @@ -72,20 +72,20 @@ func main() { // 4. Build filter usdtAddress := common.HexToAddress("0xdAC17F958D2ee523a2206206994597C13D831ec7") transferTopic := crypto.Keccak256Hash([]byte("Transfer(address,address,uint256)")) - + filter := scanner.NewFilter(). AddContract(usdtAddress). SetTopic(0, transferTopic) // 5. Configure Storage [Feature 2: Multiple Storage Engine Support] var store storage.Persistence - + // Storage Prefix (Namespace): Prioritize storage_prefix from config, otherwise use Project name storePrefix := cfg.Scanner.StoragePrefix if storePrefix == "" { storePrefix = cfg.Project + "_" } - + if dbURL := os.Getenv("PG_URL"); dbURL != "" { // PostgreSQL pgStore, err := storage.NewPostgresStore(dbURL, storePrefix) @@ -113,15 +113,15 @@ func main() { // 6. Initialize Scanner scanCfg := scanner.Config{ - ChainID: cfg.Scanner.ChainID, - StartBlock: cfg.Scanner.StartBlock, - ForceStart: cfg.Scanner.ForceStart, - Rewind: cfg.Scanner.Rewind, - CursorRewind: cfg.Scanner.CursorRewind, - BatchSize: cfg.Scanner.BatchSize, - Interval: cfg.Scanner.Interval, - ReorgSafe: cfg.Scanner.Confirmations, // Using merged preset values - UseBloom: cfg.Scanner.UseBloom, + ChainID: cfg.Scanner.ChainID, + StartBlock: cfg.Scanner.StartBlock, + ForceStart: cfg.Scanner.ForceStart, + Rewind: cfg.Scanner.Rewind, + CursorRewind: cfg.Scanner.CursorRewind, + BatchSize: cfg.Scanner.BatchSize, + Interval: cfg.Scanner.Interval, + ReorgSafe: cfg.Scanner.Confirmations, // Using merged preset values + UseBloom: cfg.Scanner.UseBloom, } s := scanner.New(client, store, scanCfg, filter) @@ -137,7 +137,7 @@ func main() { } // Print human-readable data - fmt.Printf(" [Event] %s | Block: %d | From: %v | To: %v | Value: %v\n", + fmt.Printf(" [Event] %s | Block: %d | From: %v | To: %v | Value: %v\n", decoded.Name, l.BlockNumber, decoded.Inputs["from"], @@ -162,4 +162,4 @@ func main() { log.Info("Shutting down...") cancel() time.Sleep(1 * time.Second) -} \ No newline at end of file +} diff --git a/examples/custom-chain/main.go b/examples/custom-chain/main.go index b9bada3..50d363f 100644 --- a/examples/custom-chain/main.go +++ b/examples/custom-chain/main.go @@ -15,7 +15,7 @@ import ( func main() { // [Scenario] We are launching a new AppChain or L2 called "HeroChain" // HeroChain has a fast 1-second block time and needs 50 confirmations for safety. - + // 1. Register the New Chain Preset chain.Register("herochain", chain.Preset{ ChainID: "888", @@ -35,11 +35,11 @@ func main() { preset, _ := chain.Get("herochain") config := scanner.Config{ - ChainID: "herochain", - BatchSize: preset.BatchSize, // Use value from preset - ReorgSafe: preset.ReorgSafe, // Use value from preset - Interval: preset.BlockTime, // Sync interval matches block time - UseBloom: true, // Enable bloom filter for performance + ChainID: "herochain", + BatchSize: preset.BatchSize, // Use value from preset + ReorgSafe: preset.ReorgSafe, // Use value from preset + Interval: preset.BlockTime, // Sync interval matches block time + UseBloom: true, // Enable bloom filter for performance } filter := scanner.NewFilter() // Scan all logs for demonstration @@ -52,6 +52,6 @@ func main() { }) fmt.Printf("Scanner configured for %s (Safety Window: %d blocks)\n", config.ChainID, config.ReorgSafe) - + // s.Start(ctx) // Execution omitted for demo purposes } diff --git a/examples/custom-sink/main.go b/examples/custom-sink/main.go index 389fd6f..02079b9 100644 --- a/examples/custom-sink/main.go +++ b/examples/custom-sink/main.go @@ -54,7 +54,7 @@ func main() { for i, l := range logs { decoded[i] = sink.DecodedLog{Log: l} } - + // Use our custom sink return mySlackSink.Send(ctx, decoded) }) diff --git a/examples/multi-sink/main.go b/examples/multi-sink/main.go index 486ab04..0427208 100644 --- a/examples/multi-sink/main.go +++ b/examples/multi-sink/main.go @@ -54,10 +54,10 @@ func main() { // 4. Setup Multiple Sinks (Pipeline) var outputs []sink.Output - + // Console Sink outputs = append(outputs, sink.NewConsoleOutput()) - + // File Sink if fo, err := sink.NewFileOutput("events.jsonl"); err == nil { outputs = append(outputs, fo) @@ -66,11 +66,11 @@ func main() { // 5. Initialize Scanner scanCfg := scanner.Config{ - ChainID: "ethereum", - Rewind: 10, // Start from 10 blocks ago - Interval: 5 * time.Second, - ReorgSafe: 2, - BatchSize: 10, + ChainID: "ethereum", + Rewind: 10, // Start from 10 blocks ago + Interval: 5 * time.Second, + ReorgSafe: 2, + BatchSize: 10, } s := scanner.New(client, store, scanCfg, filter) diff --git a/internal/webhook/client.go b/internal/webhook/client.go index 8d6b092..e4b1a7b 100644 --- a/internal/webhook/client.go +++ b/internal/webhook/client.go @@ -14,6 +14,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) +// Config holds configuration for the Webhook client. type Config struct { URL string `mapstructure:"url"` Secret string `mapstructure:"secret"` @@ -50,7 +51,7 @@ func NewClient(cfg Config) *Client { } } -// Payload defines the data structure sent via webhook +// Payload defines the data structure sent via webhook to consumers. type Payload struct { Timestamp int64 `json:"timestamp"` Logs []types.Log `json:"logs"` @@ -92,7 +93,7 @@ func (c *Client) Send(ctx context.Context, logs []types.Log) error { return ctx.Err() case <-timer.C: } - + // Exponential backoff backoff *= 2 if backoff > c.cfg.MaxBackoff { @@ -104,7 +105,7 @@ func (c *Client) Send(ctx context.Context, logs []types.Log) error { if err == nil { return nil // Success } - + lastErr = err // For 4xx client errors (e.g., 400 Bad Request), retries usually don't help. // Simplified logic: retry for network errors and 5xx. @@ -143,4 +144,4 @@ func (c *Client) attemptSend(ctx context.Context, body []byte) error { } return nil -} \ No newline at end of file +} diff --git a/internal/webhook/client_test.go b/internal/webhook/client_test.go index 08f5a94..0c24c67 100644 --- a/internal/webhook/client_test.go +++ b/internal/webhook/client_test.go @@ -19,7 +19,7 @@ import ( func TestWebhookSend(t *testing.T) { secret := "my-secret" - + // 1. Create Mock server ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Validate Headers @@ -76,7 +76,7 @@ func TestWebhook_Retry(t *testing.T) { InitialBackoff: 1 * time.Millisecond, MaxBackoff: 5 * time.Millisecond, }) - + logs := []types.Log{{Index: 1}} err := client.Send(context.Background(), logs) assert.NoError(t, err) diff --git a/pkg/chain/presets.go b/pkg/chain/presets.go index fef8d60..b5a6075 100644 --- a/pkg/chain/presets.go +++ b/pkg/chain/presets.go @@ -7,11 +7,11 @@ import ( // Preset defines the default behavior parameters for a chain type Preset struct { - ChainID string - BlockTime time.Duration // Average block time (affects polling interval) - ReorgSafe uint64 // Recommended safety confirmations - BatchSize uint64 // Recommended scan batch size - Endpoint string // (Optional) Default public RPC + ChainID string + BlockTime time.Duration // Average block time (affects polling interval) + ReorgSafe uint64 // Recommended safety confirmations + BatchSize uint64 // Recommended scan batch size + Endpoint string // (Optional) Default public RPC } var ( @@ -19,14 +19,14 @@ var ( mu sync.RWMutex ) -// Register registers a new chain preset. Users can call this in init() to add custom/private chains. +// Register adds a new chain preset to the global registry. func Register(name string, p Preset) { mu.Lock() defer mu.Unlock() registry[name] = p } -// Get retrieves a preset configuration +// Get retrieves a preset configuration from the registry by its name. func Get(name string) (Preset, bool) { mu.RLock() defer mu.RUnlock() @@ -42,18 +42,18 @@ func init() { ReorgSafe: 12, BatchSize: 100, }) - + Register("bsc-mainnet", Preset{ ChainID: "56", BlockTime: 3 * time.Second, ReorgSafe: 15, // BSC reorgs are relatively frequent BatchSize: 200, }) - + Register("polygon-mainnet", Preset{ ChainID: "137", BlockTime: 2 * time.Second, ReorgSafe: 32, // Polygon recommends deeper confirmations BatchSize: 200, }) -} \ No newline at end of file +} diff --git a/pkg/config/config.go b/pkg/config/config.go index 60b241a..eb75530 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -8,38 +8,42 @@ import ( "github.com/spf13/viper" ) +// Config represents the global configuration for the scanner application. type Config struct { - Project string `mapstructure:"project"` - Log LogConfig `mapstructure:"log"` - Scanner ScannerConfig `mapstructure:"scanner"` + Project string `mapstructure:"project"` + Log LogConfig `mapstructure:"log"` + Scanner ScannerConfig `mapstructure:"scanner"` RPC []rpc.NodeConfig `mapstructure:"rpc_nodes"` } +// LogConfig holds configuration for application logging. type LogConfig struct { Level string `mapstructure:"level"` // debug, info, warn, error Format string `mapstructure:"format"` // text, json } +// ScannerConfig holds specific settings for the EVM scanning process. type ScannerConfig struct { ChainID string `mapstructure:"chain_id"` BatchSize uint64 `mapstructure:"batch_size"` Interval time.Duration `mapstructure:"interval"` - + // Confirmations (ReorgSafeDepth): Protection at the scanning endpoint Confirmations uint64 `mapstructure:"confirmations"` // Startup strategy - StartBlock uint64 `mapstructure:"start_block"` // If > 0 and ForceStart=true, forces start from here - ForceStart bool `mapstructure:"force_start"` // Whether to force override persistence records - Rewind uint64 `mapstructure:"start_rewind"` // If no saved cursor, start from Latest - Rewind + StartBlock uint64 `mapstructure:"start_block"` // If > 0 and ForceStart=true, forces start from here + ForceStart bool `mapstructure:"force_start"` // Whether to force override persistence records + Rewind uint64 `mapstructure:"start_rewind"` // If no saved cursor, start from Latest - Rewind CursorRewind uint64 `mapstructure:"cursor_rewind"` // If saved cursor exists, start from Cursor - CursorRewind (safety buffer) UseBloom bool `mapstructure:"use_bloom"` - + // StoragePrefix: Prefix for storage layer (e.g., PG table prefix or Redis Key prefix) StoragePrefix string `mapstructure:"storage_prefix"` } +// Load reads and parses configuration from a YAML file and environment variables. func Load(path string) (*Config, error) { v := viper.New() v.SetConfigFile(path) @@ -65,4 +69,4 @@ func Load(path string) (*Config, error) { } return &cfg, nil -} \ No newline at end of file +} diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 346ae21..10cf9c7 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -44,7 +44,7 @@ rpc_nodes: assert.NoError(t, err) tmpFile2.Close() defer os.Remove(tmpFile2.Name()) - + _, err = Load(tmpFile2.Name()) assert.Error(t, err) } @@ -65,7 +65,7 @@ scanner: cfg, err := Load(tmpFile.Name()) assert.NoError(t, err) - + // Verify default values (BatchSize=100, Interval=3s) assert.Equal(t, uint64(100), cfg.Scanner.BatchSize) assert.Equal(t, 3*time.Second, cfg.Scanner.Interval) @@ -100,4 +100,4 @@ scanner: // Verify environment variable overrides assert.Equal(t, "env-project", cfg.Project) assert.Equal(t, uint64(999), cfg.Scanner.BatchSize) -} \ No newline at end of file +} diff --git a/pkg/decoder/decoder.go b/pkg/decoder/decoder.go index 1861aa0..ebfd4df 100644 --- a/pkg/decoder/decoder.go +++ b/pkg/decoder/decoder.go @@ -8,7 +8,7 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) -// ABIWrapper wraps the decoding logic +// ABIWrapper wraps the decoding logic using go-ethereum's ABI parser. type ABIWrapper struct { parsedABI abi.ABI } @@ -22,7 +22,7 @@ func NewFromJSON(jsonStr string) (*ABIWrapper, error) { return &ABIWrapper{parsedABI: parsed}, nil } -// DecodedLog contains parsed human-readable data +// DecodedLog contains parsed human-readable data from a transaction log. type DecodedLog struct { Name string // Event name (e.g., Transfer) Inputs map[string]interface{} // Parameter key-value pairs (e.g., from: 0x..., value: 100) @@ -72,4 +72,4 @@ func (w *ABIWrapper) Decode(log types.Log) (*DecodedLog, error) { } return result, nil -} \ No newline at end of file +} diff --git a/pkg/decoder/decoder_test.go b/pkg/decoder/decoder_test.go index 7d2d0e0..cbb85fb 100644 --- a/pkg/decoder/decoder_test.go +++ b/pkg/decoder/decoder_test.go @@ -15,7 +15,7 @@ import ( func TestDecode(t *testing.T) { // 1. Define standard ERC20 ABI const abiJSON = `[{"anonymous":false,"inputs":[{"indexed":true,"name":"from","type":"address"},{"indexed":true,"name":"to","type":"address"},{"indexed":false,"name":"value","type":"uint256"}],"name":"Transfer","type":"event"}]` - + // 2. Initialize Decoder d, err := NewFromJSON(abiJSON) assert.NoError(t, err) @@ -29,7 +29,7 @@ func TestDecode(t *testing.T) { // 4. Construct Log // Indexed parameters go into Topics (Note: Address needs to be padded to 32 bytes) // Non-indexed parameters (value) need ABI encoding into Data - + parsedABI, _ := abi.JSON(strings.NewReader(abiJSON)) // Pack only packs non-indexed arguments // Transfer(from, to, value) -> only value is non-indexed in inputs @@ -49,7 +49,7 @@ func TestDecode(t *testing.T) { decoded, err := d.Decode(log) assert.NoError(t, err) assert.Equal(t, "Transfer", decoded.Name) - + // Verify fields assert.Equal(t, sender, decoded.Inputs["from"]) assert.Equal(t, receiver, decoded.Inputs["to"]) diff --git a/pkg/rpc/client.go b/pkg/rpc/client.go index 163c81c..280e683 100644 --- a/pkg/rpc/client.go +++ b/pkg/rpc/client.go @@ -16,9 +16,8 @@ import ( "golang.org/x/time/rate" ) -var ( - ErrNoAvailableNodes = errors.New("no available rpc nodes") -) +// ErrNoAvailableNodes is returned when no RPC nodes are currently healthy or reachable. +var ErrNoAvailableNodes = errors.New("no available rpc nodes") // MultiClient manages multiple RPC nodes, providing load balancing and failover type MultiClient struct { @@ -115,7 +114,7 @@ func (mc *MultiClient) pickBestNode() *Node { defer mc.mu.RUnlock() globalH := atomic.LoadUint64(&mc.globalHeight) - + // Create a copy of candidates for sorting to avoid lock contention candidates := make([]*Node, len(mc.nodes)) copy(candidates, mc.nodes) @@ -169,13 +168,13 @@ func (mc *MultiClient) execute(ctx context.Context, op func(*Node) error) error if err == nil { return nil } - + lastErr = err // If context is canceled, don't retry if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return err } - + // If failed, the node score will automatically decrease via RecordMetric // pickBestNode might select a different node in next attempt } @@ -183,8 +182,7 @@ func (mc *MultiClient) execute(ctx context.Context, op func(*Node) error) error return lastErr } -// Implementation of Client interface - +// ChainID retrieves the chain ID from the best available node func (mc *MultiClient) ChainID(ctx context.Context) (*big.Int, error) { var res *big.Int err := mc.execute(ctx, func(n *Node) error { @@ -195,6 +193,7 @@ func (mc *MultiClient) ChainID(ctx context.Context) (*big.Int, error) { return res, err } +// BlockNumber retrieves the latest block height across all nodes (cached if possible) func (mc *MultiClient) BlockNumber(ctx context.Context) (uint64, error) { // Prefer cached global highest height h := atomic.LoadUint64(&mc.globalHeight) @@ -211,6 +210,7 @@ func (mc *MultiClient) BlockNumber(ctx context.Context) (uint64, error) { return res, err } +// HeaderByNumber retrieves a block header from the best available node func (mc *MultiClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { var res *types.Header err := mc.execute(ctx, func(n *Node) error { @@ -221,6 +221,7 @@ func (mc *MultiClient) HeaderByNumber(ctx context.Context, number *big.Int) (*ty return res, err } +// BlockByNumber retrieves a full block from the best available node func (mc *MultiClient) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { var res *types.Block err := mc.execute(ctx, func(n *Node) error { @@ -231,6 +232,7 @@ func (mc *MultiClient) BlockByNumber(ctx context.Context, number *big.Int) (*typ return res, err } +// FilterLogs retrieves logs from the best available node based on the query func (mc *MultiClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) { var res []types.Log err := mc.execute(ctx, func(n *Node) error { @@ -241,6 +243,7 @@ func (mc *MultiClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ( return res, err } +// CodeAt retrieves the contract code at a given address from the best available node func (mc *MultiClient) CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) ([]byte, error) { var res []byte err := mc.execute(ctx, func(n *Node) error { @@ -251,8 +254,9 @@ func (mc *MultiClient) CodeAt(ctx context.Context, account common.Address, block return res, err } +// Close closes all underlying RPC connections func (mc *MultiClient) Close() { for _, n := range mc.nodes { n.Close() } -} \ No newline at end of file +} diff --git a/pkg/rpc/interface.go b/pkg/rpc/interface.go index a5205b3..dbf9415 100644 --- a/pkg/rpc/interface.go +++ b/pkg/rpc/interface.go @@ -25,22 +25,22 @@ type EthClient interface { type Client interface { // ChainID retrieves the chain ID ChainID(ctx context.Context) (*big.Int, error) - + // BlockNumber retrieves the latest block height BlockNumber(ctx context.Context) (uint64, error) - + // HeaderByNumber retrieves a block header (used for fast Bloom Filter checks) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) - + // BlockByNumber retrieves a full block (used for native transfer scanning) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) - + // FilterLogs retrieves logs (used for ERC20 scanning) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) - + // CodeAt checks contract code (used for safety validation) CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) ([]byte, error) - + // Close closes the connection Close() -} \ No newline at end of file +} diff --git a/pkg/rpc/node.go b/pkg/rpc/node.go index 7baf566..569032b 100644 --- a/pkg/rpc/node.go +++ b/pkg/rpc/node.go @@ -18,7 +18,7 @@ type NodeConfig struct { Priority int // Initial weight (1-100), higher is more preferred } -// Node wraps the underlying ethclient and provides health monitoring +// Node wraps the underlying ethclient and provides health monitoring and metric tracking. type Node struct { config NodeConfig client EthClient // Interface for underlying ethclient @@ -149,8 +149,7 @@ func (n *Node) GetLatestBlock() uint64 { return n.latestBlock } -// Proxy Methods (implement Client interface) - +// BlockNumber retrieves the latest block height from the node func (n *Node) BlockNumber(ctx context.Context) (uint64, error) { start := time.Now() h, err := n.client.BlockNumber(ctx) @@ -161,6 +160,7 @@ func (n *Node) BlockNumber(ctx context.Context) (uint64, error) { return h, err } +// ChainID retrieves the chain ID from the node func (n *Node) ChainID(ctx context.Context) (*big.Int, error) { start := time.Now() id, err := n.client.ChainID(ctx) @@ -168,6 +168,7 @@ func (n *Node) ChainID(ctx context.Context) (*big.Int, error) { return id, err } +// HeaderByNumber retrieves a block header from the node func (n *Node) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { start := time.Now() h, err := n.client.HeaderByNumber(ctx, number) @@ -175,6 +176,7 @@ func (n *Node) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Head return h, err } +// BlockByNumber retrieves a full block from the node func (n *Node) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { start := time.Now() b, err := n.client.BlockByNumber(ctx, number) @@ -182,6 +184,7 @@ func (n *Node) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block return b, err } +// FilterLogs retrieves logs from the node based on the query func (n *Node) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) { start := time.Now() logs, err := n.client.FilterLogs(ctx, q) @@ -189,6 +192,7 @@ func (n *Node) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types. return logs, err } +// CodeAt retrieves the contract code at a given address func (n *Node) CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) ([]byte, error) { start := time.Now() code, err := n.client.CodeAt(ctx, account, blockNumber) @@ -196,6 +200,7 @@ func (n *Node) CodeAt(ctx context.Context, account common.Address, blockNumber * return code, err } +// Close closes the underlying RPC connection func (n *Node) Close() { n.client.Close() } diff --git a/pkg/scanner/filter.go b/pkg/scanner/filter.go index 5675c2a..c4dd603 100644 --- a/pkg/scanner/filter.go +++ b/pkg/scanner/filter.go @@ -114,4 +114,4 @@ func (f *Filter) MatchesBloom(bloom types.Bloom) bool { } return true -} \ No newline at end of file +} diff --git a/pkg/scanner/filter_test.go b/pkg/scanner/filter_test.go index 6fbc2d8..252486b 100644 --- a/pkg/scanner/filter_test.go +++ b/pkg/scanner/filter_test.go @@ -10,7 +10,7 @@ import ( func TestFilter_Builder(t *testing.T) { f := NewFilter() - + addr1 := common.HexToAddress("0x1111") addr2 := common.HexToAddress("0x2222") @@ -57,7 +57,7 @@ func TestFilter_MatchesBloom(t *testing.T) { // 1. Setup Bloom that contains addr1 and topic1 addr1 := common.HexToAddress("0x1111111111111111111111111111111111111111") topic1 := common.HexToHash("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa") - + bloom := types.Bloom{} bloom.Add(addr1.Bytes()) bloom.Add(topic1.Bytes()) diff --git a/pkg/scanner/scanner.go b/pkg/scanner/scanner.go index a317d84..d9ff1e8 100644 --- a/pkg/scanner/scanner.go +++ b/pkg/scanner/scanner.go @@ -11,6 +11,7 @@ import ( "github.com/ethereum/go-ethereum/log" ) +// Config holds configuration parameters for the Scanner. type Config struct { ChainID string // Startup strategy @@ -25,8 +26,10 @@ type Config struct { UseBloom bool } +// Handler is a callback function type for processing scanned logs. type Handler func(ctx context.Context, logs []types.Log) error +// Scanner orchestrates the scanning of blocks and processing of logs. type Scanner struct { client rpc.Client store storage.Persistence @@ -35,6 +38,7 @@ type Scanner struct { handler Handler } +// New creates and initializes a new Scanner instance. func New(client rpc.Client, store storage.Persistence, cfg Config, filter *Filter) *Scanner { if cfg.BatchSize == 0 { cfg.BatchSize = 100 diff --git a/pkg/scanner/scanner_test.go b/pkg/scanner/scanner_test.go index a3c7d21..3722966 100644 --- a/pkg/scanner/scanner_test.go +++ b/pkg/scanner/scanner_test.go @@ -89,7 +89,7 @@ func (m *MockRPC) Close() { func TestDetermineStartBlock(t *testing.T) { store := new(MockStore) client := new(MockRPC) - + // Case 1: Force Start s := New(client, store, Config{ForceStart: true, StartBlock: 100}, nil) start, err := s.DetermineStartBlockForTest(context.Background()) @@ -116,7 +116,7 @@ func TestDetermineStartBlock_Rewind(t *testing.T) { // Case 4: No cursor, Rewind from Head // Config: Rewind = 100 s := New(client, store, Config{ChainID: "eth", Rewind: 100}, nil) - + // Mock: LoadCursor -> 0 (Not found) store.On("LoadCursor", "eth").Return(uint64(0), nil).Once() // Mock: BlockNumber -> 1000 @@ -134,8 +134,8 @@ func TestScanner_Start_Errors(t *testing.T) { client := new(MockRPC) s := New(client, store, Config{ - ChainID: "eth", - Interval: 1 * time.Millisecond, + ChainID: "eth", + Interval: 1 * time.Millisecond, ReorgSafe: 0, BatchSize: 1, }, NewFilter()) @@ -220,7 +220,7 @@ func TestScanRange_Hit(t *testing.T) { filter := NewFilter() // Empty filter matches everything s := New(client, store, Config{BatchSize: 10}, filter) - + // Mock FilterLogs return logs := []types.Log{{BlockNumber: 100}} client.On("FilterLogs", mock.Anything, mock.MatchedBy(func(q ethereum.FilterQuery) bool { @@ -253,7 +253,7 @@ func TestScanner_Start(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) store := new(MockStore) client := new(MockRPC) - + // Mock process: // 1. determineStartBlock -> Start from 100 store.On("LoadCursor", "eth").Return(uint64(100), nil) @@ -269,8 +269,8 @@ func TestScanner_Start(t *testing.T) { store.On("SaveCursor", "eth", mock.Anything).Return(nil) s := New(client, store, Config{ - ChainID: "eth", - Interval: 10 * time.Millisecond, + ChainID: "eth", + Interval: 10 * time.Millisecond, ReorgSafe: 3, BatchSize: 10, }, NewFilter()) diff --git a/pkg/sink/sink.go b/pkg/sink/sink.go index 86d6d58..167f40c 100644 --- a/pkg/sink/sink.go +++ b/pkg/sink/sink.go @@ -19,7 +19,7 @@ import ( "github.com/redis/go-redis/v9" ) -// DecodedLog wraps raw log and its decoded result +// DecodedLog wraps raw log and its decoded result for structured output. type DecodedLog struct { Log types.Log `json:"log"` DecodedData *decoder.DecodedLog `json:"decoded,omitempty"` @@ -35,6 +35,7 @@ type Output interface { // --- 1. Webhook Output --- +// WebhookOutput implements the Output interface for sending events to a web service. type WebhookOutput struct { client *webhook.Client async bool @@ -44,6 +45,7 @@ type WebhookOutput struct { closedMu sync.Mutex } +// WebhookConfig holds the configuration for WebhookOutput. type WebhookConfig struct { URL string Secret string @@ -55,6 +57,7 @@ type WebhookConfig struct { Workers int } +// NewWebhookOutput initializes a new Webhook output sink. func NewWebhookOutput(url, secret string, maxAttempts int, initialBackoff, maxBackoff string, async bool, bufferSize, workers int) *WebhookOutput { // Duration conversion logic is handled at the application layer. // We receive basic types or a config here. @@ -136,12 +139,14 @@ func (w *WebhookOutput) Close() error { // --- 2. File Output --- +// FileOutput implements the Output interface for writing events to a file. type FileOutput struct { path string mu sync.Mutex file *os.File } +// NewFileOutput initializes a new file-based output sink. func NewFileOutput(path string) (*FileOutput, error) { f, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { @@ -173,6 +178,7 @@ func (f *FileOutput) Close() error { // --- 3. Console Output --- +// ConsoleOutput implements the Output interface for printing events to stdout. type ConsoleOutput struct { mu sync.Mutex } @@ -199,11 +205,13 @@ func (c *ConsoleOutput) Close() error { return nil } // --- 4. PostgreSQL Output --- +// PostgresOutput implements the Output interface for saving events to PostgreSQL. type PostgresOutput struct { db *sql.DB table string } +// NewPostgresOutput initializes a new PostgreSQL output sink. func NewPostgresOutput(url, table string) (*PostgresOutput, error) { if match, _ := regexp.MatchString("^[a-zA-Z0-9_]+$", table); !match { return nil, fmt.Errorf("invalid table name: %s", table) @@ -268,12 +276,14 @@ func (p *PostgresOutput) Close() error { return p.db.Close() } // --- 5. Redis Output --- +// RedisOutput implements the Output interface for sending events to Redis. type RedisOutput struct { client *redis.Client key string mode string } +// NewRedisOutput initializes a new Redis output sink. func NewRedisOutput(addr, password string, db int, key, mode string) (*RedisOutput, error) { rdb := redis.NewClient(&redis.Options{Addr: addr, Password: password, DB: db}) if err := rdb.Ping(context.Background()).Err(); err != nil { @@ -302,11 +312,13 @@ func (r *RedisOutput) Close() error { return r.client.Close() } // --- 6. Kafka Output --- +// KafkaOutput implements the Output interface for sending events to Kafka. type KafkaOutput struct { producer sarama.SyncProducer topic string } +// NewKafkaOutput initializes a new Kafka output sink. func NewKafkaOutput(brokers []string, topic, user, password string) (*KafkaOutput, error) { config := sarama.NewConfig() config.Producer.Return.Successes = true @@ -341,6 +353,7 @@ func (k *KafkaOutput) Close() error { return k.producer.Close() } // --- 7. RabbitMQ Output --- +// RabbitMQOutput implements the Output interface for sending events to RabbitMQ. type RabbitMQOutput struct { conn *amqp.Connection ch *amqp.Channel @@ -348,6 +361,7 @@ type RabbitMQOutput struct { routingKey string } +// NewRabbitMQOutput initializes a new RabbitMQ output sink. func NewRabbitMQOutput(url, exchange, routingKey, queueName string, durable bool) (*RabbitMQOutput, error) { conn, err := amqp.Dial(url) if err != nil { @@ -361,16 +375,22 @@ func NewRabbitMQOutput(url, exchange, routingKey, queueName string, durable bool if exchange != "" { err = ch.ExchangeDeclare(exchange, "topic", durable, false, false, false, nil) if err != nil { - ch.Close(); conn.Close(); return nil, err + ch.Close() + conn.Close() + return nil, err } } if queueName != "" { q, err := ch.QueueDeclare(queueName, durable, false, false, false, nil) if err != nil { - ch.Close(); conn.Close(); return nil, err + ch.Close() + conn.Close() + return nil, err } if err := ch.QueueBind(q.Name, routingKey, exchange, false, nil); err != nil { - ch.Close(); conn.Close(); return nil, err + ch.Close() + conn.Close() + return nil, err } } return &RabbitMQOutput{conn: conn, ch: ch, exchange: exchange, routingKey: routingKey}, nil @@ -396,4 +416,4 @@ func (r *RabbitMQOutput) Send(ctx context.Context, logs []DecodedLog) error { func (r *RabbitMQOutput) Close() error { r.ch.Close() return r.conn.Close() -} \ No newline at end of file +} diff --git a/pkg/storage/memory.go b/pkg/storage/memory.go index 09d4d3f..a8d9db1 100644 --- a/pkg/storage/memory.go +++ b/pkg/storage/memory.go @@ -24,6 +24,7 @@ type MemoryStore struct { mu sync.RWMutex } +// NewMemoryStore initializes a new in-memory storage. func NewMemoryStore(prefix string) *MemoryStore { return &MemoryStore{ data: make(map[string]uint64), @@ -31,12 +32,14 @@ func NewMemoryStore(prefix string) *MemoryStore { } } +// LoadCursor retrieves the last scanned block height from memory. func (m *MemoryStore) LoadCursor(key string) (uint64, error) { m.mu.RLock() defer m.mu.RUnlock() return m.data[m.prefix+key], nil } +// SaveCursor updates the last scanned block height in memory. func (m *MemoryStore) SaveCursor(key string, height uint64) error { m.mu.Lock() defer m.mu.Unlock() @@ -44,6 +47,7 @@ func (m *MemoryStore) SaveCursor(key string, height uint64) error { return nil } +// Close implements the Persistence interface. func (m *MemoryStore) Close() error { return nil -} \ No newline at end of file +} diff --git a/pkg/storage/postgres.go b/pkg/storage/postgres.go index a203485..0871155 100644 --- a/pkg/storage/postgres.go +++ b/pkg/storage/postgres.go @@ -35,7 +35,7 @@ func NewPostgresStore(connStr string, tablePrefix string) (*PostgresStore, error db: db, tableName: tableName, } - + if err := store.initTable(); err != nil { return nil, err } @@ -56,6 +56,7 @@ func (p *PostgresStore) initTable() error { return err } +// LoadCursor retrieves the last scanned block height for a given task key func (p *PostgresStore) LoadCursor(key string) (uint64, error) { var height uint64 query := fmt.Sprintf("SELECT block_height FROM %s WHERE task_key = $1", p.tableName) @@ -69,6 +70,7 @@ func (p *PostgresStore) LoadCursor(key string) (uint64, error) { return height, nil } +// SaveCursor updates or inserts the last scanned block height for a given task key func (p *PostgresStore) SaveCursor(key string, height uint64) error { // Upsert using Postgres ON CONFLICT syntax query := fmt.Sprintf(` @@ -81,6 +83,7 @@ func (p *PostgresStore) SaveCursor(key string, height uint64) error { return err } +// Close closes the database connection func (p *PostgresStore) Close() error { return p.db.Close() -} \ No newline at end of file +} diff --git a/pkg/storage/redis.go b/pkg/storage/redis.go index 088d0d8..6da2bc5 100644 --- a/pkg/storage/redis.go +++ b/pkg/storage/redis.go @@ -7,6 +7,7 @@ import ( "github.com/redis/go-redis/v9" ) +// RedisStore implements the Persistence interface using Redis as a backend. type RedisStore struct { client *redis.Client prefix string @@ -39,12 +40,13 @@ func NewRedisStore(addr, password string, db int, prefix string) (*RedisStore, e }, nil } +// LoadCursor retrieves the last scanned block height from Redis func (r *RedisStore) LoadCursor(key string) (uint64, error) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() fullKey := r.prefix + key - + val, err := r.client.Get(ctx, fullKey).Uint64() if err == redis.Nil { return 0, nil @@ -55,16 +57,18 @@ func (r *RedisStore) LoadCursor(key string) (uint64, error) { return val, nil } +// SaveCursor updates the last scanned block height in Redis func (r *RedisStore) SaveCursor(key string, height uint64) error { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() fullKey := r.prefix + key - + // Set value with no expiration (0) return r.client.Set(ctx, fullKey, height, 0).Err() } +// Close closes the Redis client connection func (r *RedisStore) Close() error { return r.client.Close() -} \ No newline at end of file +} From 9134aa36d3b8686090ae4f866ddbb9f595972346 Mon Sep 17 00:00:00 2001 From: 84hero <84hero@users.noreply.github.com> Date: Fri, 19 Dec 2025 09:12:38 +0800 Subject: [PATCH 2/2] chore: update README badges with a centered professional layout --- README.md | 21 +++++++++++++++++---- README_CN.md | 21 +++++++++++++++++---- 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 6fd6059..304e42d 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,22 @@ # EVM Scanner 🚀 -[](https://goreportcard.com/report/github.com/84hero/evm-scanner) -[](https://github.com/84hero/evm-scanner/actions) -[](https://godoc.org/github.com/84hero/evm-scanner) -[](https://opensource.org/licenses/MIT) +
**[English](README.md)** | **[ç®€ä½“ä¸æ–‡](README_CN.md)** diff --git a/README_CN.md b/README_CN.md index ff1d6f7..b761819 100644 --- a/README_CN.md +++ b/README_CN.md @@ -1,9 +1,22 @@ # EVM Scanner 🚀 -[](https://goreportcard.com/report/github.com/84hero/evm-scanner) -[](https://github.com/84hero/evm-scanner/actions) -[](https://godoc.org/github.com/84hero/evm-scanner) -[](https://opensource.org/licenses/MIT) + **[English](README.md)** | **[ç®€ä½“ä¸æ–‡](README_CN.md)**