From 6239e0fdba33f54fcdfbba654c61ae936d5bbab4 Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Wed, 21 Jul 2021 14:12:39 +0800 Subject: [PATCH] eth, miner: remove duplicated code --- eth/catalyst/api.go | 148 +------------ miner/miner.go | 6 + miner/worker.go | 504 ++++++++++++++++++++++++++----------------- miner/worker_test.go | 104 +++++++++ 4 files changed, 415 insertions(+), 347 deletions(-) diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index 2622c4a148f6b..0c2bcf6bf5da2 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -21,12 +21,9 @@ import ( "errors" "fmt" "math/big" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/misc" - "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/log" @@ -65,153 +62,11 @@ func newConsensusAPI(eth *eth.Ethereum) *consensusAPI { return &consensusAPI{eth: eth} } -// blockExecutionEnv gathers all the data required to execute -// a block, either when assembling it or when inserting it. -type blockExecutionEnv struct { - chain *core.BlockChain - state *state.StateDB - tcount int - gasPool *core.GasPool - - header *types.Header - txs []*types.Transaction - receipts []*types.Receipt -} - -func (env *blockExecutionEnv) commitTransaction(tx *types.Transaction, coinbase common.Address) error { - vmconfig := *env.chain.GetVMConfig() - snap := env.state.Snapshot() - receipt, err := core.ApplyTransaction(env.chain.Config(), env.chain, &coinbase, env.gasPool, env.state, env.header, tx, &env.header.GasUsed, vmconfig) - if err != nil { - env.state.RevertToSnapshot(snap) - return err - } - env.txs = append(env.txs, tx) - env.receipts = append(env.receipts, receipt) - return nil -} - -func (api *consensusAPI) makeEnv(parent *types.Block, header *types.Header) (*blockExecutionEnv, error) { - state, err := api.eth.BlockChain().StateAt(parent.Root()) - if err != nil { - return nil, err - } - env := &blockExecutionEnv{ - chain: api.eth.BlockChain(), - state: state, - header: header, - gasPool: new(core.GasPool).AddGas(header.GasLimit), - } - return env, nil -} - // AssembleBlock creates a new block, inserts it into the chain, and returns the "execution // data" required for eth2 clients to process the new block. func (api *consensusAPI) AssembleBlock(params assembleBlockParams) (*executableData, error) { log.Info("Producing block", "parentHash", params.ParentHash) - - bc := api.eth.BlockChain() - parent := bc.GetBlockByHash(params.ParentHash) - if parent == nil { - log.Warn("Cannot assemble block with parent hash to unknown block", "parentHash", params.ParentHash) - return nil, fmt.Errorf("cannot assemble block with unknown parent %s", params.ParentHash) - } - - pool := api.eth.TxPool() - - if parent.Time() >= params.Timestamp { - return nil, fmt.Errorf("child timestamp lower than parent's: %d >= %d", parent.Time(), params.Timestamp) - } - if now := uint64(time.Now().Unix()); params.Timestamp > now+1 { - wait := time.Duration(params.Timestamp-now) * time.Second - log.Info("Producing block too far in the future", "wait", common.PrettyDuration(wait)) - time.Sleep(wait) - } - - pending, err := pool.Pending(true) - if err != nil { - return nil, err - } - - coinbase, err := api.eth.Etherbase() - if err != nil { - return nil, err - } - num := parent.Number() - header := &types.Header{ - ParentHash: parent.Hash(), - Number: num.Add(num, common.Big1), - Coinbase: coinbase, - GasLimit: parent.GasLimit(), // Keep the gas limit constant in this prototype - Extra: []byte{}, - Time: params.Timestamp, - } - if config := api.eth.BlockChain().Config(); config.IsLondon(header.Number) { - header.BaseFee = misc.CalcBaseFee(config, parent.Header()) - } - err = api.eth.Engine().Prepare(bc, header) - if err != nil { - return nil, err - } - - env, err := api.makeEnv(parent, header) - if err != nil { - return nil, err - } - - var ( - signer = types.MakeSigner(bc.Config(), header.Number) - txHeap = types.NewTransactionsByPriceAndNonce(signer, pending, nil) - transactions []*types.Transaction - ) - for { - if env.gasPool.Gas() < chainParams.TxGas { - log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", chainParams.TxGas) - break - } - tx := txHeap.Peek() - if tx == nil { - break - } - - // The sender is only for logging purposes, and it doesn't really matter if it's correct. - from, _ := types.Sender(signer, tx) - - // Execute the transaction - env.state.Prepare(tx.Hash(), env.tcount) - err = env.commitTransaction(tx, coinbase) - switch err { - case core.ErrGasLimitReached: - // Pop the current out-of-gas transaction without shifting in the next from the account - log.Trace("Gas limit exceeded for current block", "sender", from) - txHeap.Pop() - - case core.ErrNonceTooLow: - // New head notification data race between the transaction pool and miner, shift - log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce()) - txHeap.Shift() - - case core.ErrNonceTooHigh: - // Reorg notification data race between the transaction pool and miner, skip account = - log.Trace("Skipping account with high nonce", "sender", from, "nonce", tx.Nonce()) - txHeap.Pop() - - case nil: - // Everything ok, collect the logs and shift in the next transaction from the same account - env.tcount++ - txHeap.Shift() - transactions = append(transactions, tx) - - default: - // Strange error, discard the transaction and get the next in line (note, the - // nonce-too-high clause will prevent us from executing in vain). - log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err) - txHeap.Shift() - } - } - - // Create the block. - block, err := api.eth.Engine().FinalizeAndAssemble(bc, header, env.state, transactions, nil /* uncles */, env.receipts) + block, err := api.eth.Miner().GetSealingBlock(params.ParentHash, params.Timestamp) if err != nil { return nil, err } @@ -255,7 +110,6 @@ func insertBlockParamsToBlock(config *chainParams.ChainConfig, parent *types.Hea if err != nil { return nil, err } - number := big.NewInt(0) number.SetUint64(params.Number) header := &types.Header{ diff --git a/miner/miner.go b/miner/miner.go index a4a01b9f4ff70..87e809f34f124 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -227,6 +227,12 @@ func (miner *Miner) DisablePreseal() { miner.worker.disablePreseal() } +// GetSealingBlock retrieves a sealing block based on the given parameters. +// The returned block is not sealed but all other fields should be filled. +func (miner *Miner) GetSealingBlock(parent common.Hash, timestamp uint64) (*types.Block, error) { + return miner.worker.getSealingBlock(parent, timestamp) +} + // SubscribePendingLogs starts delivering logs from pending transactions // to the given channel. func (miner *Miner) SubscribePendingLogs(ch chan<- []*types.Log) event.Subscription { diff --git a/miner/worker.go b/miner/worker.go index 8bdb1eff7ca02..8ce98ed91d1df 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -17,8 +17,8 @@ package miner import ( - "bytes" "errors" + "fmt" "math/big" "sync" "sync/atomic" @@ -54,14 +54,14 @@ const ( // resubmitAdjustChanSize is the size of resubmitting interval adjustment channel. resubmitAdjustChanSize = 10 - // miningLogAtDepth is the number of confirmations before logging successful mining. - miningLogAtDepth = 7 + // sealingLogAtDepth is the number of confirmations before logging successful sealing. + sealingLogAtDepth = 7 - // minRecommitInterval is the minimal time interval to recreate the mining block with + // minRecommitInterval is the minimal time interval to recreate the sealing block with // any newly arrived transactions. minRecommitInterval = 1 * time.Second - // maxRecommitInterval is the maximum time interval to recreate the mining block with + // maxRecommitInterval is the maximum time interval to recreate the sealing block with // any newly arrived transactions. maxRecommitInterval = 15 * time.Second @@ -77,20 +77,65 @@ const ( staleThreshold = 7 ) -// environment is the worker's current environment and holds all of the current state information. +// environment is the worker's current environment and holds all +// information of the sealing block generation. type environment struct { signer types.Signer state *state.StateDB // apply state changes here ancestors mapset.Set // ancestor set (used for checking uncle parent validity) family mapset.Set // family set (used for checking uncle invalidity) - uncles mapset.Set // uncle set tcount int // tx count in cycle gasPool *core.GasPool // available gas used to pack transactions header *types.Header txs []*types.Transaction receipts []*types.Receipt + uncles map[common.Hash]*types.Header +} + +// copy creates a deep copy of environment. +func (env *environment) copy() *environment { + cpy := &environment{ + signer: env.signer, + state: env.state.Copy(), + ancestors: env.ancestors.Clone(), + family: env.family.Clone(), + tcount: env.tcount, + header: types.CopyHeader(env.header), + receipts: copyReceipts(env.receipts), + } + if env.gasPool != nil { + cpy.gasPool = &(*env.gasPool) + } + // The content of txs and uncles are immutable, unnecessary + // to do the expensive deep copy for them. + cpy.txs = make([]*types.Transaction, len(env.txs)) + copy(cpy.txs, env.txs) + cpy.uncles = make(map[common.Hash]*types.Header) + for hash, uncle := range env.uncles { + cpy.uncles[hash] = uncle + } + return cpy +} + +// unclelist returns the contained uncles as the list format. +func (env *environment) unclelist() []*types.Header { + var uncles []*types.Header + for _, uncle := range env.uncles { + uncles = append(uncles, uncle) + } + return uncles +} + +// discard terminates the background prefetcher go-routine. It should +// always be called for all created environment instances otherwise +// the go-routine leak can happen. +func (env *environment) discard() { + if env.state == nil { + return + } + env.state.StopPrefetcher() } // task contains all information for consensus engine sealing and result submitting. @@ -114,6 +159,13 @@ type newWorkReq struct { timestamp int64 } +// getWorkReq represents a request for getting a new sealing work with provided parameters. +type getWorkReq struct { + params *generateParams + err error + result chan *types.Block +} + // intervalAdjust represents a resubmitting interval adjustment. type intervalAdjust struct { ratio float64 @@ -143,6 +195,7 @@ type worker struct { // Channels newWorkCh chan *newWorkReq + getWorkCh chan *getWorkReq taskCh chan *task resultCh chan *types.Block startCh chan struct{} @@ -199,12 +252,13 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus isLocalBlock: isLocalBlock, localUncles: make(map[common.Hash]*types.Block), remoteUncles: make(map[common.Hash]*types.Block), - unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth), + unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), sealingLogAtDepth), pendingTasks: make(map[common.Hash]*task), txsCh: make(chan core.NewTxsEvent, txChanSize), chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), newWorkCh: make(chan *newWorkReq), + getWorkCh: make(chan *getWorkReq), taskCh: make(chan *task), resultCh: make(chan *types.Block, resultQueueSize), exitCh: make(chan struct{}), @@ -259,15 +313,18 @@ func (w *worker) setExtra(extra []byte) { // setRecommitInterval updates the interval for miner sealing work recommitting. func (w *worker) setRecommitInterval(interval time.Duration) { - w.resubmitIntervalCh <- interval + select { + case w.resubmitIntervalCh <- interval: + case <-w.exitCh: + } } -// disablePreseal disables pre-sealing mining feature +// disablePreseal disables pre-sealing feature func (w *worker) disablePreseal() { atomic.StoreUint32(&w.noempty, 1) } -// enablePreseal enables pre-sealing mining feature +// enablePreseal enables pre-sealing feature func (w *worker) enablePreseal() { atomic.StoreUint32(&w.noempty, 0) } @@ -318,8 +375,8 @@ func (w *worker) isRunning() bool { // close terminates all background threads maintained by the worker. // Note the worker does not support being closed multiple times. func (w *worker) close() { - if w.current != nil && w.current.state != nil { - w.current.state.StopPrefetcher() + if w.current != nil { + w.current.discard() } atomic.StoreInt32(&w.running, 0) close(w.exitCh) @@ -347,12 +404,12 @@ func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) t return time.Duration(int64(next)) } -// newWorkLoop is a standalone goroutine to submit new mining work upon received events. +// newWorkLoop is a standalone goroutine to submit new sealing work upon received events. func (w *worker) newWorkLoop(recommit time.Duration) { var ( interrupt *int32 minRecommit = recommit // minimal resubmit interval specified by user. - timestamp int64 // timestamp for each round of mining. + timestamp int64 // timestamp for each round of sealing. ) timer := time.NewTimer(0) @@ -397,7 +454,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) { commit(false, commitInterruptNewHead) case <-timer.C: - // If mining is running resubmit a new work cycle periodically to pull in + // If sealing is running resubmit a new work cycle periodically to pull in // higher priced transactions. Disable this overhead for pending blocks. if w.isRunning() && (w.chainConfig.Clique == nil || w.chainConfig.Clique.Period > 0) { // Short circuit if no new transaction arrives. @@ -444,16 +501,30 @@ func (w *worker) newWorkLoop(recommit time.Duration) { } } -// mainLoop is a standalone goroutine to regenerate the sealing task based on the received event. +// mainLoop is responsible for generating and submitting sealing work based on +// the received event. It can support two modes: automatically generate task and +// submit it or return task according to given parameters for various proposes. func (w *worker) mainLoop() { defer w.txsSub.Unsubscribe() defer w.chainHeadSub.Unsubscribe() defer w.chainSideSub.Unsubscribe() + cleanTicker := time.NewTicker(time.Second * 10) + defer cleanTicker.Stop() + for { select { case req := <-w.newWorkCh: - w.commitNewWork(req.interrupt, req.noempty, req.timestamp) + w.commitWork(req.interrupt, req.noempty, req.timestamp) + + case req := <-w.getWorkCh: + block, err := w.generateWork(req.params) + if err != nil { + req.err = err + req.result <- nil + } else { + req.result <- block + } case ev := <-w.chainSideCh: // Short circuit for duplicate side blocks @@ -469,36 +540,34 @@ func (w *worker) mainLoop() { } else { w.remoteUncles[ev.Block.Hash()] = ev.Block } - // If our mining block contains less than 2 uncle blocks, - // add the new uncle block if valid and regenerate a mining block. - if w.isRunning() && w.current != nil && w.current.uncles.Cardinality() < 2 { + // If our sealing block contains less than 2 uncle blocks, + // add the new uncle block if valid and regenerate a new + // sealing block for higher profit. + if w.isRunning() && w.current != nil && len(w.current.uncles) < 2 { start := time.Now() if err := w.commitUncle(w.current, ev.Block.Header()); err == nil { - var uncles []*types.Header - w.current.uncles.Each(func(item interface{}) bool { - hash, ok := item.(common.Hash) - if !ok { - return false - } - uncle, exist := w.localUncles[hash] - if !exist { - uncle, exist = w.remoteUncles[hash] - } - if !exist { - return false - } - uncles = append(uncles, uncle.Header()) - return false - }) - w.commit(uncles, nil, true, start) + w.commit(w.current.copy(), nil, true, start) + } + } + + case <-cleanTicker.C: + chainHead := w.chain.CurrentBlock() + for hash, uncle := range w.localUncles { + if uncle.NumberU64()+staleThreshold <= chainHead.NumberU64() { + delete(w.localUncles, hash) + } + } + for hash, uncle := range w.remoteUncles { + if uncle.NumberU64()+staleThreshold <= chainHead.NumberU64() { + delete(w.remoteUncles, hash) } } case ev := <-w.txsCh: - // Apply transactions to the pending state if we're not mining. + // Apply transactions to the pending state if we're not sealing // // Note all transactions received may not be continuous with transactions - // already included in the current mining block. These transactions will + // already included in the current sealing block. These transactions will // be automatically eliminated. if !w.isRunning() && w.current != nil { // If block is already full, abort @@ -516,18 +585,18 @@ func (w *worker) mainLoop() { } txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee) tcount := w.current.tcount - w.commitTransactions(txset, coinbase, nil) + w.commitTransactions(w.current, txset, coinbase, nil) // Only update the snapshot if any new transactons were added // to the pending block if tcount != w.current.tcount { - w.updateSnapshot() + w.updateSnapshot(w.current) } } else { // Special case, if the consensus engine is 0 period clique(dev mode), - // submit mining work here since all empty submission will be rejected + // submit sealing work here since all empty submission will be rejected // by clique. Of course the advance sealing(empty submission) is disabled. if w.chainConfig.Clique != nil && w.chainConfig.Clique.Period == 0 { - w.commitNewWork(nil, true, time.Now().Unix()) + w.commitWork(nil, true, time.Now().Unix()) } } atomic.AddInt32(&w.newTxs, int32(len(ev.Txs))) @@ -658,13 +727,16 @@ func (w *worker) resultLoop() { } } -// makeCurrent creates a new environment for the current cycle. -func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error { +// makeEnv creates a new environment for the sealing block. +func (w *worker) makeEnv(parent *types.Block, header *types.Header) (*environment, error) { // Retrieve the parent state to execute on top and start a prefetcher for - // the miner to speed block sealing up a bit + // the miner to speed block sealing up a bit. Note since the sealing block + // can be created upon the arbitrary parent block, but the state of parent + // block may already be pruned, so the necessary state recovery is needed + // here in the future. TODO(rjl493456442). state, err := w.chain.StateAt(parent.Root()) if err != nil { - return err + return nil, err } state.StartPrefetcher("miner") @@ -673,8 +745,8 @@ func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error { state: state, ancestors: mapset.NewSet(), family: mapset.NewSet(), - uncles: mapset.NewSet(), header: header, + uncles: make(map[common.Hash]*types.Header), } // when 08 is processed ancestors contain 07 (quick block) for _, ancestor := range w.chain.GetBlocksFromHash(parent.Hash(), 7) { @@ -686,20 +758,13 @@ func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error { } // Keep track of transactions which return errors so they can be removed env.tcount = 0 - - // Swap out the old work with the new one, terminating any leftover prefetcher - // processes in the mean time and starting a new one. - if w.current != nil && w.current.state != nil { - w.current.state.StopPrefetcher() - } - w.current = env - return nil + return env, nil } // commitUncle adds the given block to uncle block set, returns error if failed to add. func (w *worker) commitUncle(env *environment, uncle *types.Header) error { hash := uncle.Hash() - if env.uncles.Contains(hash) { + if _, exist := env.uncles[hash]; exist { return errors.New("uncle not unique") } if env.header.ParentHash == uncle.ParentHash { @@ -711,82 +776,59 @@ func (w *worker) commitUncle(env *environment, uncle *types.Header) error { if env.family.Contains(hash) { return errors.New("uncle already included") } - env.uncles.Add(uncle.Hash()) + env.uncles[hash] = uncle return nil } // updateSnapshot updates pending snapshot block and state. // Note this function assumes the current variable is thread safe. -func (w *worker) updateSnapshot() { +func (w *worker) updateSnapshot(env *environment) { w.snapshotMu.Lock() defer w.snapshotMu.Unlock() - var uncles []*types.Header - w.current.uncles.Each(func(item interface{}) bool { - hash, ok := item.(common.Hash) - if !ok { - return false - } - uncle, exist := w.localUncles[hash] - if !exist { - uncle, exist = w.remoteUncles[hash] - } - if !exist { - return false - } - uncles = append(uncles, uncle.Header()) - return false - }) - w.snapshotBlock = types.NewBlock( - w.current.header, - w.current.txs, - uncles, - w.current.receipts, + env.header, + env.txs, + env.unclelist(), + env.receipts, trie.NewStackTrie(nil), ) - w.snapshotReceipts = copyReceipts(w.current.receipts) - w.snapshotState = w.current.state.Copy() + w.snapshotReceipts = copyReceipts(env.receipts) + w.snapshotState = env.state.Copy() } -func (w *worker) commitTransaction(tx *types.Transaction, coinbase common.Address) ([]*types.Log, error) { - snap := w.current.state.Snapshot() +func (w *worker) commitTransaction(env *environment, tx *types.Transaction, coinbase common.Address) ([]*types.Log, error) { + snap := env.state.Snapshot() - receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &coinbase, w.current.gasPool, w.current.state, w.current.header, tx, &w.current.header.GasUsed, *w.chain.GetVMConfig()) + receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &coinbase, env.gasPool, env.state, env.header, tx, &env.header.GasUsed, *w.chain.GetVMConfig()) if err != nil { - w.current.state.RevertToSnapshot(snap) + env.state.RevertToSnapshot(snap) return nil, err } - w.current.txs = append(w.current.txs, tx) - w.current.receipts = append(w.current.receipts, receipt) + env.txs = append(env.txs, tx) + env.receipts = append(env.receipts, receipt) return receipt.Logs, nil } -func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coinbase common.Address, interrupt *int32) bool { - // Short circuit if current is nil - if w.current == nil { - return true +func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, coinbase common.Address, interrupt *int32) bool { + gasLimit := env.header.GasLimit + if env.gasPool == nil { + env.gasPool = new(core.GasPool).AddGas(gasLimit) } - - gasLimit := w.current.header.GasLimit - if w.current.gasPool == nil { - w.current.gasPool = new(core.GasPool).AddGas(gasLimit) - } - var coalescedLogs []*types.Log for { // In the following three cases, we will interrupt the execution of the transaction. // (1) new head block event arrival, the interrupt signal is 1 // (2) worker start or restart, the interrupt signal is 1 - // (3) worker recreate the mining block with any newly arrived transactions, the interrupt signal is 2. + // (3) worker recreate the sealing block with any newly arrived transactions, the interrupt signal is 2. // For the first two cases, the semi-finished work will be discarded. // For the third case, the semi-finished work will be submitted to the consensus engine. if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone { // Notify resubmit loop to increase resubmitting interval due to too frequent commits. if atomic.LoadInt32(interrupt) == commitInterruptResubmit { - ratio := float64(gasLimit-w.current.gasPool.Gas()) / float64(gasLimit) + ratio := float64(gasLimit-env.gasPool.Gas()) / float64(gasLimit) if ratio < 0.1 { ratio = 0.1 } @@ -798,8 +840,8 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin return atomic.LoadInt32(interrupt) == commitInterruptNewHead } // If we don't have enough gas for any further transactions then we're done - if w.current.gasPool.Gas() < params.TxGas { - log.Trace("Not enough gas for further transactions", "have", w.current.gasPool, "want", params.TxGas) + if env.gasPool.Gas() < params.TxGas { + log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas) break } // Retrieve the next transaction and abort if all done @@ -811,19 +853,19 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin // during transaction acceptance is the transaction pool. // // We use the eip155 signer regardless of the current hf. - from, _ := types.Sender(w.current.signer, tx) + from, _ := types.Sender(env.signer, tx) // Check whether the tx is replay protected. If we're not in the EIP155 hf // phase, start ignoring the sender until we do. - if tx.Protected() && !w.chainConfig.IsEIP155(w.current.header.Number) { + if tx.Protected() && !w.chainConfig.IsEIP155(env.header.Number) { log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", w.chainConfig.EIP155Block) txs.Pop() continue } // Start executing the transaction - w.current.state.Prepare(tx.Hash(), w.current.tcount) + env.state.Prepare(tx.Hash(), env.tcount) - logs, err := w.commitTransaction(tx, coinbase) + logs, err := w.commitTransaction(env, tx, coinbase) switch { case errors.Is(err, core.ErrGasLimitReached): // Pop the current out-of-gas transaction without shifting in the next from the account @@ -843,7 +885,7 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin case errors.Is(err, nil): // Everything ok, collect the logs and shift in the next transaction from the same account coalescedLogs = append(coalescedLogs, logs...) - w.current.tcount++ + env.tcount++ txs.Shift() case errors.Is(err, core.ErrTxTypeNotSupported): @@ -860,8 +902,8 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin } if !w.isRunning() && len(coalescedLogs) > 0 { - // We don't push the pendingLogsEvent while we are mining. The reason is that - // when we are mining, the worker will regenerate a mining block every 3 seconds. + // We don't push the pendingLogsEvent while we are sealing. The reason is that + // when we are sealing, the worker will regenerate a sealing block every 3 seconds. // In order to avoid pushing the repeated pendingLog, we disable the pending log pushing. // make a copy, the state caches the logs and these logs get "upgraded" from pending to mined @@ -882,24 +924,50 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin return false } -// commitNewWork generates several new sealing tasks based on the parent block. -func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) { +// generateParams wraps various of settings for generating sealing task. +type generateParams struct { + timestamp uint64 // The timstamp for sealing task + forceTime bool // Flag whether the given timestamp is immutable or not + parentHash common.Hash // Parent block hash, empty means the latest chain head + coinbase bool // Flag whether the coinbase field is required + noUncle bool // Flag whether the uncle block inclusion is allowed + noExtra bool // Flag whether the extra field assignment is allowed +} + +// prepareWork constructs the sealing task according to the given parameters, +// either based on the last chain head or specified parent. In this function +// the pending transactions are not filled yet, only the empty task returned. +func (w *worker) prepareWork(genParams *generateParams) (*environment, error) { w.mu.RLock() defer w.mu.RUnlock() - tstart := time.Now() + // Find the parent block for sealing task parent := w.chain.CurrentBlock() - - if parent.Time() >= uint64(timestamp) { - timestamp = int64(parent.Time() + 1) + if genParams.parentHash != (common.Hash{}) { + parent = w.chain.GetBlockByHash(genParams.parentHash) } + if parent == nil { + return nil, fmt.Errorf("missing parent") + } + // Sanity check the timestamp correctness, recap the timestamp + // to parent+1 if the mutation is allowed. + timestamp := genParams.timestamp + if parent.Time() >= timestamp { + if genParams.forceTime { + return nil, fmt.Errorf("invalid timestamp, parent %d given %d", parent.Time(), timestamp) + } + timestamp = parent.Time() + 1 + } + // Construct the sealing block header, assign the extra field if it's allowed num := parent.Number() header := &types.Header{ ParentHash: parent.Hash(), Number: num.Add(num, common.Big1), GasLimit: core.CalcGasLimit(parent.GasLimit(), w.config.GasCeil), - Extra: w.extra, - Time: uint64(timestamp), + Time: timestamp, + } + if !genParams.noExtra && len(w.extra) != 0 { + header.Extra = w.extra } // Set baseFee and GasLimit if we are on an EIP-1559 chain if w.chainConfig.IsLondon(header.Number) { @@ -909,87 +977,57 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) header.GasLimit = core.CalcGasLimit(parentGasLimit, w.config.GasCeil) } } - // Only set the coinbase if our consensus engine is running (avoid spurious block rewards) - if w.isRunning() { + // Set the coinbase if the worker is running or it's required + if w.isRunning() || genParams.coinbase { if w.coinbase == (common.Address{}) { log.Error("Refusing to mine without etherbase") - return + return nil, errors.New("no etherbase specified") } header.Coinbase = w.coinbase } + // Run the consensus preparation with the default or customized consensus engine. if err := w.engine.Prepare(w.chain, header); err != nil { - log.Error("Failed to prepare header for mining", "err", err) - return - } - // If we are care about TheDAO hard-fork check whether to override the extra-data or not - if daoBlock := w.chainConfig.DAOForkBlock; daoBlock != nil { - // Check whether the block is among the fork extra-override range - limit := new(big.Int).Add(daoBlock, params.DAOForkExtraRange) - if header.Number.Cmp(daoBlock) >= 0 && header.Number.Cmp(limit) < 0 { - // Depending whether we support or oppose the fork, override differently - if w.chainConfig.DAOForkSupport { - header.Extra = common.CopyBytes(params.DAOForkBlockExtra) - } else if bytes.Equal(header.Extra, params.DAOForkBlockExtra) { - header.Extra = []byte{} // If miner opposes, don't let it use the reserved extra-data - } - } + log.Error("Failed to prepare header for sealing", "err", err) + return nil, err } // Could potentially happen if starting to mine in an odd state. - err := w.makeCurrent(parent, header) + env, err := w.makeEnv(parent, header) if err != nil { - log.Error("Failed to create mining context", "err", err) - return - } - // Create the current work task and check any fork transitions needed - env := w.current - if w.chainConfig.DAOForkSupport && w.chainConfig.DAOForkBlock != nil && w.chainConfig.DAOForkBlock.Cmp(header.Number) == 0 { - misc.ApplyDAOHardFork(env.state) + log.Error("Failed to create sealing context", "err", err) + return nil, err } - // Accumulate the uncles for the current block - uncles := make([]*types.Header, 0, 2) - commitUncles := func(blocks map[common.Hash]*types.Block) { - // Clean up stale uncle blocks first - for hash, uncle := range blocks { - if uncle.NumberU64()+staleThreshold <= header.Number.Uint64() { - delete(blocks, hash) - } - } - for hash, uncle := range blocks { - if len(uncles) == 2 { - break - } - if err := w.commitUncle(env, uncle.Header()); err != nil { - log.Trace("Possible uncle rejected", "hash", hash, "reason", err) - } else { - log.Debug("Committing new uncle to block", "hash", hash) - uncles = append(uncles, uncle.Header()) + // Accumulate the uncles for the sealing work only if it's allowed. + if !genParams.noUncle { + commitUncles := func(blocks map[common.Hash]*types.Block) { + for hash, uncle := range blocks { + if len(env.uncles) == 2 { + break + } + if err := w.commitUncle(env, uncle.Header()); err != nil { + log.Trace("Possible uncle rejected", "hash", hash, "reason", err) + } else { + log.Debug("Committing new uncle to block", "hash", hash) + } } } + // Prefer to locally generated uncle + commitUncles(w.localUncles) + commitUncles(w.remoteUncles) } - // Prefer to locally generated uncle - commitUncles(w.localUncles) - commitUncles(w.remoteUncles) - - // Create an empty block based on temporary copied state for - // sealing in advance without waiting block execution finished. - if !noempty && atomic.LoadUint32(&w.noempty) == 0 { - w.commit(uncles, nil, false, tstart) - } + return env, nil +} +// fillTransactions retrieves the pending transactions from the txpool and fills them +// into the given sealing block. The transaction selection and ordering strategy can +// be customized with the plugin in the future. +func (w *worker) fillTransactions(interrupt *int32, env *environment) error { + // Split the pending transactions into locals and remotes // Fill the block with all available pending transactions. pending, err := w.eth.TxPool().Pending(true) if err != nil { log.Error("Failed to fetch pending transactions", "err", err) - return - } - // Short circuit if there is no available pending transactions. - // But if we disable empty precommit already, ignore it. Since - // empty block is necessary to keep the liveness of the network. - if len(pending) == 0 && atomic.LoadUint32(&w.noempty) == 0 { - w.updateSnapshot() - return + return err } - // Split the pending transactions into locals and remotes localTxs, remoteTxs := make(map[common.Address]types.Transactions), pending for _, account := range w.eth.TxPool().Locals() { if txs := remoteTxs[account]; len(txs) > 0 { @@ -998,40 +1036,81 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) } } if len(localTxs) > 0 { - txs := types.NewTransactionsByPriceAndNonce(w.current.signer, localTxs, header.BaseFee) - if w.commitTransactions(txs, w.coinbase, interrupt) { - return + txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee) + if w.commitTransactions(env, txs, w.coinbase, interrupt) { + return nil } } if len(remoteTxs) > 0 { - txs := types.NewTransactionsByPriceAndNonce(w.current.signer, remoteTxs, header.BaseFee) - if w.commitTransactions(txs, w.coinbase, interrupt) { - return + txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee) + if w.commitTransactions(env, txs, w.coinbase, interrupt) { + return nil } } - w.commit(uncles, w.fullTaskHook, true, tstart) + return nil } -// commit runs any post-transaction state modifications, assembles the final block -// and commits new work if consensus engine is running. -func (w *worker) commit(uncles []*types.Header, interval func(), update bool, start time.Time) error { - // Deep copy receipts here to avoid interaction between different tasks. - receipts := copyReceipts(w.current.receipts) - s := w.current.state.Copy() - block, err := w.engine.FinalizeAndAssemble(w.chain, w.current.header, s, w.current.txs, uncles, receipts) +// generateWork generates a sealing block based on the given parameters. +func (w *worker) generateWork(params *generateParams) (*types.Block, error) { + work, err := w.prepareWork(params) if err != nil { - return err + return nil, err } + defer work.discard() + + if err := w.fillTransactions(nil, work); err != nil { + return nil, err + } + return w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts) +} + +// commitWork generates several new sealing tasks based on the parent block +// and submit them to the sealer. +func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { + start := time.Now() + work, err := w.prepareWork(&generateParams{timestamp: uint64(timestamp)}) + if err != nil { + return + } + // Create an empty block based on temporary copied state for + // sealing in advance without waiting block execution finished. + if !noempty && atomic.LoadUint32(&w.noempty) == 0 { + w.commit(work.copy(), nil, false, start) + } + // Fill pending transactions from the txpool + if err := w.fillTransactions(interrupt, work); err != nil { + return + } + w.commit(work.copy(), w.fullTaskHook, true, start) + + // Swap out the old work with the new one, terminating any leftover + // prefetcher processes in the mean time and starting a new one. + if w.current != nil { + w.current.discard() + } + w.current = work +} + +// commit runs any post-transaction state modifications, assembles the final block +// and commits new work if consensus engine is running. +// Note the assumption is held that the mutation is allowed to the passed env, do +// the deep copy first. +func (w *worker) commit(env *environment, interval func(), update bool, start time.Time) error { if w.isRunning() { if interval != nil { interval() } + // Deep copy receipts here to avoid interaction between different tasks. + block, err := w.engine.FinalizeAndAssemble(w.chain, env.header, env.state, env.txs, env.unclelist(), env.receipts) + if err != nil { + return err + } select { - case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now()}: + case w.taskCh <- &task{receipts: env.receipts, state: env.state, block: block, createdAt: time.Now()}: w.unconfirmed.Shift(block.NumberU64() - 1) - log.Info("Commit new mining work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()), - "uncles", len(uncles), "txs", w.current.tcount, - "gas", block.GasUsed(), "fees", totalFees(block, receipts), + log.Info("Commit new sealing work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()), + "uncles", len(env.uncles), "txs", env.tcount, + "gas", block.GasUsed(), "fees", totalFees(block, env.receipts), "elapsed", common.PrettyDuration(time.Since(start))) case <-w.exitCh: @@ -1039,11 +1118,36 @@ func (w *worker) commit(uncles []*types.Header, interval func(), update bool, st } } if update { - w.updateSnapshot() + w.updateSnapshot(env) } return nil } +// getSealingBlock generates the sealing block based on the given parameters. +func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64) (*types.Block, error) { + req := &getWorkReq{ + params: &generateParams{ + timestamp: timestamp, + forceTime: true, + parentHash: parent, + coinbase: true, + noUncle: true, + noExtra: true, + }, + result: make(chan *types.Block, 1), + } + select { + case w.getWorkCh <- req: + block := <-req.result + if block == nil { + return nil, req.err + } + return block, nil + case <-w.exitCh: + return nil, errors.New("miner closed") + } +} + // copyReceipts makes a deep copy of the given receipts. func copyReceipts(receipts []*types.Receipt) []*types.Receipt { result := make([]*types.Receipt, len(receipts)) diff --git a/miner/worker_test.go b/miner/worker_test.go index 9c4dc0f3788e1..be2c770256c92 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -519,3 +519,107 @@ func testAdjustInterval(t *testing.T, chainConfig *params.ChainConfig, engine co t.Error("interval reset timeout") } } + +func TestGetSealingWorkEthash(t *testing.T) { + testGetSealingWork(t, ethashChainConfig, ethash.NewFaker()) +} + +func TestGetSealingWorkClique(t *testing.T) { + testGetSealingWork(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, rawdb.NewMemoryDatabase())) +} + +func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) { + defer engine.Close() + + w, b := newTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0) + defer w.close() + + w.setExtra([]byte{0x01, 0x02}) + w.postSideBlock(core.ChainSideEvent{Block: b.uncleBlock}) + + w.skipSealHook = func(task *task) bool { + return true + } + w.fullTaskHook = func() { + time.Sleep(100 * time.Millisecond) + } + timestamp := uint64(time.Now().Unix()) + _, clique := engine.(*clique.Clique) + assertBlock := func(block *types.Block, number uint64) { + if block.Time() != timestamp { + t.Errorf("Invalid timestamp, want %d, get %d", timestamp, block.Time()) + } + if len(block.Uncles()) != 0 { + t.Error("Unexpected uncle block") + } + if !clique { + if len(block.Extra()) != 0 { + t.Error("Unexpected extra field") + } + if block.Coinbase() != w.coinbase { + t.Errorf("Invalid coinbase, want %x, get %x", w.coinbase, block.Coinbase()) + } + } + if block.MixDigest() != (common.Hash{}) { + t.Error("Unexpected mix digest") + } + if block.Nonce() != 0 { + t.Error("Unexpected block nonce") + } + if block.NumberU64() != number { + t.Errorf("Mismatched block number, want %d got %d", number, block.NumberU64()) + } + } + var cases = []struct { + parent common.Hash + expectNumber uint64 + expectErr bool + }{ + { + b.chain.Genesis().Hash(), + uint64(1), + false, + }, + { + b.chain.CurrentBlock().Hash(), + b.chain.CurrentBlock().NumberU64() + 1, + false, + }, + { + common.HexToHash("0xdeadbeef"), + 0, + true, + }, + } + + // This API should work even when the automatic sealing is not enabled + for _, c := range cases { + block, err := w.getSealingBlock(c.parent, timestamp) + if c.expectErr { + if err == nil { + t.Error("Expect error but get nil") + } + } else { + if err != nil { + t.Errorf("Unexpected error %v", err) + } + assertBlock(block, c.expectNumber) + } + } + + // This API should work even when the automatic sealing is enabled + w.start() + for _, c := range cases { + block, err := w.getSealingBlock(c.parent, timestamp) + if c.expectErr { + if err == nil { + t.Error("Expect error but get nil") + } + } else { + if err != nil { + t.Errorf("Unexpected error %v", err) + } + assertBlock(block, c.expectNumber) + } + } +}