Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 80 additions & 71 deletions mempool/reap_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package mempool

import (
"fmt"
"slices"
"sync"

ethtypes "github.com/ethereum/go-ethereum/core/types"
Expand All @@ -23,15 +22,19 @@ type txWithHash struct {
gas uint64
}

const reapListCompactThreshold = 1024

type ReapList struct {
// txs is a list of transactions and their respective hashes
// NOTE: this currently has unbound size
// txs is a list of transactions and their respective hashes.
// Entries before head have already been reaped or compacted away.
// NOTE: this currently has unbound size.
txs []*txWithHash

// txIndex is a map of tx hashes to what index that tx is stored in inside
// of txs. This serves a dual purpose of allowing for fast drops from txs
// without iteration, and guarding txs from being added to the ReapList
// twice before they are explicitly dropped.
// head is the first candidate index in txs that has not yet been reaped.
head int

// txIndex maps tx hashes to their absolute index in txs.
// Reaped txs are kept with an index of -1 until they are dropped.
txIndex map[string]int

// txsLock protects txLookup and txs.
Expand All @@ -49,8 +52,9 @@ func NewReapList(txEncoder EVMCosmosTxEncoder) *ReapList {
}

// Reap returns a list of the oldest to newest transactions bytes from the reap
// list until either maxBytes or maxGas is reached for the list of transactions
// being returned. If maxBytes and maxGas are both 0 all txs will be returned.
// list until either maxBytes or maxGas is reached. Uses a head offset so only
// the reaped prefix is traversed (O(k) for k reaped txs) with amortized
// compaction of the backing slice.
func (rl *ReapList) Reap(maxBytes uint64, maxGas uint64) [][]byte {
rl.txsLock.Lock()
defer rl.txsLock.Unlock()
Expand All @@ -59,78 +63,58 @@ func (rl *ReapList) Reap(maxBytes uint64, maxGas uint64) [][]byte {
totalBytes uint64
totalGas uint64
result [][]byte
nextStart int
nextHead = rl.head
)

for idx, tx := range rl.txs {
for idx := rl.head; idx < len(rl.txs); idx++ {
tx := rl.txs[idx]
if tx == nil {
// txs may have "holes" (nil) due to txs being invalidated and
// dropped while they are waiting in the reap list
nextStart = idx + 1
// dropped while they are waiting in the reap list.
nextHead = idx + 1
continue
}

txSize := uint64(len(tx.bytes))
txGas := tx.gas

// Check if adding this tx would exceed limits
// Check if adding this tx would exceed limits.
if (maxBytes > 0 && totalBytes+txSize > maxBytes) || (maxGas > 0 && totalGas+txGas > maxGas) {
break
}

result = append(result, tx.bytes)
totalBytes += txSize
totalGas += txGas
nextStart = idx + 1
nextHead = idx + 1

// NOTE: We need to keep the txs that were just reaped in the txIndex, so
// that it can properly guard against these txs being added to the ReapList
// again. These txs are likely still in the mempool, and callers may try to
// add them to the ReapList again, which is not allowed. Removing from the
// txIndex will only be done during Drop.
// Keep reaped txs in txIndex with sentinel -1 so callers cannot
// re-add them until they are explicitly dropped.
if _, ok := rl.txIndex[tx.hash]; !ok {
panic("removed a tx that was not in the tx index, this should not happen")
}
rl.txIndex[tx.hash] = -1
}

if nextStart >= len(rl.txs) {
rl.txs = []*txWithHash{}
} else {
// In order to remove the txs that were returned from reap, we can simply
// reslice the list since all removed txs were from the start, and we saved
// where the next set of valid txs start in nextStart.
//
// Also compact away any nil values from the new slice.
rl.txs = slices.DeleteFunc(rl.txs[nextStart:], func(tx *txWithHash) bool {
return tx == nil
})
}

// rebuild the index since txs may have shifted indices
for i, tx := range rl.txs {
if _, ok := rl.txIndex[tx.hash]; !ok {
panic("tx that was not reaped is not in the tx index, this should not happen")
}
rl.txIndex[tx.hash] = i
}
rl.head = nextHead
rl.maybeCompactLocked()

return result
}

// PushEVMTx enqueues an EVM tx into the reap list.
func (rl *ReapList) PushEVMTx(tx *ethtypes.Transaction) error {
hash := tx.Hash().String()
if rl.exists(hash) {
return nil
}

txBytes, err := rl.txEncoder.EVMTx(tx)
if err != nil {
return fmt.Errorf("encoding evm tx to bytes: %w", err)
}

rl.push(hash, txBytes, tx.Gas())
rl.txsLock.Lock()
defer rl.txsLock.Unlock()

rl.pushLocked(hash, txBytes, tx.Gas())
return nil
}

Expand All @@ -141,52 +125,78 @@ func (rl *ReapList) PushCosmosTx(tx sdk.Tx) error {
return fmt.Errorf("encoding cosmos tx to bytes: %w", err)
}

hash := cosmosHash(txBytes)
if rl.exists(hash) {
return nil
}

var gas uint64
if feeTx, ok := tx.(sdk.FeeTx); ok {
gas = feeTx.GetGas()
} else {
return fmt.Errorf("error getting tx gas: cosmos tx must implement sdk.FeeTx")
}

rl.push(hash, txBytes, gas)
rl.txsLock.Lock()
defer rl.txsLock.Unlock()

rl.pushLocked(cosmosHash(txBytes), txBytes, gas)
return nil
}

// push inserts a tx to the back of the reap list as the "newest" transaction
// (last to be returned if Reap was called now). push assumes that a tx is not
// already in the ReapList, this should be checked via exists.
func (rl *ReapList) push(hash string, tx []byte, gas uint64) {
rl.txsLock.Lock()
defer rl.txsLock.Unlock()
// pushLocked inserts a tx to the back of the reap list. Deduplicates by hash
// under the already-held write lock.
func (rl *ReapList) pushLocked(hash string, tx []byte, gas uint64) {
if _, ok := rl.txIndex[hash]; ok {
return
}

rl.txs = append(rl.txs, &txWithHash{tx, hash, gas})
if rl.head >= len(rl.txs) && len(rl.txs) > 0 {
// All active entries have been consumed; reuse the backing array.
rl.txs = rl.txs[:0]
rl.head = 0
}

rl.txs = append(rl.txs, &txWithHash{bytes: tx, hash: hash, gas: gas})
rl.txIndex[hash] = len(rl.txs) - 1
}

// exists returns true if a hash is in the index, false otherwise.
func (rl *ReapList) exists(hash string) bool {
rl.txsLock.RLock()
defer rl.txsLock.RUnlock()
// maybeCompactLocked compacts the backing slice when enough of the prefix has
// been consumed, keeping Reap amortized O(k).
func (rl *ReapList) maybeCompactLocked() {
if rl.head == 0 {
return
}

if rl.head >= len(rl.txs) {
rl.txs = rl.txs[:0]
rl.head = 0
return
}

// Compact when we have consumed a meaningful prefix.
if rl.head < reapListCompactThreshold && rl.head*2 < len(rl.txs) {
return
}

compacted := make([]*txWithHash, 0, len(rl.txs)-rl.head)
for idx := rl.head; idx < len(rl.txs); idx++ {
tx := rl.txs[idx]
if tx == nil {
continue
}
if _, ok := rl.txIndex[tx.hash]; !ok {
panic("tx that was not reaped is not in the tx index, this should not happen")
}
rl.txIndex[tx.hash] = len(compacted)
compacted = append(compacted, tx)
}

_, ok := rl.txIndex[hash]
return ok
rl.txs = compacted
rl.head = 0
}

// DropEVMTx removes an EVM tx from the ReapList. This tx may or may not have
// already been reaped. This should only be called when a tx that was
// previously validated, becomes invalid.
// DropEVMTx removes an EVM tx from the ReapList.
func (rl *ReapList) DropEVMTx(tx *ethtypes.Transaction) {
rl.drop(tx.Hash().String())
}

// DropCosmosTx removes a Cosmos tx from the ReapList. This tx may or may not
// have already been reaped. This should only be called when a tx that was
// previously validated, becomes invalid.
// DropCosmosTx removes a Cosmos tx from the ReapList.
func (rl *ReapList) DropCosmosTx(tx sdk.Tx) {
txBytes, err := rl.txEncoder.CosmosTx(tx)
if err != nil {
Expand All @@ -195,8 +205,7 @@ func (rl *ReapList) DropCosmosTx(tx sdk.Tx) {
rl.drop(cosmosHash(txBytes))
}

// drop removes an individual tx from the reap list. If the tx is not in the
// list, no changes are made.
// drop removes an individual tx from the reap list.
func (rl *ReapList) drop(hash string) {
rl.txsLock.Lock()
defer rl.txsLock.Unlock()
Expand Down
Loading