diff --git a/consensus/beacon/consensus.go b/consensus/beacon/consensus.go index cd493f3d65..ac6602f170 100644 --- a/consensus/beacon/consensus.go +++ b/consensus/beacon/consensus.go @@ -363,6 +363,10 @@ func (beacon *Beacon) SetThreads(threads int) { } } +func (p *Beacon) DropOnNewBlock(*types.Header) bool { + return true +} + // IsTTDReached checks if the TotalTerminalDifficulty has been surpassed on the `parentHash` block. // It depends on the parentHash already being stored in the database. // If the parentHash is not stored in the database a UnknownAncestor error is returned. diff --git a/consensus/clique/clique.go b/consensus/clique/clique.go index 75ed916a86..c61608f754 100644 --- a/consensus/clique/clique.go +++ b/consensus/clique/clique.go @@ -705,6 +705,11 @@ func (c *Clique) APIs(chain consensus.ChainHeaderReader) []rpc.API { }} } +func (p *Clique) DropOnNewBlock(header *types.Header) bool { + // drop the block if it is not in turn. + return header.Difficulty.Cmp(diffNoTurn) == 0 +} + // SealHash returns the hash of a block prior to it being sealed. func SealHash(header *types.Header) (hash common.Hash) { hasher := sha3.NewLegacyKeccak256() diff --git a/consensus/consensus.go b/consensus/consensus.go index c3e7b4870a..74cbe84f54 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -130,6 +130,12 @@ type Engine interface { // Close terminates any background threads maintained by the consensus engine. Close() error + + // DropOnNewBlock determine the action of mining when it is interrupted by new imported block. + // Return + // true: the mining result will be dropped + // false: the mining result will be kept and move on to the next mine step. + DropOnNewBlock(header *types.Header) bool } // PoW is a consensus engine based on proof-of-work. diff --git a/consensus/ethash/consensus.go b/consensus/ethash/consensus.go index be6085c713..d606913419 100644 --- a/consensus/ethash/consensus.go +++ b/consensus/ethash/consensus.go @@ -647,6 +647,10 @@ var ( big32 = big.NewInt(32) ) +func (p *Ethash) DropOnNewBlock(*types.Header) bool { + return true +} + // AccumulateRewards credits the coinbase of the given block with the mining // reward. The total reward consists of the static block reward and rewards for // included uncles. The coinbase of each uncle block is also rewarded. diff --git a/consensus/parlia/parlia.go b/consensus/parlia/parlia.go index fee0fe1293..30af91aa1e 100644 --- a/consensus/parlia/parlia.go +++ b/consensus/parlia/parlia.go @@ -800,11 +800,6 @@ func (p *Parlia) Delay(chain consensus.ChainReader, header *types.Header) *time. return nil } delay := p.delayForRamanujanFork(snap, header) - // The blocking time should be no more than half of period - half := time.Duration(p.config.Period) * time.Second / 2 - if delay > half { - delay = half - } return &delay } @@ -852,7 +847,9 @@ func (p *Parlia) Seal(chain consensus.ChainHeaderReader, block *types.Block, res // Sweet, the protocol permits us to sign the block, wait for our time delay := p.delayForRamanujanFork(snap, header) - log.Info("Sealing block with", "number", number, "delay", delay, "headerDifficulty", header.Difficulty, "val", val.Hex()) + log.Info("Sealing block with", "number", number, "delay", delay, + "header.Time Second", time.Unix(int64(header.Time), 0).Second(), + "headerDifficulty", header.Difficulty, "val", val.Hex()) // Sign all the things! sig, err := signFn(accounts.Account{Address: val}, accounts.MimetypeParlia, ParliaRLP(header, p.chainConfig.ChainID)) @@ -964,6 +961,11 @@ func (p *Parlia) CalcDifficulty(chain consensus.ChainHeaderReader, time uint64, return CalcDifficulty(snap, p.val) } +func (p *Parlia) DropOnNewBlock(header *types.Header) bool { + // drop the block if it is not in turn. + return header.Difficulty.Cmp(diffNoTurn) == 0 +} + // CalcDifficulty is the difficulty adjustment algorithm. It returns the difficulty // that a new block should have based on the previous blocks in the chain and the // current signer. diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 03104c6109..85b748b289 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -89,7 +89,7 @@ var Defaults = Config{ GasCeil: 8000000, GasPrice: big.NewInt(params.GWei), Recommit: 3 * time.Second, - DelayLeftOver: 50 * time.Millisecond, + DelayLeftOver: 400 * time.Millisecond, }, TxPool: core.DefaultTxPoolConfig, RPCGasCap: 50000000, diff --git a/miner/worker.go b/miner/worker.go index dca827f6d6..737db89f9a 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -19,6 +19,7 @@ package miner import ( "errors" "fmt" + "math/big" "sync" "sync/atomic" "time" @@ -77,6 +78,13 @@ const ( // staleThreshold is the maximum depth of the acceptable stale block. staleThreshold = 11 + + // the reason that commitTransactions returned. + commitTxsDone = 0 + commitTxsNoTime = 1 + commitTxsNoGas = 2 + commitTxsNewHead = 3 + commitTxsResubmit = 4 ) var ( @@ -165,9 +173,10 @@ const ( // newWorkReq represents a request for new sealing work submitting with relative interrupt notifier. type newWorkReq struct { - interrupt *int32 - noempty bool - timestamp int64 + interrupt *int32 + interruptChan chan struct{} + noempty bool + timestamp int64 } // getWorkReq represents a request for getting a new sealing work with provided parameters. @@ -422,9 +431,10 @@ func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) t func (w *worker) newWorkLoop(recommit time.Duration) { defer w.wg.Done() var ( - interrupt *int32 - minRecommit = recommit // minimal resubmit interval specified by user. - timestamp int64 // timestamp for each round of sealing. + interrupt *int32 + interruptChan = make(chan struct{}) + minRecommit = recommit // minimal resubmit interval specified by user. + timestamp int64 // timestamp for each round of sealing. ) timer := time.NewTimer(0) @@ -435,10 +445,12 @@ func (w *worker) newWorkLoop(recommit time.Duration) { commit := func(noempty bool, s int32) { if interrupt != nil { atomic.StoreInt32(interrupt, s) + close(interruptChan) } interrupt = new(int32) + interruptChan = make(chan struct{}) select { - case w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp}: + case w.newWorkCh <- &newWorkReq{interrupt: interrupt, interruptChan: interruptChan, noempty: noempty, timestamp: timestamp}: case <-w.exitCh: return } @@ -464,6 +476,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) { commit(true, commitInterruptNewHead) case head := <-w.chainHeadCh: + log.Info("newWorkLoop chainHeadCh", "block number", head.Block.NumberU64()) if !w.isRunning() { continue } @@ -551,7 +564,8 @@ func (w *worker) mainLoop() { for { select { case req := <-w.newWorkCh: - w.commitWork(req.interrupt, req.noempty, req.timestamp) + log.Info("mainLoop newWorkCh") + w.commitWork(req.interrupt, req.interruptChan, req.noempty, req.timestamp) case req := <-w.getWorkCh: block, err := w.generateWork(req.params) @@ -635,7 +649,7 @@ func (w *worker) mainLoop() { // by clique. Of course the advance sealing(empty submission) is disabled. if (w.chainConfig.Clique != nil && w.chainConfig.Clique.Period == 0) || (w.chainConfig.Parlia != nil && w.chainConfig.Parlia.Period == 0) { - w.commitWork(nil, true, time.Now().Unix()) + w.commitWork(nil, nil, true, time.Now().Unix()) } } atomic.AddInt32(&w.newTxs, int32(len(ev.Txs))) @@ -858,7 +872,8 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction, rece return receipt.Logs, nil } -func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32) bool { +func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32) (reason int) { + reason = commitTxsDone gasLimit := env.header.GasLimit if env.gasPool == nil { env.gasPool = new(core.GasPool).AddGas(gasLimit) @@ -869,12 +884,17 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP } } - var coalescedLogs []*types.Log var stopTimer *time.Timer delay := w.engine.Delay(w.chain, env.header) if delay != nil { - stopTimer = time.NewTimer(*delay - w.config.DelayLeftOver) - log.Debug("Time left for mining work", "left", (*delay - w.config.DelayLeftOver).String(), "leftover", w.config.DelayLeftOver) + left := *delay - w.config.DelayLeftOver + if left <= 0 { + log.Info("Not enough time for further commitTransactions", "delay", *delay, "DelayLeftOver", w.config.DelayLeftOver) + reason = commitTxsNoTime + return + } + stopTimer = time.NewTimer(left) + log.Debug("Time left for commitTransactions", "left", left.String(), "leftover", w.config.DelayLeftOver) defer stopTimer.Stop() } // initilise bloom processors @@ -892,6 +912,7 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP txCurr := &tx w.prefetcher.PrefetchMining(txsPrefetch, env.header, env.gasPool.Gas(), env.state.CopyDoPrefetch(), *w.chain.GetVMConfig(), interruptCh, txCurr) + var coalescedLogs []*types.Log LOOP: for { // In the following three cases, we will interrupt the execution of the transaction. @@ -911,18 +932,24 @@ LOOP: ratio: ratio, inc: true, } + reason = commitTxsResubmit } - return atomic.LoadInt32(interrupt) == commitInterruptNewHead + if atomic.LoadInt32(interrupt) == commitInterruptNewHead { + reason = commitTxsNewHead + } + return } // If we don't have enough gas for any further transactions then we're done if env.gasPool.Gas() < params.TxGas { log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas) + reason = commitTxsNoGas break } if stopTimer != nil { select { case <-stopTimer.C: log.Info("Not enough time for further transactions", "txs", len(env.txs)) + reason = commitTxsNoTime break LOOP default: } @@ -1003,7 +1030,7 @@ LOOP: if interrupt != nil { w.resubmitAdjustCh <- &intervalAdjust{inc: false} } - return false + return } // generateParams wraps various of settings for generating sealing task. @@ -1106,7 +1133,7 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) { // fillTransactions retrieves the pending transactions from the txpool and fills them // into the given sealing block. The transaction selection and ordering strategy can // be customized with the plugin in the future. -func (w *worker) fillTransactions(interrupt *int32, env *environment) { +func (w *worker) fillTransactions(interrupt *int32, env *environment) (reason int) { // Split the pending transactions into locals and remotes // Fill the block with all available pending transactions. pending := w.eth.TxPool().Pending(true) @@ -1119,16 +1146,21 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment) { } if len(localTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee) - if w.commitTransactions(env, txs, interrupt) { + reason = w.commitTransactions(env, txs, interrupt) + if reason == commitTxsNewHead || reason == commitTxsNoGas || reason == commitTxsNoTime { return } } if len(remoteTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee) - if w.commitTransactions(env, txs, interrupt) { + reason = w.commitTransactions(env, txs, interrupt) + if reason == commitTxsNewHead || reason == commitTxsNoGas || reason == commitTxsNoTime { return } } + + reason = commitTxsDone + return } // generateWork generates a sealing block based on the given parameters. @@ -1146,7 +1178,7 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) { // commitWork generates several new sealing tasks based on the parent block // and submit them to the sealer. -func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { +func (w *worker) commitWork(interrupt *int32, interruptChan chan struct{}, noempty bool, timestamp int64) { start := time.Now() // Set the coinbase if the worker is running or it's required @@ -1158,29 +1190,99 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { } coinbase = w.coinbase // Use the preset address as the fee recipient } - work, err := w.prepareWork(&generateParams{ - timestamp: uint64(timestamp), - coinbase: coinbase, - }) - if err != nil { - return + var stopTimer *time.Timer + doPreSeal := !noempty && atomic.LoadUint32(&w.noempty) == 0 + // validator can try several times to get the most profitable block, + // as long as the timestamp is not reached. + workList := make([]*environment, 0, 10) + for { + work, err := w.prepareWork(&generateParams{ + timestamp: uint64(timestamp), + coinbase: coinbase, + }) + if err != nil { + return + } + // log.Info("commitWork for", "block", work.header.Number, "timestamp", time.Unix(int64(timestamp), 0).Second()) + // Create an empty block based on temporary copied state for + // sealing in advance without waiting block execution finished. + if doPreSeal { + doPreSeal = false + w.commit(work, nil, false, start) + } + workList = append(workList, work) + + if stopTimer == nil { + delay := w.engine.Delay(w.chain, work.header) + if delay != nil { + left := *delay - w.config.DelayLeftOver + if left <= 0 { + log.Info("Not enough time for commitWork", "delay", *delay, "DelayLeftOver", w.config.DelayLeftOver) + break + } + log.Info("commitWork stopTimer", "delay", *delay, "DelayLeftOver", w.config.DelayLeftOver) + stopTimer = time.NewTimer(left) + defer stopTimer.Stop() + } + } + // subscribe before fillTransactions + txsCh := make(chan core.NewTxsEvent, txChanSize) + sub := w.eth.TxPool().SubscribeNewTxsEvent(txsCh) + + // Fill pending transactions from the txpool + reason := w.fillTransactions(interrupt, work) + if reason == commitTxsNewHead && w.engine.DropOnNewBlock(work.header) { + log.Info("drop the block, when new block is imported") + sub.Unsubscribe() + return + } + if reason == commitTxsNoGas || reason == commitTxsNoTime { + log.Info("commitWork", "fill done, reason", reason) + sub.Unsubscribe() + break + } + if interruptChan == nil { + log.Info("commitWork interruptChan is nil") + sub.Unsubscribe() // not prefer to `defer sub.Unsubscribe()` + break + } + done := false + select { + case <-stopTimer.C: + // log.Info("commitWork stopTimer expired") + done = true + case <-txsCh: + // log.Info("commitWork txsCh arrived") + case <-interruptChan: + log.Info("commitWork interruptChan closed, new block imported") + sub.Unsubscribe() // not prefer to `defer sub.Unsubscribe()` + return + } + sub.Unsubscribe() // not prefer to `defer sub.Unsubscribe()` + if done { + break + } } - - // Create an empty block based on temporary copied state for - // sealing in advance without waiting block execution finished. - if !noempty && atomic.LoadUint32(&w.noempty) == 0 { - w.commit(work, nil, false, start) + // get the most profitable work + bestWork := workList[0] + bestReward := new(big.Int) + for i, w := range workList { + balance := w.state.GetBalance(consensus.SystemAddress) + log.Debug("Get the best work", "index", i, "balance", balance, "bestReward", bestReward) + if balance.Cmp(bestReward) > 0 { + bestWork = w + bestReward = balance + } } - // Fill pending transactions from the txpool - w.fillTransactions(interrupt, work) - w.commit(work, w.fullTaskHook, true, start) + + w.commit(bestWork, w.fullTaskHook, true, start) // Swap out the old work with the new one, terminating any leftover // prefetcher processes in the mean time and starting a new one. if w.current != nil { w.current.discard() } - w.current = work + w.current = bestWork } // commit runs any post-transaction state modifications, assembles the final block