Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions mempool/internal/heightsync/heightsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"time"

"github.com/ethereum/go-ethereum/metrics"

"cosmossdk.io/log/v2"
)

var (
Expand All @@ -26,6 +28,10 @@ var (
// hsDuration is the total time callers of Get spend from invocation to
// return.
hsDuration = metrics.NewRegisteredTimer("height_sync/duration", nil)

// hsHeights it the total number of heights progressed through on the
// height sync
hsHeights = metrics.NewRegisteredMeter("height_sync/heights", nil)
)

// HeightSync synchronizes access to a per-height tx store for mempool
Expand All @@ -50,7 +56,7 @@ type HeightSync[Store any] struct {
currentHeight *big.Int

// reset is a function that returns a new store, called at every new height
reset func() *Store
reset func(logger log.Logger) *Store

// store is the current Store that operations are happening on via Do and will
// be returned via Get until a new height is started via StartNewHeight
Expand All @@ -66,16 +72,19 @@ type HeightSync[Store any] struct {
// mu protects all of the above fields; it does not protect internal
// fields of the Store itself
mu sync.RWMutex

logger log.Logger
}

// New creates a new HeightSync starting at the given height.
func New[Store any](startHeight *big.Int, reset func() *Store) *HeightSync[Store] {
func New[Store any](startHeight *big.Int, reset func(logger log.Logger) *Store, logger log.Logger) *HeightSync[Store] {
hs := &HeightSync[Store]{
currentHeight: startHeight,
reset: reset,
store: reset(),
store: reset(logger),
heightChanged: make(chan struct{}),
done: make(chan struct{}),
logger: logger,
}
// initial height is considered immediately done
hs.EndCurrentHeight()
Expand All @@ -96,7 +105,7 @@ func (hs *HeightSync[Store]) StartNewHeight(height *big.Int) {

// create new Store for this height
hs.currentHeight = new(big.Int).Set(height)
hs.store = hs.reset()
hs.store = hs.reset(hs.logger)

// close old channel and create new one to wake up consumers
oldChan := hs.heightChanged
Expand All @@ -117,6 +126,7 @@ func (hs *HeightSync[Store]) EndCurrentHeight() {
if hs.isHeightEnded() {
panic(fmt.Errorf("height %s already ended", hs.currentHeight.String()))
}
hsHeights.Mark(1)
close(hs.done)
}

Expand Down
20 changes: 11 additions & 9 deletions mempool/internal/heightsync/heightsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/stretchr/testify/require"

"github.com/cosmos/evm/mempool/internal/heightsync"

"cosmossdk.io/log/v2"
)

// testStore is a simple store for testing the generic height-sync behavior.
Expand All @@ -19,7 +21,7 @@ type testStore struct {
mu sync.Mutex
}

func newTestValue() *testStore {
func newTestValue(_ log.Logger) *testStore {
return &testStore{}
}

Expand All @@ -36,7 +38,7 @@ func (s *testStore) get() []string {
}

func TestBasicGetAfterCompletion(t *testing.T) {
hv := heightsync.New(big.NewInt(1), newTestValue)
hv := heightsync.New(big.NewInt(1), newTestValue, log.NewNopLogger())

hv.StartNewHeight(big.NewInt(1))
hv.Do(func(s *testStore) {
Expand All @@ -58,7 +60,7 @@ func TestBasicGetAfterCompletion(t *testing.T) {
}

func TestGetTimeoutBeforeHeight(t *testing.T) {
hv := heightsync.New(big.NewInt(1), newTestValue)
hv := heightsync.New(big.NewInt(1), newTestValue, log.NewNopLogger())

// request height 3 but don't advance to it
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
Expand All @@ -70,7 +72,7 @@ func TestGetTimeoutBeforeHeight(t *testing.T) {

func TestGetPartialResults(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
hv := heightsync.New(big.NewInt(1), newTestValue)
hv := heightsync.New(big.NewInt(1), newTestValue, log.NewNopLogger())

// start new height but don't call EndCurrentHeight
hv.StartNewHeight(big.NewInt(1))
Expand All @@ -90,7 +92,7 @@ func TestGetPartialResults(t *testing.T) {

func TestGetBehindByOneHeight(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
hv := heightsync.New(big.NewInt(1), newTestValue)
hv := heightsync.New(big.NewInt(1), newTestValue, log.NewNopLogger())

hv.StartNewHeight(big.NewInt(1))
hv.Do(func(s *testStore) { s.add("height1") })
Expand Down Expand Up @@ -120,7 +122,7 @@ func TestGetBehindByOneHeight(t *testing.T) {

func TestGetBehindByTwoHeights(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
hv := heightsync.New(big.NewInt(1), newTestValue)
hv := heightsync.New(big.NewInt(1), newTestValue, log.NewNopLogger())

hv.StartNewHeight(big.NewInt(1))
hv.Do(func(s *testStore) { s.add("height1") })
Expand Down Expand Up @@ -155,7 +157,7 @@ func TestGetBehindByTwoHeights(t *testing.T) {
}

func TestPanicOnOldHeight(t *testing.T) {
hv := heightsync.New(big.NewInt(1), newTestValue)
hv := heightsync.New(big.NewInt(1), newTestValue, log.NewNopLogger())

hv.StartNewHeight(big.NewInt(1))
hv.EndCurrentHeight()
Expand All @@ -170,7 +172,7 @@ func TestPanicOnOldHeight(t *testing.T) {
}

func TestStartNewHeightResetsValue(t *testing.T) {
hv := heightsync.New(big.NewInt(1), newTestValue)
hv := heightsync.New(big.NewInt(1), newTestValue, log.NewNopLogger())

hv.StartNewHeight(big.NewInt(1))
hv.Do(func(s *testStore) { s.add("old") })
Expand All @@ -189,7 +191,7 @@ func TestStartNewHeightResetsValue(t *testing.T) {
}

func TestConcurrentDo(t *testing.T) {
hv := heightsync.New(big.NewInt(1), newTestValue)
hv := heightsync.New(big.NewInt(1), newTestValue, log.NewNopLogger())

hv.StartNewHeight(big.NewInt(1))

Expand Down
3 changes: 2 additions & 1 deletion mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func NewExperimentalEVMMempool(
}
legacyPool := legacypool.New(
legacyConfig,
logger,
blockchain,
legacypool.WithRecheck(evmRechecker),
)
Expand Down Expand Up @@ -213,7 +214,7 @@ func NewExperimentalEVMMempool(
cosmosPool,
tracker.NewHandle(-1),
cosmosRechecker,
heightsync.New(blockchain.CurrentBlock().Number, NewCosmosTxStore),
heightsync.New(blockchain.CurrentBlock().Number, NewCosmosTxStore, logger.With("pool", "recheckpool")),
blockchain,
)

Expand Down
2 changes: 1 addition & 1 deletion mempool/recheck_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,7 @@ func noopAnteHandler(ctx sdk.Context, _ sdk.Tx, _ bool) (sdk.Context, error) {

// newTestRecheckedTxs creates a HeightSync[CosmosTxStore] for testing, starting at height 0.
func newTestRecheckedTxs() *heightsync.HeightSync[mempool.CosmosTxStore] {
return heightsync.New(big.NewInt(0), mempool.NewCosmosTxStore)
return heightsync.New(big.NewInt(0), mempool.NewCosmosTxStore, log.NewNopLogger())
}

// collectIteratorTxs drains an sdkmempool.Iterator into a slice.
Expand Down
4 changes: 3 additions & 1 deletion mempool/tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package mempool
import (
"sync"

"cosmossdk.io/log/v2"

sdk "github.com/cosmos/cosmos-sdk/types"
sdkmempool "github.com/cosmos/cosmos-sdk/types/mempool"
)
Expand All @@ -19,7 +21,7 @@ type CosmosTxStore struct {
}

// NewCosmosTxStore creates a new CosmosTxStore.
func NewCosmosTxStore() *CosmosTxStore {
func NewCosmosTxStore(_ log.Logger) *CosmosTxStore {
return &CosmosTxStore{
index: make(map[sdk.Tx]int),
}
Expand Down
12 changes: 7 additions & 5 deletions mempool/tx_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (

"github.com/cosmos/gogoproto/proto"

"cosmossdk.io/log/v2"

sdk "github.com/cosmos/cosmos-sdk/types"
)

Expand All @@ -26,7 +28,7 @@ func newMockTx(id int) sdk.Tx {
}

func TestCosmosTxStoreAddAndGet(t *testing.T) {
store := NewCosmosTxStore()
store := NewCosmosTxStore(log.NewNopLogger())

tx1 := newMockTx(1)
tx2 := newMockTx(2)
Expand All @@ -41,7 +43,7 @@ func TestCosmosTxStoreAddAndGet(t *testing.T) {
}

func TestCosmosTxStoreDedup(t *testing.T) {
store := NewCosmosTxStore()
store := NewCosmosTxStore(log.NewNopLogger())

tx := newMockTx(1)

Expand All @@ -53,7 +55,7 @@ func TestCosmosTxStoreDedup(t *testing.T) {
}

func TestCosmosTxStoreIterator(t *testing.T) {
store := NewCosmosTxStore()
store := NewCosmosTxStore(log.NewNopLogger())

tx1 := newMockTx(1)
tx2 := newMockTx(2)
Expand All @@ -74,12 +76,12 @@ func TestCosmosTxStoreIterator(t *testing.T) {
}

func TestCosmosTxStoreIteratorEmpty(t *testing.T) {
store := NewCosmosTxStore()
store := NewCosmosTxStore(log.NewNopLogger())
require.Nil(t, store.Iterator())
}

func TestCosmosTxStoreIteratorSnapshotIsolation(t *testing.T) {
store := NewCosmosTxStore()
store := NewCosmosTxStore(log.NewNopLogger())

tx1 := newMockTx(1)
tx2 := newMockTx(2)
Expand Down
5 changes: 3 additions & 2 deletions mempool/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"sync/atomic"
"time"

cosmoslog "cosmossdk.io/log/v2"
"github.com/cosmos/evm/mempool/internal/heightsync"
"github.com/cosmos/evm/mempool/reserver"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -383,7 +384,7 @@ func WithRecheck(rechecker Rechecker) Option {

// New creates a new transaction pool to gather, sort and filter inbound
// transactions from the network.
func New(config Config, chain BlockChain, opts ...Option) *LegacyPool {
func New(config Config, logger cosmoslog.Logger, chain BlockChain, opts ...Option) *LegacyPool {
// Sanitize the input to ensure no vulnerable gas prices are set
config = (&config).sanitize()

Expand All @@ -398,7 +399,7 @@ func New(config Config, chain BlockChain, opts ...Option) *LegacyPool {
beats: make(map[common.Address]time.Time),
all: newLookup(),
rechecker: newNopRechecker(),
validPendingTxs: heightsync.New(chain.CurrentBlock().Number, NewTxStore),
validPendingTxs: heightsync.New(chain.CurrentBlock().Number, NewTxStore, logger.With("pool", "legacypool")),
reqResetCh: make(chan *txpoolResetRequest),
reqPromoteCh: make(chan *accountSet),
reqCancelResetCh: make(chan struct{}),
Expand Down
9 changes: 5 additions & 4 deletions mempool/txpool/legacypool/legacypool2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"math/big"
"testing"

"cosmossdk.io/log/v2"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/tracing"
Expand Down Expand Up @@ -86,7 +87,7 @@ func TestTransactionFutureAttack(t *testing.T) {
config := testTxPoolConfig
config.GlobalQueue = 100
config.GlobalSlots = 100
pool := New(config, blockchain)
pool := New(config, log.NewNopLogger(), blockchain)
pool.Init(config.PriceLimit, blockchain.CurrentBlock(), newReserver())
defer pool.Close()

Expand Down Expand Up @@ -121,7 +122,7 @@ func TestTransactionFuture1559(t *testing.T) {
// Create the pool to test the pricing enforcement with
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting())
blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed))
pool := New(testTxPoolConfig, blockchain)
pool := New(testTxPoolConfig, log.NewNopLogger(), blockchain)
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver())
defer pool.Close()

Expand Down Expand Up @@ -154,7 +155,7 @@ func TestTransactionZAttack(t *testing.T) {
// Create the pool to test the pricing enforcement with
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting())
blockchain := newTestBlockChain(eip1559Config, 1000000, statedb, new(event.Feed))
pool := New(testTxPoolConfig, blockchain)
pool := New(testTxPoolConfig, log.NewNopLogger(), blockchain)
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver())
defer pool.Close()
// Create a number of test accounts, fund them and make transactions
Expand Down Expand Up @@ -227,7 +228,7 @@ func BenchmarkFutureAttack(b *testing.B) {
config := testTxPoolConfig
config.GlobalQueue = 100
config.GlobalSlots = 100
pool := New(config, blockchain)
pool := New(config, log.NewNopLogger(), blockchain)
pool.Init(testTxPoolConfig.PriceLimit, blockchain.CurrentBlock(), newReserver())
defer pool.Close()
fillPool(b, pool)
Expand Down
Loading
Loading