From 0097fc199a930d3c5fe0c8682e977e41582375cd Mon Sep 17 00:00:00 2001 From: daps94 <35882689+daps94@users.noreply.github.com> Date: Mon, 9 Mar 2026 18:07:12 +0700 Subject: [PATCH] perf(mempool): O(k) reap with head-offset and amortized compaction Replace the full O(N) index rebuild after every Reap() with a head-offset design. Reap now advances a head pointer past consumed entries and only compacts the backing slice when a meaningful prefix has been consumed (threshold-based). Push operations deduplicate under a single write lock, eliminating the TOCTOU window between the old exists() and push() calls. Co-Authored-By: Claude Opus 4.6 --- mempool/reap_list.go | 151 +++++++++++++++++++++++-------------------- 1 file changed, 80 insertions(+), 71 deletions(-) diff --git a/mempool/reap_list.go b/mempool/reap_list.go index ce866259b..912366f5f 100644 --- a/mempool/reap_list.go +++ b/mempool/reap_list.go @@ -2,7 +2,6 @@ package mempool import ( "fmt" - "slices" "sync" ethtypes "github.com/ethereum/go-ethereum/core/types" @@ -23,15 +22,19 @@ type txWithHash struct { gas uint64 } +const reapListCompactThreshold = 1024 + type ReapList struct { - // txs is a list of transactions and their respective hashes - // NOTE: this currently has unbound size + // txs is a list of transactions and their respective hashes. + // Entries before head have already been reaped or compacted away. + // NOTE: this currently has unbound size. txs []*txWithHash - // txIndex is a map of tx hashes to what index that tx is stored in inside - // of txs. This serves a dual purpose of allowing for fast drops from txs - // without iteration, and guarding txs from being added to the ReapList - // twice before they are explicitly dropped. + // head is the first candidate index in txs that has not yet been reaped. + head int + + // txIndex maps tx hashes to their absolute index in txs. + // Reaped txs are kept with an index of -1 until they are dropped. txIndex map[string]int // txsLock protects txLookup and txs. @@ -49,8 +52,9 @@ func NewReapList(txEncoder EVMCosmosTxEncoder) *ReapList { } // Reap returns a list of the oldest to newest transactions bytes from the reap -// list until either maxBytes or maxGas is reached for the list of transactions -// being returned. If maxBytes and maxGas are both 0 all txs will be returned. +// list until either maxBytes or maxGas is reached. Uses a head offset so only +// the reaped prefix is traversed (O(k) for k reaped txs) with amortized +// compaction of the backing slice. func (rl *ReapList) Reap(maxBytes uint64, maxGas uint64) [][]byte { rl.txsLock.Lock() defer rl.txsLock.Unlock() @@ -59,21 +63,22 @@ func (rl *ReapList) Reap(maxBytes uint64, maxGas uint64) [][]byte { totalBytes uint64 totalGas uint64 result [][]byte - nextStart int + nextHead = rl.head ) - for idx, tx := range rl.txs { + for idx := rl.head; idx < len(rl.txs); idx++ { + tx := rl.txs[idx] if tx == nil { // txs may have "holes" (nil) due to txs being invalidated and - // dropped while they are waiting in the reap list - nextStart = idx + 1 + // dropped while they are waiting in the reap list. + nextHead = idx + 1 continue } txSize := uint64(len(tx.bytes)) txGas := tx.gas - // Check if adding this tx would exceed limits + // Check if adding this tx would exceed limits. if (maxBytes > 0 && totalBytes+txSize > maxBytes) || (maxGas > 0 && totalGas+txGas > maxGas) { break } @@ -81,39 +86,18 @@ func (rl *ReapList) Reap(maxBytes uint64, maxGas uint64) [][]byte { result = append(result, tx.bytes) totalBytes += txSize totalGas += txGas - nextStart = idx + 1 + nextHead = idx + 1 - // NOTE: We need to keep the txs that were just reaped in the txIndex, so - // that it can properly guard against these txs being added to the ReapList - // again. These txs are likely still in the mempool, and callers may try to - // add them to the ReapList again, which is not allowed. Removing from the - // txIndex will only be done during Drop. + // Keep reaped txs in txIndex with sentinel -1 so callers cannot + // re-add them until they are explicitly dropped. if _, ok := rl.txIndex[tx.hash]; !ok { panic("removed a tx that was not in the tx index, this should not happen") } rl.txIndex[tx.hash] = -1 } - if nextStart >= len(rl.txs) { - rl.txs = []*txWithHash{} - } else { - // In order to remove the txs that were returned from reap, we can simply - // reslice the list since all removed txs were from the start, and we saved - // where the next set of valid txs start in nextStart. - // - // Also compact away any nil values from the new slice. - rl.txs = slices.DeleteFunc(rl.txs[nextStart:], func(tx *txWithHash) bool { - return tx == nil - }) - } - - // rebuild the index since txs may have shifted indices - for i, tx := range rl.txs { - if _, ok := rl.txIndex[tx.hash]; !ok { - panic("tx that was not reaped is not in the tx index, this should not happen") - } - rl.txIndex[tx.hash] = i - } + rl.head = nextHead + rl.maybeCompactLocked() return result } @@ -121,16 +105,16 @@ func (rl *ReapList) Reap(maxBytes uint64, maxGas uint64) [][]byte { // PushEVMTx enqueues an EVM tx into the reap list. func (rl *ReapList) PushEVMTx(tx *ethtypes.Transaction) error { hash := tx.Hash().String() - if rl.exists(hash) { - return nil - } txBytes, err := rl.txEncoder.EVMTx(tx) if err != nil { return fmt.Errorf("encoding evm tx to bytes: %w", err) } - rl.push(hash, txBytes, tx.Gas()) + rl.txsLock.Lock() + defer rl.txsLock.Unlock() + + rl.pushLocked(hash, txBytes, tx.Gas()) return nil } @@ -141,11 +125,6 @@ func (rl *ReapList) PushCosmosTx(tx sdk.Tx) error { return fmt.Errorf("encoding cosmos tx to bytes: %w", err) } - hash := cosmosHash(txBytes) - if rl.exists(hash) { - return nil - } - var gas uint64 if feeTx, ok := tx.(sdk.FeeTx); ok { gas = feeTx.GetGas() @@ -153,40 +132,71 @@ func (rl *ReapList) PushCosmosTx(tx sdk.Tx) error { return fmt.Errorf("error getting tx gas: cosmos tx must implement sdk.FeeTx") } - rl.push(hash, txBytes, gas) + rl.txsLock.Lock() + defer rl.txsLock.Unlock() + + rl.pushLocked(cosmosHash(txBytes), txBytes, gas) return nil } -// push inserts a tx to the back of the reap list as the "newest" transaction -// (last to be returned if Reap was called now). push assumes that a tx is not -// already in the ReapList, this should be checked via exists. -func (rl *ReapList) push(hash string, tx []byte, gas uint64) { - rl.txsLock.Lock() - defer rl.txsLock.Unlock() +// pushLocked inserts a tx to the back of the reap list. Deduplicates by hash +// under the already-held write lock. +func (rl *ReapList) pushLocked(hash string, tx []byte, gas uint64) { + if _, ok := rl.txIndex[hash]; ok { + return + } - rl.txs = append(rl.txs, &txWithHash{tx, hash, gas}) + if rl.head >= len(rl.txs) && len(rl.txs) > 0 { + // All active entries have been consumed; reuse the backing array. + rl.txs = rl.txs[:0] + rl.head = 0 + } + + rl.txs = append(rl.txs, &txWithHash{bytes: tx, hash: hash, gas: gas}) rl.txIndex[hash] = len(rl.txs) - 1 } -// exists returns true if a hash is in the index, false otherwise. -func (rl *ReapList) exists(hash string) bool { - rl.txsLock.RLock() - defer rl.txsLock.RUnlock() +// maybeCompactLocked compacts the backing slice when enough of the prefix has +// been consumed, keeping Reap amortized O(k). +func (rl *ReapList) maybeCompactLocked() { + if rl.head == 0 { + return + } + + if rl.head >= len(rl.txs) { + rl.txs = rl.txs[:0] + rl.head = 0 + return + } + + // Compact when we have consumed a meaningful prefix. + if rl.head < reapListCompactThreshold && rl.head*2 < len(rl.txs) { + return + } + + compacted := make([]*txWithHash, 0, len(rl.txs)-rl.head) + for idx := rl.head; idx < len(rl.txs); idx++ { + tx := rl.txs[idx] + if tx == nil { + continue + } + if _, ok := rl.txIndex[tx.hash]; !ok { + panic("tx that was not reaped is not in the tx index, this should not happen") + } + rl.txIndex[tx.hash] = len(compacted) + compacted = append(compacted, tx) + } - _, ok := rl.txIndex[hash] - return ok + rl.txs = compacted + rl.head = 0 } -// DropEVMTx removes an EVM tx from the ReapList. This tx may or may not have -// already been reaped. This should only be called when a tx that was -// previously validated, becomes invalid. +// DropEVMTx removes an EVM tx from the ReapList. func (rl *ReapList) DropEVMTx(tx *ethtypes.Transaction) { rl.drop(tx.Hash().String()) } -// DropCosmosTx removes a Cosmos tx from the ReapList. This tx may or may not -// have already been reaped. This should only be called when a tx that was -// previously validated, becomes invalid. +// DropCosmosTx removes a Cosmos tx from the ReapList. func (rl *ReapList) DropCosmosTx(tx sdk.Tx) { txBytes, err := rl.txEncoder.CosmosTx(tx) if err != nil { @@ -195,8 +205,7 @@ func (rl *ReapList) DropCosmosTx(tx sdk.Tx) { rl.drop(cosmosHash(txBytes)) } -// drop removes an individual tx from the reap list. If the tx is not in the -// list, no changes are made. +// drop removes an individual tx from the reap list. func (rl *ReapList) drop(hash string) { rl.txsLock.Lock() defer rl.txsLock.Unlock()