diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 5cf429b..2c94ef5 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,9 +2,9 @@ name: Test and Lint on: push: - branches: [ "main", "master" ] + branches: [ "master", "develop" ] pull_request: - branches: [ "main", "master" ] + branches: [ "master", "develop" ] jobs: test: diff --git a/pkg/rpc/node.go b/pkg/rpc/node.go index 344578c..7baf566 100644 --- a/pkg/rpc/node.go +++ b/pkg/rpc/node.go @@ -3,7 +3,7 @@ package rpc import ( "context" "math/big" - "sync/atomic" + "sync" "time" "github.com/ethereum/go-ethereum" @@ -23,7 +23,7 @@ type Node struct { config NodeConfig client EthClient // Interface for underlying ethclient - // Dynamic metrics (atomic operations) + mu sync.RWMutex errorCount uint64 // Consecutive error count totalErrors uint64 // Total error count latency int64 // Average latency (ms) @@ -63,20 +63,20 @@ func (n *Node) Priority() int { // Formula: (Priority * 100) - (Latency / 10) - (ConsecutiveErrors * 500) // Points are also deducted if the node lags too far behind the global max height. func (n *Node) Score(globalMaxHeight uint64) int64 { + n.mu.RLock() + defer n.mu.RUnlock() + score := int64(n.config.Priority) * 100 // Latency penalty (e.g., 200ms latency = -20 points) - avgLatency := atomic.LoadInt64(&n.latency) - score -= (avgLatency / 10) + score -= (n.latency / 10) // Error penalty (consecutive errors are critical) - errs := atomic.LoadUint64(&n.errorCount) - score -= int64(errs) * 500 + score -= int64(n.errorCount) * 500 // Height lag penalty - myHeight := atomic.LoadUint64(&n.latestBlock) - if globalMaxHeight > 0 && myHeight < globalMaxHeight { - lag := globalMaxHeight - myHeight + if globalMaxHeight > 0 && n.latestBlock < globalMaxHeight { + lag := globalMaxHeight - n.latestBlock if lag > 5 { score -= int64(lag) * 50 // -50 points per lagged block } @@ -89,54 +89,64 @@ func (n *Node) Score(globalMaxHeight uint64) int64 { func (n *Node) RecordMetric(start time.Time, err error) { duration := time.Since(start).Milliseconds() + n.mu.Lock() + defer n.mu.Unlock() + // Simple moving average for latency - oldLatency := atomic.LoadInt64(&n.latency) - if oldLatency == 0 { - atomic.StoreInt64(&n.latency, duration) + if n.latency == 0 { + n.latency = duration } else { // New latency weight 20% - newLatency := (oldLatency*8 + duration*2) / 10 - atomic.StoreInt64(&n.latency, newLatency) + n.latency = (n.latency*8 + duration*2) / 10 } if err != nil { - atomic.AddUint64(&n.errorCount, 1) - atomic.AddUint64(&n.totalErrors, 1) + n.errorCount++ + n.totalErrors++ } else { // Decrease error count slowly on success to avoid "jitter" - current := atomic.LoadUint64(&n.errorCount) - if current > 0 { - atomic.StoreUint64(&n.errorCount, current-1) + if n.errorCount > 0 { + n.errorCount-- } } } // UpdateHeight updates the latest block height for the node func (n *Node) UpdateHeight(h uint64) { - current := atomic.LoadUint64(&n.latestBlock) - if h > current { - atomic.StoreUint64(&n.latestBlock, h) + n.mu.Lock() + defer n.mu.Unlock() + + if h > n.latestBlock { + n.latestBlock = h } } // GetErrorCount returns the current consecutive error count func (n *Node) GetErrorCount() uint64 { - return atomic.LoadUint64(&n.errorCount) + n.mu.RLock() + defer n.mu.RUnlock() + return n.errorCount } // GetTotalErrors returns the total error count func (n *Node) GetTotalErrors() uint64 { - return atomic.LoadUint64(&n.totalErrors) + n.mu.RLock() + defer n.mu.RUnlock() + return n.totalErrors } // GetLatency returns the average latency in ms func (n *Node) GetLatency() int64 { - return atomic.LoadInt64(&n.latency) + n.mu.RLock() + defer n.mu.RUnlock() + return n.latency } // GetLatestBlock returns the latest block height observed by this node func (n *Node) GetLatestBlock() uint64 { - return atomic.LoadUint64(&n.latestBlock) + n.mu.RLock() + defer n.mu.RUnlock() + return n.latestBlock } // Proxy Methods (implement Client interface) @@ -188,4 +198,4 @@ func (n *Node) CodeAt(ctx context.Context, account common.Address, blockNumber * func (n *Node) Close() { n.client.Close() -} \ No newline at end of file +} diff --git a/pkg/scanner/scanner.go b/pkg/scanner/scanner.go index 0a02801..a317d84 100644 --- a/pkg/scanner/scanner.go +++ b/pkg/scanner/scanner.go @@ -12,17 +12,17 @@ import ( ) type Config struct { - ChainID string + ChainID string // Startup strategy - StartBlock uint64 - ForceStart bool - Rewind uint64 - CursorRewind uint64 // Safety rewind from saved cursor - - BatchSize uint64 - Interval time.Duration - ReorgSafe uint64 - UseBloom bool + StartBlock uint64 + ForceStart bool + Rewind uint64 + CursorRewind uint64 // Safety rewind from saved cursor + + BatchSize uint64 + Interval time.Duration + ReorgSafe uint64 + UseBloom bool } type Handler func(ctx context.Context, logs []types.Log) error @@ -43,10 +43,10 @@ func New(client rpc.Client, store storage.Persistence, cfg Config, filter *Filte cfg.Interval = 3 * time.Second } return &Scanner{ - client: client, - store: store, - config: cfg, - filter: filter, + client: client, + store: store, + config: cfg, + filter: filter, } } @@ -106,8 +106,12 @@ func (s *Scanner) Start(ctx context.Context) error { err := s.scanRange(ctx, currentBlock, endBlock) if err != nil { log.Error("Scan range failed", "from", currentBlock, "to", endBlock, "err", err) - // Wait a bit before retrying - time.Sleep(1 * time.Second) + // Wait a bit before retrying, but respect context + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(1 * time.Second): + } break // Break inner loop, wait for next ticker } @@ -117,7 +121,7 @@ func (s *Scanner) Start(ctx context.Context) error { if err := s.store.SaveCursor(s.config.ChainID, nextStart); err != nil { log.Error("Failed to save cursor", "err", err) } - + currentBlock = nextStart } } @@ -163,13 +167,13 @@ func (s *Scanner) determineStartBlock(ctx context.Context) (uint64, error) { if err != nil { return 0, err } - + start := uint64(0) if head > s.config.Rewind { start = head - s.config.Rewind } log.Info("Start strategy: Rewind from Head", "head", head, "rewind", s.config.Rewind, "start", start) - + return start, nil } @@ -181,7 +185,7 @@ func (s *Scanner) scanRange(ctx context.Context, from, to uint64) error { // 3. Scan range is small (Bloom is most effective for single or few blocks) // For simplicity, we only use Bloom when BatchSize=1 or scanning single block // eth_getLogs is usually fast enough anyway. - + shouldCheckBloom := s.config.UseBloom && !s.filter.IsHeavy() && (to == from) if shouldCheckBloom { @@ -199,7 +203,7 @@ func (s *Scanner) scanRange(ctx context.Context, from, to uint64) error { // Build eth_getLogs request query := s.filter.ToQuery(from, to) - + // Set range query.FromBlock = big.NewInt(int64(from)) query.ToBlock = big.NewInt(int64(to)) @@ -216,4 +220,4 @@ func (s *Scanner) scanRange(ctx context.Context, from, to uint64) error { } return nil -} \ No newline at end of file +}