From 5f8aeb0dcae841b08f68025c4763a93a396aa5be Mon Sep 17 00:00:00 2001 From: setunapo Date: Fri, 28 Oct 2022 12:01:18 +0800 Subject: [PATCH 01/11] improve: a hot fix for fair mine code: hot fix part2 code: set DelayLeftOver to 400ms, for block finalize and broadcast fair: return when no time left fix: init txs2Ch fair: add txs2HotfixLoop to avoid flush txsCh on mainLoop fair: subscribe txs event before commitTransactions log: improve print log --- consensus/parlia/parlia.go | 5 ---- eth/ethconfig/config.go | 2 +- miner/worker.go | 50 +++++++++++++++++++++++++++++++++++--- 3 files changed, 48 insertions(+), 9 deletions(-) diff --git a/consensus/parlia/parlia.go b/consensus/parlia/parlia.go index fee0fe1293..79a24769a2 100644 --- a/consensus/parlia/parlia.go +++ b/consensus/parlia/parlia.go @@ -800,11 +800,6 @@ func (p *Parlia) Delay(chain consensus.ChainReader, header *types.Header) *time. return nil } delay := p.delayForRamanujanFork(snap, header) - // The blocking time should be no more than half of period - half := time.Duration(p.config.Period) * time.Second / 2 - if delay > half { - delay = half - } return &delay } diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 03104c6109..85b748b289 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -89,7 +89,7 @@ var Defaults = Config{ GasCeil: 8000000, GasPrice: big.NewInt(params.GWei), Recommit: 3 * time.Second, - DelayLeftOver: 50 * time.Millisecond, + DelayLeftOver: 400 * time.Millisecond, }, TxPool: core.DefaultTxPoolConfig, RPCGasCap: 50000000, diff --git a/miner/worker.go b/miner/worker.go index dca827f6d6..992cbcc70c 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -869,12 +869,16 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP } } - var coalescedLogs []*types.Log var stopTimer *time.Timer delay := w.engine.Delay(w.chain, env.header) if delay != nil { - stopTimer = time.NewTimer(*delay - w.config.DelayLeftOver) - log.Debug("Time left for mining work", "left", (*delay - w.config.DelayLeftOver).String(), "leftover", w.config.DelayLeftOver) + left := *delay - w.config.DelayLeftOver + if left <= 0 { + log.Info("Not enough time for further commitTransactions", "delay", *delay, "DelayLeftOver", w.config.DelayLeftOver) + return true + } + stopTimer = time.NewTimer(left) + log.Debug("Time left for commitTransactions", "left", left.String(), "leftover", w.config.DelayLeftOver) defer stopTimer.Stop() } // initilise bloom processors @@ -892,6 +896,7 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP txCurr := &tx w.prefetcher.PrefetchMining(txsPrefetch, env.header, env.gasPool.Gas(), env.state.CopyDoPrefetch(), *w.chain.GetVMConfig(), interruptCh, txCurr) + var coalescedLogs []*types.Log LOOP: for { // In the following three cases, we will interrupt the execution of the transaction. @@ -1107,6 +1112,23 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) { // into the given sealing block. The transaction selection and ordering strategy can // be customized with the plugin in the future. func (w *worker) fillTransactions(interrupt *int32, env *environment) { + var stopTimer *time.Timer + delay := w.engine.Delay(w.chain, env.header) + if delay != nil { + left := *delay - w.config.DelayLeftOver + if left <= 0 { + log.Info("Not enough time for fillTransactions", "delay", *delay, "DelayLeftOver", w.config.DelayLeftOver) + return + } + stopTimer = time.NewTimer(left) + log.Debug("Time left for mining work", "left", left.String(), "leftover", w.config.DelayLeftOver) + defer stopTimer.Stop() + } + + txsCh := make(chan core.NewTxsEvent, txChanSize) + sub := w.eth.TxPool().SubscribeNewTxsEvent(txsCh) + defer sub.Unsubscribe() + // Split the pending transactions into locals and remotes // Fill the block with all available pending transactions. pending := w.eth.TxPool().Pending(true) @@ -1129,6 +1151,28 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment) { return } } + + if stopTimer == nil { + return + } + for { + select { + case <-stopTimer.C: + log.Info("Not enough time for further transactions", "txs", len(env.txs)) + return + case txEv := <-txsCh: + newTxs := make(map[common.Address]types.Transactions) + for _, tx := range txEv.Txs { + acc, _ := types.Sender(w.current.signer, tx) + newTxs[acc] = append(newTxs[acc], tx) + } + newTxSet := types.NewTransactionsByPriceAndNonce(env.signer, newTxs, env.header.BaseFee) + if w.commitTransactions(env, newTxSet, interrupt) { + return + } + continue + } + } } // generateWork generates a sealing block based on the given parameters. From da965c6ff3b6737f17b0c24c2f9955e70000560a Mon Sep 17 00:00:00 2001 From: setunapo Date: Fri, 28 Oct 2022 22:55:48 +0800 Subject: [PATCH 02/11] fair: cancel seal on new block imported --- miner/worker.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/miner/worker.go b/miner/worker.go index 992cbcc70c..c5f91b34db 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -209,6 +209,7 @@ type worker struct { newWorkCh chan *newWorkReq getWorkCh chan *getWorkReq taskCh chan *task + taskinterruptCh chan struct{} resultCh chan *types.Block startCh chan struct{} exitCh chan struct{} @@ -275,6 +276,7 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus newWorkCh: make(chan *newWorkReq), getWorkCh: make(chan *getWorkReq), taskCh: make(chan *task), + taskinterruptCh: make(chan struct{}, 1), resultCh: make(chan *types.Block, resultQueueSize), exitCh: make(chan struct{}), startCh: make(chan struct{}, 1), @@ -468,6 +470,10 @@ func (w *worker) newWorkLoop(recommit time.Duration) { continue } clearPending(head.Block.NumberU64()) + select { + case w.taskinterruptCh <- struct{}{}: // cancel sealing task if any + default: + } timestamp = time.Now().Unix() if p, ok := w.engine.(*parlia.Parlia); ok { signedRecent, err := p.SignRecently(w.chain, head.Block) @@ -671,6 +677,8 @@ func (w *worker) taskLoop() { } for { select { + case <-w.taskinterruptCh: + interrupt() case task := <-w.taskCh: if w.newTaskHook != nil { w.newTaskHook(task) From 04f15613c98745a1d9b31e5ec97db3899a7aacea Mon Sep 17 00:00:00 2001 From: setunapo Date: Sat, 29 Oct 2022 07:06:22 +0800 Subject: [PATCH 03/11] Revert "fair: cancel seal on new block imported" This reverts commit 47b28c8ef6dea6de55892754a4ca8c5bcf94ed74. --- miner/worker.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/miner/worker.go b/miner/worker.go index c5f91b34db..992cbcc70c 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -209,7 +209,6 @@ type worker struct { newWorkCh chan *newWorkReq getWorkCh chan *getWorkReq taskCh chan *task - taskinterruptCh chan struct{} resultCh chan *types.Block startCh chan struct{} exitCh chan struct{} @@ -276,7 +275,6 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus newWorkCh: make(chan *newWorkReq), getWorkCh: make(chan *getWorkReq), taskCh: make(chan *task), - taskinterruptCh: make(chan struct{}, 1), resultCh: make(chan *types.Block, resultQueueSize), exitCh: make(chan struct{}), startCh: make(chan struct{}, 1), @@ -470,10 +468,6 @@ func (w *worker) newWorkLoop(recommit time.Duration) { continue } clearPending(head.Block.NumberU64()) - select { - case w.taskinterruptCh <- struct{}{}: // cancel sealing task if any - default: - } timestamp = time.Now().Unix() if p, ok := w.engine.(*parlia.Parlia); ok { signedRecent, err := p.SignRecently(w.chain, head.Block) @@ -677,8 +671,6 @@ func (w *worker) taskLoop() { } for { select { - case <-w.taskinterruptCh: - interrupt() case task := <-w.taskCh: if w.newTaskHook != nil { w.newTaskHook(task) From 189c849e121a8ff5aa9fed800da02aa3894d2eeb Mon Sep 17 00:00:00 2001 From: setunapo Date: Sat, 29 Oct 2022 07:38:04 +0800 Subject: [PATCH 04/11] mine: drop current mining block on new block imported --- consensus/beacon/consensus.go | 4 ++++ consensus/clique/clique.go | 5 +++++ consensus/consensus.go | 6 ++++++ consensus/ethash/consensus.go | 4 ++++ consensus/parlia/parlia.go | 5 +++++ miner/worker.go | 4 ++++ 6 files changed, 28 insertions(+) diff --git a/consensus/beacon/consensus.go b/consensus/beacon/consensus.go index cd493f3d65..ac6602f170 100644 --- a/consensus/beacon/consensus.go +++ b/consensus/beacon/consensus.go @@ -363,6 +363,10 @@ func (beacon *Beacon) SetThreads(threads int) { } } +func (p *Beacon) DropOnNewBlock(*types.Header) bool { + return true +} + // IsTTDReached checks if the TotalTerminalDifficulty has been surpassed on the `parentHash` block. // It depends on the parentHash already being stored in the database. // If the parentHash is not stored in the database a UnknownAncestor error is returned. diff --git a/consensus/clique/clique.go b/consensus/clique/clique.go index 75ed916a86..c61608f754 100644 --- a/consensus/clique/clique.go +++ b/consensus/clique/clique.go @@ -705,6 +705,11 @@ func (c *Clique) APIs(chain consensus.ChainHeaderReader) []rpc.API { }} } +func (p *Clique) DropOnNewBlock(header *types.Header) bool { + // drop the block if it is not in turn. + return header.Difficulty.Cmp(diffNoTurn) == 0 +} + // SealHash returns the hash of a block prior to it being sealed. func SealHash(header *types.Header) (hash common.Hash) { hasher := sha3.NewLegacyKeccak256() diff --git a/consensus/consensus.go b/consensus/consensus.go index c3e7b4870a..74cbe84f54 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -130,6 +130,12 @@ type Engine interface { // Close terminates any background threads maintained by the consensus engine. Close() error + + // DropOnNewBlock determine the action of mining when it is interrupted by new imported block. + // Return + // true: the mining result will be dropped + // false: the mining result will be kept and move on to the next mine step. + DropOnNewBlock(header *types.Header) bool } // PoW is a consensus engine based on proof-of-work. diff --git a/consensus/ethash/consensus.go b/consensus/ethash/consensus.go index be6085c713..d606913419 100644 --- a/consensus/ethash/consensus.go +++ b/consensus/ethash/consensus.go @@ -647,6 +647,10 @@ var ( big32 = big.NewInt(32) ) +func (p *Ethash) DropOnNewBlock(*types.Header) bool { + return true +} + // AccumulateRewards credits the coinbase of the given block with the mining // reward. The total reward consists of the static block reward and rewards for // included uncles. The coinbase of each uncle block is also rewarded. diff --git a/consensus/parlia/parlia.go b/consensus/parlia/parlia.go index 79a24769a2..b320795815 100644 --- a/consensus/parlia/parlia.go +++ b/consensus/parlia/parlia.go @@ -959,6 +959,11 @@ func (p *Parlia) CalcDifficulty(chain consensus.ChainHeaderReader, time uint64, return CalcDifficulty(snap, p.val) } +func (p *Parlia) DropOnNewBlock(header *types.Header) bool { + // drop the block if it is not in turn. + return header.Difficulty.Cmp(diffNoTurn) == 0 +} + // CalcDifficulty is the difficulty adjustment algorithm. It returns the difficulty // that a new block should have based on the previous blocks in the chain and the // current signer. diff --git a/miner/worker.go b/miner/worker.go index 992cbcc70c..5a50508cd9 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -1217,6 +1217,10 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { } // Fill pending transactions from the txpool w.fillTransactions(interrupt, work) + if atomic.LoadInt32(interrupt) == commitInterruptNewHead && w.engine.DropOnNewBlock(work.header) { + log.Info("drop the block, when new block is imported") + return + } w.commit(work, w.fullTaskHook, true, start) // Swap out the old work with the new one, terminating any leftover From bd3afbce91a066597d50ec8149dadb84ed7a2997 Mon Sep 17 00:00:00 2001 From: setunapo Date: Sat, 29 Oct 2022 21:09:01 +0800 Subject: [PATCH 05/11] mine: add commitTransactions return reason --- miner/worker.go | 32 +++++++++++++++++++++++++------- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/miner/worker.go b/miner/worker.go index 5a50508cd9..8ef42c0630 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -77,6 +77,13 @@ const ( // staleThreshold is the maximum depth of the acceptable stale block. staleThreshold = 11 + + // the reason that commitTransactions returned. + commitTxsDone = 0 + commitTxsNoTime = 1 + commitTxsNoGas = 2 + commitTxsNewHead = 3 + commitTxsResubmit = 4 ) var ( @@ -858,7 +865,8 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction, rece return receipt.Logs, nil } -func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32) bool { +func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32) (reason int) { + reason = commitTxsDone gasLimit := env.header.GasLimit if env.gasPool == nil { env.gasPool = new(core.GasPool).AddGas(gasLimit) @@ -875,7 +883,8 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP left := *delay - w.config.DelayLeftOver if left <= 0 { log.Info("Not enough time for further commitTransactions", "delay", *delay, "DelayLeftOver", w.config.DelayLeftOver) - return true + reason = commitTxsNoTime + return } stopTimer = time.NewTimer(left) log.Debug("Time left for commitTransactions", "left", left.String(), "leftover", w.config.DelayLeftOver) @@ -916,18 +925,24 @@ LOOP: ratio: ratio, inc: true, } + reason = commitTxsResubmit } - return atomic.LoadInt32(interrupt) == commitInterruptNewHead + if atomic.LoadInt32(interrupt) == commitInterruptNewHead { + reason = commitTxsNewHead + } + return } // If we don't have enough gas for any further transactions then we're done if env.gasPool.Gas() < params.TxGas { log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas) + reason = commitTxsNoGas break } if stopTimer != nil { select { case <-stopTimer.C: log.Info("Not enough time for further transactions", "txs", len(env.txs)) + reason = commitTxsNoTime break LOOP default: } @@ -1008,7 +1023,7 @@ LOOP: if interrupt != nil { w.resubmitAdjustCh <- &intervalAdjust{inc: false} } - return false + return } // generateParams wraps various of settings for generating sealing task. @@ -1141,13 +1156,15 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment) { } if len(localTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee) - if w.commitTransactions(env, txs, interrupt) { + reason := w.commitTransactions(env, txs, interrupt) + if reason == commitTxsNewHead || reason == commitTxsNoGas || reason == commitTxsNoTime { return } } if len(remoteTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee) - if w.commitTransactions(env, txs, interrupt) { + reason := w.commitTransactions(env, txs, interrupt) + if reason == commitTxsNewHead || reason == commitTxsNoGas || reason == commitTxsNoTime { return } } @@ -1167,7 +1184,8 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment) { newTxs[acc] = append(newTxs[acc], tx) } newTxSet := types.NewTransactionsByPriceAndNonce(env.signer, newTxs, env.header.BaseFee) - if w.commitTransactions(env, newTxSet, interrupt) { + reason := w.commitTransactions(env, newTxSet, interrupt) + if reason == commitTxsNewHead || reason == commitTxsNoGas || reason == commitTxsNoTime { return } continue From 58a74c87954beecf6c1d92e7c225a562f1eca75d Mon Sep 17 00:00:00 2001 From: setunapo Date: Mon, 7 Nov 2022 17:05:03 +0800 Subject: [PATCH 06/11] fix: crash for nil current --- miner/worker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/miner/worker.go b/miner/worker.go index 8ef42c0630..d303f7b02e 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -1180,7 +1180,7 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment) { case txEv := <-txsCh: newTxs := make(map[common.Address]types.Transactions) for _, tx := range txEv.Txs { - acc, _ := types.Sender(w.current.signer, tx) + acc, _ := types.Sender(env.signer, tx) newTxs[acc] = append(newTxs[acc], tx) } newTxSet := types.NewTransactionsByPriceAndNonce(env.signer, newTxs, env.header.BaseFee) From 187a7b6f9457d9048d241529b629c7613f20f026 Mon Sep 17 00:00:00 2001 From: setunapo Date: Tue, 8 Nov 2022 22:31:32 +0800 Subject: [PATCH 07/11] fair: try fillTransactions several times for the best --- miner/worker.go | 138 +++++++++++++++++++++++++++--------------------- 1 file changed, 78 insertions(+), 60 deletions(-) diff --git a/miner/worker.go b/miner/worker.go index d303f7b02e..a2e768be09 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -19,6 +19,7 @@ package miner import ( "errors" "fmt" + "math/big" "sync" "sync/atomic" "time" @@ -1126,24 +1127,7 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) { // fillTransactions retrieves the pending transactions from the txpool and fills them // into the given sealing block. The transaction selection and ordering strategy can // be customized with the plugin in the future. -func (w *worker) fillTransactions(interrupt *int32, env *environment) { - var stopTimer *time.Timer - delay := w.engine.Delay(w.chain, env.header) - if delay != nil { - left := *delay - w.config.DelayLeftOver - if left <= 0 { - log.Info("Not enough time for fillTransactions", "delay", *delay, "DelayLeftOver", w.config.DelayLeftOver) - return - } - stopTimer = time.NewTimer(left) - log.Debug("Time left for mining work", "left", left.String(), "leftover", w.config.DelayLeftOver) - defer stopTimer.Stop() - } - - txsCh := make(chan core.NewTxsEvent, txChanSize) - sub := w.eth.TxPool().SubscribeNewTxsEvent(txsCh) - defer sub.Unsubscribe() - +func (w *worker) fillTransactions(interrupt *int32, env *environment) (reason int) { // Split the pending transactions into locals and remotes // Fill the block with all available pending transactions. pending := w.eth.TxPool().Pending(true) @@ -1156,41 +1140,21 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment) { } if len(localTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee) - reason := w.commitTransactions(env, txs, interrupt) + reason = w.commitTransactions(env, txs, interrupt) if reason == commitTxsNewHead || reason == commitTxsNoGas || reason == commitTxsNoTime { return } } if len(remoteTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee) - reason := w.commitTransactions(env, txs, interrupt) + reason = w.commitTransactions(env, txs, interrupt) if reason == commitTxsNewHead || reason == commitTxsNoGas || reason == commitTxsNoTime { return } } - if stopTimer == nil { - return - } - for { - select { - case <-stopTimer.C: - log.Info("Not enough time for further transactions", "txs", len(env.txs)) - return - case txEv := <-txsCh: - newTxs := make(map[common.Address]types.Transactions) - for _, tx := range txEv.Txs { - acc, _ := types.Sender(env.signer, tx) - newTxs[acc] = append(newTxs[acc], tx) - } - newTxSet := types.NewTransactionsByPriceAndNonce(env.signer, newTxs, env.header.BaseFee) - reason := w.commitTransactions(env, newTxSet, interrupt) - if reason == commitTxsNewHead || reason == commitTxsNoGas || reason == commitTxsNoTime { - return - } - continue - } - } + reason = commitTxsDone + return } // generateWork generates a sealing block based on the given parameters. @@ -1220,33 +1184,87 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { } coinbase = w.coinbase // Use the preset address as the fee recipient } - work, err := w.prepareWork(&generateParams{ - timestamp: uint64(timestamp), - coinbase: coinbase, - }) - if err != nil { - return - } - // Create an empty block based on temporary copied state for - // sealing in advance without waiting block execution finished. - if !noempty && atomic.LoadUint32(&w.noempty) == 0 { - w.commit(work, nil, false, start) + var stopTimer *time.Timer + doPreSeal := !noempty && atomic.LoadUint32(&w.noempty) == 0 + // validator can try several times to get the most profitable block, + // as long as the timestamp is not reached. + workList := make([]*environment, 0, 10) + for { + work, err := w.prepareWork(&generateParams{ + timestamp: uint64(timestamp), + coinbase: coinbase, + }) + if err != nil { + return + } + // Create an empty block based on temporary copied state for + // sealing in advance without waiting block execution finished. + if doPreSeal { + doPreSeal = false + w.commit(work, nil, false, start) + } + workList = append(workList, work) + + if stopTimer == nil { + delay := w.engine.Delay(w.chain, work.header) + if delay != nil { + left := *delay - w.config.DelayLeftOver + if left <= 0 { + log.Info("Not enough time for commitWork", "delay", *delay, "DelayLeftOver", w.config.DelayLeftOver) + break + } + stopTimer = time.NewTimer(left) + defer stopTimer.Stop() + } + } + // subscribe before fillTransactions + txsCh := make(chan core.NewTxsEvent, txChanSize) + sub := w.eth.TxPool().SubscribeNewTxsEvent(txsCh) + + // Fill pending transactions from the txpool + reason := w.fillTransactions(interrupt, work) + if reason == commitTxsNewHead && w.engine.DropOnNewBlock(work.header) { + log.Info("drop the block, when new block is imported") + sub.Unsubscribe() + return + } + if reason == commitTxsNoGas || reason == commitTxsNoTime { + log.Info("commitWork, filled reason", reason) + sub.Unsubscribe() + break + } + done := false + select { + case <-stopTimer.C: + done = true + case <-txsCh: + } + sub.Unsubscribe() // not prefer to `defer sub.Unsubscribe()` + if done { + break + } } - // Fill pending transactions from the txpool - w.fillTransactions(interrupt, work) - if atomic.LoadInt32(interrupt) == commitInterruptNewHead && w.engine.DropOnNewBlock(work.header) { - log.Info("drop the block, when new block is imported") - return + // get the most profitable work + bestWork := workList[0] + bestReward := new(big.Int) + for i, w := range workList { + balance := w.state.GetBalance(consensus.SystemAddress) + log.Info("commitWork", "work", i, "balance", balance, "bestReward", bestReward) + if balance.Cmp(bestReward) > 0 { + bestWork = w + bestReward = balance + } } - w.commit(work, w.fullTaskHook, true, start) + + w.commit(bestWork, w.fullTaskHook, true, start) // Swap out the old work with the new one, terminating any leftover // prefetcher processes in the mean time and starting a new one. if w.current != nil { w.current.discard() } - w.current = work + w.current = bestWork } // commit runs any post-transaction state modifications, assembles the final block From f772cb620d597d2110c3d0385e405840d97c7154 Mon Sep 17 00:00:00 2001 From: setunapo Date: Wed, 9 Nov 2022 16:32:00 +0800 Subject: [PATCH 08/11] debug: add worker log --- consensus/parlia/parlia.go | 4 +++- miner/worker.go | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/consensus/parlia/parlia.go b/consensus/parlia/parlia.go index b320795815..30af91aa1e 100644 --- a/consensus/parlia/parlia.go +++ b/consensus/parlia/parlia.go @@ -847,7 +847,9 @@ func (p *Parlia) Seal(chain consensus.ChainHeaderReader, block *types.Block, res // Sweet, the protocol permits us to sign the block, wait for our time delay := p.delayForRamanujanFork(snap, header) - log.Info("Sealing block with", "number", number, "delay", delay, "headerDifficulty", header.Difficulty, "val", val.Hex()) + log.Info("Sealing block with", "number", number, "delay", delay, + "header.Time Second", time.Unix(int64(header.Time), 0).Second(), + "headerDifficulty", header.Difficulty, "val", val.Hex()) // Sign all the things! sig, err := signFn(accounts.Account{Address: val}, accounts.MimetypeParlia, ParliaRLP(header, p.chainConfig.ChainID)) diff --git a/miner/worker.go b/miner/worker.go index a2e768be09..84d040b04e 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -472,6 +472,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) { commit(true, commitInterruptNewHead) case head := <-w.chainHeadCh: + log.Info("newWorkLoop chainHeadCh", "block number", head.Block.NumberU64()) if !w.isRunning() { continue } @@ -559,6 +560,7 @@ func (w *worker) mainLoop() { for { select { case req := <-w.newWorkCh: + log.Info("mainLoop newWorkCh") w.commitWork(req.interrupt, req.noempty, req.timestamp) case req := <-w.getWorkCh: @@ -1230,7 +1232,7 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { return } if reason == commitTxsNoGas || reason == commitTxsNoTime { - log.Info("commitWork, filled reason", reason) + log.Info("commitWork", "fill done, reason", reason) sub.Unsubscribe() break } From b90caf520b4c6f932b64b95dc446b95b5dc2e892 Mon Sep 17 00:00:00 2001 From: setunapo Date: Wed, 9 Nov 2022 16:58:51 +0800 Subject: [PATCH 09/11] debug: more worker log --- miner/worker.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/miner/worker.go b/miner/worker.go index 84d040b04e..1db418dcaa 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -1186,7 +1186,6 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { } coinbase = w.coinbase // Use the preset address as the fee recipient } - var stopTimer *time.Timer doPreSeal := !noempty && atomic.LoadUint32(&w.noempty) == 0 // validator can try several times to get the most profitable block, @@ -1200,6 +1199,7 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { if err != nil { return } + log.Info("commitWork for", "block", work.header.Number, "timestamp", time.Unix(int64(timestamp), 0).Second) // Create an empty block based on temporary copied state for // sealing in advance without waiting block execution finished. if doPreSeal { @@ -1216,6 +1216,7 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { log.Info("Not enough time for commitWork", "delay", *delay, "DelayLeftOver", w.config.DelayLeftOver) break } + log.Info("commitWork stopTimer", "delay", *delay, "DelayLeftOver", w.config.DelayLeftOver) stopTimer = time.NewTimer(left) defer stopTimer.Stop() } @@ -1239,8 +1240,10 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { done := false select { case <-stopTimer.C: + log.Info("commitWork stopTimer expired") done = true case <-txsCh: + log.Info("commitWork txsCh arrived") } sub.Unsubscribe() // not prefer to `defer sub.Unsubscribe()` if done { @@ -1252,7 +1255,7 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { bestReward := new(big.Int) for i, w := range workList { balance := w.state.GetBalance(consensus.SystemAddress) - log.Info("commitWork", "work", i, "balance", balance, "bestReward", bestReward) + log.Info("Get the best work", "index", i, "balance", balance, "bestReward", bestReward) if balance.Cmp(bestReward) > 0 { bestWork = w bestReward = balance From d36469d52d879147b4e5ffea5ff1cd6638694d28 Mon Sep 17 00:00:00 2001 From: setunapo Date: Wed, 9 Nov 2022 17:50:45 +0800 Subject: [PATCH 10/11] fair: interruptChan and debug log --- miner/worker.go | 35 ++++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/miner/worker.go b/miner/worker.go index 1db418dcaa..3af1c54c3b 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -173,9 +173,10 @@ const ( // newWorkReq represents a request for new sealing work submitting with relative interrupt notifier. type newWorkReq struct { - interrupt *int32 - noempty bool - timestamp int64 + interrupt *int32 + interruptChan chan struct{} + noempty bool + timestamp int64 } // getWorkReq represents a request for getting a new sealing work with provided parameters. @@ -430,9 +431,10 @@ func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) t func (w *worker) newWorkLoop(recommit time.Duration) { defer w.wg.Done() var ( - interrupt *int32 - minRecommit = recommit // minimal resubmit interval specified by user. - timestamp int64 // timestamp for each round of sealing. + interrupt *int32 + interruptChan = make(chan struct{}) + minRecommit = recommit // minimal resubmit interval specified by user. + timestamp int64 // timestamp for each round of sealing. ) timer := time.NewTimer(0) @@ -443,10 +445,12 @@ func (w *worker) newWorkLoop(recommit time.Duration) { commit := func(noempty bool, s int32) { if interrupt != nil { atomic.StoreInt32(interrupt, s) + close(interruptChan) } interrupt = new(int32) + interruptChan = make(chan struct{}) select { - case w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp}: + case w.newWorkCh <- &newWorkReq{interrupt: interrupt, interruptChan: interruptChan, noempty: noempty, timestamp: timestamp}: case <-w.exitCh: return } @@ -561,7 +565,7 @@ func (w *worker) mainLoop() { select { case req := <-w.newWorkCh: log.Info("mainLoop newWorkCh") - w.commitWork(req.interrupt, req.noempty, req.timestamp) + w.commitWork(req.interrupt, req.interruptChan, req.noempty, req.timestamp) case req := <-w.getWorkCh: block, err := w.generateWork(req.params) @@ -645,7 +649,7 @@ func (w *worker) mainLoop() { // by clique. Of course the advance sealing(empty submission) is disabled. if (w.chainConfig.Clique != nil && w.chainConfig.Clique.Period == 0) || (w.chainConfig.Parlia != nil && w.chainConfig.Parlia.Period == 0) { - w.commitWork(nil, true, time.Now().Unix()) + w.commitWork(nil, nil, true, time.Now().Unix()) } } atomic.AddInt32(&w.newTxs, int32(len(ev.Txs))) @@ -1174,7 +1178,7 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) { // commitWork generates several new sealing tasks based on the parent block // and submit them to the sealer. -func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { +func (w *worker) commitWork(interrupt *int32, interruptChan chan struct{}, noempty bool, timestamp int64) { start := time.Now() // Set the coinbase if the worker is running or it's required @@ -1199,7 +1203,7 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { if err != nil { return } - log.Info("commitWork for", "block", work.header.Number, "timestamp", time.Unix(int64(timestamp), 0).Second) + log.Info("commitWork for", "block", work.header.Number, "timestamp", time.Unix(int64(timestamp), 0).Second()) // Create an empty block based on temporary copied state for // sealing in advance without waiting block execution finished. if doPreSeal { @@ -1237,6 +1241,11 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { sub.Unsubscribe() break } + if interruptChan == nil { + log.Info("commitWork interruptChan is nil") + sub.Unsubscribe() // not prefer to `defer sub.Unsubscribe()` + break + } done := false select { case <-stopTimer.C: @@ -1244,6 +1253,10 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { done = true case <-txsCh: log.Info("commitWork txsCh arrived") + case <-interruptChan: + log.Info("commitWork interruptChan closed, new block imported") + sub.Unsubscribe() // not prefer to `defer sub.Unsubscribe()` + return } sub.Unsubscribe() // not prefer to `defer sub.Unsubscribe()` if done { From c458e055a5aca80d2f7a7c0088555548c745074e Mon Sep 17 00:00:00 2001 From: setunapo Date: Wed, 9 Nov 2022 18:38:04 +0800 Subject: [PATCH 11/11] fair: less debug log --- miner/worker.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/miner/worker.go b/miner/worker.go index 3af1c54c3b..737db89f9a 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -1203,7 +1203,7 @@ func (w *worker) commitWork(interrupt *int32, interruptChan chan struct{}, noemp if err != nil { return } - log.Info("commitWork for", "block", work.header.Number, "timestamp", time.Unix(int64(timestamp), 0).Second()) + // log.Info("commitWork for", "block", work.header.Number, "timestamp", time.Unix(int64(timestamp), 0).Second()) // Create an empty block based on temporary copied state for // sealing in advance without waiting block execution finished. if doPreSeal { @@ -1249,10 +1249,10 @@ func (w *worker) commitWork(interrupt *int32, interruptChan chan struct{}, noemp done := false select { case <-stopTimer.C: - log.Info("commitWork stopTimer expired") + // log.Info("commitWork stopTimer expired") done = true case <-txsCh: - log.Info("commitWork txsCh arrived") + // log.Info("commitWork txsCh arrived") case <-interruptChan: log.Info("commitWork interruptChan closed, new block imported") sub.Unsubscribe() // not prefer to `defer sub.Unsubscribe()` @@ -1268,7 +1268,7 @@ func (w *worker) commitWork(interrupt *int32, interruptChan chan struct{}, noemp bestReward := new(big.Int) for i, w := range workList { balance := w.state.GetBalance(consensus.SystemAddress) - log.Info("Get the best work", "index", i, "balance", balance, "bestReward", bestReward) + log.Debug("Get the best work", "index", i, "balance", balance, "bestReward", bestReward) if balance.Cmp(bestReward) > 0 { bestWork = w bestReward = balance