Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion mempool/recheck_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (m *RecheckMempool) Insert(_ context.Context, tx sdk.Tx) error {
}

write()
m.markTxRechecked(tx)
m.markTxInserted(tx)
return nil
}

Expand Down Expand Up @@ -452,6 +452,18 @@ func (m *RecheckMempool) markTxRechecked(txn sdk.Tx) {
m.recheckedTxs.Do(func(store *CosmosTxStore) { store.AddTx(txn) })
}

// markTxInserted conservatively updates the current height snapshot for live inserts.
// If the inserted tx replaces an existing tx, any other txs from the same sender with
// a higher nonce is dropped and rebuilt by the next recheck.
func (m *RecheckMempool) markTxInserted(txn sdk.Tx) {
m.recheckedTxs.Do(func(store *CosmosTxStore) {
if store.InvalidateFrom(txn) > 0 {
return
}
store.AddTx(txn)
})
}

type signerSequence struct {
account string
seq uint64
Expand Down
145 changes: 145 additions & 0 deletions mempool/recheck_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cosmos/evm/mempool/reserver"

"cosmossdk.io/log/v2"
sdkmath "cosmossdk.io/math"
storetypes "cosmossdk.io/store/types"

cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types"
Expand Down Expand Up @@ -924,6 +925,16 @@ func newRecheckTestTxWithNonce(t *testing.T, key *ecdsa.PrivateKey, nonce uint64
return &recheckTestTx{key: key, sequence: nonce}
}

func newRecheckTestTxWithGasPrice(t *testing.T, key *ecdsa.PrivateKey, nonce uint64, gasPrice int64) sdk.Tx {
t.Helper()
return &recheckTestTx{
key: key,
sequence: nonce,
gas: 100_000,
fee: sdk.NewCoins(sdk.NewInt64Coin(recheckTestFeeDenom, gasPrice*100_000)),
}
}

// newNonceTrackingAnteHandler returns an ante handler that enforces sequential
// nonce ordering per account. Nonces are tracked in a map keyed by signer
// address — each successful call increments the expected nonce.
Expand Down Expand Up @@ -1037,6 +1048,110 @@ func TestRecheckMempool_InsertAfterRecheck(t *testing.T) {
require.Equal(t, 3, mp.CountTx())
}

func TestRecheckMempool_InsertReplacementInvalidatesRechecked(t *testing.T) {
ctx := newRecheckTestContext()
tracker := reserver.NewReservationTracker()
handle := tracker.NewHandle(1)
pool := &recheckMockPool{}
bc := newMockContextProvider(ctx)
rc := newMockRechecker(ctx, noopAnteHandler)
recheckedTxs := newTestRecheckedTxs()

mp := mempool.NewRecheckMempool(log.NewNopLogger(), pool, handle, rc, recheckedTxs, bc)
mp.Start(testHeader(0))
t.Cleanup(func() {
require.NoError(t, mp.Close())
})

key, err := crypto.GenerateKey()
require.NoError(t, err)

tx4 := newRecheckTestTxWithNonce(t, key, 4)
tx5 := newRecheckTestTxWithNonce(t, key, 5)
tx6 := newRecheckTestTxWithNonce(t, key, 6)

recheckedTxs.Do(func(store *mempool.CosmosTxStore) {
store.AddTx(tx4)
store.AddTx(tx5)
store.AddTx(tx6)
})

replacement := newRecheckTestTxWithNonce(t, key, 4)
require.NoError(t, mp.Insert(ctx, replacement))

iter := mp.RecheckedTxs(context.Background(), big.NewInt(0))
rechecked := collectIteratorTxs(iter)
require.Empty(t, rechecked)
require.Equal(t, 1, mp.CountTx())
}

func TestRecheckMempool_RecheckRebuildsSnapshotAfterReplacement(t *testing.T) {
ctx := newRecheckTestContext()
tracker := reserver.NewReservationTracker()
handle := tracker.NewHandle(1)
pool := sdkmempool.NewPriorityMempool(sdkmempool.PriorityNonceMempoolConfig[sdkmath.Int]{
TxPriority: sdkmempool.TxPriority[sdkmath.Int]{
GetTxPriority: func(goCtx context.Context, tx sdk.Tx) sdkmath.Int {
_ = sdk.UnwrapSDKContext(goCtx)
cosmosTxFee, ok := tx.(sdk.FeeTx)
if !ok {
return sdkmath.ZeroInt()
}
found, coin := cosmosTxFee.GetFee().Find(recheckTestFeeDenom)
if !found {
return sdkmath.ZeroInt()
}

gasPrice := coin.Amount.Quo(sdkmath.NewIntFromUint64(cosmosTxFee.GetGas()))
return gasPrice
},
Compare: func(a, b sdkmath.Int) int {
return a.BigInt().Cmp(b.BigInt())
},
MinValue: sdkmath.ZeroInt(),
},
TxReplacement: func(op, np sdkmath.Int, _ sdk.Tx, _ sdk.Tx) bool {
return np.GT(op)
},
})
bc := newMockContextProvider(ctx)
rc := newMockRechecker(ctx, noopAnteHandler)
recheckedTxs := newTestRecheckedTxs()

mp := mempool.NewRecheckMempool(log.NewNopLogger(), pool, handle, rc, recheckedTxs, bc)
mp.Start(testHeader(0))
t.Cleanup(func() {
require.NoError(t, mp.Close())
})

key, err := crypto.GenerateKey()
require.NoError(t, err)

tx3 := newRecheckTestTxWithGasPrice(t, key, 3, 1)
tx4 := newRecheckTestTxWithGasPrice(t, key, 4, 1)
tx5 := newRecheckTestTxWithGasPrice(t, key, 5, 1)
tx6 := newRecheckTestTxWithGasPrice(t, key, 6, 1)
replacement := newRecheckTestTxWithGasPrice(t, key, 4, 2)

for _, tx := range []sdk.Tx{tx3, tx4, tx5, tx6} {
require.NoError(t, mp.Insert(ctx, tx))
}

// insert the replacement, which should invalidate the other txs in the pool with greater nonce.
require.NoError(t, mp.Insert(ctx, replacement))

iter := mp.RecheckedTxs(context.Background(), big.NewInt(0))
rechecked := collectIteratorTxs(iter)
require.Len(t, rechecked, 1)
require.Equal(t, tx3, rechecked[0])

mp.TriggerRecheckSync(testHeader(1))

iter = mp.RecheckedTxs(context.Background(), big.NewInt(1))
rechecked = collectIteratorTxs(iter)
require.Equal(t, []sdk.Tx{tx3, replacement, tx5, tx6}, rechecked)
}

// newRecheckTestTx creates a minimal sdk.Tx for unit testing RecheckMempool.
func newRecheckTestTx(t *testing.T, key *ecdsa.PrivateKey) sdk.Tx {
t.Helper()
Expand All @@ -1047,14 +1162,44 @@ func newRecheckTestTx(t *testing.T, key *ecdsa.PrivateKey) sdk.Tx {
type recheckTestTx struct {
key *ecdsa.PrivateKey
sequence uint64
gas uint64
fee sdk.Coins
}

const recheckTestFeeDenom = "atest"

func (m *recheckTestTx) GetMsgs() []sdk.Msg { return nil }

func (m *recheckTestTx) GetMsgsV2() ([]proto.Message, error) {
return nil, nil
}

func (m *recheckTestTx) GetGas() uint64 {
if m.gas == 0 {
return 100_000
}
return m.gas
}

func (m *recheckTestTx) GetFee() sdk.Coins {
if len(m.fee) == 0 {
return sdk.NewCoins(sdk.NewInt64Coin(recheckTestFeeDenom, 100_000))
}
return m.fee
}

func (m *recheckTestTx) FeePayer() []byte {
signers, err := m.GetSigners()
if err != nil || len(signers) == 0 {
return nil
}
return signers[0]
}

func (m *recheckTestTx) FeeGranter() []byte {
return nil
}

func (m *recheckTestTx) GetSigners() ([][]byte, error) {
pubKeyBytes := crypto.CompressPubkey(&m.key.PublicKey)
pubKey := &ethsecp256k1.PubKey{Key: pubKeyBytes}
Expand Down
141 changes: 132 additions & 9 deletions mempool/tx_store.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package mempool

import (
"fmt"
"slices"
"strings"
"sync"

"cosmossdk.io/log/v2"
Expand All @@ -14,32 +17,152 @@ import (
type CosmosTxStore struct {
txs []sdk.Tx

// index maps a tx to its position in the txs slice for fast removal
index map[sdk.Tx]int
// keys is a map of <signer/nonce> -> index to txs slice.
keys map[string]int
logger log.Logger

mu sync.RWMutex
}

// NewCosmosTxStore creates a new CosmosTxStore.
func NewCosmosTxStore(_ log.Logger) *CosmosTxStore {
func NewCosmosTxStore(l log.Logger) *CosmosTxStore {
return &CosmosTxStore{
index: make(map[sdk.Tx]int),
logger: l,
keys: make(map[string]int),
}
}

// AddTx adds a single tx to the store. Duplicate txs (by pointer identity)
// are ignored.
// AddTx adds a single tx to the store while constructing a validated snapshot.
func (s *CosmosTxStore) AddTx(tx sdk.Tx) {
s.mu.Lock()
defer s.mu.Unlock()

if _, exists := s.index[tx]; exists {
return
if key, ok := cosmosTxKey(tx); ok {
if _, exists := s.keys[key]; exists {
// this should never happen. panicking for safety
s.logger.Warn("attempted to add duplicate tx to CosmosTxStore", "key", key)
return
}
s.keys[key] = len(s.txs)
}
s.index[tx] = len(s.txs)

s.txs = append(s.txs, tx)
}

// InvalidateFrom removes any stored tx that depends on the supplied tx's signer/nonces.
// It is used for live mempool replacements: once a tx at nonce N changes, any stored tx
// for the same signer(s) with nonce >= N can no longer be considered valid for proposal building.
func (s *CosmosTxStore) InvalidateFrom(tx sdk.Tx) int {
s.mu.Lock()
defer s.mu.Unlock()

// first check if this tx is already here. If it isn't; no need to do anything. It's a fresh insert.
// If it is, we need to do the work of invaliding any txs from the same sender with a higher nonce.
if txKey, ok := cosmosTxKey(tx); ok {
if _, exists := s.keys[txKey]; !exists {
return 0
}
}

// nonce thresholds for each signer.
thresholds, ok := cosmosTxNonceMap(tx)
if !ok {
return 0
}

// rebuild the txs list, skipping txs that are invalidated.
removed := 0
nextTxs := make([]sdk.Tx, 0, len(s.txs))
for _, existing := range s.txs {
if invalidatesCosmosTx(existing, thresholds) {
removed++
continue
}
nextTxs = append(nextTxs, existing)
}

if removed == 0 {
return 0
}

// TODO: this isn't really the most optimal way to do this. but maybe fine for now
s.reindex(nextTxs)
return removed
}

func cosmosTxKey(tx sdk.Tx) (string, bool) {
nonceMap, ok := cosmosTxNonceMap(tx)
if !ok {
return "", false
}

var b strings.Builder
for i, sig := range sortedSignerNonces(nonceMap) {
if i > 0 {
b.WriteByte('|')
}
fmt.Fprintf(&b, "%s/%d", sig.account, sig.seq)
}

return b.String(), true
}

// cosmosTxNonceMap extracts the signers from the transaction
// and returns a signer -> nonce map.
func cosmosTxNonceMap(tx sdk.Tx) (map[string]uint64, bool) {
signerSeqs, err := extractSignerSequences(tx)
if err != nil || len(signerSeqs) == 0 {
return nil, false
}

nonceMap := make(map[string]uint64, len(signerSeqs))
for _, sig := range signerSeqs {
nonce, err := sdkmempool.ChooseNonce(sig.seq, tx)
if err != nil {
return nil, false
}
nonceMap[sig.account] = nonce
}

return nonceMap, true
}

func sortedSignerNonces(nonceMap map[string]uint64) []signerSequence {
signerSeqs := make([]signerSequence, 0, len(nonceMap))
for account, seq := range nonceMap {
signerSeqs = append(signerSeqs, signerSequence{account: account, seq: seq})
}
slices.SortFunc(signerSeqs, func(a, b signerSequence) int {
return strings.Compare(a.account, b.account)
})
return signerSeqs
}

func invalidatesCosmosTx(tx sdk.Tx, thresholds map[string]uint64) bool {
nonceMap, ok := cosmosTxNonceMap(tx)
if !ok {
return false
}

for account, threshold := range thresholds {
nonce, exists := nonceMap[account]
if exists && nonce >= threshold {
return true
}
}
return false
}

func (s *CosmosTxStore) reindex(txs []sdk.Tx) {
s.txs = txs
s.keys = make(map[string]int, len(txs))
for i, tx := range txs {
if key, ok := cosmosTxKey(tx); ok {
s.keys[key] = i
}
}
}

// Txs returns a copy of the current set of txs in the store.
func (s *CosmosTxStore) Txs() []sdk.Tx {
s.mu.RLock()
Expand Down
Loading
Loading