Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ var (
utils.GpoMaxGasPriceFlag,
utils.EWASMInterpreterFlag,
utils.EVMInterpreterFlag,
utils.ParallelTxFlag,
utils.ParallelTxNumFlag,
utils.MinerNotifyFullFlag,
configFileFlag,
utils.CatalystFlag,
Expand Down
29 changes: 29 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"math/big"
"os"
"path/filepath"
"runtime"
godebug "runtime/debug"
"strconv"
"strings"
Expand Down Expand Up @@ -802,6 +803,14 @@ var (
Usage: "External EVM configuration (default = built-in interpreter)",
Value: "",
}
ParallelTxFlag = cli.BoolFlag{
Name: "parallel",
Usage: "Enable the experimental parallel transaction execution mode, only valid in full sync mode (default = false)",
}
ParallelTxNumFlag = cli.IntFlag{
Name: "parallel.num",
Usage: "Number of slot for transaction execution, only valid in parallel mode (runtime calculated, no fixed default value)",
}

// Init network
InitNetworkSize = cli.IntFlag{
Expand Down Expand Up @@ -1642,6 +1651,26 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
if ctx.GlobalIsSet(RangeLimitFlag.Name) {
cfg.RangeLimit = ctx.GlobalBool(RangeLimitFlag.Name)
}
if ctx.GlobalIsSet(ParallelTxFlag.Name) {
cfg.ParallelTxMode = ctx.GlobalBool(ParallelTxFlag.Name)
// The best prallel num will be tuned later, we do a simple parallel num set here
numCpu := runtime.NumCPU()
var parallelNum int
if ctx.GlobalIsSet(ParallelTxNumFlag.Name) {
// first of all, we use "--parallel.num", but "--parallel.num 0" is not allowed
parallelNum = ctx.GlobalInt(ParallelTxNumFlag.Name)
if parallelNum < 1 {
parallelNum = 1
}
} else if numCpu == 1 {
parallelNum = 1 // single CPU core
} else if numCpu < 10 {
parallelNum = numCpu - 1
} else {
parallelNum = 8 // we found concurrency 8 is slightly better than 15
}
cfg.ParallelTxNum = parallelNum
}
// Read the value from the flag no matter if it's set or not.
cfg.Preimages = ctx.GlobalBool(CachePreimagesFlag.Name)
if cfg.NoPruning && !cfg.Preimages {
Expand Down
39 changes: 28 additions & 11 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,13 @@ type BlockChain struct {
running int32 // 0 if chain is running, 1 when stopped
procInterrupt int32 // interrupt signaler for block processing

engine consensus.Engine
prefetcher Prefetcher
validator Validator // Block and state validator interface
processor Processor // Block transaction processor interface
vmConfig vm.Config
pipeCommit bool
engine consensus.Engine
prefetcher Prefetcher
validator Validator // Block and state validator interface
processor Processor // Block transaction processor interface
vmConfig vm.Config
pipeCommit bool
parallelExecution bool

shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion.
Expand Down Expand Up @@ -2116,11 +2117,14 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
statedb.StartPrefetcher("chain")
var followupInterrupt uint32
// For diff sync, it may fallback to full sync, so we still do prefetch
if len(block.Transactions()) >= prefetchTxNumber {
throwaway := statedb.Copy()
go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *uint32) {
bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt)
}(time.Now(), block, throwaway, &followupInterrupt)
// parallel mode has a pipeline, similar to this prefetch, to save CPU we disable this prefetch for parallel
if !bc.parallelExecution {
if len(block.Transactions()) >= prefetchTxNumber {
throwaway := statedb.Copy()
go func(start time.Time, followup *types.Block, throwaway *state.StateDB, interrupt *uint32) {
bc.prefetcher.Prefetch(followup, throwaway, bc.vmConfig, &followupInterrupt)
}(time.Now(), block, throwaway, &followupInterrupt)
}
}
//Process block using the parent state as reference point
substart := time.Now()
Expand Down Expand Up @@ -3105,3 +3109,16 @@ func EnablePersistDiff(limit uint64) BlockChainOption {
return chain
}
}

func EnableParallelProcessor(parallelNum int) BlockChainOption {
return func(chain *BlockChain) *BlockChain {
if chain.snaps == nil {
// disable parallel processor if snapshot is not enabled to avoid concurrent issue for SecureTrie
log.Info("parallel processor is not enabled since snapshot is not enabled")
return chain
}
chain.parallelExecution = true
chain.processor = NewParallelStateProcessor(chain.Config(), chain, chain.engine, parallelNum)
return chain
}
}
2 changes: 1 addition & 1 deletion core/state/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (s *StateDB) DumpToCollector(c DumpCollector, excludeCode, excludeStorage,
account.SecureKey = it.Key
}
addr := common.BytesToAddress(addrBytes)
obj := newObject(s, addr, data)
obj := newObject(s, s.isParallel, addr, data)
if !excludeCode {
account.Code = common.Bytes2Hex(obj.Code(s.db))
}
Expand Down
82 changes: 82 additions & 0 deletions core/state/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package state

import (
"math/big"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)

// StateDBer is copied from vm/interface.go
// It is used by StateObject & Journal right now, to abstract StateDB & ParallelStateDB
type StateDBer interface {
getBaseStateDB() *StateDB
getStateObject(common.Address) *StateObject // only accessible for journal
storeStateObj(common.Address, *StateObject) // only accessible for journal

CreateAccount(common.Address)

SubBalance(common.Address, *big.Int)
AddBalance(common.Address, *big.Int)
GetBalance(common.Address) *big.Int

GetNonce(common.Address) uint64
SetNonce(common.Address, uint64)

GetCodeHash(common.Address) common.Hash
GetCode(common.Address) []byte
SetCode(common.Address, []byte)
GetCodeSize(common.Address) int

AddRefund(uint64)
SubRefund(uint64)
GetRefund() uint64

GetCommittedState(common.Address, common.Hash) common.Hash
GetState(common.Address, common.Hash) common.Hash
SetState(common.Address, common.Hash, common.Hash)

Suicide(common.Address) bool
HasSuicided(common.Address) bool

// Exist reports whether the given account exists in state.
// Notably this should also return true for suicided accounts.
Exist(common.Address) bool
// Empty returns whether the given account is empty. Empty
// is defined according to EIP161 (balance = nonce = code = 0).
Empty(common.Address) bool

PrepareAccessList(sender common.Address, dest *common.Address, precompiles []common.Address, txAccesses types.AccessList)
AddressInAccessList(addr common.Address) bool
SlotInAccessList(addr common.Address, slot common.Hash) (addressOk bool, slotOk bool)
// AddAddressToAccessList adds the given address to the access list. This operation is safe to perform
// even if the feature/fork is not active yet
AddAddressToAccessList(addr common.Address)
// AddSlotToAccessList adds the given (address,slot) to the access list. This operation is safe to perform
// even if the feature/fork is not active yet
AddSlotToAccessList(addr common.Address, slot common.Hash)

RevertToSnapshot(int)
Snapshot() int

AddLog(*types.Log)
AddPreimage(common.Hash, []byte)

ForEachStorage(common.Address, func(common.Hash, common.Hash) bool) error
}
71 changes: 48 additions & 23 deletions core/state/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
// reverted on demand.
type journalEntry interface {
// revert undoes the changes introduced by this journal entry.
revert(*StateDB)
revert(StateDBer)

// dirtied returns the Ethereum address modified by this journal entry.
dirtied() *common.Address
Expand Down Expand Up @@ -58,10 +58,10 @@ func (j *journal) append(entry journalEntry) {

// revert undoes a batch of journalled modifications along with any reverted
// dirty handling too.
func (j *journal) revert(statedb *StateDB, snapshot int) {
func (j *journal) revert(dber StateDBer, snapshot int) {
for i := len(j.entries) - 1; i >= snapshot; i-- {
// Undo the changes made by the operation
j.entries[i].revert(statedb)
j.entries[i].revert(dber)

// Drop any dirty tracking induced by the change
if addr := j.entries[i].dirtied(); addr != nil {
Expand Down Expand Up @@ -141,28 +141,47 @@ type (
}
)

func (ch createObjectChange) revert(s *StateDB) {
delete(s.stateObjects, *ch.account)
func (ch createObjectChange) revert(dber StateDBer) {
s := dber.getBaseStateDB()
if s.parallel.isSlotDB {
delete(s.parallel.dirtiedStateObjectsInSlot, *ch.account)
delete(s.parallel.addrStateChangesInSlot, *ch.account)
delete(s.parallel.nonceChangesInSlot, *ch.account)
delete(s.parallel.balanceChangesInSlot, *ch.account)
delete(s.parallel.codeChangesInSlot, *ch.account)
delete(s.parallel.kvChangesInSlot, *ch.account)
} else {
s.deleteStateObj(*ch.account)
}
delete(s.stateObjectsDirty, *ch.account)
}

func (ch createObjectChange) dirtied() *common.Address {
return ch.account
}

func (ch resetObjectChange) revert(s *StateDB) {
s.SetStateObject(ch.prev)
func (ch resetObjectChange) revert(dber StateDBer) {
s := dber.getBaseStateDB()
if s.parallel.isSlotDB {
// ch.prev must be from dirtiedStateObjectsInSlot, put it back
s.parallel.dirtiedStateObjectsInSlot[ch.prev.address] = ch.prev
} else {
// ch.prev was got from main DB, put it back to main DB.
s.storeStateObj(ch.prev.address, ch.prev)
}
if !ch.prevdestruct && s.snap != nil {
s.snapParallelLock.Lock()
delete(s.snapDestructs, ch.prev.address)
s.snapParallelLock.Unlock()
}
}

func (ch resetObjectChange) dirtied() *common.Address {
return nil
}

func (ch suicideChange) revert(s *StateDB) {
obj := s.getStateObject(*ch.account)
func (ch suicideChange) revert(dber StateDBer) {
obj := dber.getStateObject(*ch.account)
if obj != nil {
obj.suicided = ch.prev
obj.setBalance(ch.prevbalance)
Expand All @@ -175,54 +194,57 @@ func (ch suicideChange) dirtied() *common.Address {

var ripemd = common.HexToAddress("0000000000000000000000000000000000000003")

func (ch touchChange) revert(s *StateDB) {
func (ch touchChange) revert(dber StateDBer) {
}

func (ch touchChange) dirtied() *common.Address {
return ch.account
}

func (ch balanceChange) revert(s *StateDB) {
s.getStateObject(*ch.account).setBalance(ch.prev)
func (ch balanceChange) revert(dber StateDBer) {
dber.getStateObject(*ch.account).setBalance(ch.prev)
}

func (ch balanceChange) dirtied() *common.Address {
return ch.account
}

func (ch nonceChange) revert(s *StateDB) {
s.getStateObject(*ch.account).setNonce(ch.prev)
func (ch nonceChange) revert(dber StateDBer) {
dber.getStateObject(*ch.account).setNonce(ch.prev)
}

func (ch nonceChange) dirtied() *common.Address {
return ch.account
}

func (ch codeChange) revert(s *StateDB) {
s.getStateObject(*ch.account).setCode(common.BytesToHash(ch.prevhash), ch.prevcode)
func (ch codeChange) revert(dber StateDBer) {
dber.getStateObject(*ch.account).setCode(common.BytesToHash(ch.prevhash), ch.prevcode)
}

func (ch codeChange) dirtied() *common.Address {
return ch.account
}

func (ch storageChange) revert(s *StateDB) {
s.getStateObject(*ch.account).setState(ch.key, ch.prevalue)
func (ch storageChange) revert(dber StateDBer) {
dber.getStateObject(*ch.account).setState(ch.key, ch.prevalue)
}

func (ch storageChange) dirtied() *common.Address {
return ch.account
}

func (ch refundChange) revert(s *StateDB) {
func (ch refundChange) revert(dber StateDBer) {
s := dber.getBaseStateDB()
s.refund = ch.prev
}

func (ch refundChange) dirtied() *common.Address {
return nil
}

func (ch addLogChange) revert(s *StateDB) {
func (ch addLogChange) revert(dber StateDBer) {
s := dber.getBaseStateDB()

logs := s.logs[ch.txhash]
if len(logs) == 1 {
delete(s.logs, ch.txhash)
Expand All @@ -236,15 +258,17 @@ func (ch addLogChange) dirtied() *common.Address {
return nil
}

func (ch addPreimageChange) revert(s *StateDB) {
func (ch addPreimageChange) revert(dber StateDBer) {
s := dber.getBaseStateDB()
delete(s.preimages, ch.hash)
}

func (ch addPreimageChange) dirtied() *common.Address {
return nil
}

func (ch accessListAddAccountChange) revert(s *StateDB) {
func (ch accessListAddAccountChange) revert(dber StateDBer) {
s := dber.getBaseStateDB()
/*
One important invariant here, is that whenever a (addr, slot) is added, if the
addr is not already present, the add causes two journal entries:
Expand All @@ -263,7 +287,8 @@ func (ch accessListAddAccountChange) dirtied() *common.Address {
return nil
}

func (ch accessListAddSlotChange) revert(s *StateDB) {
func (ch accessListAddSlotChange) revert(dber StateDBer) {
s := dber.getBaseStateDB()
if s.accessList != nil {
s.accessList.DeleteSlot(*ch.address, *ch.slot)
}
Expand Down
Loading