From 183d46a585b26fd33b2ed8f677bb6818ad187902 Mon Sep 17 00:00:00 2001 From: Roshan Date: Thu, 5 Feb 2026 16:42:47 +0800 Subject: [PATCH 1/3] feat(filters): add eth_subscribe("newOracleTransactions") for oracle tx streaming --- core/events.go | 10 ++ core/oracle/chainlink.go | 94 +++++++++++ core/oracle/chainlink_test.go | 149 ++++++++++++++++++ core/oracle/identifier.go | 47 ++++++ core/oracle/redstone.go | 58 +++++++ core/oracle/registry.go | 52 +++++++ core/oracle/registry_test.go | 190 +++++++++++++++++++++++ core/txpool/blobpool/blobpool.go | 6 + core/txpool/legacypool/legacypool.go | 51 ++++-- core/txpool/subpool.go | 4 + core/txpool/txpool.go | 13 ++ eth/api_backend.go | 4 + eth/backend.go | 7 + eth/filters/api.go | 42 +++++ eth/filters/filter_system.go | 56 ++++++- eth/filters/filter_system_test.go | 5 + internal/ethapi/api.go | 15 ++ internal/ethapi/api_test.go | 3 + internal/ethapi/backend.go | 1 + internal/ethapi/transaction_args_test.go | 7 +- 20 files changed, 797 insertions(+), 17 deletions(-) create mode 100644 core/oracle/chainlink.go create mode 100644 core/oracle/chainlink_test.go create mode 100644 core/oracle/identifier.go create mode 100644 core/oracle/redstone.go create mode 100644 core/oracle/registry.go create mode 100644 core/oracle/registry_test.go diff --git a/core/events.go b/core/events.go index 5b1b65750b..3ef987e645 100644 --- a/core/events.go +++ b/core/events.go @@ -17,6 +17,7 @@ package core import ( + "github.com/ethereum/go-ethereum/core/oracle" "github.com/ethereum/go-ethereum/core/types" ) @@ -52,3 +53,12 @@ type ChainHeadEvent struct { } type HighestVerifiedBlockEvent struct{ Header *types.Header } + +// OracleTxInfo pairs a transaction with its oracle identification metadata. +type OracleTxInfo struct { + Tx *types.Transaction + Info *oracle.OracleInfo +} + +// NewOracleTxsEvent is posted when oracle-related transactions enter the transaction pool. +type NewOracleTxsEvent struct{ Txs []OracleTxInfo } diff --git a/core/oracle/chainlink.go b/core/oracle/chainlink.go new file mode 100644 index 0000000000..4204c6f931 --- /dev/null +++ b/core/oracle/chainlink.go @@ -0,0 +1,94 @@ +// Copyright 2026 The bsc Authors +// This file is part of the bsc library. +// +// The bsc library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The bsc library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the bsc library. If not, see . + +package oracle + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +var ( + // Chainlink transmit method selectors. + chainlinkSelectors = [2][4]byte{ + {0x6f, 0xad, 0xcf, 0x72}, + {0xb6, 0x4f, 0xa9, 0xe6}, + } + + // Chainlink forwarder contract addresses. + chainlinkTargets = map[common.Address]struct{}{ + common.HexToAddress("0x080f02795ba9003404Aceaf102f14443647DBe08"): {}, + common.HexToAddress("0x1877d26Dabef3b5869aAA788EbFB39F1d6C477A0"): {}, + common.HexToAddress("0x1b66fBd3a0c4aa37705E7091EF40554E124A25F6"): {}, + common.HexToAddress("0x2c14565cDDC53F3a60D27E8C345809a31D934642"): {}, + common.HexToAddress("0x328e4BDcf3096C36D9CB2d989Da6995b461140ac"): {}, + common.HexToAddress("0x342682197497FC9b860e914636847F1a9AD8b037"): {}, + common.HexToAddress("0x347dF906a432D7964Ce2D854B51D9166B6A46CcC"): {}, + common.HexToAddress("0x3A642243e38beA40322aa9A7e59c26Aa27Cc6F8D"): {}, + common.HexToAddress("0x42b950F1Cf4855a54f6d0bd6981687Eb4d5d6a18"): {}, + common.HexToAddress("0x5b4813eE2D4366D73C89D7870aFa82BF2FBB3E71"): {}, + common.HexToAddress("0x5d6173E05c3Af359DBb13ee12Af726E34AAeDAD2"): {}, + common.HexToAddress("0x64F4d51eE8AE9671c5d1cE58dc15a653A71efC90"): {}, + common.HexToAddress("0x7511839dFfaf432A5DbA567bc1Aa77115EfEf882"): {}, + common.HexToAddress("0x870A3D3BBB5554f1D04967a6b76534989020B203"): {}, + common.HexToAddress("0x8bDB5E089E9285037E6874D8F3160b5548611380"): {}, + common.HexToAddress("0x9A7032C6EDCBbf164E201ca9dF5894BC35416e6A"): {}, + common.HexToAddress("0x9dFCA5435036ad0069ECa7C8e15b0c5c44548952"): {}, + common.HexToAddress("0xD979873cFAf9a655F44006Fe01B148f1ABbc47Cc"): {}, + common.HexToAddress("0xF72bb66E324B5188c6178c6401Ee982FA30bE096"): {}, + common.HexToAddress("0xF87C2E9f08c04f805d15C5E90273227639151860"): {}, + common.HexToAddress("0xa13814fE399ea85e0541B4FdD2b38efa149c8Ee5"): {}, + common.HexToAddress("0xaaC815bcd3d1eCEACfC51608c4EaF194eEfd14f1"): {}, + common.HexToAddress("0xc3bC83292fcc9c646E61f37126839449736d64FD"): {}, + common.HexToAddress("0xc7F79eA9c7FF17968eEC60b4CC9880905cFdb2f4"): {}, + common.HexToAddress("0xdB1A1C98F4A23c8C78B7B2c44Cd9678Ff286A0A1"): {}, + } +) + +// ChainlinkIdentifier identifies Chainlink oracle transactions. +type ChainlinkIdentifier struct{} + +// NewChainlinkIdentifier creates a new Chainlink oracle identifier. +func NewChainlinkIdentifier() *ChainlinkIdentifier { + return &ChainlinkIdentifier{} +} + +// Type returns the oracle type for Chainlink. +func (c *ChainlinkIdentifier) Type() OracleType { + return OracleChainlink +} + +// Identify checks whether the transaction is a Chainlink oracle transaction. +func (c *ChainlinkIdentifier) Identify(tx *types.Transaction) *OracleInfo { + // Check method selector (4-byte comparison, very cheap). + data := tx.Data() + if len(data) < 4 { + return nil + } + sel := [4]byte(data[:4]) + if sel != chainlinkSelectors[0] && sel != chainlinkSelectors[1] { + return nil + } + // Check target address against the known set. + to := tx.To() + if to == nil { + return nil + } + if _, ok := chainlinkTargets[*to]; !ok { + return nil + } + return &OracleInfo{Type: OracleChainlink} +} diff --git a/core/oracle/chainlink_test.go b/core/oracle/chainlink_test.go new file mode 100644 index 0000000000..b938b91486 --- /dev/null +++ b/core/oracle/chainlink_test.go @@ -0,0 +1,149 @@ +// Copyright 2026 The bsc Authors +// This file is part of the bsc library. +// +// The bsc library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The bsc library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the bsc library. If not, see . +package oracle + +import ( + "context" + "fmt" + "math/big" + "os" + "sort" + "strconv" + "testing" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethclient" +) + +const ( + chainlinkFactoryBSC = "0x297Bc37BCE59112D245cCfC22f54079466733401" +) + +// TestScanChainlinkForwarders scans AuthorizedForwarderCreated events from the +// Chainlink OperatorFactory and prints unique forwarder addresses. +// +// Env vars: +// - BSC_RPC_URL: required RPC endpoint (e.g., https://bsc-dataseed.binance.org) +// - FROM_BLOCK: optional start block (default 0) +// - TO_BLOCK: optional end block (default latest) +// - BATCH_SIZE: optional batch size (default 50000) +func TestScanChainlinkForwarders(t *testing.T) { + rpcURL := "http://127.0.0.1:8544" + + fromBlock := uint64(40299701) // deployed at block 40299701 + toBlock := uint64(44844259) // known latest tx height + batchSize := uint64(50000) + + client, err := ethclient.Dial(rpcURL) + if err != nil { + t.Fatalf("dial rpc: %v", err) + } + defer client.Close() + + if toBlock == 0 { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + latest, err := client.BlockNumber(ctx) + if err != nil { + t.Fatalf("get latest block: %v", err) + } + toBlock = latest + } + + factoryAddr := common.HexToAddress(chainlinkFactoryBSC) + eventSig := crypto.Keccak256Hash([]byte("AuthorizedForwarderCreated(address,address,address)")) + + forwarders := map[common.Address]struct{}{} + var logsCount int + + for start := fromBlock; start <= toBlock; start += batchSize { + end := start + batchSize - 1 + if end > toBlock { + end = toBlock + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + query := ethereum.FilterQuery{ + FromBlock: bigInt(start), + ToBlock: bigInt(end), + Addresses: []common.Address{factoryAddr}, + Topics: [][]common.Hash{{eventSig}}, + } + logs, err := client.FilterLogs(ctx, query) + cancel() + if err != nil { + t.Fatalf("filter logs [%d-%d]: %v", start, end, err) + } + + for _, log := range logs { + logsCount++ + fwd, owner, sender, ok := decodeForwarderEvent(log) + if !ok { + continue + } + forwarders[fwd] = struct{}{} + t.Logf("forwarder=%s owner=%s sender=%s block=%d tx=%s", + fwd.Hex(), owner.Hex(), sender.Hex(), log.BlockNumber, log.TxHash.Hex()) + } + } + + list := make([]string, 0, len(forwarders)) + for addr := range forwarders { + list = append(list, addr.Hex()) + } + sort.Strings(list) + + t.Logf("total logs=%d unique forwarders=%d", logsCount, len(list)) + for _, addr := range list { + fmt.Println(addr) + } +} + +func decodeForwarderEvent(log types.Log) (common.Address, common.Address, common.Address, bool) { + // Topics: [sig, forwarder, owner, sender] + if len(log.Topics) < 4 { + return common.Address{}, common.Address{}, common.Address{}, false + } + return topicToAddress(log.Topics[1]), + topicToAddress(log.Topics[2]), + topicToAddress(log.Topics[3]), + true +} + +func topicToAddress(topic common.Hash) common.Address { + b := topic.Bytes() + return common.BytesToAddress(b[12:]) +} + +func parseUintEnv(t *testing.T, key string, def uint64) uint64 { + v := os.Getenv(key) + if v == "" { + return def + } + n, err := strconv.ParseUint(v, 10, 64) + if err != nil { + t.Fatalf("invalid %s=%q: %v", key, v, err) + } + return n +} + +func bigInt(v uint64) *big.Int { + return new(big.Int).SetUint64(v) +} diff --git a/core/oracle/identifier.go b/core/oracle/identifier.go new file mode 100644 index 0000000000..c5784c126e --- /dev/null +++ b/core/oracle/identifier.go @@ -0,0 +1,47 @@ +// Copyright 2026 The bsc Authors +// This file is part of the bsc library. +// +// The bsc library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The bsc library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the bsc library. If not, see . + +// Package oracle provides an extensible framework for identifying oracle-related +// transactions in the transaction pool (e.g., Chainlink, Redstone price feed updates). +package oracle + +import "github.com/ethereum/go-ethereum/core/types" + +// OracleType represents the type of oracle provider. +type OracleType string + +const ( + OracleChainlink OracleType = "chainlink" + OracleRedstone OracleType = "redstone" +) + +// OracleInfo contains metadata about an identified oracle transaction. +type OracleInfo struct { + Type OracleType + // Extensible: future fields for decoded price data, feed address, etc. +} + +// Identifier defines the interface for oracle transaction identification. +// Implementations should check whether a transaction is related to a specific +// oracle provider and return metadata if it matches. +type Identifier interface { + // Type returns the oracle type this identifier handles. + Type() OracleType + + // Identify checks whether the given transaction is an oracle transaction. + // Returns non-nil OracleInfo if the transaction matches, nil otherwise. + Identify(tx *types.Transaction) *OracleInfo +} diff --git a/core/oracle/redstone.go b/core/oracle/redstone.go new file mode 100644 index 0000000000..f465127767 --- /dev/null +++ b/core/oracle/redstone.go @@ -0,0 +1,58 @@ +// Copyright 2026 The bsc Authors +// This file is part of the bsc library. +// +// The bsc library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The bsc library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the bsc library. If not, see . + +package oracle + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +var ( + // Redstone updateDataFeedsValues method selector. + redstoneSelector = [4]byte{0xb7, 0xa1, 0x62, 0x51} + + // Redstone PriceFeedAdapter contract address. + redstoneTarget = common.HexToAddress("0x97c19d3Ae8e4d74e25EF3AFf3a277fB614ed76D4") +) + +// RedstoneIdentifier identifies Redstone oracle transactions. +type RedstoneIdentifier struct{} + +// NewRedstoneIdentifier creates a new Redstone oracle identifier. +func NewRedstoneIdentifier() *RedstoneIdentifier { + return &RedstoneIdentifier{} +} + +// Type returns the oracle type for Redstone. +func (r *RedstoneIdentifier) Type() OracleType { + return OracleRedstone +} + +// Identify checks whether the transaction is a Redstone oracle transaction. +func (r *RedstoneIdentifier) Identify(tx *types.Transaction) *OracleInfo { + // Check single target address first (cheapest filter for single-target oracle). + to := tx.To() + if to == nil || *to != redstoneTarget { + return nil + } + // Check method selector. + data := tx.Data() + if len(data) < 4 || [4]byte(data[:4]) != redstoneSelector { + return nil + } + return &OracleInfo{Type: OracleRedstone} +} diff --git a/core/oracle/registry.go b/core/oracle/registry.go new file mode 100644 index 0000000000..878fdf7364 --- /dev/null +++ b/core/oracle/registry.go @@ -0,0 +1,52 @@ +// Copyright 2026 The bsc Authors +// This file is part of the bsc library. +// +// The bsc library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The bsc library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the bsc library. If not, see . + +package oracle + +import "github.com/ethereum/go-ethereum/core/types" + +// Registry manages a collection of oracle identifiers and provides +// a unified interface to classify transactions. +type Registry struct { + identifiers []Identifier +} + +// NewRegistry creates an empty oracle registry. +func NewRegistry() *Registry { + return &Registry{} +} + +// Register adds an oracle identifier to the registry. +func (r *Registry) Register(id Identifier) { + r.identifiers = append(r.identifiers, id) +} + +// Identify checks the transaction against all registered identifiers. +// Returns the first match (first-match semantics). Returns nil if no +// identifier recognizes the transaction. +func (r *Registry) Identify(tx *types.Transaction) *OracleInfo { + for _, id := range r.identifiers { + if info := id.Identify(tx); info != nil { + return info + } + } + return nil +} + +// Len returns the number of registered identifiers. +func (r *Registry) Len() int { + return len(r.identifiers) +} diff --git a/core/oracle/registry_test.go b/core/oracle/registry_test.go new file mode 100644 index 0000000000..cef30ce641 --- /dev/null +++ b/core/oracle/registry_test.go @@ -0,0 +1,190 @@ +package oracle + +import ( + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +// mockIdentifier is a test helper that matches transactions with a specific nonce. +type mockIdentifier struct { + oracleType OracleType + matchNonce uint64 +} + +func (m *mockIdentifier) Type() OracleType { return m.oracleType } + +func (m *mockIdentifier) Identify(tx *types.Transaction) *OracleInfo { + if tx.Nonce() == m.matchNonce { + return &OracleInfo{Type: m.oracleType} + } + return nil +} + +func TestRegistryEmpty(t *testing.T) { + r := NewRegistry() + tx := types.NewTx(&types.LegacyTx{Nonce: 1}) + if info := r.Identify(tx); info != nil { + t.Fatalf("empty registry should return nil, got %v", info) + } + if r.Len() != 0 { + t.Fatalf("empty registry length should be 0, got %d", r.Len()) + } +} + +func TestRegistryFirstMatch(t *testing.T) { + r := NewRegistry() + // Both identifiers match nonce 42, first one should win. + r.Register(&mockIdentifier{oracleType: "oracle_a", matchNonce: 42}) + r.Register(&mockIdentifier{oracleType: "oracle_b", matchNonce: 42}) + + tx := types.NewTx(&types.LegacyTx{Nonce: 42}) + info := r.Identify(tx) + if info == nil { + t.Fatal("expected match, got nil") + } + if info.Type != "oracle_a" { + t.Fatalf("expected first-match oracle_a, got %s", info.Type) + } +} + +func TestRegistryNoMatch(t *testing.T) { + r := NewRegistry() + r.Register(&mockIdentifier{oracleType: OracleChainlink, matchNonce: 100}) + r.Register(&mockIdentifier{oracleType: OracleRedstone, matchNonce: 200}) + + tx := types.NewTx(&types.LegacyTx{Nonce: 999}) + if info := r.Identify(tx); info != nil { + t.Fatalf("expected no match, got %v", info) + } +} + +func TestRegistrySelectiveMatch(t *testing.T) { + r := NewRegistry() + r.Register(&mockIdentifier{oracleType: OracleChainlink, matchNonce: 10}) + r.Register(&mockIdentifier{oracleType: OracleRedstone, matchNonce: 20}) + + // Only second identifier should match. + tx := types.NewTx(&types.LegacyTx{Nonce: 20}) + info := r.Identify(tx) + if info == nil { + t.Fatal("expected match, got nil") + } + if info.Type != OracleRedstone { + t.Fatalf("expected redstone, got %s", info.Type) + } +} + +func TestRegistryLen(t *testing.T) { + r := NewRegistry() + if r.Len() != 0 { + t.Fatalf("expected 0, got %d", r.Len()) + } + r.Register(NewChainlinkIdentifier()) + r.Register(NewRedstoneIdentifier()) + if r.Len() != 2 { + t.Fatalf("expected 2, got %d", r.Len()) + } +} + +func TestChainlinkIdentify(t *testing.T) { + // Inject a test target address. + addr := common.HexToAddress("0xAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA") + chainlinkTargets[addr] = struct{}{} + defer delete(chainlinkTargets, addr) + + cl := NewChainlinkIdentifier() + if cl.Type() != OracleChainlink { + t.Fatalf("expected %s, got %s", OracleChainlink, cl.Type()) + } + + // Match with first selector. + data := append(chainlinkSelectors[0][:], make([]byte, 32)...) + tx := types.NewTx(&types.LegacyTx{Data: data, To: &addr}) + info := cl.Identify(tx) + if info == nil || info.Type != OracleChainlink { + t.Fatalf("expected chainlink match with selector 0, got %v", info) + } + + // Match with second selector. + data = append(chainlinkSelectors[1][:], make([]byte, 32)...) + tx = types.NewTx(&types.LegacyTx{Data: data, To: &addr}) + info = cl.Identify(tx) + if info == nil || info.Type != OracleChainlink { + t.Fatalf("expected chainlink match with selector 1, got %v", info) + } + + // Wrong selector. + badData := append([]byte{0x00, 0x00, 0x00, 0x00}, make([]byte, 32)...) + tx = types.NewTx(&types.LegacyTx{Data: badData, To: &addr}) + if cl.Identify(tx) != nil { + t.Fatal("expected nil for wrong selector") + } + + // Wrong address. + unknownAddr := common.HexToAddress("0xdead") + tx = types.NewTx(&types.LegacyTx{Data: data, To: &unknownAddr}) + if cl.Identify(tx) != nil { + t.Fatal("expected nil for unknown address") + } + + // No data. + tx = types.NewTx(&types.LegacyTx{To: &addr}) + if cl.Identify(tx) != nil { + t.Fatal("expected nil for empty data") + } + + // Nil To (contract creation). + tx = types.NewTx(&types.LegacyTx{Data: data}) + if cl.Identify(tx) != nil { + t.Fatal("expected nil for nil To") + } +} + +func TestRedstoneIdentify(t *testing.T) { + // Inject a test target address. + target := common.HexToAddress("0xBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB") + origTarget := redstoneTarget + redstoneTarget = target + defer func() { redstoneTarget = origTarget }() + + rs := NewRedstoneIdentifier() + if rs.Type() != OracleRedstone { + t.Fatalf("expected %s, got %s", OracleRedstone, rs.Type()) + } + + // Matching tx. + data := append(redstoneSelector[:], make([]byte, 32)...) + tx := types.NewTx(&types.LegacyTx{Data: data, To: &target}) + info := rs.Identify(tx) + if info == nil || info.Type != OracleRedstone { + t.Fatalf("expected redstone match, got %v", info) + } + + // Wrong address. + wrongAddr := common.HexToAddress("0xdead") + tx = types.NewTx(&types.LegacyTx{Data: data, To: &wrongAddr}) + if rs.Identify(tx) != nil { + t.Fatal("expected nil for wrong address") + } + + // Wrong selector. + badData := append([]byte{0x00, 0x00, 0x00, 0x00}, make([]byte, 32)...) + tx = types.NewTx(&types.LegacyTx{Data: badData, To: &target}) + if rs.Identify(tx) != nil { + t.Fatal("expected nil for wrong selector") + } + + // No data. + tx = types.NewTx(&types.LegacyTx{To: &target}) + if rs.Identify(tx) != nil { + t.Fatal("expected nil for empty data") + } + + // Nil To. + tx = types.NewTx(&types.LegacyTx{Data: data}) + if rs.Identify(tx) != nil { + t.Fatal("expected nil for nil To") + } +} diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index 745d8a225c..f90e783111 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -1758,6 +1758,12 @@ func (p *BlobPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool } } +// SubscribeOracleTransactions is a no-op for blob pool since blob transactions +// are not oracle-related. +func (p *BlobPool) SubscribeOracleTransactions(ch chan<- core.NewOracleTxsEvent) event.Subscription { + return nil +} + // SubscribeReannoTxsEvent registers a subscription of ReannoTxsEvent and // starts sending event to the given channel. func (p *BlobPool) SubscribeReannoTxsEvent(ch chan<- core.ReannoTxsEvent) event.Subscription { diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index 882496c09f..e437591c1b 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -29,10 +29,13 @@ import ( "sync/atomic" "time" + "github.com/holiman/uint256" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/prque" "github.com/ethereum/go-ethereum/consensus/misc/eip1559" "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/oracle" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" @@ -41,7 +44,6 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" - "github.com/holiman/uint256" ) const ( @@ -239,16 +241,18 @@ func (config *Config) sanitize() Config { // will reject new transactions with delegations from that account with standard in-flight // transactions. type LegacyPool struct { - config Config - chainconfig *params.ChainConfig - chain BlockChain - gasTip atomic.Pointer[uint256.Int] - txFeed event.Feed - reannoTxFeed event.Feed // Event feed for announcing transactions again - scope event.SubscriptionScope - signer types.Signer - mu sync.RWMutex - maxGas atomic.Uint64 // Currently accepted max gas, it will be modified by MinerAPI + config Config + chainconfig *params.ChainConfig + chain BlockChain + gasTip atomic.Pointer[uint256.Int] + txFeed event.Feed + oracleTxFeed event.Feed // Event feed for oracle-related transactions + oracleIdentifier *oracle.Registry // Pluggable oracle tx classifier + reannoTxFeed event.Feed // Event feed for announcing transactions again + scope event.SubscriptionScope + signer types.Signer + mu sync.RWMutex + maxGas atomic.Uint64 // Currently accepted max gas, it will be modified by MinerAPI currentHead atomic.Pointer[types.Header] // Current head of the blockchain currentState *state.StateDB // Current state in the blockchain head @@ -456,6 +460,16 @@ func (pool *LegacyPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs return pool.txFeed.Subscribe(ch) } +// SubscribeOracleTransactions registers a subscription for oracle transaction events. +func (pool *LegacyPool) SubscribeOracleTransactions(ch chan<- core.NewOracleTxsEvent) event.Subscription { + return pool.oracleTxFeed.Subscribe(ch) +} + +// SetOracleIdentifier sets the oracle identifier registry for classifying transactions. +func (pool *LegacyPool) SetOracleIdentifier(registry *oracle.Registry) { + pool.oracleIdentifier = registry +} + // SubscribeReannoTxsEvent registers a subscription of ReannoTxsEvent and // starts sending event to the given channel. func (pool *LegacyPool) SubscribeReannoTxsEvent(ch chan<- core.ReannoTxsEvent) event.Subscription { @@ -1418,6 +1432,19 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, txs = append(txs, set.Flatten()...) } pool.txFeed.Send(core.NewTxsEvent{Txs: txs}) + + // Classify and emit oracle transactions on the dedicated feed. + if pool.oracleIdentifier != nil && pool.oracleIdentifier.Len() > 0 { + var oracleTxs []core.OracleTxInfo + for _, tx := range txs { + if info := pool.oracleIdentifier.Identify(tx); info != nil { + oracleTxs = append(oracleTxs, core.OracleTxInfo{Tx: tx, Info: info}) + } + } + if len(oracleTxs) > 0 { + pool.oracleTxFeed.Send(core.NewOracleTxsEvent{Txs: oracleTxs}) + } + } } } @@ -1561,7 +1588,7 @@ func (pool *LegacyPool) promoteExecutables(accounts []common.Address) []*types.T queuedGauge.Dec(int64(len(readies))) // Drop all transactions over the allowed limit - var caps = list.Cap(int(pool.config.AccountQueue)) + caps := list.Cap(int(pool.config.AccountQueue)) for _, tx := range caps { hash := tx.Hash() pool.all.Remove(hash) diff --git a/core/txpool/subpool.go b/core/txpool/subpool.go index a24250700f..2dfd282ec5 100644 --- a/core/txpool/subpool.go +++ b/core/txpool/subpool.go @@ -155,6 +155,10 @@ type SubPool interface { // or also for reorged out ones. SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) event.Subscription + // SubscribeOracleTransactions subscribes to oracle-related transaction events. + // Returns nil if this subpool does not support oracle transaction identification. + SubscribeOracleTransactions(ch chan<- core.NewOracleTxsEvent) event.Subscription + // SubscribeReannoTxsEvent should return an event subscription of // ReannoTxsEvent and send events to the given channel. SubscribeReannoTxsEvent(chan<- core.ReannoTxsEvent) event.Subscription diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 2832f2416c..b7b78a30b5 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -389,6 +389,19 @@ func (p *TxPool) SubscribeTransactions(ch chan<- core.NewTxsEvent, reorgs bool) return p.subs.Track(event.JoinSubscriptions(subs...)) } +// SubscribeOracleTransactions registers a subscription for oracle transaction events, +// aggregating events from all subpools that support oracle identification. +func (p *TxPool) SubscribeOracleTransactions(ch chan<- core.NewOracleTxsEvent) event.Subscription { + subs := make([]event.Subscription, 0, len(p.subpools)) + for _, subpool := range p.subpools { + sub := subpool.SubscribeOracleTransactions(ch) + if sub != nil { + subs = append(subs, sub) + } + } + return p.subs.Track(event.JoinSubscriptions(subs...)) +} + // SubscribeReannoTxsEvent registers a subscription of ReannoTxsEvent and starts sending // events to the given channel. func (p *TxPool) SubscribeReannoTxsEvent(ch chan<- core.ReannoTxsEvent) event.Subscription { diff --git a/eth/api_backend.go b/eth/api_backend.go index 229b04a760..9cd40b3069 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -421,6 +421,10 @@ func (b *EthAPIBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.S return b.eth.txPool.SubscribeTransactions(ch, true) } +func (b *EthAPIBackend) SubscribeNewOracleTxsEvent(ch chan<- core.NewOracleTxsEvent) event.Subscription { + return b.eth.txPool.SubscribeOracleTransactions(ch) +} + func (b *EthAPIBackend) SubscribeNewVoteEvent(ch chan<- core.NewVoteEvent) event.Subscription { if b.eth.VotePool() == nil { return nil diff --git a/eth/backend.go b/eth/backend.go index 6b83607f81..e588f0da22 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -41,6 +41,7 @@ import ( "github.com/ethereum/go-ethereum/core/monitor" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state/pruner" + "github.com/ethereum/go-ethereum/core/oracle" "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/txpool/blobpool" "github.com/ethereum/go-ethereum/core/txpool/legacypool" @@ -413,6 +414,12 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { } legacyPool := legacypool.New(config.TxPool, eth.blockchain) + // Set up oracle transaction identification + oracleRegistry := oracle.NewRegistry() + oracleRegistry.Register(oracle.NewChainlinkIdentifier()) + oracleRegistry.Register(oracle.NewRedstoneIdentifier()) + legacyPool.SetOracleIdentifier(oracleRegistry) + if config.BlobPool.Datadir != "" { config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir) } diff --git a/eth/filters/api.go b/eth/filters/api.go index 499610cd45..619d6b536f 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -29,6 +29,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/history" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/internal/ethapi" @@ -206,6 +207,47 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) return rpcSub, nil } +// NewOracleTransactions creates a subscription that is triggered each time an +// oracle-related transaction (e.g., Chainlink, Redstone price feed update) enters +// the transaction pool. Each notification includes the full transaction with +// enriched oracle metadata (oracle provider type). +func (api *FilterAPI) NewOracleTransactions(ctx context.Context) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + rpcSub := notifier.CreateSubscription() + + gopool.Submit(func() { + oracleTxs := make(chan []core.OracleTxInfo, 64) + oracleTxSub := api.events.SubscribeOracleTxs(oracleTxs) + defer oracleTxSub.Unsubscribe() + + chainConfig := api.sys.backend.ChainConfig() + + for { + select { + case txInfos := <-oracleTxs: + latest := api.sys.backend.CurrentHeader() + for _, txInfo := range txInfos { + rpcTx := ethapi.NewRPCOraclePendingTransaction( + txInfo.Tx, + string(txInfo.Info.Type), + latest, + chainConfig, + ) + notifier.Notify(rpcSub.ID, rpcTx) + } + case <-rpcSub.Err(): + return + } + } + }) + + return rpcSub, nil +} + // NewVotesFilter creates a filter that fetches votes that entered the vote pool. // It is part of the filter package since polling goes with eth_getFilterChanges. func (api *FilterAPI) NewVotesFilter() rpc.ID { diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index 41dd1fc867..ac31025a5a 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -67,6 +67,7 @@ type Backend interface { ChainConfig() *params.ChainConfig HistoryPruningCutoff() uint64 SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription + SubscribeNewOracleTxsEvent(chan<- core.NewOracleTxsEvent) event.Subscription SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription SubscribeFinalizedHeaderEvent(ch chan<- core.FinalizedHeaderEvent) event.Subscription SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription @@ -165,6 +166,8 @@ const ( FinalizedHeadersSubscription // TransactionReceiptsSubscription queries for transaction receipts when transactions are included in blocks TransactionReceiptsSubscription + // OracleTransactionsSubscription queries for oracle-related transactions entering the pending state + OracleTransactionsSubscription // LastIndexSubscription keeps track of the last index LastIndexSubscription ) @@ -184,6 +187,9 @@ const ( // voteChanSize is the size of channel listening to NewVoteEvent. // The number is referenced from the size of vote pool. voteChanSize = 256 + // oracleTxsChanSize is the size of channel listening to NewOracleTxsEvent. + // Oracle transactions are rare, so a small buffer suffices. + oracleTxsChanSize = 256 ) type subscription struct { @@ -196,9 +202,10 @@ type subscription struct { headers chan *types.Header votes chan *types.VoteEnvelope receipts chan []*ReceiptWithTx - txHashes []common.Hash // contains transaction hashes for transactionReceipts subscription filtering - installed chan struct{} // closed when the filter is installed - err chan error // closed when the filter is uninstalled + oracleTxs chan []core.OracleTxInfo // Channel for oracle transaction events + txHashes []common.Hash // contains transaction hashes for transactionReceipts subscription filtering + installed chan struct{} // closed when the filter is installed + err chan error // closed when the filter is uninstalled } // EventSystem creates subscriptions, processes events and broadcasts them to the @@ -214,6 +221,7 @@ type EventSystem struct { chainSub event.Subscription // Subscription for new chain event finalizedHeaderSub event.Subscription // Subscription for new finalized header voteSub event.Subscription // Subscription for new vote event + oracleTxsSub event.Subscription // Subscription for oracle transaction event // Channels install chan *subscription // install filter for event notification @@ -224,6 +232,7 @@ type EventSystem struct { chainCh chan core.ChainEvent // Channel to receive new chain event finalizedHeaderCh chan core.FinalizedHeaderEvent // Channel to receive new finalized header event voteCh chan core.NewVoteEvent // Channel to receive new vote event + oracleTxsCh chan core.NewOracleTxsEvent // Channel to receive oracle transactions event } // NewEventSystem creates a new manager that listens for event on the given mux, @@ -244,6 +253,7 @@ func NewEventSystem(sys *FilterSystem) *EventSystem { chainCh: make(chan core.ChainEvent, chainEvChanSize), finalizedHeaderCh: make(chan core.FinalizedHeaderEvent, finalizedHeaderEvChanSize), voteCh: make(chan core.NewVoteEvent, voteChanSize), + oracleTxsCh: make(chan core.NewOracleTxsEvent, oracleTxsChanSize), } // Subscribe events @@ -253,6 +263,7 @@ func NewEventSystem(sys *FilterSystem) *EventSystem { m.chainSub = m.backend.SubscribeChainEvent(m.chainCh) m.finalizedHeaderSub = m.backend.SubscribeFinalizedHeaderEvent(m.finalizedHeaderCh) m.voteSub = m.backend.SubscribeNewVoteEvent(m.voteCh) + m.oracleTxsSub = m.backend.SubscribeNewOracleTxsEvent(m.oracleTxsCh) // Make sure none of the subscriptions are empty if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil { @@ -261,6 +272,9 @@ func NewEventSystem(sys *FilterSystem) *EventSystem { if m.voteSub == nil || m.finalizedHeaderSub == nil { log.Warn("Subscribe for vote or finalized header event failed") } + if m.oracleTxsSub == nil { + log.Warn("Subscribe for oracle transaction event failed") + } go m.eventLoop() return m @@ -473,6 +487,25 @@ func (es *EventSystem) SubscribeTransactionReceipts(txHashes []common.Hash, rece return es.subscribe(sub) } +// SubscribeOracleTxs creates a subscription that writes oracle transaction info +// for oracle-related transactions that enter the transaction pool. +func (es *EventSystem) SubscribeOracleTxs(oracleTxs chan []core.OracleTxInfo) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: OracleTransactionsSubscription, + created: time.Now(), + logs: make(chan []*types.Log), + txs: make(chan []*types.Transaction), + headers: make(chan *types.Header), + votes: make(chan *types.VoteEnvelope), + receipts: make(chan []*ReceiptWithTx), + oracleTxs: oracleTxs, + installed: make(chan struct{}), + err: make(chan error), + } + return es.subscribe(sub) +} + type filterIndex map[Type]map[rpc.ID]*subscription func (es *EventSystem) handleLogs(filters filterIndex, ev []*types.Log) { @@ -493,6 +526,12 @@ func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) } } +func (es *EventSystem) handleOracleTxsEvent(filters filterIndex, ev core.NewOracleTxsEvent) { + for _, f := range filters[OracleTransactionsSubscription] { + f.oracleTxs <- ev.Txs + } +} + func (es *EventSystem) handleVoteEvent(filters filterIndex, ev core.NewVoteEvent) { for _, f := range filters[VotesSubscription] { f.votes <- ev.Vote @@ -531,6 +570,9 @@ func (es *EventSystem) eventLoop() { if es.voteSub != nil { es.voteSub.Unsubscribe() } + if es.oracleTxsSub != nil { + es.oracleTxsSub.Unsubscribe() + } }() index := make(filterIndex) @@ -542,6 +584,10 @@ func (es *EventSystem) eventLoop() { if es.voteSub != nil { voteSubErr = es.voteSub.Err() } + var oracleTxsSubErr <-chan error + if es.oracleTxsSub != nil { + oracleTxsSubErr = es.oracleTxsSub.Err() + } for { select { case ev := <-es.txsCh: @@ -556,6 +602,8 @@ func (es *EventSystem) eventLoop() { es.handleFinalizedHeaderEvent(index, ev) case ev := <-es.voteCh: es.handleVoteEvent(index, ev) + case ev := <-es.oracleTxsCh: + es.handleOracleTxsEvent(index, ev) case f := <-es.install: index[f.typ][f.id] = f @@ -578,6 +626,8 @@ func (es *EventSystem) eventLoop() { return case <-voteSubErr: return + case <-oracleTxsSubErr: + return } } } diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 1ec4739979..c9a34bd7a5 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -48,6 +48,7 @@ type testBackend struct { chainFeed event.Feed finalizedHeaderFeed event.Feed voteFeed event.Feed + oracleTxFeed event.Feed pendingBlock *types.Block pendingReceipts types.Receipts } @@ -144,6 +145,10 @@ func (b *testBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Sub return b.txFeed.Subscribe(ch) } +func (b *testBackend) SubscribeNewOracleTxsEvent(ch chan<- core.NewOracleTxsEvent) event.Subscription { + return b.oracleTxFeed.Subscribe(ch) +} + func (b *testBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { return b.rmLogsFeed.Subscribe(ch) } diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index d54f34a839..9ab9b5097e 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -1440,6 +1440,21 @@ func NewRPCPendingTransaction(tx *types.Transaction, current *types.Header, conf return newRPCTransaction(tx, common.Hash{}, blockNumber, blockTime, 0, baseFee, config) } +// RPCOracleTransaction wraps RPCTransaction with oracle-specific metadata. +type RPCOracleTransaction struct { + RPCTransaction + OracleType string `json:"oracleType"` +} + +// NewRPCOraclePendingTransaction creates an enriched oracle transaction for the RPC representation. +func NewRPCOraclePendingTransaction(tx *types.Transaction, oracleType string, current *types.Header, config *params.ChainConfig) *RPCOracleTransaction { + rpcTx := NewRPCPendingTransaction(tx, current, config) + return &RPCOracleTransaction{ + RPCTransaction: *rpcTx, + OracleType: oracleType, + } +} + // newRPCTransactionsFromBlockIndex returns transactions that will serialize to the RPC representation. func newRPCTransactionsFromBlockIndex(b *types.Block, config *params.ChainConfig) []*RPCTransaction { txs := b.Transactions() diff --git a/internal/ethapi/api_test.go b/internal/ethapi/api_test.go index 74d33fc8ad..cb1eadf21b 100644 --- a/internal/ethapi/api_test.go +++ b/internal/ethapi/api_test.go @@ -640,6 +640,9 @@ func (b testBackend) TxPoolContentFrom(addr common.Address) ([]*types.Transactio func (b testBackend) SubscribeNewTxsEvent(events chan<- core.NewTxsEvent) event.Subscription { panic("implement me") } +func (b testBackend) SubscribeNewOracleTxsEvent(chan<- core.NewOracleTxsEvent) event.Subscription { + panic("implement me") +} func (b testBackend) ChainConfig() *params.ChainConfig { return b.chain.Config() } func (b testBackend) Engine() consensus.Engine { return b.chain.Engine() } func (b testBackend) CurrentValidators() ([]common.Address, error) { return []common.Address{}, nil } diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index d8b20e7183..6b3291cecf 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -88,6 +88,7 @@ type Backend interface { TxPoolContent() (map[common.Address][]*types.Transaction, map[common.Address][]*types.Transaction) TxPoolContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction) SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription + SubscribeNewOracleTxsEvent(chan<- core.NewOracleTxsEvent) event.Subscription ChainConfig() *params.ChainConfig Engine() consensus.Engine diff --git a/internal/ethapi/transaction_args_test.go b/internal/ethapi/transaction_args_test.go index 39e7d328a8..21b53798a2 100644 --- a/internal/ethapi/transaction_args_test.go +++ b/internal/ethapi/transaction_args_test.go @@ -411,8 +411,11 @@ func (b *backendMock) TxPoolContent() (map[common.Address][]*types.Transaction, func (b *backendMock) TxPoolContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction) { return nil, nil } -func (b *backendMock) SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription { return nil } -func (b *backendMock) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { return nil } +func (b *backendMock) SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription { return nil } +func (b *backendMock) SubscribeNewOracleTxsEvent(chan<- core.NewOracleTxsEvent) event.Subscription { + return nil +} +func (b *backendMock) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { return nil } func (b *backendMock) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { return nil } From 0f5bc21c87e96f327b35864791ba429a840a5ce5 Mon Sep 17 00:00:00 2001 From: Roshan Date: Sun, 8 Feb 2026 19:05:34 +0800 Subject: [PATCH 2/3] feat(filters): add eth_subscribe("newHighGasTransactions") for high-gas-cost tx streaming --- core/events.go | 5 + core/txpool/blobpool/blobpool.go | 6 + core/txpool/legacypool/highgas_test.go | 191 +++++++++++++++++++++++ core/txpool/legacypool/legacypool.go | 46 ++++++ core/txpool/subpool.go | 4 + core/txpool/txpool.go | 13 ++ eth/api_backend.go | 4 + eth/filters/api.go | 36 +++++ eth/filters/filter_system.go | 51 +++++- eth/filters/filter_system_test.go | 5 + internal/ethapi/api_test.go | 3 + internal/ethapi/backend.go | 1 + internal/ethapi/transaction_args_test.go | 3 + 13 files changed, 366 insertions(+), 2 deletions(-) create mode 100644 core/txpool/legacypool/highgas_test.go diff --git a/core/events.go b/core/events.go index 3ef987e645..e5947c29bc 100644 --- a/core/events.go +++ b/core/events.go @@ -62,3 +62,8 @@ type OracleTxInfo struct { // NewOracleTxsEvent is posted when oracle-related transactions enter the transaction pool. type NewOracleTxsEvent struct{ Txs []OracleTxInfo } + +// NewHighGasTxsEvent is posted when high-gas-cost transactions enter the transaction pool. +// A transaction qualifies when gasFeeCap*gasLimit exceeds the configured threshold, +// its gasLimit is below the configured cap, and its To address is not whitelisted. +type NewHighGasTxsEvent struct{ Txs []*types.Transaction } diff --git a/core/txpool/blobpool/blobpool.go b/core/txpool/blobpool/blobpool.go index f90e783111..ad6d4dbd9c 100644 --- a/core/txpool/blobpool/blobpool.go +++ b/core/txpool/blobpool/blobpool.go @@ -1764,6 +1764,12 @@ func (p *BlobPool) SubscribeOracleTransactions(ch chan<- core.NewOracleTxsEvent) return nil } +// SubscribeHighGasTransactions is a no-op for blob pool since high-gas-cost +// filtering is handled by the legacy pool. +func (p *BlobPool) SubscribeHighGasTransactions(ch chan<- core.NewHighGasTxsEvent) event.Subscription { + return nil +} + // SubscribeReannoTxsEvent registers a subscription of ReannoTxsEvent and // starts sending event to the given channel. func (p *BlobPool) SubscribeReannoTxsEvent(ch chan<- core.ReannoTxsEvent) event.Subscription { diff --git a/core/txpool/legacypool/highgas_test.go b/core/txpool/legacypool/highgas_test.go new file mode 100644 index 0000000000..db6493c7c6 --- /dev/null +++ b/core/txpool/legacypool/highgas_test.go @@ -0,0 +1,191 @@ +package legacypool + +import ( + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +func TestFilterHighGasTxs_AboveThreshold(t *testing.T) { + addr := common.HexToAddress("0x1234") + cfg := &HighGasTxConfig{ + Threshold: big.NewInt(1000), // gasCost > 1000 + } + // gasPrice=10, gasLimit=200 → gasCost=2000 > 1000 ✓ + tx := types.NewTx(&types.LegacyTx{GasPrice: big.NewInt(10), Gas: 200, To: &addr}) + matched := filterHighGasTxs([]*types.Transaction{tx}, cfg) + if len(matched) != 1 { + t.Fatalf("expected 1 match, got %d", len(matched)) + } +} + +func TestFilterHighGasTxs_BelowThreshold(t *testing.T) { + addr := common.HexToAddress("0x1234") + cfg := &HighGasTxConfig{ + Threshold: big.NewInt(1000), + } + // gasPrice=1, gasLimit=100 → gasCost=100 <= 1000 ✗ + tx := types.NewTx(&types.LegacyTx{GasPrice: big.NewInt(1), Gas: 100, To: &addr}) + matched := filterHighGasTxs([]*types.Transaction{tx}, cfg) + if len(matched) != 0 { + t.Fatalf("expected 0 matches, got %d", len(matched)) + } +} + +func TestFilterHighGasTxs_EqualThreshold(t *testing.T) { + addr := common.HexToAddress("0x1234") + cfg := &HighGasTxConfig{ + Threshold: big.NewInt(1000), + } + // gasPrice=10, gasLimit=100 → gasCost=1000 == threshold → NOT matched (strictly greater) + tx := types.NewTx(&types.LegacyTx{GasPrice: big.NewInt(10), Gas: 100, To: &addr}) + matched := filterHighGasTxs([]*types.Transaction{tx}, cfg) + if len(matched) != 0 { + t.Fatalf("expected 0 matches for equal threshold, got %d", len(matched)) + } +} + +func TestFilterHighGasTxs_Whitelisted(t *testing.T) { + addr := common.HexToAddress("0xAAAA") + cfg := &HighGasTxConfig{ + Threshold: big.NewInt(100), + Whitelist: map[common.Address]struct{}{ + addr: {}, + }, + } + // Above threshold but To address is whitelisted → excluded + tx := types.NewTx(&types.LegacyTx{GasPrice: big.NewInt(10), Gas: 200, To: &addr}) + matched := filterHighGasTxs([]*types.Transaction{tx}, cfg) + if len(matched) != 0 { + t.Fatalf("expected 0 matches for whitelisted address, got %d", len(matched)) + } +} + +func TestFilterHighGasTxs_NotWhitelisted(t *testing.T) { + whitelisted := common.HexToAddress("0xAAAA") + target := common.HexToAddress("0xBBBB") + cfg := &HighGasTxConfig{ + Threshold: big.NewInt(100), + Whitelist: map[common.Address]struct{}{ + whitelisted: {}, + }, + } + // Above threshold and To address is NOT whitelisted → matched + tx := types.NewTx(&types.LegacyTx{GasPrice: big.NewInt(10), Gas: 200, To: &target}) + matched := filterHighGasTxs([]*types.Transaction{tx}, cfg) + if len(matched) != 1 { + t.Fatalf("expected 1 match, got %d", len(matched)) + } +} + +func TestFilterHighGasTxs_NilTo(t *testing.T) { + cfg := &HighGasTxConfig{ + Threshold: big.NewInt(100), + Whitelist: map[common.Address]struct{}{ + common.HexToAddress("0xAAAA"): {}, + }, + } + // Contract creation (nil To) above threshold → matched (whitelist doesn't apply) + tx := types.NewTx(&types.LegacyTx{GasPrice: big.NewInt(10), Gas: 200}) + matched := filterHighGasTxs([]*types.Transaction{tx}, cfg) + if len(matched) != 1 { + t.Fatalf("expected 1 match for nil To, got %d", len(matched)) + } +} + +func TestFilterHighGasTxs_GasLimitCap(t *testing.T) { + addr := common.HexToAddress("0x1234") + cfg := &HighGasTxConfig{ + Threshold: big.NewInt(100), + GasLimitCap: 500, // Only accept txs with gasLimit < 500 + } + + // gasLimit=200 < 500 and gasCost=2000 > 100 → matched + tx1 := types.NewTx(&types.LegacyTx{GasPrice: big.NewInt(10), Gas: 200, To: &addr}) + // gasLimit=500 >= 500 → excluded by GasLimitCap + tx2 := types.NewTx(&types.LegacyTx{GasPrice: big.NewInt(10), Gas: 500, To: &addr}) + // gasLimit=1000 >= 500 → excluded by GasLimitCap + tx3 := types.NewTx(&types.LegacyTx{GasPrice: big.NewInt(10), Gas: 1000, To: &addr}) + + matched := filterHighGasTxs([]*types.Transaction{tx1, tx2, tx3}, cfg) + if len(matched) != 1 { + t.Fatalf("expected 1 match with gasLimitCap, got %d", len(matched)) + } +} + +func TestFilterHighGasTxs_GasLimitCapZero(t *testing.T) { + addr := common.HexToAddress("0x1234") + cfg := &HighGasTxConfig{ + Threshold: big.NewInt(100), + GasLimitCap: 0, // 0 = no cap + } + // GasLimitCap=0 means no cap, so large gasLimit is fine + tx := types.NewTx(&types.LegacyTx{GasPrice: big.NewInt(10), Gas: 999999, To: &addr}) + matched := filterHighGasTxs([]*types.Transaction{tx}, cfg) + if len(matched) != 1 { + t.Fatalf("expected 1 match with zero gasLimitCap, got %d", len(matched)) + } +} + +func TestFilterHighGasTxs_EIP1559(t *testing.T) { + addr := common.HexToAddress("0x1234") + cfg := &HighGasTxConfig{ + Threshold: big.NewInt(1000), + } + // EIP-1559 tx: GasFeeCap=20, GasTipCap=5, Gas=100 → gasCost = 20*100 = 2000 > 1000 ✓ + tx := types.NewTx(&types.DynamicFeeTx{ + GasFeeCap: big.NewInt(20), + GasTipCap: big.NewInt(5), + Gas: 100, + To: &addr, + }) + matched := filterHighGasTxs([]*types.Transaction{tx}, cfg) + if len(matched) != 1 { + t.Fatalf("expected 1 match for EIP-1559 tx, got %d", len(matched)) + } +} + +func TestFilterHighGasTxs_MixedBatch(t *testing.T) { + addr := common.HexToAddress("0x1234") + whitelisted := common.HexToAddress("0xAAAA") + cfg := &HighGasTxConfig{ + Threshold: big.NewInt(500), + GasLimitCap: 1000, + Whitelist: map[common.Address]struct{}{ + whitelisted: {}, + }, + } + + txs := []*types.Transaction{ + // gasCost=2000 > 500, gasLimit=200 < 1000, not whitelisted → ✓ + types.NewTx(&types.LegacyTx{GasPrice: big.NewInt(10), Gas: 200, To: &addr}), + // gasCost=100 <= 500 → ✗ (below threshold) + types.NewTx(&types.LegacyTx{GasPrice: big.NewInt(1), Gas: 100, To: &addr}), + // gasCost=20000 > 500, gasLimit=2000 >= 1000 → ✗ (exceeds gasLimitCap) + types.NewTx(&types.LegacyTx{GasPrice: big.NewInt(10), Gas: 2000, To: &addr}), + // gasCost=2000 > 500, gasLimit=200 < 1000, but whitelisted → ✗ + types.NewTx(&types.LegacyTx{GasPrice: big.NewInt(10), Gas: 200, To: &whitelisted}), + // gasCost=2000 > 500, gasLimit=200 < 1000, nil To → ✓ + types.NewTx(&types.LegacyTx{GasPrice: big.NewInt(10), Gas: 200}), + } + + matched := filterHighGasTxs(txs, cfg) + if len(matched) != 2 { + t.Fatalf("expected 2 matches in mixed batch, got %d", len(matched)) + } +} + +func TestFilterHighGasTxs_EmptyWhitelist(t *testing.T) { + addr := common.HexToAddress("0x1234") + cfg := &HighGasTxConfig{ + Threshold: big.NewInt(100), + Whitelist: nil, // nil whitelist + } + tx := types.NewTx(&types.LegacyTx{GasPrice: big.NewInt(10), Gas: 200, To: &addr}) + matched := filterHighGasTxs([]*types.Transaction{tx}, cfg) + if len(matched) != 1 { + t.Fatalf("expected 1 match with nil whitelist, got %d", len(matched)) + } +} diff --git a/core/txpool/legacypool/legacypool.go b/core/txpool/legacypool/legacypool.go index e437591c1b..28bd92f4ac 100644 --- a/core/txpool/legacypool/legacypool.go +++ b/core/txpool/legacypool/legacypool.go @@ -160,6 +160,15 @@ type Config struct { Lifetime time.Duration // Maximum amount of time non-executable transaction are queued ReannounceTime time.Duration // Duration for announcing local pending transactions again + + HighGasTxFilter *HighGasTxConfig // Filtering criteria for high-gas-cost tx subscription (nil = disabled) +} + +// HighGasTxConfig defines filtering criteria for high-gas-cost transaction subscription. +type HighGasTxConfig struct { + Threshold *big.Int // Min gas cost (gasFeeCap * gasLimit) to qualify + GasLimitCap uint64 // Max gasLimit to qualify (0 = no cap) + Whitelist map[common.Address]struct{} // Addresses excluded from notification } // DefaultConfig contains the default configurations for the transaction pool. @@ -248,6 +257,7 @@ type LegacyPool struct { txFeed event.Feed oracleTxFeed event.Feed // Event feed for oracle-related transactions oracleIdentifier *oracle.Registry // Pluggable oracle tx classifier + highGasTxFeed event.Feed // Event feed for high-gas-cost transactions reannoTxFeed event.Feed // Event feed for announcing transactions again scope event.SubscriptionScope signer types.Signer @@ -470,6 +480,35 @@ func (pool *LegacyPool) SetOracleIdentifier(registry *oracle.Registry) { pool.oracleIdentifier = registry } +// filterHighGasTxs returns transactions that match the high-gas-cost criteria: +// - gasFeeCap * gasLimit > cfg.Threshold +// - gasLimit < cfg.GasLimitCap (if GasLimitCap > 0) +// - tx.To() not in cfg.Whitelist +func filterHighGasTxs(txs []*types.Transaction, cfg *HighGasTxConfig) []*types.Transaction { + var matched []*types.Transaction + for _, tx := range txs { + if cfg.GasLimitCap > 0 && tx.Gas() >= cfg.GasLimitCap { + continue + } + gasCost := new(big.Int).Mul(tx.GasFeeCap(), new(big.Int).SetUint64(tx.Gas())) + if gasCost.Cmp(cfg.Threshold) <= 0 { + continue + } + if to := tx.To(); to != nil { + if _, ok := cfg.Whitelist[*to]; ok { + continue + } + } + matched = append(matched, tx) + } + return matched +} + +// SubscribeHighGasTransactions registers a subscription for high-gas-cost transaction events. +func (pool *LegacyPool) SubscribeHighGasTransactions(ch chan<- core.NewHighGasTxsEvent) event.Subscription { + return pool.highGasTxFeed.Subscribe(ch) +} + // SubscribeReannoTxsEvent registers a subscription of ReannoTxsEvent and // starts sending event to the given channel. func (pool *LegacyPool) SubscribeReannoTxsEvent(ch chan<- core.ReannoTxsEvent) event.Subscription { @@ -1445,6 +1484,13 @@ func (pool *LegacyPool) runReorg(done chan struct{}, reset *txpoolResetRequest, pool.oracleTxFeed.Send(core.NewOracleTxsEvent{Txs: oracleTxs}) } } + + // Filter and emit high-gas-cost transactions on the dedicated feed. + if cfg := pool.config.HighGasTxFilter; cfg != nil && cfg.Threshold != nil { + if matched := filterHighGasTxs(txs, cfg); len(matched) > 0 { + pool.highGasTxFeed.Send(core.NewHighGasTxsEvent{Txs: matched}) + } + } } } diff --git a/core/txpool/subpool.go b/core/txpool/subpool.go index 2dfd282ec5..23cda79b5d 100644 --- a/core/txpool/subpool.go +++ b/core/txpool/subpool.go @@ -159,6 +159,10 @@ type SubPool interface { // Returns nil if this subpool does not support oracle transaction identification. SubscribeOracleTransactions(ch chan<- core.NewOracleTxsEvent) event.Subscription + // SubscribeHighGasTransactions subscribes to high-gas-cost transaction events. + // Returns nil if this subpool does not support high-gas-cost filtering. + SubscribeHighGasTransactions(ch chan<- core.NewHighGasTxsEvent) event.Subscription + // SubscribeReannoTxsEvent should return an event subscription of // ReannoTxsEvent and send events to the given channel. SubscribeReannoTxsEvent(chan<- core.ReannoTxsEvent) event.Subscription diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index b7b78a30b5..920c092ca7 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -402,6 +402,19 @@ func (p *TxPool) SubscribeOracleTransactions(ch chan<- core.NewOracleTxsEvent) e return p.subs.Track(event.JoinSubscriptions(subs...)) } +// SubscribeHighGasTransactions registers a subscription for high-gas-cost transaction events, +// aggregating events from all subpools that support high-gas filtering. +func (p *TxPool) SubscribeHighGasTransactions(ch chan<- core.NewHighGasTxsEvent) event.Subscription { + subs := make([]event.Subscription, 0, len(p.subpools)) + for _, subpool := range p.subpools { + sub := subpool.SubscribeHighGasTransactions(ch) + if sub != nil { + subs = append(subs, sub) + } + } + return p.subs.Track(event.JoinSubscriptions(subs...)) +} + // SubscribeReannoTxsEvent registers a subscription of ReannoTxsEvent and starts sending // events to the given channel. func (p *TxPool) SubscribeReannoTxsEvent(ch chan<- core.ReannoTxsEvent) event.Subscription { diff --git a/eth/api_backend.go b/eth/api_backend.go index 9cd40b3069..a29259c641 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -425,6 +425,10 @@ func (b *EthAPIBackend) SubscribeNewOracleTxsEvent(ch chan<- core.NewOracleTxsEv return b.eth.txPool.SubscribeOracleTransactions(ch) } +func (b *EthAPIBackend) SubscribeNewHighGasTxsEvent(ch chan<- core.NewHighGasTxsEvent) event.Subscription { + return b.eth.txPool.SubscribeHighGasTransactions(ch) +} + func (b *EthAPIBackend) SubscribeNewVoteEvent(ch chan<- core.NewVoteEvent) event.Subscription { if b.eth.VotePool() == nil { return nil diff --git a/eth/filters/api.go b/eth/filters/api.go index 619d6b536f..1e8373acf0 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -248,6 +248,42 @@ func (api *FilterAPI) NewOracleTransactions(ctx context.Context) (*rpc.Subscript return rpcSub, nil } +// NewHighGasTransactions creates a subscription that is triggered each time a +// high-gas-cost transaction enters the transaction pool. A transaction qualifies +// when its gasFeeCap*gasLimit exceeds the configured threshold, its gasLimit is +// below the configured cap, and its To address is not in the configured whitelist. +func (api *FilterAPI) NewHighGasTransactions(ctx context.Context) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported + } + + rpcSub := notifier.CreateSubscription() + + gopool.Submit(func() { + highGasTxs := make(chan []*types.Transaction, 64) + highGasTxSub := api.events.SubscribeHighGasTxs(highGasTxs) + defer highGasTxSub.Unsubscribe() + + chainConfig := api.sys.backend.ChainConfig() + + for { + select { + case txs := <-highGasTxs: + latest := api.sys.backend.CurrentHeader() + for _, tx := range txs { + rpcTx := ethapi.NewRPCPendingTransaction(tx, latest, chainConfig) + notifier.Notify(rpcSub.ID, rpcTx) + } + case <-rpcSub.Err(): + return + } + } + }) + + return rpcSub, nil +} + // NewVotesFilter creates a filter that fetches votes that entered the vote pool. // It is part of the filter package since polling goes with eth_getFilterChanges. func (api *FilterAPI) NewVotesFilter() rpc.ID { diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index ac31025a5a..19d779d4f7 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -68,6 +68,7 @@ type Backend interface { HistoryPruningCutoff() uint64 SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription SubscribeNewOracleTxsEvent(chan<- core.NewOracleTxsEvent) event.Subscription + SubscribeNewHighGasTxsEvent(chan<- core.NewHighGasTxsEvent) event.Subscription SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription SubscribeFinalizedHeaderEvent(ch chan<- core.FinalizedHeaderEvent) event.Subscription SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription @@ -168,6 +169,8 @@ const ( TransactionReceiptsSubscription // OracleTransactionsSubscription queries for oracle-related transactions entering the pending state OracleTransactionsSubscription + // HighGasTransactionsSubscription queries for high-gas-cost transactions entering the pending state + HighGasTransactionsSubscription // LastIndexSubscription keeps track of the last index LastIndexSubscription ) @@ -190,6 +193,8 @@ const ( // oracleTxsChanSize is the size of channel listening to NewOracleTxsEvent. // Oracle transactions are rare, so a small buffer suffices. oracleTxsChanSize = 256 + // highGasTxsChanSize is the size of channel listening to NewHighGasTxsEvent. + highGasTxsChanSize = 256 ) type subscription struct { @@ -202,8 +207,9 @@ type subscription struct { headers chan *types.Header votes chan *types.VoteEnvelope receipts chan []*ReceiptWithTx - oracleTxs chan []core.OracleTxInfo // Channel for oracle transaction events - txHashes []common.Hash // contains transaction hashes for transactionReceipts subscription filtering + oracleTxs chan []core.OracleTxInfo // Channel for oracle transaction events + highGasTxs chan []*types.Transaction // Channel for high-gas-cost transaction events + txHashes []common.Hash // contains transaction hashes for transactionReceipts subscription filtering installed chan struct{} // closed when the filter is installed err chan error // closed when the filter is uninstalled } @@ -222,6 +228,7 @@ type EventSystem struct { finalizedHeaderSub event.Subscription // Subscription for new finalized header voteSub event.Subscription // Subscription for new vote event oracleTxsSub event.Subscription // Subscription for oracle transaction event + highGasTxsSub event.Subscription // Subscription for high-gas-cost transaction event // Channels install chan *subscription // install filter for event notification @@ -233,6 +240,7 @@ type EventSystem struct { finalizedHeaderCh chan core.FinalizedHeaderEvent // Channel to receive new finalized header event voteCh chan core.NewVoteEvent // Channel to receive new vote event oracleTxsCh chan core.NewOracleTxsEvent // Channel to receive oracle transactions event + highGasTxsCh chan core.NewHighGasTxsEvent // Channel to receive high-gas-cost transactions event } // NewEventSystem creates a new manager that listens for event on the given mux, @@ -254,6 +262,7 @@ func NewEventSystem(sys *FilterSystem) *EventSystem { finalizedHeaderCh: make(chan core.FinalizedHeaderEvent, finalizedHeaderEvChanSize), voteCh: make(chan core.NewVoteEvent, voteChanSize), oracleTxsCh: make(chan core.NewOracleTxsEvent, oracleTxsChanSize), + highGasTxsCh: make(chan core.NewHighGasTxsEvent, highGasTxsChanSize), } // Subscribe events @@ -264,6 +273,7 @@ func NewEventSystem(sys *FilterSystem) *EventSystem { m.finalizedHeaderSub = m.backend.SubscribeFinalizedHeaderEvent(m.finalizedHeaderCh) m.voteSub = m.backend.SubscribeNewVoteEvent(m.voteCh) m.oracleTxsSub = m.backend.SubscribeNewOracleTxsEvent(m.oracleTxsCh) + m.highGasTxsSub = m.backend.SubscribeNewHighGasTxsEvent(m.highGasTxsCh) // Make sure none of the subscriptions are empty if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil { @@ -506,6 +516,26 @@ func (es *EventSystem) SubscribeOracleTxs(oracleTxs chan []core.OracleTxInfo) *S return es.subscribe(sub) } +// SubscribeHighGasTxs creates a subscription that writes high-gas-cost +// transactions that enter the transaction pool. +func (es *EventSystem) SubscribeHighGasTxs(highGasTxs chan []*types.Transaction) *Subscription { + sub := &subscription{ + id: rpc.NewID(), + typ: HighGasTransactionsSubscription, + created: time.Now(), + logs: make(chan []*types.Log), + txs: make(chan []*types.Transaction), + headers: make(chan *types.Header), + votes: make(chan *types.VoteEnvelope), + receipts: make(chan []*ReceiptWithTx), + oracleTxs: make(chan []core.OracleTxInfo), + highGasTxs: highGasTxs, + installed: make(chan struct{}), + err: make(chan error), + } + return es.subscribe(sub) +} + type filterIndex map[Type]map[rpc.ID]*subscription func (es *EventSystem) handleLogs(filters filterIndex, ev []*types.Log) { @@ -532,6 +562,12 @@ func (es *EventSystem) handleOracleTxsEvent(filters filterIndex, ev core.NewOrac } } +func (es *EventSystem) handleHighGasTxsEvent(filters filterIndex, ev core.NewHighGasTxsEvent) { + for _, f := range filters[HighGasTransactionsSubscription] { + f.highGasTxs <- ev.Txs + } +} + func (es *EventSystem) handleVoteEvent(filters filterIndex, ev core.NewVoteEvent) { for _, f := range filters[VotesSubscription] { f.votes <- ev.Vote @@ -573,6 +609,9 @@ func (es *EventSystem) eventLoop() { if es.oracleTxsSub != nil { es.oracleTxsSub.Unsubscribe() } + if es.highGasTxsSub != nil { + es.highGasTxsSub.Unsubscribe() + } }() index := make(filterIndex) @@ -588,6 +627,10 @@ func (es *EventSystem) eventLoop() { if es.oracleTxsSub != nil { oracleTxsSubErr = es.oracleTxsSub.Err() } + var highGasTxsSubErr <-chan error + if es.highGasTxsSub != nil { + highGasTxsSubErr = es.highGasTxsSub.Err() + } for { select { case ev := <-es.txsCh: @@ -604,6 +647,8 @@ func (es *EventSystem) eventLoop() { es.handleVoteEvent(index, ev) case ev := <-es.oracleTxsCh: es.handleOracleTxsEvent(index, ev) + case ev := <-es.highGasTxsCh: + es.handleHighGasTxsEvent(index, ev) case f := <-es.install: index[f.typ][f.id] = f @@ -628,6 +673,8 @@ func (es *EventSystem) eventLoop() { return case <-oracleTxsSubErr: return + case <-highGasTxsSubErr: + return } } } diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index c9a34bd7a5..ee649a33f8 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -49,6 +49,7 @@ type testBackend struct { finalizedHeaderFeed event.Feed voteFeed event.Feed oracleTxFeed event.Feed + highGasTxFeed event.Feed pendingBlock *types.Block pendingReceipts types.Receipts } @@ -149,6 +150,10 @@ func (b *testBackend) SubscribeNewOracleTxsEvent(ch chan<- core.NewOracleTxsEven return b.oracleTxFeed.Subscribe(ch) } +func (b *testBackend) SubscribeNewHighGasTxsEvent(ch chan<- core.NewHighGasTxsEvent) event.Subscription { + return b.highGasTxFeed.Subscribe(ch) +} + func (b *testBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { return b.rmLogsFeed.Subscribe(ch) } diff --git a/internal/ethapi/api_test.go b/internal/ethapi/api_test.go index cb1eadf21b..b05ff3d1cb 100644 --- a/internal/ethapi/api_test.go +++ b/internal/ethapi/api_test.go @@ -643,6 +643,9 @@ func (b testBackend) SubscribeNewTxsEvent(events chan<- core.NewTxsEvent) event. func (b testBackend) SubscribeNewOracleTxsEvent(chan<- core.NewOracleTxsEvent) event.Subscription { panic("implement me") } +func (b testBackend) SubscribeNewHighGasTxsEvent(chan<- core.NewHighGasTxsEvent) event.Subscription { + panic("implement me") +} func (b testBackend) ChainConfig() *params.ChainConfig { return b.chain.Config() } func (b testBackend) Engine() consensus.Engine { return b.chain.Engine() } func (b testBackend) CurrentValidators() ([]common.Address, error) { return []common.Address{}, nil } diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 6b3291cecf..c06e16f4b0 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -89,6 +89,7 @@ type Backend interface { TxPoolContentFrom(addr common.Address) ([]*types.Transaction, []*types.Transaction) SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription SubscribeNewOracleTxsEvent(chan<- core.NewOracleTxsEvent) event.Subscription + SubscribeNewHighGasTxsEvent(chan<- core.NewHighGasTxsEvent) event.Subscription ChainConfig() *params.ChainConfig Engine() consensus.Engine diff --git a/internal/ethapi/transaction_args_test.go b/internal/ethapi/transaction_args_test.go index 21b53798a2..ca218b15ef 100644 --- a/internal/ethapi/transaction_args_test.go +++ b/internal/ethapi/transaction_args_test.go @@ -415,6 +415,9 @@ func (b *backendMock) SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscr func (b *backendMock) SubscribeNewOracleTxsEvent(chan<- core.NewOracleTxsEvent) event.Subscription { return nil } +func (b *backendMock) SubscribeNewHighGasTxsEvent(chan<- core.NewHighGasTxsEvent) event.Subscription { + return nil +} func (b *backendMock) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { return nil } func (b *backendMock) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { return nil From a15d2afc9a634f20ee2d33ddfcb3f7bb07cbd19d Mon Sep 17 00:00:00 2001 From: Roshan Date: Mon, 9 Feb 2026 15:53:48 +0800 Subject: [PATCH 3/3] add sub to ethclient --- ethclient/ethclient.go | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/ethclient/ethclient.go b/ethclient/ethclient.go index c4d8254f0b..ca37826bf6 100644 --- a/ethclient/ethclient.go +++ b/ethclient/ethclient.go @@ -476,6 +476,47 @@ func (ec *Client) SubscribeNewVotes(ctx context.Context, ch chan<- *types.VoteEn return ec.c.EthSubscribe(ctx, ch, "newVotes") } +// OracleTransaction wraps a transaction with oracle-specific metadata returned +// by the newOracleTransactions subscription. +type OracleTransaction struct { + Tx *types.Transaction + OracleType string +} + +// oracleTransactionRaw is the JSON-level representation used for unmarshaling. +type oracleTransactionRaw struct { + OracleType string `json:"oracleType"` +} + +// UnmarshalJSON decodes an oracle transaction from the RPC JSON representation. +func (otx *OracleTransaction) UnmarshalJSON(msg []byte) error { + // Decode the core transaction fields. + otx.Tx = new(types.Transaction) + if err := json.Unmarshal(msg, otx.Tx); err != nil { + return err + } + // Decode the oracle-specific metadata. + var raw oracleTransactionRaw + if err := json.Unmarshal(msg, &raw); err != nil { + return err + } + otx.OracleType = raw.OracleType + return nil +} + +// SubscribeNewOracleTransactions subscribes to notifications about oracle-related +// transactions (e.g., Chainlink, Redstone) entering the transaction pool. +func (ec *Client) SubscribeNewOracleTransactions(ctx context.Context, ch chan<- *OracleTransaction) (ethereum.Subscription, error) { + return ec.c.EthSubscribe(ctx, ch, "newOracleTransactions") +} + +// SubscribeNewHighGasTransactions subscribes to notifications about high-gas-cost +// transactions entering the transaction pool. Filtering criteria (threshold, +// gasLimit cap, whitelist) are configured on the server side. +func (ec *Client) SubscribeNewHighGasTransactions(ctx context.Context, ch chan<- *types.Transaction) (ethereum.Subscription, error) { + return ec.c.EthSubscribe(ctx, ch, "newHighGasTransactions") +} + // State Access // NetworkID returns the network ID for this client.