diff --git a/feeds/interfaces.go b/feeds/interfaces.go index aed354b..f65e733 100644 --- a/feeds/interfaces.go +++ b/feeds/interfaces.go @@ -2,6 +2,7 @@ package feeds import ( "math/big" + "time" "github.com/ethereum/go-ethereum/core/types" "github.com/forta-network/core-go/etherclient" @@ -10,6 +11,7 @@ import ( // LogFeed is a feed of logs type LogFeed interface { ForEachLog(handler func(blk *etherclient.Block, logEntry types.Log) error, finishBlockHandler func(blk *etherclient.Block) error) error + ForEachLogPolling(interval time.Duration, handler func(blk *etherclient.Block, lg types.Log) error, finishBlockHandler func(blk *etherclient.Block) error) error GetLogsForLastBlocks(blocksAgo int64) ([]types.Log, error) GetLogsForRange(blockStart *big.Int, blockEnd *big.Int) ([]types.Log, error) AddAddress(newAddr string) diff --git a/feeds/logs.go b/feeds/logs.go index ce498aa..174fd6d 100644 --- a/feeds/logs.go +++ b/feeds/logs.go @@ -6,6 +6,7 @@ import ( "math/big" "strings" "sync" + "time" "github.com/forta-network/core-go/etherclient" @@ -158,6 +159,109 @@ func (l *logFeed) ForEachLog(handler func(blk *etherclient.Block, logEntry types return eg.Wait() } +// ForEachLogPolling processes every block that appears on‑chain, +// polling the RPC node at the given interval. +// +// - It remembers the last processed height in-memory. +// - On every tick it asks the node for the current tip and +// loops from lastProcessed+1 … tip, invoking the handlers +// exactly once per block. +// - If l.endBlock != nil the loop stops after that height +func (l *logFeed) ForEachLogPolling( + interval time.Duration, + handler func(blk *etherclient.Block, lg types.Log) error, + finishBlockHandler func(blk *etherclient.Block) error, +) error { + // prepare topic matrix once + topics := make([][]common.Hash, len(l.topics)) + for i, set := range l.topics { + topics[i] = make([]common.Hash, len(set)) + for j, t := range set { + topics[i][j] = common.HexToHash(t) + } + } + + var cursor *big.Int + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-l.ctx.Done(): + return l.ctx.Err() + + case <-ticker.C: + // discover the latest tip + tipUint, err := l.client.BlockNumber(l.ctx) + if err != nil { + return fmt.Errorf("tip discovery failed: %w", err) + } + tip := big.NewInt(int64(tipUint)) + + // initialize cursor on first iteration + if cursor == nil { + if l.startBlock != nil { + cursor = new(big.Int).Set(l.startBlock) + } else { + cursor = new(big.Int).Set(tip) + } + } + + // if nothing new, wait for next tick + if cursor.Cmp(tip) > 0 { + continue + } + + // walk from cursor … tip + for cursor.Cmp(tip) <= 0 { + // optional stop height + if l.endBlock != nil && cursor.Cmp(l.endBlock) > 0 { + return nil + } + + // fetch block + blk, err := l.client.GetBlockByNumber(l.ctx, cursor) + if err != nil { + // node not indexed yet? retry same cursor next tick + if strings.Contains(err.Error(), "not found") { + break + } + return err + } + + // build filter query (apply offset) + from := new(big.Int).Sub(cursor, big.NewInt(int64(l.offset))) + q := ethereum.FilterQuery{ + FromBlock: from, + ToBlock: from, + Addresses: l.getAddrs(), + Topics: topics, + } + logs, err := l.client.FilterLogs(l.ctx, q) + if err != nil { + if strings.Contains(err.Error(), "not found") { + break + } + return err + } + + // deliver logs + for _, lg := range logs { + if err := handler(blk, lg); err != nil { + return err + } + } + if err := finishBlockHandler(blk); err != nil { + return err + } + + // advance cursor by 1 + cursor.Add(cursor, big.NewInt(1)) + } + } + } +} + func (l *logFeed) getAddrs() []common.Address { l.addrsMu.RLock() defer l.addrsMu.RUnlock() diff --git a/feeds/logs_test.go b/feeds/logs_test.go index c8b0b62..0ff4039 100644 --- a/feeds/logs_test.go +++ b/feeds/logs_test.go @@ -5,6 +5,7 @@ import ( "fmt" "math/big" "testing" + "time" "github.com/forta-network/core-go/etherclient" mock_etherclient "github.com/forta-network/core-go/etherclient/mocks" @@ -76,3 +77,52 @@ func TestLogFeed_ForEachLog(t *testing.T) { r.Equal(logs[idx].TxHash.Hex(), fl.TxHash.Hex()) } } + +func TestLogFeed_ForEachLogPolling(t *testing.T) { + r := require.New(t) + + ctx := context.Background() + ctrl := gomock.NewController(t) + client := mock_etherclient.NewMockEtherClient(ctrl) + addr := "0x38C1e080BeEb26eeA91932178E62987598230271" + logs := testLogs(0, 1, 2) + + client.EXPECT().BlockNumber(gomock.Any()).Return(uint64(16), nil).Times(1) + + client.EXPECT().GetBlockByNumber(gomock.Any(), big.NewInt(13)).Return(ðerclient.Block{Number: "0x0"}, nil).Times(1) + client.EXPECT().FilterLogs(gomock.Any(), gomock.Any()).Return([]types.Log{logs[0]}, nil).Times(1) + + client.EXPECT().GetBlockByNumber(gomock.Any(), big.NewInt(14)).Return(ðerclient.Block{Number: "0x0"}, nil).Times(1) + client.EXPECT().FilterLogs(gomock.Any(), gomock.Any()).Return([]types.Log{logs[1]}, nil).Times(1) + + client.EXPECT().GetBlockByNumber(gomock.Any(), big.NewInt(15)).Return(ðerclient.Block{Number: "0x0"}, nil).Times(1) + client.EXPECT().FilterLogs(gomock.Any(), gomock.Any()).Return([]types.Log{logs[2]}, nil).Times(1) + + lf, err := NewLogFeed(ctx, client, LogFeedConfig{ + Addresses: []string{addr}, + Topics: [][]string{{testEventTopic}}, + StartBlock: big.NewInt(13), + }) + r.NoError(err) + + var found []types.Log + err = lf.ForEachLogPolling( + 1*time.Millisecond, // fast tick for test + func(_ *etherclient.Block, lg types.Log) error { + found = append(found, lg) + if len(found) == 3 { + return context.Canceled // stop after all logs + } + return nil + }, + func(_ *etherclient.Block) error { return nil }, + ) + // ensure expected error is the one returned + r.ErrorIs(err, context.Canceled) + + r.Equal(len(logs), len(found), "should find all logs") + for idx, fl := range found { + r.Equal(logs[idx].TxIndex, fl.TxIndex) + r.Equal(logs[idx].TxHash.Hex(), fl.TxHash.Hex()) + } +} diff --git a/feeds/mocks/mock_feeds.go b/feeds/mocks/mock_feeds.go index 3f6f4fe..a3c49b1 100644 --- a/feeds/mocks/mock_feeds.go +++ b/feeds/mocks/mock_feeds.go @@ -7,6 +7,7 @@ package mock_feeds import ( big "math/big" reflect "reflect" + time "time" types "github.com/ethereum/go-ethereum/core/types" etherclient "github.com/forta-network/core-go/etherclient" @@ -62,6 +63,20 @@ func (mr *MockLogFeedMockRecorder) ForEachLog(handler, finishBlockHandler interf return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ForEachLog", reflect.TypeOf((*MockLogFeed)(nil).ForEachLog), handler, finishBlockHandler) } +// ForEachLogPolling mocks base method. +func (m *MockLogFeed) ForEachLogPolling(interval time.Duration, handler func(*etherclient.Block, types.Log) error, finishBlockHandler func(*etherclient.Block) error) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ForEachLogPolling", interval, handler, finishBlockHandler) + ret0, _ := ret[0].(error) + return ret0 +} + +// ForEachLogPolling indicates an expected call of ForEachLogPolling. +func (mr *MockLogFeedMockRecorder) ForEachLogPolling(interval, handler, finishBlockHandler interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ForEachLogPolling", reflect.TypeOf((*MockLogFeed)(nil).ForEachLogPolling), interval, handler, finishBlockHandler) +} + // GetLogsForLastBlocks mocks base method. func (m *MockLogFeed) GetLogsForLastBlocks(blocksAgo int64) ([]types.Log, error) { m.ctrl.T.Helper()