From 62b33907f88604717fe05f71e86b8fb8a2ffcc19 Mon Sep 17 00:00:00 2001 From: kyrie-yl Date: Fri, 14 Jan 2022 11:32:19 +0800 Subject: [PATCH 1/4] fast node verification Signed-off-by: kyrie-yl --- cmd/geth/main.go | 4 +- cmd/geth/usage.go | 3 + cmd/utils/flags.go | 36 ++++++-- core/blockchain.go | 40 ++++++++- core/rawdb/accessors_chain.go | 20 +++++ core/rawdb/schema.go | 7 ++ core/types/block.go | 7 -- core/verify_manager.go | 150 ++++++++++++++++++++++++++++++++++ core/verify_modes.go | 69 ++++++++++++++++ core/verify_peers.go | 12 +++ core/verify_task.go | 125 ++++++++++++++++++++++++++++ eth/backend.go | 25 ++++-- eth/ethconfig/config.go | 6 +- eth/ethconfig/gen_config.go | 22 +++-- eth/handler_trust.go | 8 +- eth/peerset.go | 12 +-- ethclient/ethclient.go | 5 +- internal/ethapi/api.go | 2 +- 18 files changed, 515 insertions(+), 38 deletions(-) create mode 100644 core/verify_manager.go create mode 100644 core/verify_modes.go create mode 100644 core/verify_peers.go create mode 100644 core/verify_task.go diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 1ab5e134ac..a2b1ca01df 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -71,6 +71,8 @@ var ( utils.NoUSBFlag, utils.DirectBroadcastFlag, utils.DisableSnapProtocolFlag, + utils.DisableDiffProtocolFlag, + utils.EnableTrustProtocolFlag, utils.DiffSyncFlag, utils.RangeLimitFlag, utils.USBFlag, @@ -97,6 +99,7 @@ var ( utils.TxPoolLifetimeFlag, utils.TxPoolReannounceTimeFlag, utils.SyncModeFlag, + utils.TriesVerifyModeFlag, utils.ExitWhenSyncedFlag, utils.GCModeFlag, utils.SnapshotFlag, @@ -114,7 +117,6 @@ var ( utils.WhitelistFlag, utils.BloomFilterSizeFlag, utils.TriesInMemoryFlag, - utils.AllowInsecureNoTriesFlag, utils.CacheFlag, utils.CacheDatabaseFlag, utils.CacheTrieFlag, diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index fba14530b5..8cc4b5bd31 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -41,6 +41,8 @@ var AppHelpFlagGroups = []flags.FlagGroup{ utils.NoUSBFlag, utils.DirectBroadcastFlag, utils.DisableSnapProtocolFlag, + utils.DisableDiffProtocolFlag, + utils.EnableTrustProtocolFlag, utils.RangeLimitFlag, utils.SmartCardDaemonPathFlag, utils.NetworkIdFlag, @@ -50,6 +52,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{ utils.YoloV3Flag, utils.RopstenFlag, utils.SyncModeFlag, + utils.TriesVerifyModeFlag, utils.ExitWhenSyncedFlag, utils.GCModeFlag, utils.TxLookupLimitFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index e7336d73a9..2741b4ad21 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -121,6 +121,14 @@ var ( Name: "disablesnapprotocol", Usage: "Disable snap protocol", } + DisableDiffProtocolFlag = cli.BoolFlag{ + Name: "disablediffprotocol", + Usage: "Disable diff protocol", + } + EnableTrustProtocolFlag = cli.BoolFlag{ + Name: "enabletrustprotocol", + Usage: "Enable trust protocol", + } DiffSyncFlag = cli.BoolFlag{ Name: "diffsync", Usage: "Enable diffy sync, Please note that enable diffsync will improve the syncing speed, " + @@ -259,9 +267,11 @@ var ( Usage: "The layer of tries trees that keep in memory", Value: 128, } - AllowInsecureNoTriesFlag = cli.BoolTFlag{ - Name: "allow-insecure-no-tries", - Usage: `Disable the tries state root verification, the state consistency is no longer 100% guaranteed, diffsync is not allowed if enabled. Do not enable it unless you know exactly what the consequence it will cause.`, + defaultVerifyMode = ethconfig.Defaults.TriesVerifyMode + TriesVerifyModeFlag = TextMarshalerFlag{ + Name: "tries-verify-mode", + Usage: `tries verify mode: "local", "full", "light", "insecure"`, + Value: &defaultVerifyMode, } OverrideBerlinFlag = cli.Uint64Flag{ Name: "override.berlin", @@ -1622,6 +1632,12 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { if ctx.GlobalIsSet(DisableSnapProtocolFlag.Name) { cfg.DisableSnapProtocol = ctx.GlobalBool(DisableSnapProtocolFlag.Name) } + if ctx.GlobalIsSet(DisableDiffProtocolFlag.Name) { + cfg.DisableDiffProtocol = ctx.GlobalIsSet(DisableDiffProtocolFlag.Name) + } + if ctx.GlobalIsSet(EnableTrustProtocolFlag.Name) { + cfg.EnableTrustProtocol = ctx.GlobalIsSet(EnableTrustProtocolFlag.Name) + } if ctx.GlobalIsSet(DiffSyncFlag.Name) { cfg.DiffSync = ctx.GlobalBool(DiffSyncFlag.Name) } @@ -1652,8 +1668,18 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { if ctx.GlobalIsSet(TriesInMemoryFlag.Name) { cfg.TriesInMemory = ctx.GlobalUint64(TriesInMemoryFlag.Name) } - if ctx.GlobalIsSet(AllowInsecureNoTriesFlag.Name) { - cfg.NoTries = ctx.GlobalBool(AllowInsecureNoTriesFlag.Name) + if ctx.GlobalIsSet(TriesVerifyModeFlag.Name) { + cfg.TriesVerifyMode = *GlobalTextMarshaler(ctx, TriesVerifyModeFlag.Name).(*core.VerifyMode) + //If a node sets verify mode to full or light, it's a fast node and need + //to verify blocks from verify nodes, then it should enable trust protocol. + if cfg.TriesVerifyMode == core.FullVerify || cfg.TriesVerifyMode == core.LightVerify { + cfg.EnableTrustProtocol = true + } + //If a node sets verify node but not local, it's a fast node whose difflayer is not integral. + //So fast node should disable diff protocol. + if cfg.TriesVerifyMode != core.LocalVerify { + cfg.DisableDiffProtocol = true + } } if ctx.GlobalIsSet(CacheFlag.Name) || ctx.GlobalIsSet(CacheSnapshotFlag.Name) { cfg.SnapshotCache = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheSnapshotFlag.Name) / 100 diff --git a/core/blockchain.go b/core/blockchain.go index dcac331149..3bffd1872d 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -239,6 +239,7 @@ type BlockChain struct { engine consensus.Engine validator Validator // Block and state validator interface processor Processor // Block transaction processor interface + verifyManager *VerifyManager vmConfig vm.Config shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block. @@ -462,6 +463,16 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par return bc, nil } +func (bc *BlockChain) StartVerify(peers VerifyPeers, allowUntrustedVerify bool) { + bc.verifyManager.peers = peers + bc.verifyManager.allowUntrustedVerify = allowUntrustedVerify + bc.verifyManager.Start() +} + +func (bc *BlockChain) VerifyManger() *VerifyManager { + return bc.verifyManager +} + // GetVMConfig returns the block chain VM config. func (bc *BlockChain) GetVMConfig() *vm.Config { return &bc.vmConfig @@ -1191,6 +1202,9 @@ func (bc *BlockChain) Stop() { close(bc.quit) bc.StopInsert() bc.wg.Wait() + if bc.verifyManager != nil { + bc.verifyManager.Stop() + } // Ensure that the entirety of the state snapshot is journalled to disk. var snapBase common.Hash @@ -2009,6 +2023,11 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er log.Debug("Abort during block processing") break } + //For fast node, if the H-11 block has not been verified, stop processing blocks. + if bc.verifyManager != nil && bc.verifyManager.CheckAncestorVerified(block.Header()) { + log.Debug("Block ancestor has not been verified", "hash", block.Hash(), "number", block.Number()) + break + } // If the header is a banned one, straight out abort if BadHashes[block.Hash()] { bc.reportBlock(block, nil, ErrBlacklistedHash) @@ -2052,6 +2071,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er lastCanon = block continue } + // Retrieve the parent block and it's state to execute on top start := time.Now() @@ -2122,6 +2142,11 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er blockWriteTimer.Update(time.Since(substart)) blockInsertTimer.UpdateSince(start) + //Start a routine to verify this block. + if bc.verifyManager != nil { + bc.verifyManager.NewBlockVerifyTask(block.Header()) + } + switch status { case CanonStatTy: log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(), @@ -3020,8 +3045,15 @@ func EnablePersistDiff(limit uint64) BlockChainOption { } } -func (bc *BlockChain) GetRootByDiffHash(blockNumber uint64, blockHash common.Hash, diffHash common.Hash) (*types.VerifyResult, error) { - var res types.VerifyResult +func EnableVerifyManager() BlockChainOption { + return func(chain *BlockChain) *BlockChain { + chain.verifyManager = NewVerifyManager(chain) + return chain + } +} + +func (bc *BlockChain) GetRootByDiffHash(blockNumber uint64, blockHash common.Hash, diffHash common.Hash) (*VerifyResult, error) { + var res VerifyResult res.BlockNumber = blockNumber res.BlockHash = blockHash @@ -3076,6 +3108,10 @@ func (bc *BlockChain) GetRootByDiffHash(blockNumber uint64, blockHash common.Has return &res, nil } +func (bc *BlockChain) GenerateDiffLayer(blockHash common.Hash) (*types.DiffLayer, error) { + +} + func (bc *BlockChain) GetTrustedDiffLayer(blockHash common.Hash) *types.DiffLayer { var diff *types.DiffLayer if cached, ok := bc.diffLayerCache.Get(blockHash); ok { diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 6489a600fb..912a19f6ff 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -64,6 +64,26 @@ func DeleteCanonicalHash(db ethdb.KeyValueWriter, number uint64) { } } +func ReadTrustBlockHash(db ethdb.Reader, hash common.Hash) bool { + data, _ := db.Get(trustBlockHashKey(hash)) + if len(data) == 0 { + return false + } + return bytes.Equal(data,[]byte{0x01}) +} + +func WriteTrustBlockHash(db ethdb.KeyValueWriter, hashkey common.Hash) { + if err := db.Put(trustBlockHashKey(hashkey),[]byte{0x01}); err != nil { + log.Crit("Failed to store trust block hash") + } +} + +func DeleteTrustBlockHash(db ethdb.KeyValueWriter, hash common.Hash) { + if err := db.Delete(trustBlockHashKey(hash)); err != nil { + log.Crit("Failed to delete trust block hash") + } +} + // ReadAllHashes retrieves all the hashes assigned to blocks at a certain heights, // both canonical and reorged forks included. func ReadAllHashes(db ethdb.Iteratee, number uint64) []common.Hash { diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index b4fb99e451..525614173c 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -93,6 +93,9 @@ var ( // difflayer database diffLayerPrefix = []byte("d") // diffLayerPrefix + hash -> diffLayer + // trust block database + trustBlockPrefix = []byte("tb") // trustBlockPrefix + hash -> verify result + preimagePrefix = []byte("secure-key-") // preimagePrefix + hash -> preimage configPrefix = []byte("ethereum-config-") // config prefix for the db @@ -164,6 +167,10 @@ func headerTDKey(number uint64, hash common.Hash) []byte { func headerHashKey(number uint64) []byte { return append(append(headerPrefix, encodeBlockNumber(number)...), headerHashSuffix...) } +// trustBlockHashKey = trustBlockPrefix + hash +func trustBlockHashKey(hash common.Hash) []byte { + return append(append(trustBlockPrefix, hash.Bytes()...)) +} // headerNumberKey = headerNumberPrefix + hash func headerNumberKey(hash common.Hash) []byte { diff --git a/core/types/block.go b/core/types/block.go index f3c487b684..7230c65007 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -67,13 +67,6 @@ var ( StatusUnexpectedError = VerifyStatus{Code: 0x400, Msg: "can’t verify because of unexpected internal error"} ) -type VerifyResult struct { - Status VerifyStatus - BlockNumber uint64 - BlockHash common.Hash - Root common.Hash -} - // A BlockNonce is a 64-bit hash which proves (combined with the // mix-hash) that a sufficient amount of computation has been carried // out on a block. diff --git a/core/verify_manager.go b/core/verify_manager.go new file mode 100644 index 0000000000..2137376bfa --- /dev/null +++ b/core/verify_manager.go @@ -0,0 +1,150 @@ +package core + +import ( + "fmt" + "github.com/ethereum/go-ethereum/log" + "time" + + lru "github.com/hashicorp/golang-lru" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" +) + +const verifiedCacheSize = 256 + +type VerifyManager struct { + bc *BlockChain + tasks map[common.Hash]*VerifyTask + peers VerifyPeers + verifiedCache *lru.Cache + allowUntrustedVerify bool + verifyCh chan common.Hash + exitCh chan struct{} +} + +func NewVerifyManager(blockchain *BlockChain) *VerifyManager { + verifiedCache, _ := lru.New(verifiedCacheSize) + vm := &VerifyManager{ + bc: blockchain, + tasks: make(map[common.Hash]*VerifyTask), + verifiedCache: verifiedCache, + verifyCh: make(chan common.Hash), + exitCh: make(chan struct{}), + } + return vm +} + +func (vm *VerifyManager) Start() { + //read disk store to initial verified cache + //load unverified blocks in a normalized chain and start a batch of verify task + header := vm.bc.CurrentHeader() + go vm.mainLoop(header) +} + +func (vm *VerifyManager) Stop() { + //stop all the tasks + close(vm.exitCh) +} + +func (vm *VerifyManager) mainLoop(header *types.Header) { + //Start verify task from H to H-11 if need. + vm.NewBlockVerifyTask(header) + prune := time.NewTicker(time.Second) + defer prune.Stop() + for { + select { + case hash := <-vm.verifyCh: + vm.cacheBlockVerified(hash) + rawdb.WriteTrustBlockHash(vm.bc.db, hash) + delete(vm.tasks, hash) + case <-prune.C: + for hash, task := range vm.tasks { + if vm.bc.CurrentHeader().Number.Uint64()-task.blockHeader.Number.Uint64() > 15 { + delete(vm.tasks, hash) + close(task.terminalCh) + } + } + case <-vm.exitCh: + return + } + } +} + +func (vm *VerifyManager) NewBlockVerifyTask(header *types.Header) { + for i := 0; i <= 11; i++ { + hash := header.Hash() + //if verified cache record that this block has been verified, skip. + if _, ok := vm.verifiedCache.Get(hash); ok { + header = vm.bc.GetHeaderByHash(header.ParentHash) + continue + } + //if there already has a verify task for this block, skip. + if _, ok := vm.tasks[hash]; ok { + header = vm.bc.GetHeaderByHash(header.ParentHash) + continue + } + //if verified storage record that this block has been verified, skip. + if rawdb.ReadTrustBlockHash(vm.bc.db, hash) { + vm.cacheBlockVerified(hash) + header = vm.bc.GetHeaderByHash(header.ParentHash) + continue + } + diffLayer := vm.bc.GetTrustedDiffLayer(hash) + //if this block has no diff, there is no need to verify it. + var err error + if diffLayer == nil { + if diffLayer, err = vm.bc.GenerateDiffLayer(hash); err != nil { + log.Error("failed to get diff layer", "block", hash, "number", header.Number, "error", err) + header = vm.bc.GetHeaderByHash(header.ParentHash) + continue + } + } + diffHash, err := GetTrustedDiffHash(diffLayer) + if err != nil { + log.Error("failed to get diff hash", "block", hash, "number", header.Number, "error", err) + header = vm.bc.GetHeaderByHash(header.ParentHash) + continue + } + verifyTask := NewVerifyTask(diffHash, header, vm.peers, vm.bc.db, vm.verifyCh, vm.allowUntrustedVerify) + vm.tasks[hash] = verifyTask + header = vm.bc.GetHeaderByHash(header.ParentHash) + } +} + +func (vm *VerifyManager) cacheBlockVerified(hash common.Hash) { + if vm.verifiedCache.Len() >= verifiedCacheSize { + vm.verifiedCache.RemoveOldest() + } + vm.verifiedCache.Add(hash, true) +} + +//CheckAncestorVerified function check whether H-11 block has been verified or it's a empty block. +//If not, the blockchain should stop to insert new block. +func (vm *VerifyManager) CheckAncestorVerified(header *types.Header) bool { + //find header of H-11 block. + header = vm.bc.GetHeaderByNumber(header.Number.Uint64() - 11) + //If start from genesis block, there has not a H-11 block. + if header == nil { + return true + } + hash := header.Hash() + //check whether H-11 block is a empty block. + parent := vm.bc.GetHeaderByHash(hash) + if header.TxHash == (common.Hash{}) && header.Root == parent.Root { + return true + } + if _, ok := vm.verifiedCache.Get(hash); ok { + return true + } + return rawdb.ReadTrustBlockHash(vm.bc.db, hash) +} + +func (vm *VerifyManager) HandleRootResponse(vr *VerifyResult, pid string) error { + if vt, ok := vm.tasks[vr.BlockHash]; ok { + vt.messageCh <- VerifyMessage{verifyResult: vr, peerId: pid} + return nil + } + return fmt.Errorf("") +} diff --git a/core/verify_modes.go b/core/verify_modes.go new file mode 100644 index 0000000000..8d9b05547f --- /dev/null +++ b/core/verify_modes.go @@ -0,0 +1,69 @@ +package core + +import "fmt" + +type VerifyMode uint32 + +const ( + LocalVerify VerifyMode = iota // + FullVerify + LightVerify + InsecureVerify +) + +func (mode VerifyMode) IsValid() bool { + return mode >= LocalVerify && mode <= InsecureVerify +} + +func (mode VerifyMode) String() string { + switch mode { + case LocalVerify: + return "local" + case FullVerify: + return "full" + case LightVerify: + return "light" + case InsecureVerify: + return "insecure" + default: + return "unknown" + } +} + +func (mode VerifyMode) MarshalText() ([]byte, error) { + switch mode { + case LocalVerify: + return []byte("local"), nil + case FullVerify: + return []byte("full"), nil + case LightVerify: + return []byte("light"), nil + case InsecureVerify: + return []byte("insecure"), nil + default: + return nil, fmt.Errorf("unknown verify mode %d", mode) + } +} + +func (mode *VerifyMode) UnmarshalText(text []byte) error { + switch string(text) { + case "local": + *mode = LocalVerify + case "full": + *mode = FullVerify + case "light": + *mode = LightVerify + case "insecure": + *mode = InsecureVerify + default: + return fmt.Errorf(`unknown sync mode %q, want "full", "light" or "insecure"`, text) + } + return nil +} + +func (mode *VerifyMode) NeedRemoteVerify() bool { + if *mode == FullVerify || *mode == LightVerify { + return true + } + return false +} diff --git a/core/verify_peers.go b/core/verify_peers.go new file mode 100644 index 0000000000..a1ea2eaa9c --- /dev/null +++ b/core/verify_peers.go @@ -0,0 +1,12 @@ +package core + +import "github.com/ethereum/go-ethereum/common" + +type VerifyPeer interface { + RequestRoot(blockNumber uint64, blockHash common.Hash, diffHash common.Hash) error + ID() string +} + +type VerifyPeers interface { + GetVerifyPeers() []VerifyPeer +} diff --git a/core/verify_task.go b/core/verify_task.go new file mode 100644 index 0000000000..59b63b836c --- /dev/null +++ b/core/verify_task.go @@ -0,0 +1,125 @@ +package core + +import ( + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + + "math/rand" + "time" +) + +type VerifyResult struct { + Status types.VerifyStatus + BlockNumber uint64 + BlockHash common.Hash + Root common.Hash +} + + +type VerifyMessage struct { + verifyResult *VerifyResult + peerId string +} + +type VerifyTask struct { + diffhash common.Hash + blockHeader *types.Header + candidatePeers VerifyPeers + BadPeers map[string]struct{} + startAt time.Time + db ethdb.Database + allowUntrustedVerify bool + + messageCh chan VerifyMessage + terminalCh chan struct{} +} + +func NewVerifyTask(diffhash common.Hash, header *types.Header, peers VerifyPeers, db ethdb.Database, verifyCh chan common.Hash, allowUntrustedVerify bool) *VerifyTask { + vt := &VerifyTask{ + diffhash: diffhash, + blockHeader: header, + candidatePeers: peers, + BadPeers: make(map[string]struct{}), + db: db, + allowUntrustedVerify: allowUntrustedVerify, + messageCh: make(chan VerifyMessage), + terminalCh: make(chan struct{}), + } + go vt.Start(verifyCh) + return vt +} + +func (vt *VerifyTask) Start(verifyCh chan common.Hash) { + vt.startAt = time.Now() + + vt.selectPeersToVerify(vt.candidatePeers.GetVerifyPeers(), 3) + resend := time.NewTicker(2 * time.Second) + defer resend.Stop() + for { + select { + case msg := <-vt.messageCh: + switch msg.verifyResult.Status { + case types.StatusFullVerified: + vt.compareRootHashAndWrite(msg, verifyCh) + case types.StatusUntrustedVerified: + log.Warn("block %s , num= %s is untrusted verified", msg.verifyResult.BlockHash, msg.verifyResult.BlockNumber) + if vt.allowUntrustedVerify { + vt.compareRootHashAndWrite(msg, verifyCh) + } + case types.StatusDiffHashMismatch, types.StatusImpossibleFork, types.StatusUnexpectedError: + vt.BadPeers[msg.peerId] = struct{}{} + case types.StatusBlockTooNew, types.StatusBlockNewer, types.StatusPossibleFork: + log.Info("return msg from peer %s for block %s is %s", msg.peerId, msg.verifyResult.BlockHash, msg.verifyResult.Status.Msg) + } + case <-resend.C: + //if a task has run over 300s, try all the vaild peers to verify. + if time.Now().Second()-vt.startAt.Second() < 300 { + vt.selectPeersToVerify(vt.candidatePeers.GetVerifyPeers(), 1) + } else { + vt.selectPeersToVerify(vt.candidatePeers.GetVerifyPeers(), -1) + } + case <-vt.terminalCh: + return + } + } +} + +// selectPeersAndVerify func select at most n peers from (candidatePeers-badPeers) randomly and send verify request. +//when n<0, send to all the peers exclude badPeers. +func (vt *VerifyTask) selectPeersToVerify(candidatePeers []VerifyPeer, n int) { + var validPeers []VerifyPeer + for _, p := range candidatePeers { + if _, ok := vt.BadPeers[p.ID()]; !ok { + validPeers = append(validPeers, p) + } + } + if n < 0 || n >= len(validPeers) { + for _, p := range validPeers { + p.RequestRoot(vt.blockHeader.Number.Uint64(), vt.blockHeader.Hash(), vt.diffhash) + } + return + } + + //if n < len(validPeers), select n peers from validPeers randomly. + for i := 0; i < n; i++ { + s := rand.NewSource(time.Now().Unix()) + r := rand.New(s) + p := validPeers[r.Intn(len(validPeers))] + p.RequestRoot(vt.blockHeader.Number.Uint64(), vt.blockHeader.Hash(), vt.diffhash) + } +} + +func (vt *VerifyTask) compareRootHashAndWrite(msg VerifyMessage, verifyCh chan common.Hash) { + if msg.verifyResult.Root == vt.blockHeader.Root { + blockhash := msg.verifyResult.BlockHash + rawdb.WriteTrustBlockHash(vt.db, blockhash) + //write back to manager so that manager can cache the result and delete this task. + verifyCh <- blockhash + vt.terminalCh <- struct{}{} + } else { + vt.BadPeers[msg.peerId] = struct{}{} + } +} diff --git a/eth/backend.go b/eth/backend.go index 873980fc14..197eab608b 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -26,8 +26,6 @@ import ( "sync/atomic" "time" - "github.com/ethereum/go-ethereum/eth/protocols/trust" - "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -47,6 +45,7 @@ import ( "github.com/ethereum/go-ethereum/eth/protocols/diff" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" + "github.com/ethereum/go-ethereum/eth/protocols/trust" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/internal/ethapi" @@ -111,6 +110,9 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { if !config.SyncMode.IsValid() { return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode) } + if !config.TriesVerifyMode.IsValid() { + return nil, fmt.Errorf("invalid tries verify mode %d", config.TriesVerifyMode) + } if config.Miner.GasPrice == nil || config.Miner.GasPrice.Cmp(common.Big0) <= 0 { log.Warn("Sanitizing invalid miner gas price", "provided", config.Miner.GasPrice, "updated", ethconfig.Defaults.Miner.GasPrice) config.Miner.GasPrice = new(big.Int).Set(ethconfig.Defaults.Miner.GasPrice) @@ -196,19 +198,22 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { TrieDirtyLimit: config.TrieDirtyCache, TrieDirtyDisabled: config.NoPruning, TrieTimeLimit: config.TrieTimeout, - NoTries: config.NoTries, + NoTries: config.TriesVerifyMode != core.LocalVerify, SnapshotLimit: config.SnapshotCache, TriesInMemory: config.TriesInMemory, Preimages: config.Preimages, } ) bcOps := make([]core.BlockChainOption, 0) - if config.DiffSync && !config.NoTries { + if config.DiffSync && config.TriesVerifyMode == core.LocalVerify { bcOps = append(bcOps, core.EnableLightProcessor) } if config.PersistDiff { bcOps = append(bcOps, core.EnablePersistDiff(config.DiffBlock)) } + if config.TriesVerifyMode.NeedRemoteVerify() { + bcOps = append(bcOps, core.EnableVerifyManager()) + } eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, chainConfig, eth.engine, vmConfig, eth.shouldPreserve, &config.TxLookupLimit, bcOps...) if err != nil { return nil, err @@ -250,6 +255,10 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { return nil, err } + if config.TriesVerifyMode.NeedRemoteVerify() { + eth.blockchain.StartVerify(eth.handler.peers, config.TriesVerifyMode == core.LightVerify) + } + eth.miner = miner.New(eth, &config.Miner, chainConfig, eth.EventMux(), eth.engine, eth.isLocalBlock) eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData)) @@ -552,8 +561,12 @@ func (s *Ethereum) Protocols() []p2p.Protocol { protos = append(protos, snap.MakeProtocols((*snapHandler)(s.handler), s.snapDialCandidates)...) } // diff protocol can still open without snap protocol - protos = append(protos, diff.MakeProtocols((*diffHandler)(s.handler), s.snapDialCandidates)...) - protos = append(protos, trust.MakeProtocols((*trustHandler)(s.handler), s.snapDialCandidates)...) + if !s.config.DisableDiffProtocol { + protos = append(protos, diff.MakeProtocols((*diffHandler)(s.handler), s.snapDialCandidates)...) + } + if s.config.EnableTrustProtocol { + protos = append(protos, trust.MakeProtocols((*trustHandler)(s.handler), s.snapDialCandidates)...) + } return protos } diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index d3e146aaaf..1ae713cfd1 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -79,7 +79,7 @@ var Defaults = Config{ TrieDirtyCache: 256, TrieTimeout: 60 * time.Minute, TriesInMemory: 128, - NoTries: false, + TriesVerifyMode: core.LocalVerify, SnapshotCache: 102, DiffBlock: uint64(86400), Miner: miner.Config{ @@ -137,6 +137,8 @@ type Config struct { NoPruning bool // Whether to disable pruning and flush everything to disk DirectBroadcast bool DisableSnapProtocol bool //Whether disable snap protocol + DisableDiffProtocol bool //Whether disable diff protocol + EnableTrustProtocol bool //Whether enable trust protocol DiffSync bool // Whether support diff sync RangeLimit bool @@ -175,7 +177,7 @@ type Config struct { TrieTimeout time.Duration SnapshotCache int TriesInMemory uint64 - NoTries bool + TriesVerifyMode core.VerifyMode Preimages bool // Mining options diff --git a/eth/ethconfig/gen_config.go b/eth/ethconfig/gen_config.go index ba2996279d..d26f206d83 100644 --- a/eth/ethconfig/gen_config.go +++ b/eth/ethconfig/gen_config.go @@ -28,6 +28,8 @@ func (c Config) MarshalTOML() (interface{}, error) { NoPrefetch bool DirectBroadcast bool DisableSnapProtocol bool + DisableDiffProtocol bool + EnableTrustProtocol bool DiffSync bool RangeLimit bool TxLookupLimit uint64 `toml:",omitempty"` @@ -56,7 +58,7 @@ func (c Config) MarshalTOML() (interface{}, error) { TrieTimeout time.Duration SnapshotCache int TriesInMemory uint64 - NoTries bool + TriesVerifyMode core.VerifyMode Preimages bool Miner miner.Config Ethash ethash.Config `toml:",omitempty"` @@ -82,6 +84,8 @@ func (c Config) MarshalTOML() (interface{}, error) { enc.NoPruning = c.NoPruning enc.DirectBroadcast = c.DirectBroadcast enc.DisableSnapProtocol = c.DisableSnapProtocol + enc.DisableDiffProtocol = c.DisableDiffProtocol + enc.EnableTrustProtocol = c.EnableTrustProtocol enc.DiffSync = c.DiffSync enc.RangeLimit = c.RangeLimit enc.TxLookupLimit = c.TxLookupLimit @@ -110,7 +114,7 @@ func (c Config) MarshalTOML() (interface{}, error) { enc.TrieTimeout = c.TrieTimeout enc.SnapshotCache = c.SnapshotCache enc.TriesInMemory = c.TriesInMemory - enc.NoTries = c.NoTries + enc.TriesVerifyMode = c.TriesVerifyMode enc.Preimages = c.Preimages enc.Miner = c.Miner enc.Ethash = c.Ethash @@ -141,6 +145,8 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { NoPrefetch *bool DirectBroadcast *bool DisableSnapProtocol *bool + DisableDiffProtocol *bool + EnableTrustProtocol *bool DiffSync *bool RangeLimit *bool TxLookupLimit *uint64 `toml:",omitempty"` @@ -169,7 +175,7 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { TrieTimeout *time.Duration SnapshotCache *int TriesInMemory *uint64 - NoTries *bool + TriesVerifyMode *core.VerifyMode Preimages *bool Miner *miner.Config Ethash *ethash.Config `toml:",omitempty"` @@ -216,6 +222,12 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { if dec.DisableSnapProtocol != nil { c.DisableSnapProtocol = *dec.DisableSnapProtocol } + if dec.DisableDiffProtocol != nil { + c.DisableDiffProtocol = *dec.DisableDiffProtocol + } + if dec.EnableTrustProtocol != nil { + c.EnableTrustProtocol = *dec.EnableTrustProtocol + } if dec.DiffSync != nil { c.DiffSync = *dec.DiffSync } @@ -300,8 +312,8 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { if dec.TriesInMemory != nil { c.TriesInMemory = *dec.TriesInMemory } - if dec.NoTries != nil { - c.NoTries = *dec.NoTries + if dec.TriesVerifyMode != nil { + c.TriesVerifyMode = *dec.TriesVerifyMode } if dec.Preimages != nil { c.Preimages = *dec.Preimages diff --git a/eth/handler_trust.go b/eth/handler_trust.go index 6df630a2e8..c990aeb8b0 100644 --- a/eth/handler_trust.go +++ b/eth/handler_trust.go @@ -2,7 +2,6 @@ package eth import ( "fmt" - "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/eth/protocols/trust" "github.com/ethereum/go-ethereum/p2p/enode" @@ -34,6 +33,13 @@ func (h *trustHandler) PeerInfo(id enode.ID) interface{} { func (h *trustHandler) Handle(peer *trust.Peer, packet trust.Packet) error { switch packet := packet.(type) { case *trust.RootResponsePacket: + verifyResult := &core.VerifyResult{ + Status: packet.Status, + BlockNumber: packet.BlockNumber, + BlockHash: packet.BlockHash, + Root: packet.Root, + } + h.Chain().VerifyManger().HandleRootResponse(verifyResult, peer.ID()) // TODO: h.bc.VerifyManager().HandleRootResponse(peer.ID(), *packet) return nil diff --git a/eth/peerset.go b/eth/peerset.go index dc1d7da45e..71b8e109ed 100644 --- a/eth/peerset.go +++ b/eth/peerset.go @@ -22,13 +22,13 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum/eth/protocols/trust" - "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/protocols/diff" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" + "github.com/ethereum/go-ethereum/eth/protocols/trust" "github.com/ethereum/go-ethereum/p2p" ) @@ -334,11 +334,11 @@ func (ps *peerSet) GetDiffPeer(pid string) downloader.IDiffPeer { } // GetVerifyPeers returns an array of verify nodes. -func (ps *peerSet) GetVerifyPeers() []*trustPeer { - res := make([]*trustPeer, 0) +func (ps *peerSet) GetVerifyPeers() []core.VerifyPeer { + res := make([]core.VerifyPeer, 0) for _, p := range ps.peers { - if p.trustExt != nil { - res = append(res, p.trustExt) + if p.trustExt != nil && p.trustExt.Peer != nil { + res = append(res, p.trustExt.Peer) } } return res diff --git a/ethclient/ethclient.go b/ethclient/ethclient.go index 395e87fe1d..78f5c6bb14 100644 --- a/ethclient/ethclient.go +++ b/ethclient/ethclient.go @@ -22,6 +22,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/ethereum/go-ethereum/core" "math/big" "github.com/ethereum/go-ethereum" @@ -200,8 +201,8 @@ func (ec *Client) GetDiffAccountsWithScope(ctx context.Context, number *big.Int, return &result, err } -func (ec *Client) GetRootByDiffHash(ctx context.Context, blockNr *big.Int, blockHash common.Hash, diffHash common.Hash) (*types.VerifyResult, error) { - var result types.VerifyResult +func (ec *Client) GetRootByDiffHash(ctx context.Context, blockNr *big.Int, blockHash common.Hash, diffHash common.Hash) (*core.VerifyResult, error) { + var result core.VerifyResult err := ec.c.CallContext(ctx, &result, "eth_getRootByDiffHash", toBlockNumArg(blockNr), blockHash, diffHash) return &result, err } diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index b01ec8b2e5..311b7b88c6 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -1287,7 +1287,7 @@ func (s *PublicBlockChainAPI) GetDiffAccountsWithScope(ctx context.Context, bloc return result, err } -func (s *PublicBlockChainAPI) GetRootByDiffHash(ctx context.Context, blockNr rpc.BlockNumber, blockHash common.Hash, diffHash common.Hash) (*types.VerifyResult, error) { +func (s *PublicBlockChainAPI) GetRootByDiffHash(ctx context.Context, blockNr rpc.BlockNumber, blockHash common.Hash, diffHash common.Hash) (*core.VerifyResult, error) { return s.b.Chain().GetRootByDiffHash(uint64(blockNr), blockHash, diffHash) } From 3bc42621f6cc8751ea2d2656abf8c9ef3f1363c4 Mon Sep 17 00:00:00 2001 From: kyrie-yl Date: Thu, 10 Feb 2022 17:22:23 +0800 Subject: [PATCH 2/4] resolve comments Signed-off-by: kyrie-yl --- cmd/utils/flags.go | 12 +- core/block_validator.go | 41 +++- core/blockchain.go | 36 +--- core/blockchain_insert.go | 6 +- core/rawdb/accessors_chain.go | 8 +- core/rawdb/schema.go | 4 +- core/trie_verify.go | 353 ++++++++++++++++++++++++++++++++++ core/types.go | 11 ++ core/verify_manager.go | 150 --------------- core/verify_modes.go | 69 ------- core/verify_peers.go | 12 -- core/verify_task.go | 125 ------------ eth/backend.go | 9 +- eth/handler_trust.go | 8 +- 14 files changed, 437 insertions(+), 407 deletions(-) create mode 100644 core/trie_verify.go delete mode 100644 core/verify_manager.go delete mode 100644 core/verify_modes.go delete mode 100644 core/verify_peers.go delete mode 100644 core/verify_task.go diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 2741b4ad21..f18b5104d1 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -270,7 +270,7 @@ var ( defaultVerifyMode = ethconfig.Defaults.TriesVerifyMode TriesVerifyModeFlag = TextMarshalerFlag{ Name: "tries-verify-mode", - Usage: `tries verify mode: "local", "full", "light", "insecure"`, + Usage: `tries verify mode: "local", "full", "insecure", "none"`, Value: &defaultVerifyMode, } OverrideBerlinFlag = cli.Uint64Flag{ @@ -1670,13 +1670,13 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { } if ctx.GlobalIsSet(TriesVerifyModeFlag.Name) { cfg.TriesVerifyMode = *GlobalTextMarshaler(ctx, TriesVerifyModeFlag.Name).(*core.VerifyMode) - //If a node sets verify mode to full or light, it's a fast node and need - //to verify blocks from verify nodes, then it should enable trust protocol. - if cfg.TriesVerifyMode == core.FullVerify || cfg.TriesVerifyMode == core.LightVerify { + // If a node sets verify mode to full or light, it's a fast node and need + // to verify blocks from verify nodes, then it should enable trust protocol. + if cfg.TriesVerifyMode.NeedRemoteVerify() { cfg.EnableTrustProtocol = true } - //If a node sets verify node but not local, it's a fast node whose difflayer is not integral. - //So fast node should disable diff protocol. + // If a node sets verify mode but not local, it's a fast node whose difflayer is not integral. + // So fast node should disable diff protocol. if cfg.TriesVerifyMode != core.LocalVerify { cfg.DisableDiffProtocol = true } diff --git a/core/block_validator.go b/core/block_validator.go index bf2fb40260..a1d66d72be 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -34,18 +34,26 @@ type BlockValidator struct { config *params.ChainConfig // Chain configuration options bc *BlockChain // Canonical block chain engine consensus.Engine // Consensus engine used for validating + remoteValidator *VerifyManager } // NewBlockValidator returns a new block validator which is safe for re-use -func NewBlockValidator(config *params.ChainConfig, blockchain *BlockChain, engine consensus.Engine) *BlockValidator { +func NewBlockValidator(config *params.ChainConfig, blockchain *BlockChain, engine consensus.Engine, mode *VerifyMode) *BlockValidator { validator := &BlockValidator{ config: config, engine: engine, bc: blockchain, } + if mode.NeedRemoteVerify() { + validator.remoteValidator = NewVerifyManager(blockchain, *mode == InsecureVerify) + } return validator } +func(v *BlockValidator) RemoteVerifyManager() *VerifyManager{ + return v.remoteValidator +} + // ValidateBody validates the given block's uncles and verifies the block // header's transaction and uncle roots. The headers are assumed to be already // validated at this point. @@ -149,6 +157,37 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD return err } +func (v *BlockValidator) StartRemoteVerify(peers VerifyPeers) { + v.remoteValidator.peers = peers + if v.remoteValidator != nil { + go v.remoteValidator.verifyManagerLoop() + } +} + +func (v * BlockValidator) StopRemoteVerify() { + if v.remoteValidator != nil { + v.remoteValidator.Stop() + } +} + +func (v * BlockValidator) VerifyBlock(header *types.Header) { + if v.remoteValidator != nil { + v.remoteValidator.newTaskCh <- header + } +} + +// ValidateBlockVerify validate that weather the H-11 ancestor of the block has been verified by peers. +// If not, the blockchain should halt. +func (v * BlockValidator) ValidateBlockVerify(block *types.Block) error { + if v.remoteValidator != nil { + header := block.Header() + if !v.remoteValidator.AncestorVerified(v.bc.GetHeaderByNumber(header.Number.Uint64())) { + return fmt.Errorf("block's ancessor %x has not been verified", block.Hash()) + } + } + return nil +} + // CalcGasLimit computes the gas limit of the next block after parent. It aims // to keep the baseline gas above the provided floor, and increase it towards the // ceil if the blocks are full. If the ceil is exceeded, it will always decrease diff --git a/core/blockchain.go b/core/blockchain.go index 3bffd1872d..414be33852 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -239,7 +239,6 @@ type BlockChain struct { engine consensus.Engine validator Validator // Block and state validator interface processor Processor // Block transaction processor interface - verifyManager *VerifyManager vmConfig vm.Config shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block. @@ -300,7 +299,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par diffNumToBlockHashes: make(map[uint64]map[common.Hash]struct{}), diffPeersToDiffHashes: make(map[string]map[common.Hash]struct{}), } - bc.validator = NewBlockValidator(chainConfig, bc, engine) bc.processor = NewStateProcessor(chainConfig, bc, engine) var err error @@ -463,16 +461,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par return bc, nil } -func (bc *BlockChain) StartVerify(peers VerifyPeers, allowUntrustedVerify bool) { - bc.verifyManager.peers = peers - bc.verifyManager.allowUntrustedVerify = allowUntrustedVerify - bc.verifyManager.Start() -} - -func (bc *BlockChain) VerifyManger() *VerifyManager { - return bc.verifyManager -} - // GetVMConfig returns the block chain VM config. func (bc *BlockChain) GetVMConfig() *vm.Config { return &bc.vmConfig @@ -1202,9 +1190,7 @@ func (bc *BlockChain) Stop() { close(bc.quit) bc.StopInsert() bc.wg.Wait() - if bc.verifyManager != nil { - bc.verifyManager.Stop() - } + bc.validator.StopRemoteVerify() // Ensure that the entirety of the state snapshot is journalled to disk. var snapBase common.Hash @@ -2023,11 +2009,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er log.Debug("Abort during block processing") break } - //For fast node, if the H-11 block has not been verified, stop processing blocks. - if bc.verifyManager != nil && bc.verifyManager.CheckAncestorVerified(block.Header()) { - log.Debug("Block ancestor has not been verified", "hash", block.Hash(), "number", block.Number()) - break - } // If the header is a banned one, straight out abort if BadHashes[block.Hash()] { bc.reportBlock(block, nil, ErrBlacklistedHash) @@ -2071,7 +2052,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er lastCanon = block continue } - // Retrieve the parent block and it's state to execute on top start := time.Now() @@ -2143,9 +2123,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er blockInsertTimer.UpdateSince(start) //Start a routine to verify this block. - if bc.verifyManager != nil { - bc.verifyManager.NewBlockVerifyTask(block.Header()) - } + bc.validator.VerifyBlock(block.Header()) switch status { case CanonStatTy: @@ -3045,10 +3023,10 @@ func EnablePersistDiff(limit uint64) BlockChainOption { } } -func EnableVerifyManager() BlockChainOption { - return func(chain *BlockChain) *BlockChain { - chain.verifyManager = NewVerifyManager(chain) - return chain +func EnableBlockValidator(chainConfig *params.ChainConfig, engine consensus.Engine, mode *VerifyMode) BlockChainOption { + return func(bc *BlockChain) *BlockChain { + bc.validator = NewBlockValidator(chainConfig, bc, engine, mode) + return bc } } @@ -3109,7 +3087,7 @@ func (bc *BlockChain) GetRootByDiffHash(blockNumber uint64, blockHash common.Has } func (bc *BlockChain) GenerateDiffLayer(blockHash common.Hash) (*types.DiffLayer, error) { - + return &types.DiffLayer{}, nil } func (bc *BlockChain) GetTrustedDiffLayer(blockHash common.Hash) *types.DiffLayer { diff --git a/core/blockchain_insert.go b/core/blockchain_insert.go index cb8473c084..3bd51825d2 100644 --- a/core/blockchain_insert.go +++ b/core/blockchain_insert.go @@ -118,7 +118,11 @@ func (it *insertIterator) next() (*types.Block, error) { return it.chain[it.index], it.errors[it.index] } // Block header valid, run body validation and return - return it.chain[it.index], it.validator.ValidateBody(it.chain[it.index]) + if err := it.validator.ValidateBody(it.chain[it.index]); err != nil { + return it.chain[it.index], err + } + // Block body valid, run remote verify and return + return it.chain[it.index], it.validator.ValidateBlockVerify(it.chain[it.index]) } // peek returns the next block in the iterator, along with any potential validation diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 912a19f6ff..66f6abbae5 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -64,16 +64,16 @@ func DeleteCanonicalHash(db ethdb.KeyValueWriter, number uint64) { } } -func ReadTrustBlockHash(db ethdb.Reader, hash common.Hash) bool { +func IsTrustBlock(db ethdb.Reader, hash common.Hash) bool { data, _ := db.Get(trustBlockHashKey(hash)) if len(data) == 0 { return false } - return bytes.Equal(data,[]byte{0x01}) + return bytes.Equal(data,[]byte{byteTrue}) } -func WriteTrustBlockHash(db ethdb.KeyValueWriter, hashkey common.Hash) { - if err := db.Put(trustBlockHashKey(hashkey),[]byte{0x01}); err != nil { +func MarkTrustBlock(db ethdb.KeyValueWriter, hashkey common.Hash) { + if err := db.Put(trustBlockHashKey(hashkey),[]byte{byteTrue}); err != nil { log.Crit("Failed to store trust block hash") } } diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index 525614173c..5c9950866a 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -94,7 +94,7 @@ var ( diffLayerPrefix = []byte("d") // diffLayerPrefix + hash -> diffLayer // trust block database - trustBlockPrefix = []byte("tb") // trustBlockPrefix + hash -> verify result + trustBlockPrefix = []byte("trust-block-") // trustBlockPrefix + hash -> verify result preimagePrefix = []byte("secure-key-") // preimagePrefix + hash -> preimage configPrefix = []byte("ethereum-config-") // config prefix for the db @@ -121,6 +121,8 @@ const ( // freezerDifficultyTable indicates the name of the freezer total difficulty table. freezerDifficultyTable = "diffs" + // + byteTrue = 0x01 ) // FreezerNoSnappy configures whether compression is disabled for the ancient-tables. diff --git a/core/trie_verify.go b/core/trie_verify.go new file mode 100644 index 0000000000..1c3f96678d --- /dev/null +++ b/core/trie_verify.go @@ -0,0 +1,353 @@ +package core + +import ( + "fmt" + "math/rand" + "time" + + lru "github.com/hashicorp/golang-lru" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" +) + +const ( + verifiedCacheSize = 256 + maxForkHeight = 11 + resendInterval = 2 * time.Second + // defaultPeerNumber is default number of verify peers + defaultPeerNumber = 3 + // tryAllPeersTime is the time that a block has not been verified and then try all the valid verify peers. + tryAllPeersTime = 15 * time.Second +) + +type VerifyManager struct { + bc *BlockChain + tasks map[common.Hash]*VerifyTask + peers VerifyPeers + verifiedCache *lru.Cache + allowUntrustedVerify bool + newTaskCh chan *types.Header + verifyCh chan common.Hash + messageCh chan VerifyMessage + exitCh chan struct{} +} + +func NewVerifyManager(blockchain *BlockChain, allowUntrustedVerify bool) *VerifyManager { + verifiedCache, _ := lru.New(verifiedCacheSize) + vm := &VerifyManager{ + bc: blockchain, + tasks: make(map[common.Hash]*VerifyTask), + verifiedCache: verifiedCache, + newTaskCh: make(chan *types.Header), + verifyCh: make(chan common.Hash), + messageCh: make(chan VerifyMessage), + exitCh: make(chan struct{}), + allowUntrustedVerify: allowUntrustedVerify, + } + return vm +} + +func (vm *VerifyManager) verifyManagerLoop() { + // read disk store to initial verified cache + // load unverified blocks in a normalized chain and start a batch of verify task + header := vm.bc.CurrentHeader() + // Start verify task from H to H-11 if need. + vm.NewBlockVerifyTask(header) + prune := time.NewTicker(time.Second) + defer prune.Stop() + for { + select { + case h := <-vm.newTaskCh: + vm.NewBlockVerifyTask(h) + case hash := <-vm.verifyCh: + vm.cacheBlockVerified(hash) + rawdb.MarkTrustBlock(vm.bc.db, hash) + if task, ok := vm.tasks[hash]; ok { + delete(vm.tasks, hash) + close(task.terminalCh) + } + case <-prune.C: + for hash, task := range vm.tasks { + if vm.bc.CurrentHeader().Number.Uint64()-task.blockHeader.Number.Uint64() > 15 { + delete(vm.tasks, hash) + close(task.terminalCh) + } + } + case message := <-vm.messageCh: + if vt, ok := vm.tasks[message.verifyResult.BlockHash]; ok { + vt.messageCh <- message + } + case <-vm.exitCh: + return + } + } +} + +func (vm *VerifyManager) Stop() { + // stop all the tasks + for _, task := range vm.tasks { + close(task.terminalCh) + } + close(vm.exitCh) +} + +func (vm *VerifyManager) NewBlockVerifyTask(header *types.Header) { + for i := 0; header != nil && i <= maxForkHeight; i++ { + func(hash common.Hash){ + // if verified cache record that this block has been verified, skip. + if _, ok := vm.verifiedCache.Get(hash); ok { + return + } + // if there already has a verify task for this block, skip. + if _, ok := vm.tasks[hash]; ok { + return + } + // if verified storage record that this block has been verified, skip. + if rawdb.IsTrustBlock(vm.bc.db, hash) { + vm.cacheBlockVerified(hash) + return + } + diffLayer := vm.bc.GetTrustedDiffLayer(hash) + // if this block has no diff, there is no need to verify it. + var err error + if diffLayer == nil { + if diffLayer, err = vm.bc.GenerateDiffLayer(hash); err != nil { + log.Error("failed to get diff layer", "block", hash, "number", header.Number, "error", err) + return + } + } + diffHash, err := GetTrustedDiffHash(diffLayer) + if err != nil { + log.Error("failed to get diff hash", "block", hash, "number", header.Number, "error", err) + return + } + verifyTask := NewVerifyTask(diffHash, header, vm.peers, vm.bc.db, vm.verifyCh, vm.allowUntrustedVerify) + vm.tasks[hash] = verifyTask + }(header.Hash()) + header = vm.bc.GetHeaderByHash(header.ParentHash) + } +} + +func (vm *VerifyManager) cacheBlockVerified(hash common.Hash) { + if vm.verifiedCache.Len() >= verifiedCacheSize { + vm.verifiedCache.RemoveOldest() + } + vm.verifiedCache.Add(hash, true) +} + +// AncestorVerified function check block has been verified or it's a empty block. +func (vm *VerifyManager) AncestorVerified(header *types.Header) bool { + // find header of H-11 block. + header = vm.bc.GetHeaderByNumber(header.Number.Uint64() - maxForkHeight) + // If start from genesis block, there has not a H-11 block. + if header == nil { + return true + } + // check whether H-11 block is a empty block. + if header.TxHash == types.EmptyRootHash { + parent := vm.bc.GetHeaderByHash(header.ParentHash) + if header.Root == parent.Root { + return true + } + } + hash := header.Hash() + if _, ok := vm.verifiedCache.Get(hash); ok { + return true + } + return rawdb.IsTrustBlock(vm.bc.db, hash) +} + +func (vm *VerifyManager) HandleRootResponse(vr *VerifyResult, pid string) error { + vm.messageCh <- VerifyMessage{verifyResult: vr, peerId: pid} + return nil +} + +type VerifyResult struct { + Status types.VerifyStatus + BlockNumber uint64 + BlockHash common.Hash + Root common.Hash +} + +type VerifyMessage struct { + verifyResult *VerifyResult + peerId string +} + +type VerifyTask struct { + diffhash common.Hash + blockHeader *types.Header + candidatePeers VerifyPeers + BadPeers map[string]struct{} + startAt time.Time + db ethdb.Database + allowUntrustedVerify bool + + messageCh chan VerifyMessage + terminalCh chan struct{} +} + +func NewVerifyTask(diffhash common.Hash, header *types.Header, peers VerifyPeers, db ethdb.Database, verifyCh chan common.Hash, allowUntrustedVerify bool) *VerifyTask { + vt := &VerifyTask{ + diffhash: diffhash, + blockHeader: header, + candidatePeers: peers, + BadPeers: make(map[string]struct{}), + db: db, + allowUntrustedVerify: allowUntrustedVerify, + messageCh: make(chan VerifyMessage), + terminalCh: make(chan struct{}), + } + go vt.Start(verifyCh) + return vt +} + +func (vt *VerifyTask) Start(verifyCh chan common.Hash) { + vt.startAt = time.Now() + + vt.selectPeersToVerify(defaultPeerNumber) + resend := time.NewTicker(resendInterval) + defer resend.Stop() + for { + select { + case msg := <-vt.messageCh: + switch msg.verifyResult.Status { + case types.StatusFullVerified: + vt.compareRootHashAndWrite(msg, verifyCh) + case types.StatusUntrustedVerified: + log.Warn("block %s , num= %s is untrusted verified", msg.verifyResult.BlockHash, msg.verifyResult.BlockNumber) + if vt.allowUntrustedVerify { + vt.compareRootHashAndWrite(msg, verifyCh) + } + case types.StatusDiffHashMismatch, types.StatusImpossibleFork, types.StatusUnexpectedError: + vt.BadPeers[msg.peerId] = struct{}{} + log.Debug("peer %s is not available: %s", msg.peerId, msg.verifyResult.Status.Msg) + case types.StatusBlockTooNew, types.StatusBlockNewer, types.StatusPossibleFork: + log.Info("return msg from peer %s for block %s is %s", msg.peerId, msg.verifyResult.BlockHash, msg.verifyResult.Status.Msg) + } + case <-resend.C: + // if a task has run over 15s, try all the vaild peers to verify. + if time.Since(vt.startAt) < tryAllPeersTime { + vt.selectPeersToVerify(1) + } else { + vt.selectPeersToVerify(-1) + } + case <-vt.terminalCh: + return + } + } +} + +// selectPeersAndVerify func select at most n peers from (candidatePeers-badPeers) randomly and send verify request. +// when n<0, send to all the peers exclude badPeers. +func (vt *VerifyTask) selectPeersToVerify(n int) { + var validPeers []VerifyPeer + candidatePeers := vt.candidatePeers.GetVerifyPeers() + for _, p := range candidatePeers { + if _, ok := vt.BadPeers[p.ID()]; !ok { + validPeers = append(validPeers, p) + } + } + // if + if n < 0 || n >= len(validPeers) { + for _, p := range validPeers { + p.RequestRoot(vt.blockHeader.Number.Uint64(), vt.blockHeader.Hash(), vt.diffhash) + } + return + } + + // if n < len(validPeers), select n peers from validPeers randomly. + rand.Seed(time.Now().UnixNano()) + rand.Shuffle(len(validPeers), func(i, j int) { validPeers[i], validPeers[j] = validPeers[j], validPeers[i] }) + for i := 0; i < n; i++ { + p := validPeers[i] + p.RequestRoot(vt.blockHeader.Number.Uint64(), vt.blockHeader.Hash(), vt.diffhash) + } +} + +func (vt *VerifyTask) compareRootHashAndWrite(msg VerifyMessage, verifyCh chan common.Hash) { + if msg.verifyResult.Root == vt.blockHeader.Root { + blockhash := msg.verifyResult.BlockHash + rawdb.MarkTrustBlock(vt.db, blockhash) + // write back to manager so that manager can cache the result and delete this task. + verifyCh <- blockhash + } else { + vt.BadPeers[msg.peerId] = struct{}{} + } +} + +type VerifyPeer interface { + RequestRoot(blockNumber uint64, blockHash common.Hash, diffHash common.Hash) error + ID() string +} + +type VerifyPeers interface { + GetVerifyPeers() []VerifyPeer +} + +type VerifyMode uint32 + +const ( + LocalVerify VerifyMode = iota // + FullVerify + InsecureVerify + NoneVerify +) + +func (mode VerifyMode) IsValid() bool { + return mode >= LocalVerify && mode <= NoneVerify +} + +func (mode VerifyMode) String() string { + switch mode { + case LocalVerify: + return "local" + case FullVerify: + return "full" + case InsecureVerify: + return "insecure" + case NoneVerify: + return "none" + default: + return "unknown" + } +} + +func (mode VerifyMode) MarshalText() ([]byte, error) { + switch mode { + case LocalVerify: + return []byte("local"), nil + case FullVerify: + return []byte("full"), nil + case InsecureVerify: + return []byte("insecure"), nil + case NoneVerify: + return []byte("none"), nil + default: + return nil, fmt.Errorf("unknown verify mode %d", mode) + } +} + +func (mode *VerifyMode) UnmarshalText(text []byte) error { + switch string(text) { + case "local": + *mode = LocalVerify + case "full": + *mode = FullVerify + case "insecure": + *mode = InsecureVerify + case "none": + *mode = NoneVerify + default: + return fmt.Errorf(`unknown sync mode %q, want "full", "light" or "insecure"`, text) + } + return nil +} + +func (mode *VerifyMode) NeedRemoteVerify() bool { + return *mode == FullVerify || *mode == InsecureVerify +} diff --git a/core/types.go b/core/types.go index 49bd58e086..fb4b215926 100644 --- a/core/types.go +++ b/core/types.go @@ -32,6 +32,17 @@ type Validator interface { // ValidateState validates the given statedb and optionally the receipts and // gas used. ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64) error + + // StartRemoteVerify start the remote verify routine for given peers. + StartRemoteVerify(peers VerifyPeers) + // StopRemoteVerify stop the remote verify routine. + StopRemoteVerify() + // VerifyBlock verify the given blcok. + VerifyBlock(header *types.Header) + // ValidateBlockVerify validate the given block has been verified. + ValidateBlockVerify(block *types.Block) error + // RemoteVerifyManager return remoteVerifyManager of validator. + RemoteVerifyManager() *VerifyManager } // Prefetcher is an interface for pre-caching transaction signatures and state. diff --git a/core/verify_manager.go b/core/verify_manager.go deleted file mode 100644 index 2137376bfa..0000000000 --- a/core/verify_manager.go +++ /dev/null @@ -1,150 +0,0 @@ -package core - -import ( - "fmt" - "github.com/ethereum/go-ethereum/log" - "time" - - lru "github.com/hashicorp/golang-lru" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/rawdb" - "github.com/ethereum/go-ethereum/core/types" -) - -const verifiedCacheSize = 256 - -type VerifyManager struct { - bc *BlockChain - tasks map[common.Hash]*VerifyTask - peers VerifyPeers - verifiedCache *lru.Cache - allowUntrustedVerify bool - verifyCh chan common.Hash - exitCh chan struct{} -} - -func NewVerifyManager(blockchain *BlockChain) *VerifyManager { - verifiedCache, _ := lru.New(verifiedCacheSize) - vm := &VerifyManager{ - bc: blockchain, - tasks: make(map[common.Hash]*VerifyTask), - verifiedCache: verifiedCache, - verifyCh: make(chan common.Hash), - exitCh: make(chan struct{}), - } - return vm -} - -func (vm *VerifyManager) Start() { - //read disk store to initial verified cache - //load unverified blocks in a normalized chain and start a batch of verify task - header := vm.bc.CurrentHeader() - go vm.mainLoop(header) -} - -func (vm *VerifyManager) Stop() { - //stop all the tasks - close(vm.exitCh) -} - -func (vm *VerifyManager) mainLoop(header *types.Header) { - //Start verify task from H to H-11 if need. - vm.NewBlockVerifyTask(header) - prune := time.NewTicker(time.Second) - defer prune.Stop() - for { - select { - case hash := <-vm.verifyCh: - vm.cacheBlockVerified(hash) - rawdb.WriteTrustBlockHash(vm.bc.db, hash) - delete(vm.tasks, hash) - case <-prune.C: - for hash, task := range vm.tasks { - if vm.bc.CurrentHeader().Number.Uint64()-task.blockHeader.Number.Uint64() > 15 { - delete(vm.tasks, hash) - close(task.terminalCh) - } - } - case <-vm.exitCh: - return - } - } -} - -func (vm *VerifyManager) NewBlockVerifyTask(header *types.Header) { - for i := 0; i <= 11; i++ { - hash := header.Hash() - //if verified cache record that this block has been verified, skip. - if _, ok := vm.verifiedCache.Get(hash); ok { - header = vm.bc.GetHeaderByHash(header.ParentHash) - continue - } - //if there already has a verify task for this block, skip. - if _, ok := vm.tasks[hash]; ok { - header = vm.bc.GetHeaderByHash(header.ParentHash) - continue - } - //if verified storage record that this block has been verified, skip. - if rawdb.ReadTrustBlockHash(vm.bc.db, hash) { - vm.cacheBlockVerified(hash) - header = vm.bc.GetHeaderByHash(header.ParentHash) - continue - } - diffLayer := vm.bc.GetTrustedDiffLayer(hash) - //if this block has no diff, there is no need to verify it. - var err error - if diffLayer == nil { - if diffLayer, err = vm.bc.GenerateDiffLayer(hash); err != nil { - log.Error("failed to get diff layer", "block", hash, "number", header.Number, "error", err) - header = vm.bc.GetHeaderByHash(header.ParentHash) - continue - } - } - diffHash, err := GetTrustedDiffHash(diffLayer) - if err != nil { - log.Error("failed to get diff hash", "block", hash, "number", header.Number, "error", err) - header = vm.bc.GetHeaderByHash(header.ParentHash) - continue - } - verifyTask := NewVerifyTask(diffHash, header, vm.peers, vm.bc.db, vm.verifyCh, vm.allowUntrustedVerify) - vm.tasks[hash] = verifyTask - header = vm.bc.GetHeaderByHash(header.ParentHash) - } -} - -func (vm *VerifyManager) cacheBlockVerified(hash common.Hash) { - if vm.verifiedCache.Len() >= verifiedCacheSize { - vm.verifiedCache.RemoveOldest() - } - vm.verifiedCache.Add(hash, true) -} - -//CheckAncestorVerified function check whether H-11 block has been verified or it's a empty block. -//If not, the blockchain should stop to insert new block. -func (vm *VerifyManager) CheckAncestorVerified(header *types.Header) bool { - //find header of H-11 block. - header = vm.bc.GetHeaderByNumber(header.Number.Uint64() - 11) - //If start from genesis block, there has not a H-11 block. - if header == nil { - return true - } - hash := header.Hash() - //check whether H-11 block is a empty block. - parent := vm.bc.GetHeaderByHash(hash) - if header.TxHash == (common.Hash{}) && header.Root == parent.Root { - return true - } - if _, ok := vm.verifiedCache.Get(hash); ok { - return true - } - return rawdb.ReadTrustBlockHash(vm.bc.db, hash) -} - -func (vm *VerifyManager) HandleRootResponse(vr *VerifyResult, pid string) error { - if vt, ok := vm.tasks[vr.BlockHash]; ok { - vt.messageCh <- VerifyMessage{verifyResult: vr, peerId: pid} - return nil - } - return fmt.Errorf("") -} diff --git a/core/verify_modes.go b/core/verify_modes.go deleted file mode 100644 index 8d9b05547f..0000000000 --- a/core/verify_modes.go +++ /dev/null @@ -1,69 +0,0 @@ -package core - -import "fmt" - -type VerifyMode uint32 - -const ( - LocalVerify VerifyMode = iota // - FullVerify - LightVerify - InsecureVerify -) - -func (mode VerifyMode) IsValid() bool { - return mode >= LocalVerify && mode <= InsecureVerify -} - -func (mode VerifyMode) String() string { - switch mode { - case LocalVerify: - return "local" - case FullVerify: - return "full" - case LightVerify: - return "light" - case InsecureVerify: - return "insecure" - default: - return "unknown" - } -} - -func (mode VerifyMode) MarshalText() ([]byte, error) { - switch mode { - case LocalVerify: - return []byte("local"), nil - case FullVerify: - return []byte("full"), nil - case LightVerify: - return []byte("light"), nil - case InsecureVerify: - return []byte("insecure"), nil - default: - return nil, fmt.Errorf("unknown verify mode %d", mode) - } -} - -func (mode *VerifyMode) UnmarshalText(text []byte) error { - switch string(text) { - case "local": - *mode = LocalVerify - case "full": - *mode = FullVerify - case "light": - *mode = LightVerify - case "insecure": - *mode = InsecureVerify - default: - return fmt.Errorf(`unknown sync mode %q, want "full", "light" or "insecure"`, text) - } - return nil -} - -func (mode *VerifyMode) NeedRemoteVerify() bool { - if *mode == FullVerify || *mode == LightVerify { - return true - } - return false -} diff --git a/core/verify_peers.go b/core/verify_peers.go deleted file mode 100644 index a1ea2eaa9c..0000000000 --- a/core/verify_peers.go +++ /dev/null @@ -1,12 +0,0 @@ -package core - -import "github.com/ethereum/go-ethereum/common" - -type VerifyPeer interface { - RequestRoot(blockNumber uint64, blockHash common.Hash, diffHash common.Hash) error - ID() string -} - -type VerifyPeers interface { - GetVerifyPeers() []VerifyPeer -} diff --git a/core/verify_task.go b/core/verify_task.go deleted file mode 100644 index 59b63b836c..0000000000 --- a/core/verify_task.go +++ /dev/null @@ -1,125 +0,0 @@ -package core - -import ( - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/rawdb" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/log" - - "math/rand" - "time" -) - -type VerifyResult struct { - Status types.VerifyStatus - BlockNumber uint64 - BlockHash common.Hash - Root common.Hash -} - - -type VerifyMessage struct { - verifyResult *VerifyResult - peerId string -} - -type VerifyTask struct { - diffhash common.Hash - blockHeader *types.Header - candidatePeers VerifyPeers - BadPeers map[string]struct{} - startAt time.Time - db ethdb.Database - allowUntrustedVerify bool - - messageCh chan VerifyMessage - terminalCh chan struct{} -} - -func NewVerifyTask(diffhash common.Hash, header *types.Header, peers VerifyPeers, db ethdb.Database, verifyCh chan common.Hash, allowUntrustedVerify bool) *VerifyTask { - vt := &VerifyTask{ - diffhash: diffhash, - blockHeader: header, - candidatePeers: peers, - BadPeers: make(map[string]struct{}), - db: db, - allowUntrustedVerify: allowUntrustedVerify, - messageCh: make(chan VerifyMessage), - terminalCh: make(chan struct{}), - } - go vt.Start(verifyCh) - return vt -} - -func (vt *VerifyTask) Start(verifyCh chan common.Hash) { - vt.startAt = time.Now() - - vt.selectPeersToVerify(vt.candidatePeers.GetVerifyPeers(), 3) - resend := time.NewTicker(2 * time.Second) - defer resend.Stop() - for { - select { - case msg := <-vt.messageCh: - switch msg.verifyResult.Status { - case types.StatusFullVerified: - vt.compareRootHashAndWrite(msg, verifyCh) - case types.StatusUntrustedVerified: - log.Warn("block %s , num= %s is untrusted verified", msg.verifyResult.BlockHash, msg.verifyResult.BlockNumber) - if vt.allowUntrustedVerify { - vt.compareRootHashAndWrite(msg, verifyCh) - } - case types.StatusDiffHashMismatch, types.StatusImpossibleFork, types.StatusUnexpectedError: - vt.BadPeers[msg.peerId] = struct{}{} - case types.StatusBlockTooNew, types.StatusBlockNewer, types.StatusPossibleFork: - log.Info("return msg from peer %s for block %s is %s", msg.peerId, msg.verifyResult.BlockHash, msg.verifyResult.Status.Msg) - } - case <-resend.C: - //if a task has run over 300s, try all the vaild peers to verify. - if time.Now().Second()-vt.startAt.Second() < 300 { - vt.selectPeersToVerify(vt.candidatePeers.GetVerifyPeers(), 1) - } else { - vt.selectPeersToVerify(vt.candidatePeers.GetVerifyPeers(), -1) - } - case <-vt.terminalCh: - return - } - } -} - -// selectPeersAndVerify func select at most n peers from (candidatePeers-badPeers) randomly and send verify request. -//when n<0, send to all the peers exclude badPeers. -func (vt *VerifyTask) selectPeersToVerify(candidatePeers []VerifyPeer, n int) { - var validPeers []VerifyPeer - for _, p := range candidatePeers { - if _, ok := vt.BadPeers[p.ID()]; !ok { - validPeers = append(validPeers, p) - } - } - if n < 0 || n >= len(validPeers) { - for _, p := range validPeers { - p.RequestRoot(vt.blockHeader.Number.Uint64(), vt.blockHeader.Hash(), vt.diffhash) - } - return - } - - //if n < len(validPeers), select n peers from validPeers randomly. - for i := 0; i < n; i++ { - s := rand.NewSource(time.Now().Unix()) - r := rand.New(s) - p := validPeers[r.Intn(len(validPeers))] - p.RequestRoot(vt.blockHeader.Number.Uint64(), vt.blockHeader.Hash(), vt.diffhash) - } -} - -func (vt *VerifyTask) compareRootHashAndWrite(msg VerifyMessage, verifyCh chan common.Hash) { - if msg.verifyResult.Root == vt.blockHeader.Root { - blockhash := msg.verifyResult.BlockHash - rawdb.WriteTrustBlockHash(vt.db, blockhash) - //write back to manager so that manager can cache the result and delete this task. - verifyCh <- blockhash - vt.terminalCh <- struct{}{} - } else { - vt.BadPeers[msg.peerId] = struct{}{} - } -} diff --git a/eth/backend.go b/eth/backend.go index 197eab608b..7b00e43448 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -211,9 +211,8 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { if config.PersistDiff { bcOps = append(bcOps, core.EnablePersistDiff(config.DiffBlock)) } - if config.TriesVerifyMode.NeedRemoteVerify() { - bcOps = append(bcOps, core.EnableVerifyManager()) - } + + bcOps = append(bcOps, core.EnableBlockValidator(chainConfig, eth.engine, &config.TriesVerifyMode)) eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, chainConfig, eth.engine, vmConfig, eth.shouldPreserve, &config.TxLookupLimit, bcOps...) if err != nil { return nil, err @@ -255,9 +254,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { return nil, err } - if config.TriesVerifyMode.NeedRemoteVerify() { - eth.blockchain.StartVerify(eth.handler.peers, config.TriesVerifyMode == core.LightVerify) - } + eth.blockchain.Validator().StartRemoteVerify(eth.handler.peers) eth.miner = miner.New(eth, &config.Miner, chainConfig, eth.EventMux(), eth.engine, eth.isLocalBlock) eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData)) diff --git a/eth/handler_trust.go b/eth/handler_trust.go index c990aeb8b0..52100144e2 100644 --- a/eth/handler_trust.go +++ b/eth/handler_trust.go @@ -39,9 +39,11 @@ func (h *trustHandler) Handle(peer *trust.Peer, packet trust.Packet) error { BlockHash: packet.BlockHash, Root: packet.Root, } - h.Chain().VerifyManger().HandleRootResponse(verifyResult, peer.ID()) - // TODO: h.bc.VerifyManager().HandleRootResponse(peer.ID(), *packet) - return nil + if vm := h.Chain().Validator().RemoteVerifyManager(); vm != nil { + vm.HandleRootResponse(verifyResult, peer.ID()) + return nil + } + return fmt.Errorf("verify manager is nil which is unexpected") default: return fmt.Errorf("unexpected trust packet type: %T", packet) From 1af60b32bf488f5d28489ad2e6b199489bbba230 Mon Sep 17 00:00:00 2001 From: kyrie-yl Date: Fri, 11 Feb 2022 17:36:25 +0800 Subject: [PATCH 3/4] resolve comments from second review Signed-off-by: kyrie-yl --- cmd/utils/flags.go | 5 - core/block_validator.go | 55 ++--- core/blockchain.go | 8 +- core/blockchain_insert.go | 6 +- core/rawdb/accessors_chain.go | 20 -- core/rawdb/schema.go | 9 - ...rie_verify.go => remote_state_verifier.go} | 192 +++++++++--------- core/types.go | 11 +- eth/backend.go | 6 +- eth/handler.go | 3 +- 10 files changed, 123 insertions(+), 192 deletions(-) rename core/{trie_verify.go => remote_state_verifier.go} (63%) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index f18b5104d1..3f10a75fb3 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1675,11 +1675,6 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { if cfg.TriesVerifyMode.NeedRemoteVerify() { cfg.EnableTrustProtocol = true } - // If a node sets verify mode but not local, it's a fast node whose difflayer is not integral. - // So fast node should disable diff protocol. - if cfg.TriesVerifyMode != core.LocalVerify { - cfg.DisableDiffProtocol = true - } } if ctx.GlobalIsSet(CacheFlag.Name) || ctx.GlobalIsSet(CacheSnapshotFlag.Name) { cfg.SnapshotCache = ctx.GlobalInt(CacheFlag.Name) * ctx.GlobalInt(CacheSnapshotFlag.Name) / 100 diff --git a/core/block_validator.go b/core/block_validator.go index a1d66d72be..eee9b89cf8 100644 --- a/core/block_validator.go +++ b/core/block_validator.go @@ -31,29 +31,26 @@ import ( // // BlockValidator implements Validator. type BlockValidator struct { - config *params.ChainConfig // Chain configuration options - bc *BlockChain // Canonical block chain - engine consensus.Engine // Consensus engine used for validating - remoteValidator *VerifyManager + config *params.ChainConfig // Chain configuration options + bc *BlockChain // Canonical block chain + engine consensus.Engine // Consensus engine used for validating + remoteValidator *remoteVerifyManager } // NewBlockValidator returns a new block validator which is safe for re-use -func NewBlockValidator(config *params.ChainConfig, blockchain *BlockChain, engine consensus.Engine, mode *VerifyMode) *BlockValidator { +func NewBlockValidator(config *params.ChainConfig, blockchain *BlockChain, engine consensus.Engine, mode VerifyMode, peers verifyPeers) *BlockValidator { validator := &BlockValidator{ config: config, engine: engine, bc: blockchain, } if mode.NeedRemoteVerify() { - validator.remoteValidator = NewVerifyManager(blockchain, *mode == InsecureVerify) + validator.remoteValidator = NewVerifyManager(blockchain, peers, mode == InsecureVerify) + go validator.remoteValidator.mainLoop() } return validator } -func(v *BlockValidator) RemoteVerifyManager() *VerifyManager{ - return v.remoteValidator -} - // ValidateBody validates the given block's uncles and verifies the block // header's transaction and uncle roots. The headers are assumed to be already // validated at this point. @@ -87,6 +84,13 @@ func (v *BlockValidator) ValidateBody(block *types.Block) error { } return nil }, + // for fast node which verify trie from remote verify peers, a block's H-11 ancestor should have been verify. + func() error { + if v.remoteValidator != nil && !v.remoteValidator.AncestorVerified(v.bc.GetHeaderByNumber(header.Number.Uint64())) { + return fmt.Errorf("block's ancessor %x has not been verified", block.Hash()) + } + return nil + }, } validateRes := make(chan error, len(validateFuns)) for _, f := range validateFuns { @@ -157,35 +161,8 @@ func (v *BlockValidator) ValidateState(block *types.Block, statedb *state.StateD return err } -func (v *BlockValidator) StartRemoteVerify(peers VerifyPeers) { - v.remoteValidator.peers = peers - if v.remoteValidator != nil { - go v.remoteValidator.verifyManagerLoop() - } -} - -func (v * BlockValidator) StopRemoteVerify() { - if v.remoteValidator != nil { - v.remoteValidator.Stop() - } -} - -func (v * BlockValidator) VerifyBlock(header *types.Header) { - if v.remoteValidator != nil { - v.remoteValidator.newTaskCh <- header - } -} - -// ValidateBlockVerify validate that weather the H-11 ancestor of the block has been verified by peers. -// If not, the blockchain should halt. -func (v * BlockValidator) ValidateBlockVerify(block *types.Block) error { - if v.remoteValidator != nil { - header := block.Header() - if !v.remoteValidator.AncestorVerified(v.bc.GetHeaderByNumber(header.Number.Uint64())) { - return fmt.Errorf("block's ancessor %x has not been verified", block.Hash()) - } - } - return nil +func (v *BlockValidator) RemoteVerifyManager() *remoteVerifyManager { + return v.remoteValidator } // CalcGasLimit computes the gas limit of the next block after parent. It aims diff --git a/core/blockchain.go b/core/blockchain.go index 414be33852..5fc56a8ff4 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1190,7 +1190,6 @@ func (bc *BlockChain) Stop() { close(bc.quit) bc.StopInsert() bc.wg.Wait() - bc.validator.StopRemoteVerify() // Ensure that the entirety of the state snapshot is journalled to disk. var snapBase common.Hash @@ -2122,9 +2121,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er blockWriteTimer.Update(time.Since(substart)) blockInsertTimer.UpdateSince(start) - //Start a routine to verify this block. - bc.validator.VerifyBlock(block.Header()) - switch status { case CanonStatTy: log.Debug("Inserted new block", "number", block.Number(), "hash", block.Hash(), @@ -3023,9 +3019,9 @@ func EnablePersistDiff(limit uint64) BlockChainOption { } } -func EnableBlockValidator(chainConfig *params.ChainConfig, engine consensus.Engine, mode *VerifyMode) BlockChainOption { +func EnableBlockValidator(chainConfig *params.ChainConfig, engine consensus.Engine, mode VerifyMode, peers verifyPeers) BlockChainOption { return func(bc *BlockChain) *BlockChain { - bc.validator = NewBlockValidator(chainConfig, bc, engine, mode) + bc.validator = NewBlockValidator(chainConfig, bc, engine, mode, peers) return bc } } diff --git a/core/blockchain_insert.go b/core/blockchain_insert.go index 3bd51825d2..cb8473c084 100644 --- a/core/blockchain_insert.go +++ b/core/blockchain_insert.go @@ -118,11 +118,7 @@ func (it *insertIterator) next() (*types.Block, error) { return it.chain[it.index], it.errors[it.index] } // Block header valid, run body validation and return - if err := it.validator.ValidateBody(it.chain[it.index]); err != nil { - return it.chain[it.index], err - } - // Block body valid, run remote verify and return - return it.chain[it.index], it.validator.ValidateBlockVerify(it.chain[it.index]) + return it.chain[it.index], it.validator.ValidateBody(it.chain[it.index]) } // peek returns the next block in the iterator, along with any potential validation diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 66f6abbae5..6489a600fb 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -64,26 +64,6 @@ func DeleteCanonicalHash(db ethdb.KeyValueWriter, number uint64) { } } -func IsTrustBlock(db ethdb.Reader, hash common.Hash) bool { - data, _ := db.Get(trustBlockHashKey(hash)) - if len(data) == 0 { - return false - } - return bytes.Equal(data,[]byte{byteTrue}) -} - -func MarkTrustBlock(db ethdb.KeyValueWriter, hashkey common.Hash) { - if err := db.Put(trustBlockHashKey(hashkey),[]byte{byteTrue}); err != nil { - log.Crit("Failed to store trust block hash") - } -} - -func DeleteTrustBlockHash(db ethdb.KeyValueWriter, hash common.Hash) { - if err := db.Delete(trustBlockHashKey(hash)); err != nil { - log.Crit("Failed to delete trust block hash") - } -} - // ReadAllHashes retrieves all the hashes assigned to blocks at a certain heights, // both canonical and reorged forks included. func ReadAllHashes(db ethdb.Iteratee, number uint64) []common.Hash { diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index 5c9950866a..b4fb99e451 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -93,9 +93,6 @@ var ( // difflayer database diffLayerPrefix = []byte("d") // diffLayerPrefix + hash -> diffLayer - // trust block database - trustBlockPrefix = []byte("trust-block-") // trustBlockPrefix + hash -> verify result - preimagePrefix = []byte("secure-key-") // preimagePrefix + hash -> preimage configPrefix = []byte("ethereum-config-") // config prefix for the db @@ -121,8 +118,6 @@ const ( // freezerDifficultyTable indicates the name of the freezer total difficulty table. freezerDifficultyTable = "diffs" - // - byteTrue = 0x01 ) // FreezerNoSnappy configures whether compression is disabled for the ancient-tables. @@ -169,10 +164,6 @@ func headerTDKey(number uint64, hash common.Hash) []byte { func headerHashKey(number uint64) []byte { return append(append(headerPrefix, encodeBlockNumber(number)...), headerHashSuffix...) } -// trustBlockHashKey = trustBlockPrefix + hash -func trustBlockHashKey(hash common.Hash) []byte { - return append(append(trustBlockPrefix, hash.Bytes()...)) -} // headerNumberKey = headerNumberPrefix + hash func headerNumberKey(hash common.Hash) []byte { diff --git a/core/trie_verify.go b/core/remote_state_verifier.go similarity index 63% rename from core/trie_verify.go rename to core/remote_state_verifier.go index 1c3f96678d..cc4e42a6cb 100644 --- a/core/trie_verify.go +++ b/core/remote_state_verifier.go @@ -2,77 +2,88 @@ package core import ( "fmt" + "github.com/ethereum/go-ethereum/event" "math/rand" "time" lru "github.com/hashicorp/golang-lru" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" ) const ( verifiedCacheSize = 256 maxForkHeight = 11 - resendInterval = 2 * time.Second + // defaultPeerNumber is default number of verify peers defaultPeerNumber = 3 + // pruneHeightDiff indicates that if the height difference between current block and task's + // corresponding block is larger than it, the task should be pruned. + pruneHeightDiff = 15 + pruneInterval = 5 * time.Second + resendInterval = 2 * time.Second // tryAllPeersTime is the time that a block has not been verified and then try all the valid verify peers. tryAllPeersTime = 15 * time.Second ) -type VerifyManager struct { - bc *BlockChain - tasks map[common.Hash]*VerifyTask - peers VerifyPeers - verifiedCache *lru.Cache - allowUntrustedVerify bool - newTaskCh chan *types.Header - verifyCh chan common.Hash - messageCh chan VerifyMessage - exitCh chan struct{} +type remoteVerifyManager struct { + bc *BlockChain + tasks map[common.Hash]*verifyTask + peers verifyPeers + verifiedCache *lru.Cache + allowInsecure bool + + // Subscription + chainHeadCh chan ChainHeadEvent + chainHeadSub event.Subscription + + // Channels + verifyCh chan common.Hash + messageCh chan verifyMessage } -func NewVerifyManager(blockchain *BlockChain, allowUntrustedVerify bool) *VerifyManager { +func NewVerifyManager(blockchain *BlockChain, peers verifyPeers, allowUntrusted bool) *remoteVerifyManager { verifiedCache, _ := lru.New(verifiedCacheSize) - vm := &VerifyManager{ - bc: blockchain, - tasks: make(map[common.Hash]*VerifyTask), - verifiedCache: verifiedCache, - newTaskCh: make(chan *types.Header), - verifyCh: make(chan common.Hash), - messageCh: make(chan VerifyMessage), - exitCh: make(chan struct{}), - allowUntrustedVerify: allowUntrustedVerify, + vm := &remoteVerifyManager{ + bc: blockchain, + tasks: make(map[common.Hash]*verifyTask), + peers: peers, + verifiedCache: verifiedCache, + allowInsecure: allowUntrusted, + + chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), + verifyCh: make(chan common.Hash, maxForkHeight), + messageCh: make(chan verifyMessage), } + vm.chainHeadSub = blockchain.SubscribeChainHeadEvent(vm.chainHeadCh) return vm } -func (vm *VerifyManager) verifyManagerLoop() { - // read disk store to initial verified cache +func (vm *remoteVerifyManager) mainLoop() { + defer vm.chainHeadSub.Unsubscribe() + // load unverified blocks in a normalized chain and start a batch of verify task header := vm.bc.CurrentHeader() // Start verify task from H to H-11 if need. vm.NewBlockVerifyTask(header) - prune := time.NewTicker(time.Second) - defer prune.Stop() + pruneTicker := time.NewTicker(pruneInterval) + defer pruneTicker.Stop() for { select { - case h := <-vm.newTaskCh: - vm.NewBlockVerifyTask(h) + case h := <-vm.chainHeadCh: + vm.NewBlockVerifyTask(h.Block.Header()) case hash := <-vm.verifyCh: vm.cacheBlockVerified(hash) - rawdb.MarkTrustBlock(vm.bc.db, hash) if task, ok := vm.tasks[hash]; ok { delete(vm.tasks, hash) close(task.terminalCh) } - case <-prune.C: + case <-pruneTicker.C: for hash, task := range vm.tasks { - if vm.bc.CurrentHeader().Number.Uint64()-task.blockHeader.Number.Uint64() > 15 { + if vm.bc.CurrentHeader().Number.Cmp(task.blockHeader.Number) == 1 && + vm.bc.CurrentHeader().Number.Uint64()-task.blockHeader.Number.Uint64() > pruneHeightDiff { delete(vm.tasks, hash) close(task.terminalCh) } @@ -81,23 +92,22 @@ func (vm *VerifyManager) verifyManagerLoop() { if vt, ok := vm.tasks[message.verifyResult.BlockHash]; ok { vt.messageCh <- message } - case <-vm.exitCh: + + // System stopped + case <-vm.bc.quit: + for _, task := range vm.tasks { + close(task.terminalCh) + } + return + case <-vm.chainHeadSub.Err(): return } } } -func (vm *VerifyManager) Stop() { - // stop all the tasks - for _, task := range vm.tasks { - close(task.terminalCh) - } - close(vm.exitCh) -} - -func (vm *VerifyManager) NewBlockVerifyTask(header *types.Header) { +func (vm *remoteVerifyManager) NewBlockVerifyTask(header *types.Header) { for i := 0; header != nil && i <= maxForkHeight; i++ { - func(hash common.Hash){ + func(hash common.Hash) { // if verified cache record that this block has been verified, skip. if _, ok := vm.verifiedCache.Get(hash); ok { return @@ -106,11 +116,6 @@ func (vm *VerifyManager) NewBlockVerifyTask(header *types.Header) { if _, ok := vm.tasks[hash]; ok { return } - // if verified storage record that this block has been verified, skip. - if rawdb.IsTrustBlock(vm.bc.db, hash) { - vm.cacheBlockVerified(hash) - return - } diffLayer := vm.bc.GetTrustedDiffLayer(hash) // if this block has no diff, there is no need to verify it. var err error @@ -118,6 +123,9 @@ func (vm *VerifyManager) NewBlockVerifyTask(header *types.Header) { if diffLayer, err = vm.bc.GenerateDiffLayer(hash); err != nil { log.Error("failed to get diff layer", "block", hash, "number", header.Number, "error", err) return + } else { + log.Info("this is an empty block:", "block", hash, "number", header.Number) + return } } diffHash, err := GetTrustedDiffHash(diffLayer) @@ -125,14 +133,14 @@ func (vm *VerifyManager) NewBlockVerifyTask(header *types.Header) { log.Error("failed to get diff hash", "block", hash, "number", header.Number, "error", err) return } - verifyTask := NewVerifyTask(diffHash, header, vm.peers, vm.bc.db, vm.verifyCh, vm.allowUntrustedVerify) + verifyTask := NewVerifyTask(diffHash, header, vm.peers, vm.verifyCh, vm.allowInsecure) vm.tasks[hash] = verifyTask }(header.Hash()) header = vm.bc.GetHeaderByHash(header.ParentHash) } } -func (vm *VerifyManager) cacheBlockVerified(hash common.Hash) { +func (vm *remoteVerifyManager) cacheBlockVerified(hash common.Hash) { if vm.verifiedCache.Len() >= verifiedCacheSize { vm.verifiedCache.RemoveOldest() } @@ -140,7 +148,7 @@ func (vm *VerifyManager) cacheBlockVerified(hash common.Hash) { } // AncestorVerified function check block has been verified or it's a empty block. -func (vm *VerifyManager) AncestorVerified(header *types.Header) bool { +func (vm *remoteVerifyManager) AncestorVerified(header *types.Header) bool { // find header of H-11 block. header = vm.bc.GetHeaderByNumber(header.Number.Uint64() - maxForkHeight) // If start from genesis block, there has not a H-11 block. @@ -150,19 +158,15 @@ func (vm *VerifyManager) AncestorVerified(header *types.Header) bool { // check whether H-11 block is a empty block. if header.TxHash == types.EmptyRootHash { parent := vm.bc.GetHeaderByHash(header.ParentHash) - if header.Root == parent.Root { - return true - } + return header.Root == parent.Root } hash := header.Hash() - if _, ok := vm.verifiedCache.Get(hash); ok { - return true - } - return rawdb.IsTrustBlock(vm.bc.db, hash) + _, exist := vm.verifiedCache.Get(hash) + return exist } -func (vm *VerifyManager) HandleRootResponse(vr *VerifyResult, pid string) error { - vm.messageCh <- VerifyMessage{verifyResult: vr, peerId: pid} +func (vm *remoteVerifyManager) HandleRootResponse(vr *VerifyResult, pid string) error { + vm.messageCh <- verifyMessage{verifyResult: vr, peerId: pid} return nil } @@ -173,43 +177,41 @@ type VerifyResult struct { Root common.Hash } -type VerifyMessage struct { +type verifyMessage struct { verifyResult *VerifyResult peerId string } -type VerifyTask struct { - diffhash common.Hash - blockHeader *types.Header - candidatePeers VerifyPeers - BadPeers map[string]struct{} - startAt time.Time - db ethdb.Database - allowUntrustedVerify bool +type verifyTask struct { + diffhash common.Hash + blockHeader *types.Header + candidatePeers verifyPeers + BadPeers map[string]struct{} + startAt time.Time + allowUntrusted bool - messageCh chan VerifyMessage + messageCh chan verifyMessage terminalCh chan struct{} } -func NewVerifyTask(diffhash common.Hash, header *types.Header, peers VerifyPeers, db ethdb.Database, verifyCh chan common.Hash, allowUntrustedVerify bool) *VerifyTask { - vt := &VerifyTask{ - diffhash: diffhash, - blockHeader: header, - candidatePeers: peers, - BadPeers: make(map[string]struct{}), - db: db, - allowUntrustedVerify: allowUntrustedVerify, - messageCh: make(chan VerifyMessage), - terminalCh: make(chan struct{}), +func NewVerifyTask(diffhash common.Hash, header *types.Header, peers verifyPeers, verifyCh chan common.Hash, allowUntrusted bool) *verifyTask { + vt := &verifyTask{ + diffhash: diffhash, + blockHeader: header, + candidatePeers: peers, + BadPeers: make(map[string]struct{}), + allowUntrusted: allowUntrusted, + messageCh: make(chan verifyMessage), + terminalCh: make(chan struct{}), } go vt.Start(verifyCh) return vt } -func (vt *VerifyTask) Start(verifyCh chan common.Hash) { +func (vt *verifyTask) Start(verifyCh chan common.Hash) { vt.startAt = time.Now() - vt.selectPeersToVerify(defaultPeerNumber) + vt.sendVerifyRequest(defaultPeerNumber) resend := time.NewTicker(resendInterval) defer resend.Stop() for { @@ -220,7 +222,7 @@ func (vt *VerifyTask) Start(verifyCh chan common.Hash) { vt.compareRootHashAndWrite(msg, verifyCh) case types.StatusUntrustedVerified: log.Warn("block %s , num= %s is untrusted verified", msg.verifyResult.BlockHash, msg.verifyResult.BlockNumber) - if vt.allowUntrustedVerify { + if vt.allowUntrusted { vt.compareRootHashAndWrite(msg, verifyCh) } case types.StatusDiffHashMismatch, types.StatusImpossibleFork, types.StatusUnexpectedError: @@ -232,9 +234,9 @@ func (vt *VerifyTask) Start(verifyCh chan common.Hash) { case <-resend.C: // if a task has run over 15s, try all the vaild peers to verify. if time.Since(vt.startAt) < tryAllPeersTime { - vt.selectPeersToVerify(1) + vt.sendVerifyRequest(1) } else { - vt.selectPeersToVerify(-1) + vt.sendVerifyRequest(-1) } case <-vt.terminalCh: return @@ -242,9 +244,9 @@ func (vt *VerifyTask) Start(verifyCh chan common.Hash) { } } -// selectPeersAndVerify func select at most n peers from (candidatePeers-badPeers) randomly and send verify request. +// sendVerifyRequest func select at most n peers from (candidatePeers-badPeers) randomly and send verify request. // when n<0, send to all the peers exclude badPeers. -func (vt *VerifyTask) selectPeersToVerify(n int) { +func (vt *verifyTask) sendVerifyRequest(n int) { var validPeers []VerifyPeer candidatePeers := vt.candidatePeers.GetVerifyPeers() for _, p := range candidatePeers { @@ -252,7 +254,10 @@ func (vt *VerifyTask) selectPeersToVerify(n int) { validPeers = append(validPeers, p) } } - // if + // if has not valid peer, log warning. + if len(validPeers) == 0 { + log.Warn("there is no valid peer for block", vt.blockHeader.Number) + } if n < 0 || n >= len(validPeers) { for _, p := range validPeers { p.RequestRoot(vt.blockHeader.Number.Uint64(), vt.blockHeader.Hash(), vt.diffhash) @@ -269,10 +274,9 @@ func (vt *VerifyTask) selectPeersToVerify(n int) { } } -func (vt *VerifyTask) compareRootHashAndWrite(msg VerifyMessage, verifyCh chan common.Hash) { +func (vt *verifyTask) compareRootHashAndWrite(msg verifyMessage, verifyCh chan common.Hash) { if msg.verifyResult.Root == vt.blockHeader.Root { blockhash := msg.verifyResult.BlockHash - rawdb.MarkTrustBlock(vt.db, blockhash) // write back to manager so that manager can cache the result and delete this task. verifyCh <- blockhash } else { @@ -285,14 +289,14 @@ type VerifyPeer interface { ID() string } -type VerifyPeers interface { +type verifyPeers interface { GetVerifyPeers() []VerifyPeer } type VerifyMode uint32 const ( - LocalVerify VerifyMode = iota // + LocalVerify VerifyMode = iota FullVerify InsecureVerify NoneVerify @@ -348,6 +352,6 @@ func (mode *VerifyMode) UnmarshalText(text []byte) error { return nil } -func (mode *VerifyMode) NeedRemoteVerify() bool { - return *mode == FullVerify || *mode == InsecureVerify +func (mode VerifyMode) NeedRemoteVerify() bool { + return mode == FullVerify || mode == InsecureVerify } diff --git a/core/types.go b/core/types.go index fb4b215926..8cd0b5ca63 100644 --- a/core/types.go +++ b/core/types.go @@ -32,17 +32,8 @@ type Validator interface { // ValidateState validates the given statedb and optionally the receipts and // gas used. ValidateState(block *types.Block, state *state.StateDB, receipts types.Receipts, usedGas uint64) error - - // StartRemoteVerify start the remote verify routine for given peers. - StartRemoteVerify(peers VerifyPeers) - // StopRemoteVerify stop the remote verify routine. - StopRemoteVerify() - // VerifyBlock verify the given blcok. - VerifyBlock(header *types.Header) - // ValidateBlockVerify validate the given block has been verified. - ValidateBlockVerify(block *types.Block) error // RemoteVerifyManager return remoteVerifyManager of validator. - RemoteVerifyManager() *VerifyManager + RemoteVerifyManager() *remoteVerifyManager } // Prefetcher is an interface for pre-caching transaction signatures and state. diff --git a/eth/backend.go b/eth/backend.go index 7b00e43448..7d196b60e0 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -208,11 +208,12 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { if config.DiffSync && config.TriesVerifyMode == core.LocalVerify { bcOps = append(bcOps, core.EnableLightProcessor) } + peers := newPeerSet() if config.PersistDiff { bcOps = append(bcOps, core.EnablePersistDiff(config.DiffBlock)) } - bcOps = append(bcOps, core.EnableBlockValidator(chainConfig, eth.engine, &config.TriesVerifyMode)) + bcOps = append(bcOps, core.EnableBlockValidator(chainConfig, eth.engine, config.TriesVerifyMode, peers)) eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, chainConfig, eth.engine, vmConfig, eth.shouldPreserve, &config.TxLookupLimit, bcOps...) if err != nil { return nil, err @@ -250,12 +251,11 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { DirectBroadcast: config.DirectBroadcast, DiffSync: config.DiffSync, DisablePeerTxBroadcast: config.DisablePeerTxBroadcast, + PeerSet: peers, }); err != nil { return nil, err } - eth.blockchain.Validator().StartRemoteVerify(eth.handler.peers) - eth.miner = miner.New(eth, &config.Miner, chainConfig, eth.EventMux(), eth.engine, eth.isLocalBlock) eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData)) diff --git a/eth/handler.go b/eth/handler.go index e47d3eee8d..f9854766c4 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -96,6 +96,7 @@ type handlerConfig struct { Whitelist map[uint64]common.Hash // Hard coded whitelist for sync challenged DirectBroadcast bool DisablePeerTxBroadcast bool + PeerSet *peerSet } type handler struct { @@ -155,7 +156,7 @@ func newHandler(config *handlerConfig) (*handler, error) { database: config.Database, txpool: config.TxPool, chain: config.Chain, - peers: newPeerSet(), + peers: config.PeerSet, whitelist: config.Whitelist, directBroadcast: config.DirectBroadcast, diffSync: config.DiffSync, From 85a5c20218a675c614ef4725e57ea13d075c45b8 Mon Sep 17 00:00:00 2001 From: kyrie-yl Date: Mon, 21 Feb 2022 15:40:45 +0800 Subject: [PATCH 4/4] add metrics Signed-off-by: kyrie-yl --- core/remote_state_verifier.go | 39 +++++++++++++++++++++++++++++++++-- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/core/remote_state_verifier.go b/core/remote_state_verifier.go index cc4e42a6cb..d586414d2d 100644 --- a/core/remote_state_verifier.go +++ b/core/remote_state_verifier.go @@ -3,6 +3,7 @@ package core import ( "fmt" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/metrics" "math/rand" "time" @@ -28,6 +29,32 @@ const ( tryAllPeersTime = 15 * time.Second ) +var ( + remoteVerifyTaskCounter = metrics.NewRegisteredCounter("remote/state/verify/task/total", nil) + + statusFullVerifiedMeter = metrics.NewRegisteredMeter("status/full/verified/messages/total", nil) + statusUntrustedVerifiedMeter = metrics.NewRegisteredMeter("status/untrusted/verified/messages/total", nil) + + statusDiffHashMismatchMeter = metrics.NewRegisteredMeter("status/diffhash/mismatch/messages/total", nil) + statusImpossibleForkMeter = metrics.NewRegisteredMeter("status/impossible/fork/messages/total", nil) + + statusBlockTooNewMeter = metrics.NewRegisteredMeter("status/block/too/new/messages/total", nil) + statusBlockNewerMeter = metrics.NewRegisteredMeter("status/block/newer/messages/total", nil) + statusPossibleForkMeter = metrics.NewRegisteredMeter("status/possible/fork/messages/total", nil) + statusUnexpectedErrorMeter = metrics.NewRegisteredMeter("status/unexpected/error/total", nil) + + codeMap = map[uint16]metrics.Meter{ + 0x101: statusFullVerifiedMeter, + 0x102: statusUntrustedVerifiedMeter, + 0x201: statusDiffHashMismatchMeter, + 0x202: statusImpossibleForkMeter, + 0x301: statusBlockTooNewMeter, + 0x302: statusBlockNewerMeter, + 0x303: statusPossibleForkMeter, + 0x400: statusUnexpectedErrorMeter, + } +) + type remoteVerifyManager struct { bc *BlockChain tasks map[common.Hash]*verifyTask @@ -78,6 +105,7 @@ func (vm *remoteVerifyManager) mainLoop() { vm.cacheBlockVerified(hash) if task, ok := vm.tasks[hash]; ok { delete(vm.tasks, hash) + remoteVerifyTaskCounter.Dec(1) close(task.terminalCh) } case <-pruneTicker.C: @@ -85,6 +113,7 @@ func (vm *remoteVerifyManager) mainLoop() { if vm.bc.CurrentHeader().Number.Cmp(task.blockHeader.Number) == 1 && vm.bc.CurrentHeader().Number.Uint64()-task.blockHeader.Number.Uint64() > pruneHeightDiff { delete(vm.tasks, hash) + remoteVerifyTaskCounter.Dec(1) close(task.terminalCh) } } @@ -135,6 +164,7 @@ func (vm *remoteVerifyManager) NewBlockVerifyTask(header *types.Header) { } verifyTask := NewVerifyTask(diffHash, header, vm.peers, vm.verifyCh, vm.allowInsecure) vm.tasks[hash] = verifyTask + remoteVerifyTaskCounter.Inc(1) }(header.Hash()) header = vm.bc.GetHeaderByHash(header.ParentHash) } @@ -217,17 +247,22 @@ func (vt *verifyTask) Start(verifyCh chan common.Hash) { for { select { case msg := <-vt.messageCh: + if metric, exist := codeMap[msg.verifyResult.Status.Code]; exist { + metric.Mark(1) + } switch msg.verifyResult.Status { case types.StatusFullVerified: + statusFullVerifiedMeter.Mark(1) vt.compareRootHashAndWrite(msg, verifyCh) case types.StatusUntrustedVerified: + statusUntrustedVerifiedMeter.Mark(1) log.Warn("block %s , num= %s is untrusted verified", msg.verifyResult.BlockHash, msg.verifyResult.BlockNumber) if vt.allowUntrusted { vt.compareRootHashAndWrite(msg, verifyCh) } - case types.StatusDiffHashMismatch, types.StatusImpossibleFork, types.StatusUnexpectedError: + case types.StatusUnexpectedError, types.StatusImpossibleFork, types.StatusDiffHashMismatch: vt.BadPeers[msg.peerId] = struct{}{} - log.Debug("peer %s is not available: %s", msg.peerId, msg.verifyResult.Status.Msg) + log.Info("peer %s is not available: code %d, msg %s,", msg.peerId, msg.verifyResult.Status.Code, msg.verifyResult.Status.Msg) case types.StatusBlockTooNew, types.StatusBlockNewer, types.StatusPossibleFork: log.Info("return msg from peer %s for block %s is %s", msg.peerId, msg.verifyResult.BlockHash, msg.verifyResult.Status.Msg) }