Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ name: Test and Lint

on:
push:
branches: [ "main", "master" ]
branches: [ "master", "develop" ]
pull_request:
branches: [ "main", "master" ]
branches: [ "master", "develop" ]

jobs:
test:
Expand Down
64 changes: 37 additions & 27 deletions pkg/rpc/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package rpc
import (
"context"
"math/big"
"sync/atomic"
"sync"
"time"

"github.com/ethereum/go-ethereum"
Expand All @@ -23,7 +23,7 @@ type Node struct {
config NodeConfig
client EthClient // Interface for underlying ethclient

// Dynamic metrics (atomic operations)
mu sync.RWMutex
errorCount uint64 // Consecutive error count
totalErrors uint64 // Total error count
latency int64 // Average latency (ms)
Expand Down Expand Up @@ -63,20 +63,20 @@ func (n *Node) Priority() int {
// Formula: (Priority * 100) - (Latency / 10) - (ConsecutiveErrors * 500)
// Points are also deducted if the node lags too far behind the global max height.
func (n *Node) Score(globalMaxHeight uint64) int64 {
n.mu.RLock()
defer n.mu.RUnlock()

score := int64(n.config.Priority) * 100

// Latency penalty (e.g., 200ms latency = -20 points)
avgLatency := atomic.LoadInt64(&n.latency)
score -= (avgLatency / 10)
score -= (n.latency / 10)

// Error penalty (consecutive errors are critical)
errs := atomic.LoadUint64(&n.errorCount)
score -= int64(errs) * 500
score -= int64(n.errorCount) * 500

// Height lag penalty
myHeight := atomic.LoadUint64(&n.latestBlock)
if globalMaxHeight > 0 && myHeight < globalMaxHeight {
lag := globalMaxHeight - myHeight
if globalMaxHeight > 0 && n.latestBlock < globalMaxHeight {
lag := globalMaxHeight - n.latestBlock
if lag > 5 {
score -= int64(lag) * 50 // -50 points per lagged block
}
Expand All @@ -89,54 +89,64 @@ func (n *Node) Score(globalMaxHeight uint64) int64 {
func (n *Node) RecordMetric(start time.Time, err error) {
duration := time.Since(start).Milliseconds()

n.mu.Lock()
defer n.mu.Unlock()

// Simple moving average for latency
oldLatency := atomic.LoadInt64(&n.latency)
if oldLatency == 0 {
atomic.StoreInt64(&n.latency, duration)
if n.latency == 0 {
n.latency = duration
} else {
// New latency weight 20%
newLatency := (oldLatency*8 + duration*2) / 10
atomic.StoreInt64(&n.latency, newLatency)
n.latency = (n.latency*8 + duration*2) / 10
}

if err != nil {
atomic.AddUint64(&n.errorCount, 1)
atomic.AddUint64(&n.totalErrors, 1)
n.errorCount++
n.totalErrors++
} else {
// Decrease error count slowly on success to avoid "jitter"
current := atomic.LoadUint64(&n.errorCount)
if current > 0 {
atomic.StoreUint64(&n.errorCount, current-1)
if n.errorCount > 0 {
n.errorCount--
}
}
}

// UpdateHeight updates the latest block height for the node
func (n *Node) UpdateHeight(h uint64) {
current := atomic.LoadUint64(&n.latestBlock)
if h > current {
atomic.StoreUint64(&n.latestBlock, h)
n.mu.Lock()
defer n.mu.Unlock()

if h > n.latestBlock {
n.latestBlock = h
}
}

// GetErrorCount returns the current consecutive error count
func (n *Node) GetErrorCount() uint64 {
return atomic.LoadUint64(&n.errorCount)
n.mu.RLock()
defer n.mu.RUnlock()
return n.errorCount
}

// GetTotalErrors returns the total error count
func (n *Node) GetTotalErrors() uint64 {
return atomic.LoadUint64(&n.totalErrors)
n.mu.RLock()
defer n.mu.RUnlock()
return n.totalErrors
}

// GetLatency returns the average latency in ms
func (n *Node) GetLatency() int64 {
return atomic.LoadInt64(&n.latency)
n.mu.RLock()
defer n.mu.RUnlock()
return n.latency
}

// GetLatestBlock returns the latest block height observed by this node
func (n *Node) GetLatestBlock() uint64 {
return atomic.LoadUint64(&n.latestBlock)
n.mu.RLock()
defer n.mu.RUnlock()
return n.latestBlock
}

// Proxy Methods (implement Client interface)
Expand Down Expand Up @@ -188,4 +198,4 @@ func (n *Node) CodeAt(ctx context.Context, account common.Address, blockNumber *

func (n *Node) Close() {
n.client.Close()
}
}
48 changes: 26 additions & 22 deletions pkg/scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ import (
)

type Config struct {
ChainID string
ChainID string
// Startup strategy
StartBlock uint64
ForceStart bool
Rewind uint64
CursorRewind uint64 // Safety rewind from saved cursor
BatchSize uint64
Interval time.Duration
ReorgSafe uint64
UseBloom bool
StartBlock uint64
ForceStart bool
Rewind uint64
CursorRewind uint64 // Safety rewind from saved cursor

BatchSize uint64
Interval time.Duration
ReorgSafe uint64
UseBloom bool
}

type Handler func(ctx context.Context, logs []types.Log) error
Expand All @@ -43,10 +43,10 @@ func New(client rpc.Client, store storage.Persistence, cfg Config, filter *Filte
cfg.Interval = 3 * time.Second
}
return &Scanner{
client: client,
store: store,
config: cfg,
filter: filter,
client: client,
store: store,
config: cfg,
filter: filter,
}
}

Expand Down Expand Up @@ -106,8 +106,12 @@ func (s *Scanner) Start(ctx context.Context) error {
err := s.scanRange(ctx, currentBlock, endBlock)
if err != nil {
log.Error("Scan range failed", "from", currentBlock, "to", endBlock, "err", err)
// Wait a bit before retrying
time.Sleep(1 * time.Second)
// Wait a bit before retrying, but respect context
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(1 * time.Second):
}
break // Break inner loop, wait for next ticker
}

Expand All @@ -117,7 +121,7 @@ func (s *Scanner) Start(ctx context.Context) error {
if err := s.store.SaveCursor(s.config.ChainID, nextStart); err != nil {
log.Error("Failed to save cursor", "err", err)
}

currentBlock = nextStart
}
}
Expand Down Expand Up @@ -163,13 +167,13 @@ func (s *Scanner) determineStartBlock(ctx context.Context) (uint64, error) {
if err != nil {
return 0, err
}

start := uint64(0)
if head > s.config.Rewind {
start = head - s.config.Rewind
}
log.Info("Start strategy: Rewind from Head", "head", head, "rewind", s.config.Rewind, "start", start)

return start, nil
}

Expand All @@ -181,7 +185,7 @@ func (s *Scanner) scanRange(ctx context.Context, from, to uint64) error {
// 3. Scan range is small (Bloom is most effective for single or few blocks)
// For simplicity, we only use Bloom when BatchSize=1 or scanning single block
// eth_getLogs is usually fast enough anyway.

shouldCheckBloom := s.config.UseBloom && !s.filter.IsHeavy() && (to == from)

if shouldCheckBloom {
Expand All @@ -199,7 +203,7 @@ func (s *Scanner) scanRange(ctx context.Context, from, to uint64) error {

// Build eth_getLogs request
query := s.filter.ToQuery(from, to)

// Set range
query.FromBlock = big.NewInt(int64(from))
query.ToBlock = big.NewInt(int64(to))
Expand All @@ -216,4 +220,4 @@ func (s *Scanner) scanRange(ctx context.Context, from, to uint64) error {
}

return nil
}
}