From c9e1f2578de226a21a699de53c8b38845f2a3e63 Mon Sep 17 00:00:00 2001 From: ali Date: Thu, 8 May 2025 11:50:14 +0000 Subject: [PATCH 1/3] Implement a polling log feed --- feeds/interfaces.go | 2 + feeds/logs.go | 103 ++++++++++++++++++++++++++++++++++++++ feeds/logs_test.go | 50 ++++++++++++++++++ feeds/mocks/mock_feeds.go | 15 ++++++ 4 files changed, 170 insertions(+) diff --git a/feeds/interfaces.go b/feeds/interfaces.go index aed354b..f65e733 100644 --- a/feeds/interfaces.go +++ b/feeds/interfaces.go @@ -2,6 +2,7 @@ package feeds import ( "math/big" + "time" "github.com/ethereum/go-ethereum/core/types" "github.com/forta-network/core-go/etherclient" @@ -10,6 +11,7 @@ import ( // LogFeed is a feed of logs type LogFeed interface { ForEachLog(handler func(blk *etherclient.Block, logEntry types.Log) error, finishBlockHandler func(blk *etherclient.Block) error) error + ForEachLogPolling(interval time.Duration, handler func(blk *etherclient.Block, lg types.Log) error, finishBlockHandler func(blk *etherclient.Block) error) error GetLogsForLastBlocks(blocksAgo int64) ([]types.Log, error) GetLogsForRange(blockStart *big.Int, blockEnd *big.Int) ([]types.Log, error) AddAddress(newAddr string) diff --git a/feeds/logs.go b/feeds/logs.go index ce498aa..8f6fd80 100644 --- a/feeds/logs.go +++ b/feeds/logs.go @@ -6,6 +6,7 @@ import ( "math/big" "strings" "sync" + "time" "github.com/forta-network/core-go/etherclient" @@ -158,6 +159,108 @@ func (l *logFeed) ForEachLog(handler func(blk *etherclient.Block, logEntry types return eg.Wait() } +// ForEachLogPolling processes every block that appears on‑chain, +// polling the RPC node at the given interval. +// +// - It remembers the last processed height in-memory. +// - On every tick it asks the node for the current tip and +// loops from lastProcessed+1 … tip, invoking the handlers +// exactly once per block. +// - If l.endBlock != nil the loop stops after that height +func (l *logFeed) ForEachLogPolling( + interval time.Duration, + handler func(blk *etherclient.Block, lg types.Log) error, + finishBlockHandler func(blk *etherclient.Block) error, +) error { + // prepare topic matrix once + topics := make([][]common.Hash, len(l.topics)) + for i, set := range l.topics { + topics[i] = make([]common.Hash, len(set)) + for j, t := range set { + topics[i][j] = common.HexToHash(t) + } + } + + // initial height = cfg.StartBlock (may be nil ➜ latest‑tip on first tick) + var lastProcessed *big.Int + if l.startBlock != nil { + lastProcessed = new(big.Int).Sub(l.startBlock, big.NewInt(1)) + } + + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-l.ctx.Done(): + return l.ctx.Err() + + case <-ticker.C: + // ── discover current tip ────────────────────────────────────────── + head, err := l.client.GetBlockByNumber(l.ctx, nil) + if err != nil { + return fmt.Errorf("tip discovery failed: %w", err) + } + tip, _ := hexutil.DecodeBig(head.Number) + + // initialize lastProcessed if this is the first iteration + if lastProcessed == nil { + lastProcessed = new(big.Int).Sub(tip, big.NewInt(1)) + } + + // no new blocks? keep waiting + if tip.Cmp(lastProcessed) <= 0 { + continue + } + + // walk from lastProcessed+1 … tip + cursor := new(big.Int).Add(lastProcessed, big.NewInt(1)) + for ; cursor.Cmp(tip) <= 0; cursor.Add(cursor, big.NewInt(1)) { + // optional stop height + if l.endBlock != nil && cursor.Cmp(l.endBlock) > 0 { + return nil + } + + blk, err := l.client.GetBlockByNumber(l.ctx, cursor) + if err != nil { + // skip races where the node hasn’t fully indexed the block yet + if strings.Contains(err.Error(), "not found") { + cursor.Sub(cursor, big.NewInt(1)) // retry same height next tick + break + } + return err + } + + q := ethereum.FilterQuery{ + FromBlock: new(big.Int).Sub(cursor, big.NewInt(int64(l.offset))), + ToBlock: new(big.Int).Sub(cursor, big.NewInt(int64(l.offset))), + Addresses: l.getAddrs(), + Topics: topics, + } + logs, err := l.client.FilterLogs(l.ctx, q) + if err != nil { + if strings.Contains(err.Error(), "not found") { + cursor.Sub(cursor, big.NewInt(1)) + break + } + return err + } + + for _, lg := range logs { + if err := handler(blk, lg); err != nil { + return err + } + } + if err := finishBlockHandler(blk); err != nil { + return err + } + + lastProcessed = new(big.Int).Set(cursor) + } + } + } +} + func (l *logFeed) getAddrs() []common.Address { l.addrsMu.RLock() defer l.addrsMu.RUnlock() diff --git a/feeds/logs_test.go b/feeds/logs_test.go index c8b0b62..30dc75a 100644 --- a/feeds/logs_test.go +++ b/feeds/logs_test.go @@ -5,6 +5,7 @@ import ( "fmt" "math/big" "testing" + "time" "github.com/forta-network/core-go/etherclient" mock_etherclient "github.com/forta-network/core-go/etherclient/mocks" @@ -76,3 +77,52 @@ func TestLogFeed_ForEachLog(t *testing.T) { r.Equal(logs[idx].TxHash.Hex(), fl.TxHash.Hex()) } } + +func TestLogFeed_ForEachLogPolling(t *testing.T) { + r := require.New(t) + + ctx := context.Background() + ctrl := gomock.NewController(t) + client := mock_etherclient.NewMockEtherClient(ctrl) + addr := "0x38C1e080BeEb26eeA91932178E62987598230271" + logs := testLogs(0, 1, 2) + + client.EXPECT().GetBlockByNumber(gomock.Any(), nil).Return(ðerclient.Block{Number: "0xf"}, nil).Times(1) + + client.EXPECT().GetBlockByNumber(gomock.Any(), big.NewInt(13)).Return(ðerclient.Block{Number: "0x0"}, nil).Times(1) + client.EXPECT().FilterLogs(gomock.Any(), gomock.Any()).Return([]types.Log{logs[0]}, nil).Times(1) + + client.EXPECT().GetBlockByNumber(gomock.Any(), big.NewInt(14)).Return(ðerclient.Block{Number: "0x0"}, nil).Times(1) + client.EXPECT().FilterLogs(gomock.Any(), gomock.Any()).Return([]types.Log{logs[1]}, nil).Times(1) + + client.EXPECT().GetBlockByNumber(gomock.Any(), big.NewInt(15)).Return(ðerclient.Block{Number: "0x0"}, nil).Times(1) + client.EXPECT().FilterLogs(gomock.Any(), gomock.Any()).Return([]types.Log{logs[2]}, nil).Times(1) + + lf, err := NewLogFeed(ctx, client, LogFeedConfig{ + Addresses: []string{addr}, + Topics: [][]string{{testEventTopic}}, + StartBlock: big.NewInt(13), + }) + r.NoError(err) + + var found []types.Log + err = lf.ForEachLogPolling( + 1*time.Millisecond, // fast tick for test + func(_ *etherclient.Block, lg types.Log) error { + found = append(found, lg) + if len(found) == 3 { + return context.Canceled // stop after all logs + } + return nil + }, + func(_ *etherclient.Block) error { return nil }, + ) + // ensure expected error is the one returned + r.ErrorIs(err, context.Canceled) + + r.Equal(len(logs), len(found), "should find all logs") + for idx, fl := range found { + r.Equal(logs[idx].TxIndex, fl.TxIndex) + r.Equal(logs[idx].TxHash.Hex(), fl.TxHash.Hex()) + } +} diff --git a/feeds/mocks/mock_feeds.go b/feeds/mocks/mock_feeds.go index 3f6f4fe..a3c49b1 100644 --- a/feeds/mocks/mock_feeds.go +++ b/feeds/mocks/mock_feeds.go @@ -7,6 +7,7 @@ package mock_feeds import ( big "math/big" reflect "reflect" + time "time" types "github.com/ethereum/go-ethereum/core/types" etherclient "github.com/forta-network/core-go/etherclient" @@ -62,6 +63,20 @@ func (mr *MockLogFeedMockRecorder) ForEachLog(handler, finishBlockHandler interf return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ForEachLog", reflect.TypeOf((*MockLogFeed)(nil).ForEachLog), handler, finishBlockHandler) } +// ForEachLogPolling mocks base method. +func (m *MockLogFeed) ForEachLogPolling(interval time.Duration, handler func(*etherclient.Block, types.Log) error, finishBlockHandler func(*etherclient.Block) error) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ForEachLogPolling", interval, handler, finishBlockHandler) + ret0, _ := ret[0].(error) + return ret0 +} + +// ForEachLogPolling indicates an expected call of ForEachLogPolling. +func (mr *MockLogFeedMockRecorder) ForEachLogPolling(interval, handler, finishBlockHandler interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ForEachLogPolling", reflect.TypeOf((*MockLogFeed)(nil).ForEachLogPolling), interval, handler, finishBlockHandler) +} + // GetLogsForLastBlocks mocks base method. func (m *MockLogFeed) GetLogsForLastBlocks(blocksAgo int64) ([]types.Log, error) { m.ctrl.T.Helper() From c647ba0ce2d5fd8c736e78284a8abe38c44b1014 Mon Sep 17 00:00:00 2001 From: ali Date: Fri, 9 May 2025 07:33:44 +0000 Subject: [PATCH 2/3] use blocknumber instead of getblock --- feeds/logs.go | 4 ++-- feeds/logs_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/feeds/logs.go b/feeds/logs.go index 8f6fd80..37c6b7b 100644 --- a/feeds/logs.go +++ b/feeds/logs.go @@ -197,12 +197,12 @@ func (l *logFeed) ForEachLogPolling( case <-ticker.C: // ── discover current tip ────────────────────────────────────────── - head, err := l.client.GetBlockByNumber(l.ctx, nil) + tipInt, err := l.client.BlockNumber(l.ctx) if err != nil { return fmt.Errorf("tip discovery failed: %w", err) } - tip, _ := hexutil.DecodeBig(head.Number) + tip := big.NewInt(int64(tipInt)) // initialize lastProcessed if this is the first iteration if lastProcessed == nil { lastProcessed = new(big.Int).Sub(tip, big.NewInt(1)) diff --git a/feeds/logs_test.go b/feeds/logs_test.go index 30dc75a..0ff4039 100644 --- a/feeds/logs_test.go +++ b/feeds/logs_test.go @@ -87,7 +87,7 @@ func TestLogFeed_ForEachLogPolling(t *testing.T) { addr := "0x38C1e080BeEb26eeA91932178E62987598230271" logs := testLogs(0, 1, 2) - client.EXPECT().GetBlockByNumber(gomock.Any(), nil).Return(ðerclient.Block{Number: "0xf"}, nil).Times(1) + client.EXPECT().BlockNumber(gomock.Any()).Return(uint64(16), nil).Times(1) client.EXPECT().GetBlockByNumber(gomock.Any(), big.NewInt(13)).Return(ðerclient.Block{Number: "0x0"}, nil).Times(1) client.EXPECT().FilterLogs(gomock.Any(), gomock.Any()).Return([]types.Log{logs[0]}, nil).Times(1) From a761c9007753039cfc51abb861e78de1fbf4eae8 Mon Sep 17 00:00:00 2001 From: ali Date: Fri, 9 May 2025 07:39:41 +0000 Subject: [PATCH 3/3] simplify logic --- feeds/logs.go | 49 +++++++++++++++++++++++++------------------------ 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/feeds/logs.go b/feeds/logs.go index 37c6b7b..174fd6d 100644 --- a/feeds/logs.go +++ b/feeds/logs.go @@ -181,12 +181,7 @@ func (l *logFeed) ForEachLogPolling( } } - // initial height = cfg.StartBlock (may be nil ➜ latest‑tip on first tick) - var lastProcessed *big.Int - if l.startBlock != nil { - lastProcessed = new(big.Int).Sub(l.startBlock, big.NewInt(1)) - } - + var cursor *big.Int ticker := time.NewTicker(interval) defer ticker.Stop() @@ -196,56 +191,61 @@ func (l *logFeed) ForEachLogPolling( return l.ctx.Err() case <-ticker.C: - // ── discover current tip ────────────────────────────────────────── - tipInt, err := l.client.BlockNumber(l.ctx) + // discover the latest tip + tipUint, err := l.client.BlockNumber(l.ctx) if err != nil { return fmt.Errorf("tip discovery failed: %w", err) } - - tip := big.NewInt(int64(tipInt)) - // initialize lastProcessed if this is the first iteration - if lastProcessed == nil { - lastProcessed = new(big.Int).Sub(tip, big.NewInt(1)) + tip := big.NewInt(int64(tipUint)) + + // initialize cursor on first iteration + if cursor == nil { + if l.startBlock != nil { + cursor = new(big.Int).Set(l.startBlock) + } else { + cursor = new(big.Int).Set(tip) + } } - // no new blocks? keep waiting - if tip.Cmp(lastProcessed) <= 0 { + // if nothing new, wait for next tick + if cursor.Cmp(tip) > 0 { continue } - // walk from lastProcessed+1 … tip - cursor := new(big.Int).Add(lastProcessed, big.NewInt(1)) - for ; cursor.Cmp(tip) <= 0; cursor.Add(cursor, big.NewInt(1)) { + // walk from cursor … tip + for cursor.Cmp(tip) <= 0 { // optional stop height if l.endBlock != nil && cursor.Cmp(l.endBlock) > 0 { return nil } + // fetch block blk, err := l.client.GetBlockByNumber(l.ctx, cursor) if err != nil { - // skip races where the node hasn’t fully indexed the block yet + // node not indexed yet? retry same cursor next tick if strings.Contains(err.Error(), "not found") { - cursor.Sub(cursor, big.NewInt(1)) // retry same height next tick break } return err } + // build filter query (apply offset) + from := new(big.Int).Sub(cursor, big.NewInt(int64(l.offset))) q := ethereum.FilterQuery{ - FromBlock: new(big.Int).Sub(cursor, big.NewInt(int64(l.offset))), - ToBlock: new(big.Int).Sub(cursor, big.NewInt(int64(l.offset))), + FromBlock: from, + ToBlock: from, Addresses: l.getAddrs(), Topics: topics, } logs, err := l.client.FilterLogs(l.ctx, q) if err != nil { if strings.Contains(err.Error(), "not found") { - cursor.Sub(cursor, big.NewInt(1)) break } return err } + // deliver logs for _, lg := range logs { if err := handler(blk, lg); err != nil { return err @@ -255,7 +255,8 @@ func (l *logFeed) ForEachLogPolling( return err } - lastProcessed = new(big.Int).Set(cursor) + // advance cursor by 1 + cursor.Add(cursor, big.NewInt(1)) } } }