Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions feeds/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package feeds

import (
"math/big"
"time"

"github.com/ethereum/go-ethereum/core/types"
"github.com/forta-network/core-go/etherclient"
Expand All @@ -10,6 +11,7 @@ import (
// LogFeed is a feed of logs
type LogFeed interface {
ForEachLog(handler func(blk *etherclient.Block, logEntry types.Log) error, finishBlockHandler func(blk *etherclient.Block) error) error
ForEachLogPolling(interval time.Duration, handler func(blk *etherclient.Block, lg types.Log) error, finishBlockHandler func(blk *etherclient.Block) error) error
GetLogsForLastBlocks(blocksAgo int64) ([]types.Log, error)
GetLogsForRange(blockStart *big.Int, blockEnd *big.Int) ([]types.Log, error)
AddAddress(newAddr string)
Expand Down
104 changes: 104 additions & 0 deletions feeds/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math/big"
"strings"
"sync"
"time"

"github.com/forta-network/core-go/etherclient"

Expand Down Expand Up @@ -158,6 +159,109 @@ func (l *logFeed) ForEachLog(handler func(blk *etherclient.Block, logEntry types
return eg.Wait()
}

// ForEachLogPolling processes every block that appears on‑chain,
// polling the RPC node at the given interval.
//
// - It remembers the last processed height in-memory.
// - On every tick it asks the node for the current tip and
// loops from lastProcessed+1 … tip, invoking the handlers
// exactly once per block.
// - If l.endBlock != nil the loop stops after that height
func (l *logFeed) ForEachLogPolling(
interval time.Duration,
handler func(blk *etherclient.Block, lg types.Log) error,
finishBlockHandler func(blk *etherclient.Block) error,
) error {
// prepare topic matrix once
topics := make([][]common.Hash, len(l.topics))
for i, set := range l.topics {
topics[i] = make([]common.Hash, len(set))
for j, t := range set {
topics[i][j] = common.HexToHash(t)
}
}

var cursor *big.Int
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-l.ctx.Done():
return l.ctx.Err()

case <-ticker.C:
// discover the latest tip
tipUint, err := l.client.BlockNumber(l.ctx)
if err != nil {
return fmt.Errorf("tip discovery failed: %w", err)
}
tip := big.NewInt(int64(tipUint))

// initialize cursor on first iteration
if cursor == nil {
if l.startBlock != nil {
cursor = new(big.Int).Set(l.startBlock)
} else {
cursor = new(big.Int).Set(tip)
}
}

// if nothing new, wait for next tick
if cursor.Cmp(tip) > 0 {
continue
}

// walk from cursor … tip
for cursor.Cmp(tip) <= 0 {
// optional stop height
if l.endBlock != nil && cursor.Cmp(l.endBlock) > 0 {
return nil
}

// fetch block
blk, err := l.client.GetBlockByNumber(l.ctx, cursor)
if err != nil {
// node not indexed yet? retry same cursor next tick
if strings.Contains(err.Error(), "not found") {
break
}
return err
}

// build filter query (apply offset)
from := new(big.Int).Sub(cursor, big.NewInt(int64(l.offset)))
q := ethereum.FilterQuery{
FromBlock: from,
ToBlock: from,
Addresses: l.getAddrs(),
Topics: topics,
}
logs, err := l.client.FilterLogs(l.ctx, q)
if err != nil {
if strings.Contains(err.Error(), "not found") {
break
}
return err
}

// deliver logs
for _, lg := range logs {
if err := handler(blk, lg); err != nil {
return err
}
}
if err := finishBlockHandler(blk); err != nil {
return err
}

// advance cursor by 1
cursor.Add(cursor, big.NewInt(1))
}
}
}
}

func (l *logFeed) getAddrs() []common.Address {
l.addrsMu.RLock()
defer l.addrsMu.RUnlock()
Expand Down
50 changes: 50 additions & 0 deletions feeds/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"math/big"
"testing"
"time"

"github.com/forta-network/core-go/etherclient"
mock_etherclient "github.com/forta-network/core-go/etherclient/mocks"
Expand Down Expand Up @@ -76,3 +77,52 @@ func TestLogFeed_ForEachLog(t *testing.T) {
r.Equal(logs[idx].TxHash.Hex(), fl.TxHash.Hex())
}
}

func TestLogFeed_ForEachLogPolling(t *testing.T) {
r := require.New(t)

ctx := context.Background()
ctrl := gomock.NewController(t)
client := mock_etherclient.NewMockEtherClient(ctrl)
addr := "0x38C1e080BeEb26eeA91932178E62987598230271"
logs := testLogs(0, 1, 2)

client.EXPECT().BlockNumber(gomock.Any()).Return(uint64(16), nil).Times(1)

client.EXPECT().GetBlockByNumber(gomock.Any(), big.NewInt(13)).Return(&etherclient.Block{Number: "0x0"}, nil).Times(1)
client.EXPECT().FilterLogs(gomock.Any(), gomock.Any()).Return([]types.Log{logs[0]}, nil).Times(1)

client.EXPECT().GetBlockByNumber(gomock.Any(), big.NewInt(14)).Return(&etherclient.Block{Number: "0x0"}, nil).Times(1)
client.EXPECT().FilterLogs(gomock.Any(), gomock.Any()).Return([]types.Log{logs[1]}, nil).Times(1)

client.EXPECT().GetBlockByNumber(gomock.Any(), big.NewInt(15)).Return(&etherclient.Block{Number: "0x0"}, nil).Times(1)
client.EXPECT().FilterLogs(gomock.Any(), gomock.Any()).Return([]types.Log{logs[2]}, nil).Times(1)

lf, err := NewLogFeed(ctx, client, LogFeedConfig{
Addresses: []string{addr},
Topics: [][]string{{testEventTopic}},
StartBlock: big.NewInt(13),
})
r.NoError(err)

var found []types.Log
err = lf.ForEachLogPolling(
1*time.Millisecond, // fast tick for test
func(_ *etherclient.Block, lg types.Log) error {
found = append(found, lg)
if len(found) == 3 {
return context.Canceled // stop after all logs
}
return nil
},
func(_ *etherclient.Block) error { return nil },
)
// ensure expected error is the one returned
r.ErrorIs(err, context.Canceled)

r.Equal(len(logs), len(found), "should find all logs")
for idx, fl := range found {
r.Equal(logs[idx].TxIndex, fl.TxIndex)
r.Equal(logs[idx].TxHash.Hex(), fl.TxHash.Hex())
}
}
15 changes: 15 additions & 0 deletions feeds/mocks/mock_feeds.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading