diff --git a/eth/catalyst/api.go b/eth/catalyst/api.go index 2622c4a148f6b..0c2bcf6bf5da2 100644 --- a/eth/catalyst/api.go +++ b/eth/catalyst/api.go @@ -21,12 +21,9 @@ import ( "errors" "fmt" "math/big" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/misc" - "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/log" @@ -65,153 +62,11 @@ func newConsensusAPI(eth *eth.Ethereum) *consensusAPI { return &consensusAPI{eth: eth} } -// blockExecutionEnv gathers all the data required to execute -// a block, either when assembling it or when inserting it. -type blockExecutionEnv struct { - chain *core.BlockChain - state *state.StateDB - tcount int - gasPool *core.GasPool - - header *types.Header - txs []*types.Transaction - receipts []*types.Receipt -} - -func (env *blockExecutionEnv) commitTransaction(tx *types.Transaction, coinbase common.Address) error { - vmconfig := *env.chain.GetVMConfig() - snap := env.state.Snapshot() - receipt, err := core.ApplyTransaction(env.chain.Config(), env.chain, &coinbase, env.gasPool, env.state, env.header, tx, &env.header.GasUsed, vmconfig) - if err != nil { - env.state.RevertToSnapshot(snap) - return err - } - env.txs = append(env.txs, tx) - env.receipts = append(env.receipts, receipt) - return nil -} - -func (api *consensusAPI) makeEnv(parent *types.Block, header *types.Header) (*blockExecutionEnv, error) { - state, err := api.eth.BlockChain().StateAt(parent.Root()) - if err != nil { - return nil, err - } - env := &blockExecutionEnv{ - chain: api.eth.BlockChain(), - state: state, - header: header, - gasPool: new(core.GasPool).AddGas(header.GasLimit), - } - return env, nil -} - // AssembleBlock creates a new block, inserts it into the chain, and returns the "execution // data" required for eth2 clients to process the new block. func (api *consensusAPI) AssembleBlock(params assembleBlockParams) (*executableData, error) { log.Info("Producing block", "parentHash", params.ParentHash) - - bc := api.eth.BlockChain() - parent := bc.GetBlockByHash(params.ParentHash) - if parent == nil { - log.Warn("Cannot assemble block with parent hash to unknown block", "parentHash", params.ParentHash) - return nil, fmt.Errorf("cannot assemble block with unknown parent %s", params.ParentHash) - } - - pool := api.eth.TxPool() - - if parent.Time() >= params.Timestamp { - return nil, fmt.Errorf("child timestamp lower than parent's: %d >= %d", parent.Time(), params.Timestamp) - } - if now := uint64(time.Now().Unix()); params.Timestamp > now+1 { - wait := time.Duration(params.Timestamp-now) * time.Second - log.Info("Producing block too far in the future", "wait", common.PrettyDuration(wait)) - time.Sleep(wait) - } - - pending, err := pool.Pending(true) - if err != nil { - return nil, err - } - - coinbase, err := api.eth.Etherbase() - if err != nil { - return nil, err - } - num := parent.Number() - header := &types.Header{ - ParentHash: parent.Hash(), - Number: num.Add(num, common.Big1), - Coinbase: coinbase, - GasLimit: parent.GasLimit(), // Keep the gas limit constant in this prototype - Extra: []byte{}, - Time: params.Timestamp, - } - if config := api.eth.BlockChain().Config(); config.IsLondon(header.Number) { - header.BaseFee = misc.CalcBaseFee(config, parent.Header()) - } - err = api.eth.Engine().Prepare(bc, header) - if err != nil { - return nil, err - } - - env, err := api.makeEnv(parent, header) - if err != nil { - return nil, err - } - - var ( - signer = types.MakeSigner(bc.Config(), header.Number) - txHeap = types.NewTransactionsByPriceAndNonce(signer, pending, nil) - transactions []*types.Transaction - ) - for { - if env.gasPool.Gas() < chainParams.TxGas { - log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", chainParams.TxGas) - break - } - tx := txHeap.Peek() - if tx == nil { - break - } - - // The sender is only for logging purposes, and it doesn't really matter if it's correct. - from, _ := types.Sender(signer, tx) - - // Execute the transaction - env.state.Prepare(tx.Hash(), env.tcount) - err = env.commitTransaction(tx, coinbase) - switch err { - case core.ErrGasLimitReached: - // Pop the current out-of-gas transaction without shifting in the next from the account - log.Trace("Gas limit exceeded for current block", "sender", from) - txHeap.Pop() - - case core.ErrNonceTooLow: - // New head notification data race between the transaction pool and miner, shift - log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce()) - txHeap.Shift() - - case core.ErrNonceTooHigh: - // Reorg notification data race between the transaction pool and miner, skip account = - log.Trace("Skipping account with high nonce", "sender", from, "nonce", tx.Nonce()) - txHeap.Pop() - - case nil: - // Everything ok, collect the logs and shift in the next transaction from the same account - env.tcount++ - txHeap.Shift() - transactions = append(transactions, tx) - - default: - // Strange error, discard the transaction and get the next in line (note, the - // nonce-too-high clause will prevent us from executing in vain). - log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err) - txHeap.Shift() - } - } - - // Create the block. - block, err := api.eth.Engine().FinalizeAndAssemble(bc, header, env.state, transactions, nil /* uncles */, env.receipts) + block, err := api.eth.Miner().GetSealingBlock(params.ParentHash, params.Timestamp) if err != nil { return nil, err } @@ -255,7 +110,6 @@ func insertBlockParamsToBlock(config *chainParams.ChainConfig, parent *types.Hea if err != nil { return nil, err } - number := big.NewInt(0) number.SetUint64(params.Number) header := &types.Header{ diff --git a/miner/default_collator.go b/miner/default_collator.go new file mode 100644 index 0000000000000..236423560bb61 --- /dev/null +++ b/miner/default_collator.go @@ -0,0 +1,110 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package miner + +import ( + "errors" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/params" +) + +type DefaultCollator struct{} + +func submitTransactions(bs BlockState, txs *types.TransactionsByPriceAndNonce) bool { + var shouldAbort bool + cb := func(err error, receipts *types.Receipt) bool { + switch { + case errors.Is(err, core.ErrGasLimitReached): + fallthrough + case errors.Is(err, core.ErrTxTypeNotSupported): + fallthrough + case errors.Is(err, core.ErrNonceTooHigh): + txs.Pop() + case errors.Is(err, core.ErrNonceTooLow): + fallthrough + case errors.Is(err, ErrAbort): + shouldAbort = true + return false // don't need to waste time rolling back these transactions when this work will be thrown away anyways. + case errors.Is(err, nil): + fallthrough + default: + txs.Shift() + } + return false + } + + for { + // If we don't have enough gas for any further transactions then we're done + available := bs.Gas() + if available < params.TxGas { + break + } + // Retrieve the next transaction and abort if all done + tx := txs.Peek() + if tx == nil { + break + } + // Enough space for this tx? + if available < tx.Gas() { + txs.Pop() + continue + } + bs.AddTransactions(types.Transactions{tx}, cb) + if shouldAbort { + return true + } + } + return false +} + +// CollateBlock fills a block based on the highest paying transactions from the +// transaction pool, giving precedence over local transactions. +func (w *DefaultCollator) CollateBlock(bs BlockState, pool Pool) { + txs, err := pool.Pending(true) + if err != nil { + log.Error("could not get pending transactions from the pool", "err", err) + return + } + if len(txs) == 0 { + return + } + // Split the pending transactions into locals and remotes + localTxs, remoteTxs := make(map[common.Address]types.Transactions), txs + for _, account := range pool.Locals() { + if accountTxs := remoteTxs[account]; len(accountTxs) > 0 { + delete(remoteTxs, account) + localTxs[account] = accountTxs + } + } + if len(localTxs) > 0 { + if submitTransactions(bs, types.NewTransactionsByPriceAndNonce(bs.Signer(), localTxs, bs.BaseFee())) { + return + } + } + if len(remoteTxs) > 0 { + if submitTransactions(bs, types.NewTransactionsByPriceAndNonce(bs.Signer(), remoteTxs, bs.BaseFee())) { + return + } + } + + bs.Commit() + + return +} diff --git a/miner/miner.go b/miner/miner.go index a4a01b9f4ff70..a4aba2401ad00 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -66,6 +66,8 @@ type Miner struct { } func New(eth Backend, config *Config, chainConfig *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, isLocalBlock func(block *types.Block) bool) *Miner { + activeCollators := []BlockCollator{} + activeCollators = append(activeCollators, &DefaultCollator{}) miner := &Miner{ eth: eth, mux: mux, @@ -73,7 +75,7 @@ func New(eth Backend, config *Config, chainConfig *params.ChainConfig, mux *even exitCh: make(chan struct{}), startCh: make(chan common.Address), stopCh: make(chan struct{}), - worker: newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, true), + worker: newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, true, activeCollators), } go miner.update() @@ -227,6 +229,12 @@ func (miner *Miner) DisablePreseal() { miner.worker.disablePreseal() } +// GetSealingBlock retrieves a sealing block based on the given parameters. +// The returned block is not sealed but all other fields should be filled. +func (miner *Miner) GetSealingBlock(parent common.Hash, timestamp uint64) (*types.Block, error) { + return miner.worker.getSealingBlock(parent, timestamp) +} + // SubscribePendingLogs starts delivering logs from pending transactions // to the given channel. func (miner *Miner) SubscribePendingLogs(ch chan<- []*types.Log) event.Subscription { diff --git a/miner/multi_collator.go b/miner/multi_collator.go new file mode 100644 index 0000000000000..116f74c62bfad --- /dev/null +++ b/miner/multi_collator.go @@ -0,0 +1,403 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package miner + +import ( + "errors" + "math" + "math/big" + "sync/atomic" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/params" +) + +// called by AddTransactions. If the provided error is not nil, none of the trasactions were added. +// If the error is nil, the receipts of all executed transactions are provided +type AddTransactionsResultFunc func(error, *types.Receipt) bool + +// BlockState represents a block-to-be-mined, which is being assembled. +// A collator can add transactions by calling AddTransactions. +// When the collator is done adding transactions to a block, it calls Commit. +// after-which, no more transactions can be added to the block +type BlockState interface { + // AddTransactions adds the sequence of transactions to the blockstate. Either all + // transactions are added, or none of them. In the latter case, the error + // describes the reason why the txs could not be included. + // If all transactions were successfully executed, the return value of the callback + // determines if the transactions are kept (true) or all reverted (false) + AddTransactions(sequence types.Transactions, cb AddTransactionsResultFunc) + // Commit is called when the collator is done adding transactions to a block + // and wants to suggest it for sealing + Commit() + // deep copy of a blockState + Copy() BlockState + Gas() (remaining uint64) + Coinbase() common.Address + BaseFee() *big.Int + Signer() types.Signer + Profit() *big.Int +} + +// collatorWork is provided by the CollatorPool to each collator goroutine +// when new work is being generated +type collatorWork struct { + env *environment + counter uint64 + interrupt *int32 +} + +func (w *collatorWork) Copy() collatorWork { + newEnv := w.env.copy() + return collatorWork{ + env: newEnv, + counter: w.counter, + interrupt: w.interrupt, + } +} + +// Pool is an interface to the transaction pool +type Pool interface { + Pending(bool) (map[common.Address]types.Transactions, error) + Locals() []common.Address +} + +var ( + ErrAbort = errors.New("abort sealing current work (resubmit/newHead interrupt)") + ErrUnsupportedEIP155Tx = errors.New("replay-protected tx when EIP155 not enabled") +) + +// collatorBlockState is an implementation of BlockState +type collatorBlockState struct { + work collatorWork + c *collator + done bool +} + +func (c *collatorBlockState) Copy() BlockState { + return &collatorBlockState{ + work: c.work.Copy(), + c: c.c, + done: c.done, + } +} + +func (bs *collatorBlockState) AddTransactions(sequence types.Transactions, cb AddTransactionsResultFunc) { + var ( + interrupt = bs.work.interrupt + header = bs.work.env.header + gasPool = bs.work.env.gasPool + signer = bs.work.env.signer + chainConfig = bs.c.chainConfig + chain = bs.c.chain + state = bs.work.env.state + snap = state.Snapshot() + curProfit = new(big.Int).Set(bs.work.env.profit) + startProfit = new(big.Int).Set(bs.work.env.profit) + tcount = bs.work.env.tcount + err error + logs []*types.Log + startTCount = bs.work.env.tcount + shouldRevert bool + ) + if bs.done { + err = ErrAbort + cb(err, nil) + return + } + + for _, tx := range sequence { + if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone { + bs.done = true + shouldRevert = true + cb(ErrAbort, nil) + break + } + if gasPool.Gas() < params.TxGas { + log.Trace("Not enough gas for further transactions", "have", gasPool, "want", params.TxGas) + err = core.ErrGasLimitReached + cb(err, nil) + shouldRevert = true + break + } + from, _ := types.Sender(signer, tx) + // Check whether the tx is replay protected. If we're not in the EIP155 hf + // phase, start ignoring the sender until we do. + if tx.Protected() && !chainConfig.IsEIP155(header.Number) { + log.Trace("encountered replay-protected transaction when chain doesn't support replay protection", "hash", tx.Hash(), "eip155", chainConfig.EIP155Block) + err = ErrUnsupportedEIP155Tx + cb(err, nil) + break + } + gasPrice, err := tx.EffectiveGasTip(bs.work.env.header.BaseFee) + if err != nil { + shouldRevert = true + cb(err, nil) + break + } + // Start executing the transaction + state.Prepare(tx.Hash(), tcount) + + var txLogs []*types.Log + txLogs, err = commitTransaction(chain, chainConfig, bs.work.env, tx, bs.Coinbase()) + if err == nil { + logs = append(logs, txLogs...) + gasUsed := new(big.Int).SetUint64(bs.work.env.receipts[len(bs.work.env.receipts)-1].GasUsed) + // TODO remove this allocation once things are working + curProfit.Add(curProfit, gasUsed.Mul(gasUsed, gasPrice)) + bs.work.env.profit.Set(curProfit) + if cb(nil, bs.work.env.receipts[len(bs.work.env.receipts)-1]) { + shouldRevert = true + break + } else { + tcount++ + } + } else { + cb(err, nil) + shouldRevert = true + log.Trace("Tx block inclusion failed", "sender", from, "nonce", tx.Nonce(), + "type", tx.Type(), "hash", tx.Hash(), "err", err) + break + } + } + if shouldRevert { + state.RevertToSnapshot(snap) + bs.work.env.txs = bs.work.env.txs[:startTCount] + bs.work.env.receipts = bs.work.env.receipts[:startTCount] + bs.work.env.profit.Set(startProfit) + } else { + bs.work.env.logs = append(bs.work.env.logs, logs...) + bs.work.env.tcount = tcount + } + return +} + +func (bs *collatorBlockState) Commit() { + if !bs.done { + bs.done = true + bs.c.workResultCh <- bs.work + } +} + +func (bs *collatorBlockState) Gas() uint64 { + return bs.work.env.gasPool.Gas() +} + +func (bs *collatorBlockState) Coinbase() common.Address { + resultCopy := common.Address{} + copy(resultCopy[:], bs.work.env.coinbase[:]) + return resultCopy +} + +func (bs *collatorBlockState) BaseFee() *big.Int { + if bs.work.env.header.BaseFee != nil { + return new(big.Int).Set(bs.work.env.header.BaseFee) + } + + return nil +} + +func (bs *collatorBlockState) Signer() types.Signer { + return bs.work.env.signer +} + +func (bs *collatorBlockState) Profit() *big.Int { + return new(big.Int).Set(bs.work.env.profit) +} + +// BlockCollator is the publicly-exposed interface +// for implementing custom block collation strategies +type BlockCollator interface { + CollateBlock(bs BlockState, pool Pool) + /* + // TODO implement these + SideChainHook(header *types.Header) + NewHeadHook(header *types.Header) + */ +} + +type collator struct { + newWorkCh chan collatorWork + workResultCh chan collatorWork + // channel signalling collator loop should exit + exitCh chan struct{} + newHeadCh chan types.Header + sideChainCh chan types.Header + blockCollatorImpl BlockCollator + + chainConfig *params.ChainConfig + chain *core.BlockChain + pool Pool +} + +// each active collator runs mainLoop() in a goroutine. +// It receives new work from the miner and listens for new blocks built from CollateBlock +// calling Commit() on the provided collatorBlockState +func (c *collator) mainLoop() { + for { + select { + case newWork := <-c.newWorkCh: + c.blockCollatorImpl.CollateBlock(&collatorBlockState{work: newWork, c: c, done: false}, c.pool) + // signal to the exitCh that the collator is done + // computing this work. + c.workResultCh <- collatorWork{env: nil, interrupt: nil, counter: newWork.counter} + case <-c.exitCh: + // TODO any cleanup needed? + return + case newHead := <-c.newHeadCh: + // TODO call hook here + _ = newHead + case sideHeader := <-c.sideChainCh: + _ = sideHeader + // TODO call hook here + } + } +} + +var ( + // collator + workResultChSize = 10 + newWorkChSize = 10 + newHeadChSize = 10 + sideChainChSize = 10 +) + +// MultiCollator manages multiple active collators +type MultiCollator struct { + counter uint64 + responsiveCollatorCount int + collators []collator + pool Pool + interrupt *int32 +} + +func NewMultiCollator(chainConfig *params.ChainConfig, chain *core.BlockChain, pool Pool, strategies []BlockCollator) MultiCollator { + collators := []collator{} + for _, s := range strategies { + collators = append(collators, collator{ + newWorkCh: make(chan collatorWork, newWorkChSize), + workResultCh: make(chan collatorWork, workResultChSize), + exitCh: make(chan struct{}), + newHeadCh: make(chan types.Header, newHeadChSize), + blockCollatorImpl: s, + chainConfig: chainConfig, + chain: chain, + pool: pool, + }) + } + return MultiCollator{ + counter: 0, + responsiveCollatorCount: 0, + collators: collators, + interrupt: nil, + } +} + +func (m *MultiCollator) Start() { + for _, c := range m.collators { + go c.mainLoop() + } +} + +func (m *MultiCollator) Close() { + for _, c := range m.collators { + select { + case c.exitCh <- struct{}{}: + default: + } + } +} + +// SuggestBlock sends a new empty block to each active collator. +// collators whose receiving channels are full are noted as "unresponsive" +// for the purpose of not expecting a response back (for this round) during +// polling performed by Collect +func (m *MultiCollator) SuggestBlock(work *environment, interrupt *int32) { + if m.counter == math.MaxUint64 { + m.counter = 0 + } else { + m.counter++ + } + m.responsiveCollatorCount = 0 + m.interrupt = interrupt + for _, c := range m.collators { + select { + case c.newWorkCh <- collatorWork{env: work.copy(), counter: m.counter, interrupt: interrupt}: + m.responsiveCollatorCount++ + default: + } + } +} + +type WorkResult func(environment) + +// Collect retrieves filled blocks returned by active collators in response to the block suggested by the previous call to SuggestBlock. +// It blocks until all responsive collators (ones which accepted the block from SuggestBlock) signal that they are done +// or the interrupt provided in SetBlock is set. +func (m *MultiCollator) Collect(cb WorkResult) { + finishedCollators := []int{} + for { + if len(finishedCollators) == m.responsiveCollatorCount { + break + } + if m.interrupt != nil && atomic.LoadInt32(m.interrupt) != commitInterruptNone { + break + } + for i, c := range m.collators { + select { + case response := <-c.workResultCh: + // ignore collators responding from old work rounds + if response.counter != m.counter { + break + } + // ignore responses from collators that have already signalled they are done + shouldIgnore := false + for _, finishedCollator := range finishedCollators { + if i == finishedCollator { + shouldIgnore = true + break + } + } + if shouldIgnore { + break + } + // nil for work signals the collator won't send back any more blocks for this round + if response.env == nil { + finishedCollators = append(finishedCollators, i) + } else { + cb(*response.env) + } + default: + } + } + } + return +} + +/* +TODO implement and hook these into the miner +func (m *MultiCollator) NewHeadHook() { + +} + +func (m *Multicollator) SideChainHook() { + +} +*/ diff --git a/miner/worker.go b/miner/worker.go index accf3dac90964..7c42033aff713 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -17,8 +17,8 @@ package miner import ( - "bytes" "errors" + "fmt" "math/big" "sync" "sync/atomic" @@ -54,14 +54,14 @@ const ( // resubmitAdjustChanSize is the size of resubmitting interval adjustment channel. resubmitAdjustChanSize = 10 - // miningLogAtDepth is the number of confirmations before logging successful mining. - miningLogAtDepth = 7 + // sealingLogAtDepth is the number of confirmations before logging successful sealing. + sealingLogAtDepth = 7 - // minRecommitInterval is the minimal time interval to recreate the mining block with + // minRecommitInterval is the minimal time interval to recreate the sealing block with // any newly arrived transactions. minRecommitInterval = 1 * time.Second - // maxRecommitInterval is the maximum time interval to recreate the mining block with + // maxRecommitInterval is the maximum time interval to recreate the sealing block with // any newly arrived transactions. maxRecommitInterval = 15 * time.Second @@ -77,20 +77,74 @@ const ( staleThreshold = 7 ) -// environment is the worker's current environment and holds all of the current state information. +// environment is the worker's current environment and holds all +// information of the sealing block generation. type environment struct { signer types.Signer state *state.StateDB // apply state changes here ancestors mapset.Set // ancestor set (used for checking uncle parent validity) family mapset.Set // family set (used for checking uncle invalidity) - uncles mapset.Set // uncle set tcount int // tx count in cycle gasPool *core.GasPool // available gas used to pack transactions header *types.Header txs []*types.Transaction receipts []*types.Receipt + uncles map[common.Hash]*types.Header + profit *big.Int + + coinbase common.Address + + logs []*types.Log +} + +// copy creates a deep copy of environment. +func (env *environment) copy() *environment { + cpy := &environment{ + signer: env.signer, + state: env.state.Copy(), + ancestors: env.ancestors.Clone(), + family: env.family.Clone(), + tcount: env.tcount, + header: types.CopyHeader(env.header), + receipts: copyReceipts(env.receipts), + logs: copyLogs(env.logs), + profit: new(big.Int).Set(env.profit), + } + if env.gasPool != nil { + cpy.gasPool = &(*env.gasPool) + } + // The content of txs and uncles are immutable, unnecessary + // to do the expensive deep copy for them. + cpy.txs = make([]*types.Transaction, len(env.txs)) + copy(cpy.txs, env.txs) + cpy.uncles = make(map[common.Hash]*types.Header) + for hash, uncle := range env.uncles { + cpy.uncles[hash] = uncle + } + cpy.coinbase = common.Address{} + copy(cpy.coinbase[:], env.coinbase[:]) + return cpy +} + +// unclelist returns the contained uncles as the list format. +func (env *environment) unclelist() []*types.Header { + var uncles []*types.Header + for _, uncle := range env.uncles { + uncles = append(uncles, uncle) + } + return uncles +} + +// discard terminates the background prefetcher go-routine. It should +// always be called for all created environment instances otherwise +// the go-routine leak can happen. +func (env *environment) discard() { + if env.state == nil { + return + } + env.state.StopPrefetcher() } // task contains all information for consensus engine sealing and result submitting. @@ -114,6 +168,13 @@ type newWorkReq struct { timestamp int64 } +// getWorkReq represents a request for getting a new sealing work with provided parameters. +type getWorkReq struct { + params *generateParams + err error + result chan *types.Block +} + // intervalAdjust represents a resubmitting interval adjustment. type intervalAdjust struct { ratio float64 @@ -143,6 +204,7 @@ type worker struct { // Channels newWorkCh chan *newWorkReq + getWorkCh chan *getWorkReq taskCh chan *task resultCh chan *types.Block startCh chan struct{} @@ -150,10 +212,11 @@ type worker struct { resubmitIntervalCh chan time.Duration resubmitAdjustCh chan *intervalAdjust - current *environment // An environment for current running cycle. - localUncles map[common.Hash]*types.Block // A set of side blocks generated locally as the possible uncle blocks. - remoteUncles map[common.Hash]*types.Block // A set of side blocks as the possible uncle blocks. - unconfirmed *unconfirmedBlocks // A set of locally mined blocks pending canonicalness confirmations. + current *environment // An environment for current running cycle. + multiCollator MultiCollator // pool of active block collation strategies + localUncles map[common.Hash]*types.Block // A set of side blocks generated locally as the possible uncle blocks. + remoteUncles map[common.Hash]*types.Block // A set of side blocks as the possible uncle blocks. + unconfirmed *unconfirmedBlocks // A set of locally mined blocks pending canonicalness confirmations. mu sync.RWMutex // The lock used to protect the coinbase and extra fields coinbase common.Address @@ -188,7 +251,7 @@ type worker struct { resubmitHook func(time.Duration, time.Duration) // Method to call upon updating resubmitting interval. } -func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, init bool) *worker { +func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, init bool, collators []BlockCollator) *worker { worker := &worker{ config: config, chainConfig: chainConfig, @@ -199,19 +262,23 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus isLocalBlock: isLocalBlock, localUncles: make(map[common.Hash]*types.Block), remoteUncles: make(map[common.Hash]*types.Block), - unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth), + unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), sealingLogAtDepth), pendingTasks: make(map[common.Hash]*task), txsCh: make(chan core.NewTxsEvent, txChanSize), chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), newWorkCh: make(chan *newWorkReq), + getWorkCh: make(chan *getWorkReq), taskCh: make(chan *task), resultCh: make(chan *types.Block, resultQueueSize), exitCh: make(chan struct{}), startCh: make(chan struct{}, 1), resubmitIntervalCh: make(chan time.Duration), resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize), + multiCollator: NewMultiCollator(chainConfig, eth.BlockChain(), eth.TxPool(), collators), } + // start collator pool listening for new work + worker.multiCollator.Start() // Subscribe NewTxsEvent for tx pool worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh) // Subscribe events for blockchain @@ -259,15 +326,18 @@ func (w *worker) setExtra(extra []byte) { // setRecommitInterval updates the interval for miner sealing work recommitting. func (w *worker) setRecommitInterval(interval time.Duration) { - w.resubmitIntervalCh <- interval + select { + case w.resubmitIntervalCh <- interval: + case <-w.exitCh: + } } -// disablePreseal disables pre-sealing mining feature +// disablePreseal disables pre-sealing feature func (w *worker) disablePreseal() { atomic.StoreUint32(&w.noempty, 1) } -// enablePreseal enables pre-sealing mining feature +// enablePreseal enables pre-sealing feature func (w *worker) enablePreseal() { atomic.StoreUint32(&w.noempty, 0) } @@ -318,9 +388,10 @@ func (w *worker) isRunning() bool { // close terminates all background threads maintained by the worker. // Note the worker does not support being closed multiple times. func (w *worker) close() { - if w.current != nil && w.current.state != nil { - w.current.state.StopPrefetcher() + if w.current != nil { + w.current.discard() } + w.multiCollator.Close() atomic.StoreInt32(&w.running, 0) close(w.exitCh) } @@ -347,12 +418,12 @@ func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) t return time.Duration(int64(next)) } -// newWorkLoop is a standalone goroutine to submit new mining work upon received events. +// newWorkLoop is a standalone goroutine to submit new sealing work upon received events. func (w *worker) newWorkLoop(recommit time.Duration) { var ( interrupt *int32 minRecommit = recommit // minimal resubmit interval specified by user. - timestamp int64 // timestamp for each round of mining. + timestamp int64 // timestamp for each round of sealing. ) timer := time.NewTimer(0) @@ -397,7 +468,7 @@ func (w *worker) newWorkLoop(recommit time.Duration) { commit(false, commitInterruptNewHead) case <-timer.C: - // If mining is running resubmit a new work cycle periodically to pull in + // If sealing is running resubmit a new work cycle periodically to pull in // higher priced transactions. Disable this overhead for pending blocks. if w.isRunning() && (w.chainConfig.Clique == nil || w.chainConfig.Clique.Period > 0) { // Short circuit if no new transaction arrives. @@ -444,16 +515,30 @@ func (w *worker) newWorkLoop(recommit time.Duration) { } } -// mainLoop is a standalone goroutine to regenerate the sealing task based on the received event. +// mainLoop is responsible for generating and submitting sealing work based on +// the received event. It can support two modes: automatically generate task and +// submit it or return task according to given parameters for various proposes. func (w *worker) mainLoop() { defer w.txsSub.Unsubscribe() defer w.chainHeadSub.Unsubscribe() defer w.chainSideSub.Unsubscribe() + cleanTicker := time.NewTicker(time.Second * 10) + defer cleanTicker.Stop() + for { select { case req := <-w.newWorkCh: - w.commitNewWork(req.interrupt, req.noempty, req.timestamp) + w.commitWork(req.interrupt, req.noempty, req.timestamp) + + case req := <-w.getWorkCh: + block, err := w.generateWork(req.params) + if err != nil { + req.err = err + req.result <- nil + } else { + req.result <- block + } case ev := <-w.chainSideCh: // Short circuit for duplicate side blocks @@ -469,36 +554,34 @@ func (w *worker) mainLoop() { } else { w.remoteUncles[ev.Block.Hash()] = ev.Block } - // If our mining block contains less than 2 uncle blocks, - // add the new uncle block if valid and regenerate a mining block. - if w.isRunning() && w.current != nil && w.current.uncles.Cardinality() < 2 { + // If our sealing block contains less than 2 uncle blocks, + // add the new uncle block if valid and regenerate a new + // sealing block for higher profit. + if w.isRunning() && w.current != nil && len(w.current.uncles) < 2 { start := time.Now() if err := w.commitUncle(w.current, ev.Block.Header()); err == nil { - var uncles []*types.Header - w.current.uncles.Each(func(item interface{}) bool { - hash, ok := item.(common.Hash) - if !ok { - return false - } - uncle, exist := w.localUncles[hash] - if !exist { - uncle, exist = w.remoteUncles[hash] - } - if !exist { - return false - } - uncles = append(uncles, uncle.Header()) - return false - }) - w.commit(uncles, nil, true, start) + w.commit(w.current.copy(), nil, true, start) + } + } + + case <-cleanTicker.C: + chainHead := w.chain.CurrentBlock() + for hash, uncle := range w.localUncles { + if uncle.NumberU64()+staleThreshold <= chainHead.NumberU64() { + delete(w.localUncles, hash) + } + } + for hash, uncle := range w.remoteUncles { + if uncle.NumberU64()+staleThreshold <= chainHead.NumberU64() { + delete(w.remoteUncles, hash) } } case ev := <-w.txsCh: - // Apply transactions to the pending state if we're not mining. + // Apply transactions to the pending state if we're not sealing // // Note all transactions received may not be continuous with transactions - // already included in the current mining block. These transactions will + // already included in the current sealing block. These transactions will // be automatically eliminated. if !w.isRunning() && w.current != nil { // If block is already full, abort @@ -516,18 +599,18 @@ func (w *worker) mainLoop() { } txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee) tcount := w.current.tcount - w.commitTransactions(txset, coinbase, nil) + w.commitTransactions(w.current, txset, coinbase, nil) // Only update the snapshot if any new transactons were added // to the pending block if tcount != w.current.tcount { - w.updateSnapshot() + w.updateSnapshot(w.current) } } else { // Special case, if the consensus engine is 0 period clique(dev mode), - // submit mining work here since all empty submission will be rejected + // submit sealing work here since all empty submission will be rejected // by clique. Of course the advance sealing(empty submission) is disabled. if w.chainConfig.Clique != nil && w.chainConfig.Clique.Period == 0 { - w.commitNewWork(nil, true, time.Now().Unix()) + w.commitWork(nil, true, time.Now().Unix()) } } atomic.AddInt32(&w.newTxs, int32(len(ev.Txs))) @@ -658,23 +741,33 @@ func (w *worker) resultLoop() { } } -// makeCurrent creates a new environment for the current cycle. -func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error { +// makeEnv creates a new environment for the sealing block. +func (w *worker) makeEnv(parent *types.Block, header *types.Header) (*environment, error) { // Retrieve the parent state to execute on top and start a prefetcher for - // the miner to speed block sealing up a bit + // the miner to speed block sealing up a bit. Note since the sealing block + // can be created upon the arbitrary parent block, but the state of parent + // block may already be pruned, so the necessary state recovery is needed + // here in the future. TODO(rjl493456442). state, err := w.chain.StateAt(parent.Root()) if err != nil { - return err + return nil, err } state.StartPrefetcher("miner") + coinbaseCopy := common.Address{} + copy(coinbaseCopy[:], w.coinbase[:]) + env := &environment{ signer: types.MakeSigner(w.chainConfig, header.Number), state: state, ancestors: mapset.NewSet(), family: mapset.NewSet(), - uncles: mapset.NewSet(), header: header, + uncles: make(map[common.Hash]*types.Header), + logs: []*types.Log{}, + profit: new(big.Int).SetUint64(0), + gasPool: new(core.GasPool).AddGas(header.GasLimit), + coinbase: coinbaseCopy, } // when 08 is processed ancestors contain 07 (quick block) for _, ancestor := range w.chain.GetBlocksFromHash(parent.Hash(), 7) { @@ -686,20 +779,13 @@ func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error { } // Keep track of transactions which return errors so they can be removed env.tcount = 0 - - // Swap out the old work with the new one, terminating any leftover prefetcher - // processes in the mean time and starting a new one. - if w.current != nil && w.current.state != nil { - w.current.state.StopPrefetcher() - } - w.current = env - return nil + return env, nil } // commitUncle adds the given block to uncle block set, returns error if failed to add. func (w *worker) commitUncle(env *environment, uncle *types.Header) error { hash := uncle.Hash() - if env.uncles.Contains(hash) { + if _, exist := env.uncles[hash]; exist { return errors.New("uncle not unique") } if env.header.ParentHash == uncle.ParentHash { @@ -711,82 +797,61 @@ func (w *worker) commitUncle(env *environment, uncle *types.Header) error { if env.family.Contains(hash) { return errors.New("uncle already included") } - env.uncles.Add(uncle.Hash()) + env.uncles[hash] = uncle return nil } // updateSnapshot updates pending snapshot block and state. // Note this function assumes the current variable is thread safe. -func (w *worker) updateSnapshot() { +func (w *worker) updateSnapshot(env *environment) { w.snapshotMu.Lock() defer w.snapshotMu.Unlock() - var uncles []*types.Header - w.current.uncles.Each(func(item interface{}) bool { - hash, ok := item.(common.Hash) - if !ok { - return false - } - uncle, exist := w.localUncles[hash] - if !exist { - uncle, exist = w.remoteUncles[hash] - } - if !exist { - return false - } - uncles = append(uncles, uncle.Header()) - return false - }) - w.snapshotBlock = types.NewBlock( - w.current.header, - w.current.txs, - uncles, - w.current.receipts, + env.header, + env.txs, + env.unclelist(), + env.receipts, trie.NewStackTrie(nil), ) - w.snapshotReceipts = copyReceipts(w.current.receipts) - w.snapshotState = w.current.state.Copy() + w.snapshotReceipts = copyReceipts(env.receipts) + w.snapshotState = env.state.Copy() } -func (w *worker) commitTransaction(tx *types.Transaction, coinbase common.Address) ([]*types.Log, error) { - snap := w.current.state.Snapshot() +func commitTransaction(chain *core.BlockChain, chainConfig *params.ChainConfig, env *environment, tx *types.Transaction, coinbase common.Address) ([]*types.Log, error) { + snap := env.state.Snapshot() - receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &coinbase, w.current.gasPool, w.current.state, w.current.header, tx, &w.current.header.GasUsed, *w.chain.GetVMConfig()) + receipt, err := core.ApplyTransaction(chainConfig, chain, &coinbase, env.gasPool, env.state, env.header, tx, &env.header.GasUsed, *chain.GetVMConfig()) if err != nil { - w.current.state.RevertToSnapshot(snap) + env.state.RevertToSnapshot(snap) return nil, err } - w.current.txs = append(w.current.txs, tx) - w.current.receipts = append(w.current.receipts, receipt) + env.txs = append(env.txs, tx) + env.receipts = append(env.receipts, receipt) return receipt.Logs, nil } -func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coinbase common.Address, interrupt *int32) bool { - // Short circuit if current is nil - if w.current == nil { - return true - } - - gasLimit := w.current.header.GasLimit - if w.current.gasPool == nil { - w.current.gasPool = new(core.GasPool).AddGas(gasLimit) +func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, coinbase common.Address, interrupt *int32) bool { + gasLimit := env.header.GasLimit + if env.gasPool == nil { + env.gasPool = new(core.GasPool).AddGas(gasLimit) } - var coalescedLogs []*types.Log + curProfit := new(big.Int).Set(env.profit) + for { // In the following three cases, we will interrupt the execution of the transaction. // (1) new head block event arrival, the interrupt signal is 1 // (2) worker start or restart, the interrupt signal is 1 - // (3) worker recreate the mining block with any newly arrived transactions, the interrupt signal is 2. + // (3) worker recreate the sealing block with any newly arrived transactions, the interrupt signal is 2. // For the first two cases, the semi-finished work will be discarded. // For the third case, the semi-finished work will be submitted to the consensus engine. if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone { // Notify resubmit loop to increase resubmitting interval due to too frequent commits. if atomic.LoadInt32(interrupt) == commitInterruptResubmit { - ratio := float64(gasLimit-w.current.gasPool.Gas()) / float64(gasLimit) + ratio := float64(gasLimit-env.gasPool.Gas()) / float64(gasLimit) if ratio < 0.1 { ratio = 0.1 } @@ -798,8 +863,8 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin return atomic.LoadInt32(interrupt) == commitInterruptNewHead } // If we don't have enough gas for any further transactions then we're done - if w.current.gasPool.Gas() < params.TxGas { - log.Trace("Not enough gas for further transactions", "have", w.current.gasPool, "want", params.TxGas) + if env.gasPool.Gas() < params.TxGas { + log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas) break } // Retrieve the next transaction and abort if all done @@ -811,19 +876,24 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin // during transaction acceptance is the transaction pool. // // We use the eip155 signer regardless of the current hf. - from, _ := types.Sender(w.current.signer, tx) + from, _ := types.Sender(env.signer, tx) // Check whether the tx is replay protected. If we're not in the EIP155 hf // phase, start ignoring the sender until we do. - if tx.Protected() && !w.chainConfig.IsEIP155(w.current.header.Number) { + if tx.Protected() && !w.chainConfig.IsEIP155(env.header.Number) { log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", w.chainConfig.EIP155Block) txs.Pop() continue } + gasPrice, err := tx.EffectiveGasTip(env.header.BaseFee) + if err != nil { + txs.Shift() + continue + } // Start executing the transaction - w.current.state.Prepare(tx.Hash(), w.current.tcount) + env.state.Prepare(tx.Hash(), env.tcount) - logs, err := w.commitTransaction(tx, coinbase) + logs, err := commitTransaction(w.chain, w.chainConfig, env, tx, coinbase) switch { case errors.Is(err, core.ErrGasLimitReached): // Pop the current out-of-gas transaction without shifting in the next from the account @@ -843,7 +913,9 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin case errors.Is(err, nil): // Everything ok, collect the logs and shift in the next transaction from the same account coalescedLogs = append(coalescedLogs, logs...) - w.current.tcount++ + gasUsed := new(big.Int).SetUint64(env.receipts[len(env.receipts)-1].GasUsed) + curProfit.Add(curProfit, gasUsed.Mul(gasUsed, gasPrice)) + env.tcount++ txs.Shift() case errors.Is(err, core.ErrTxTypeNotSupported): @@ -859,9 +931,13 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin } } + if w.isRunning() { + env.profit = curProfit + } + if !w.isRunning() && len(coalescedLogs) > 0 { - // We don't push the pendingLogsEvent while we are mining. The reason is that - // when we are mining, the worker will regenerate a mining block every 3 seconds. + // We don't push the pendingLogsEvent while we are sealing. The reason is that + // when we are sealing, the worker will regenerate a sealing block every 3 seconds. // In order to avoid pushing the repeated pendingLog, we disable the pending log pushing. // make a copy, the state caches the logs and these logs get "upgraded" from pending to mined @@ -874,166 +950,218 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin } w.pendingLogsFeed.Send(cpy) } - // Notify resubmit loop to decrease resubmitting interval if current interval is larger - // than the user-specified one. - if interrupt != nil { - w.resubmitAdjustCh <- &intervalAdjust{inc: false} - } + return false } -// commitNewWork generates several new sealing tasks based on the parent block. -func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) { +// generateParams wraps various of settings for generating sealing task. +type generateParams struct { + timestamp uint64 // The timstamp for sealing task + forceTime bool // Flag whether the given timestamp is immutable or not + parentHash common.Hash // Parent block hash, empty means the latest chain head + coinbase bool // Flag whether the coinbase field is required + noUncle bool // Flag whether the uncle block inclusion is allowed + noExtra bool // Flag whether the extra field assignment is allowed +} + +// prepareWork constructs the sealing task according to the given parameters, +// either based on the last chain head or specified parent. In this function +// the pending transactions are not filled yet, only the empty task returned. +func (w *worker) prepareWork(genParams *generateParams) (*environment, error) { w.mu.RLock() defer w.mu.RUnlock() - tstart := time.Now() + // Find the parent block for sealing task parent := w.chain.CurrentBlock() - - if parent.Time() >= uint64(timestamp) { - timestamp = int64(parent.Time() + 1) + if genParams.parentHash != (common.Hash{}) { + parent = w.chain.GetBlockByHash(genParams.parentHash) + } + if parent == nil { + return nil, fmt.Errorf("missing parent") + } + // Sanity check the timestamp correctness, recap the timestamp + // to parent+1 if the mutation is allowed. + timestamp := genParams.timestamp + if parent.Time() >= timestamp { + if genParams.forceTime { + return nil, fmt.Errorf("invalid timestamp, parent %d given %d", parent.Time(), timestamp) + } + timestamp = parent.Time() + 1 } + // Construct the sealing block header, assign the extra field if it's allowed num := parent.Number() header := &types.Header{ ParentHash: parent.Hash(), Number: num.Add(num, common.Big1), GasLimit: core.CalcGasLimit(parent.GasUsed(), parent.GasLimit(), w.config.GasFloor, w.config.GasCeil), - Extra: w.extra, - Time: uint64(timestamp), + Time: timestamp, + } + if !genParams.noExtra && len(w.extra) != 0 { + header.Extra = w.extra } // Set baseFee and GasLimit if we are on an EIP-1559 chain if w.chainConfig.IsLondon(header.Number) { header.BaseFee = misc.CalcBaseFee(w.chainConfig, parent.Header()) parentGasLimit := parent.GasLimit() if !w.chainConfig.IsLondon(parent.Number()) { - // Bump by 2x parentGasLimit = parent.GasLimit() * params.ElasticityMultiplier } header.GasLimit = core.CalcGasLimit1559(parentGasLimit, w.config.GasCeil) } - // Only set the coinbase if our consensus engine is running (avoid spurious block rewards) - if w.isRunning() { + // Set the coinbase if the worker is running or it's required + if w.isRunning() || genParams.coinbase { if w.coinbase == (common.Address{}) { log.Error("Refusing to mine without etherbase") - return + return nil, errors.New("no etherbase specified") } header.Coinbase = w.coinbase } + // Run the consensus preparation with the default or customized consensus engine. if err := w.engine.Prepare(w.chain, header); err != nil { - log.Error("Failed to prepare header for mining", "err", err) - return - } - // If we are care about TheDAO hard-fork check whether to override the extra-data or not - if daoBlock := w.chainConfig.DAOForkBlock; daoBlock != nil { - // Check whether the block is among the fork extra-override range - limit := new(big.Int).Add(daoBlock, params.DAOForkExtraRange) - if header.Number.Cmp(daoBlock) >= 0 && header.Number.Cmp(limit) < 0 { - // Depending whether we support or oppose the fork, override differently - if w.chainConfig.DAOForkSupport { - header.Extra = common.CopyBytes(params.DAOForkBlockExtra) - } else if bytes.Equal(header.Extra, params.DAOForkBlockExtra) { - header.Extra = []byte{} // If miner opposes, don't let it use the reserved extra-data - } - } + log.Error("Failed to prepare header for sealing", "err", err) + return nil, err } // Could potentially happen if starting to mine in an odd state. - err := w.makeCurrent(parent, header) + env, err := w.makeEnv(parent, header) if err != nil { - log.Error("Failed to create mining context", "err", err) - return - } - // Create the current work task and check any fork transitions needed - env := w.current - if w.chainConfig.DAOForkSupport && w.chainConfig.DAOForkBlock != nil && w.chainConfig.DAOForkBlock.Cmp(header.Number) == 0 { - misc.ApplyDAOHardFork(env.state) + log.Error("Failed to create sealing context", "err", err) + return nil, err } - // Accumulate the uncles for the current block - uncles := make([]*types.Header, 0, 2) - commitUncles := func(blocks map[common.Hash]*types.Block) { - // Clean up stale uncle blocks first - for hash, uncle := range blocks { - if uncle.NumberU64()+staleThreshold <= header.Number.Uint64() { - delete(blocks, hash) - } - } - for hash, uncle := range blocks { - if len(uncles) == 2 { - break - } - if err := w.commitUncle(env, uncle.Header()); err != nil { - log.Trace("Possible uncle rejected", "hash", hash, "reason", err) - } else { - log.Debug("Committing new uncle to block", "hash", hash) - uncles = append(uncles, uncle.Header()) + // Accumulate the uncles for the sealing work only if it's allowed. + if !genParams.noUncle { + commitUncles := func(blocks map[common.Hash]*types.Block) { + for hash, uncle := range blocks { + if len(env.uncles) == 2 { + break + } + if err := w.commitUncle(env, uncle.Header()); err != nil { + log.Trace("Possible uncle rejected", "hash", hash, "reason", err) + } else { + log.Debug("Committing new uncle to block", "hash", hash) + } } } + // Prefer to locally generated uncle + commitUncles(w.localUncles) + commitUncles(w.remoteUncles) } - // Prefer to locally generated uncle - commitUncles(w.localUncles) - commitUncles(w.remoteUncles) + return env, nil +} - // Create an empty block based on temporary copied state for - // sealing in advance without waiting block execution finished. - if !noempty && atomic.LoadUint32(&w.noempty) == 0 { - w.commit(uncles, nil, false, tstart) +// generateWork generates a sealing block based on the given parameters. +func (w *worker) generateWork(params *generateParams) (*types.Block, error) { + work, err := w.prepareWork(params) + if err != nil { + return nil, err + } + defer work.discard() + + w.multiCollator.SuggestBlock(work, nil) + var bestWork *environment = work + profit := big.NewInt(0) + + chooseMostProfitableBlock := func(e environment) { + if e.profit.Cmp(profit) > 0 { + bestWork.discard() + bestWork = &e + profit.Set(e.profit) + } } + w.multiCollator.Collect(chooseMostProfitableBlock) + + return w.engine.FinalizeAndAssemble(w.chain, bestWork.header, bestWork.state, bestWork.txs, bestWork.unclelist(), bestWork.receipts) +} - // Fill the block with all available pending transactions. - pending, err := w.eth.TxPool().Pending(true) +// commitWork generates several new sealing tasks based on the parent block +// and submit them to the sealer. +func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { + start := time.Now() + work, err := w.prepareWork(&generateParams{timestamp: uint64(timestamp)}) if err != nil { - log.Error("Failed to fetch pending transactions", "err", err) return } - // Short circuit if there is no available pending transactions. - // But if we disable empty precommit already, ignore it. Since - // empty block is necessary to keep the liveness of the network. - if len(pending) == 0 && atomic.LoadUint32(&w.noempty) == 0 { - w.updateSnapshot() - return + // Create an empty block based on temporary copied state for + // sealing in advance without waiting block execution finished. + if !noempty && atomic.LoadUint32(&w.noempty) == 0 { + w.commit(work.copy(), nil, false, start) } - // Split the pending transactions into locals and remotes - localTxs, remoteTxs := make(map[common.Address]types.Transactions), pending - for _, account := range w.eth.TxPool().Locals() { - if txs := remoteTxs[account]; len(txs) > 0 { - delete(remoteTxs, account) - localTxs[account] = txs + + // suggest the new block to the pool of active collators and interrupt sealing as more profitable strategies are fed back + w.multiCollator.SuggestBlock(work, interrupt) + profit := work.profit + curBest := work + cb := func(newWork environment) { + if newWork.profit.Cmp(profit) > 0 { + // probably don't need to copy work again here but do it for now just to be safe + w.commit(newWork.copy(), w.fullTaskHook, true, start) + profit.Set(newWork.profit) + curBest = &newWork } } - if len(localTxs) > 0 { - txs := types.NewTransactionsByPriceAndNonce(w.current.signer, localTxs, header.BaseFee) - if w.commitTransactions(txs, w.coinbase, interrupt) { - return + w.multiCollator.Collect(cb) + + // Swap out the old work with the new one, terminating any leftover + // prefetcher processes in the mean time and starting a new one. + if w.current != nil { + w.current.discard() + } + w.current = curBest + + if !w.isRunning() && len(w.current.logs) > 0 { + // We don't push the pendingLogsEvent while we are sealing. The reason is that + // when we are sealing, the worker will regenerate a sealing block every 3 seconds. + // In order to avoid pushing the repeated pendingLog, we disable the pending log pushing. + + // make a copy, the state caches the logs and these logs get "upgraded" from pending to mined + // logs by filling in the block hash when the block was mined by the local miner. This can + // cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed. + cpy := make([]*types.Log, len(w.current.logs)) + for i, l := range w.current.logs { + cpy[i] = new(types.Log) + *cpy[i] = *l } + w.pendingLogsFeed.Send(cpy) } - if len(remoteTxs) > 0 { - txs := types.NewTransactionsByPriceAndNonce(w.current.signer, remoteTxs, header.BaseFee) - if w.commitTransactions(txs, w.coinbase, interrupt) { - return + + // TODO how to determine how to adjust the resubmit interval here? i.e. how the ratio should be determined + if interrupt != nil { + if atomic.LoadInt32(interrupt) == commitInterruptNone { + w.resubmitAdjustCh <- &intervalAdjust{inc: false} + } else if atomic.LoadInt32(interrupt) == commitInterruptNone { + gasLimit := w.current.header.GasLimit + ratio := float64(gasLimit-w.current.gasPool.Gas()) / float64(gasLimit) + if ratio < 0.1 { + ratio = 0.1 + } + w.resubmitAdjustCh <- &intervalAdjust{ + ratio: ratio, + inc: true, + } } } - w.commit(uncles, w.fullTaskHook, true, tstart) + } // commit runs any post-transaction state modifications, assembles the final block // and commits new work if consensus engine is running. -func (w *worker) commit(uncles []*types.Header, interval func(), update bool, start time.Time) error { - // Deep copy receipts here to avoid interaction between different tasks. - receipts := copyReceipts(w.current.receipts) - s := w.current.state.Copy() - block, err := w.engine.FinalizeAndAssemble(w.chain, w.current.header, s, w.current.txs, uncles, receipts) - if err != nil { - return err - } +// Note the assumption is held that the mutation is allowed to the passed env, do +// the deep copy first. +func (w *worker) commit(env *environment, interval func(), update bool, start time.Time) error { if w.isRunning() { if interval != nil { interval() } + // Deep copy receipts here to avoid interaction between different tasks. + block, err := w.engine.FinalizeAndAssemble(w.chain, env.header, env.state, env.txs, env.unclelist(), env.receipts) + if err != nil { + return err + } select { - case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now()}: + case w.taskCh <- &task{receipts: env.receipts, state: env.state, block: block, createdAt: time.Now()}: w.unconfirmed.Shift(block.NumberU64() - 1) - log.Info("Commit new mining work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()), - "uncles", len(uncles), "txs", w.current.tcount, - "gas", block.GasUsed(), "fees", totalFees(block, receipts), + log.Info("Commit new sealing work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()), + "uncles", len(env.uncles), "txs", env.tcount, + "gas", block.GasUsed(), "fees", totalFees(block, env.receipts), "elapsed", common.PrettyDuration(time.Since(start))) case <-w.exitCh: @@ -1041,11 +1169,61 @@ func (w *worker) commit(uncles []*types.Header, interval func(), update bool, st } } if update { - w.updateSnapshot() + w.updateSnapshot(env) } return nil } +// getSealingBlock generates the sealing block based on the given parameters. +func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64) (*types.Block, error) { + req := &getWorkReq{ + params: &generateParams{ + timestamp: timestamp, + forceTime: true, + parentHash: parent, + coinbase: true, + noUncle: true, + noExtra: true, + }, + result: make(chan *types.Block, 1), + } + select { + case w.getWorkCh <- req: + block := <-req.result + if block == nil { + return nil, req.err + } + return block, nil + case <-w.exitCh: + return nil, errors.New("miner closed") + } +} + +func copyLogs(logs []*types.Log) []*types.Log { + result := make([]*types.Log, len(logs)) + for _, l := range logs { + logCopy := types.Log{} + copy(logCopy.Address[:], l.Address[:]) + for _, t := range l.Topics { + topic := common.Hash{} + copy(topic[:], t[:]) + logCopy.Topics = append(logCopy.Topics, topic) + } + logCopy.Data = make([]byte, len(l.Data)) + copy(logCopy.Data[:], l.Data[:]) + logCopy.BlockNumber = l.BlockNumber + copy(logCopy.TxHash[:], l.TxHash[:]) + logCopy.TxIndex = l.TxIndex + copy(logCopy.BlockHash[:], l.BlockHash[:]) + logCopy.Index = l.Index + logCopy.Removed = l.Removed + + result = append(result, &logCopy) + } + + return result +} + // copyReceipts makes a deep copy of the given receipts. func copyReceipts(receipts []*types.Receipt) []*types.Receipt { result := make([]*types.Receipt, len(receipts)) diff --git a/miner/worker_test.go b/miner/worker_test.go index 2bb6c9407bbec..6756c22c67c71 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -196,7 +196,7 @@ func (b *testWorkerBackend) newRandomTx(creation bool) *types.Transaction { func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, db ethdb.Database, blocks int) (*worker, *testWorkerBackend) { backend := newTestWorkerBackend(t, chainConfig, engine, db, blocks) backend.txPool.AddLocals(pendingTxs) - w := newWorker(testConfig, chainConfig, engine, backend, new(event.TypeMux), nil, false) + w := newWorker(testConfig, chainConfig, engine, backend, new(event.TypeMux), nil, false, []BlockCollator{&DefaultCollator{}}) w.setEtherbase(testBankAddress) return w, backend } @@ -520,3 +520,107 @@ func testAdjustInterval(t *testing.T, chainConfig *params.ChainConfig, engine co t.Error("interval reset timeout") } } + +func TestGetSealingWorkEthash(t *testing.T) { + testGetSealingWork(t, ethashChainConfig, ethash.NewFaker()) +} + +func TestGetSealingWorkClique(t *testing.T) { + testGetSealingWork(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, rawdb.NewMemoryDatabase())) +} + +func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine) { + defer engine.Close() + + w, b := newTestWorker(t, chainConfig, engine, rawdb.NewMemoryDatabase(), 0) + defer w.close() + + w.setExtra([]byte{0x01, 0x02}) + w.postSideBlock(core.ChainSideEvent{Block: b.uncleBlock}) + + w.skipSealHook = func(task *task) bool { + return true + } + w.fullTaskHook = func() { + time.Sleep(100 * time.Millisecond) + } + timestamp := uint64(time.Now().Unix()) + _, clique := engine.(*clique.Clique) + assertBlock := func(block *types.Block, number uint64) { + if block.Time() != timestamp { + t.Errorf("Invalid timestamp, want %d, get %d", timestamp, block.Time()) + } + if len(block.Uncles()) != 0 { + t.Error("Unexpected uncle block") + } + if !clique { + if len(block.Extra()) != 0 { + t.Error("Unexpected extra field") + } + if block.Coinbase() != w.coinbase { + t.Errorf("Invalid coinbase, want %x, get %x", w.coinbase, block.Coinbase()) + } + } + if block.MixDigest() != (common.Hash{}) { + t.Error("Unexpected mix digest") + } + if block.Nonce() != 0 { + t.Error("Unexpected block nonce") + } + if block.NumberU64() != number { + t.Errorf("Mismatched block number, want %d got %d", number, block.NumberU64()) + } + } + var cases = []struct { + parent common.Hash + expectNumber uint64 + expectErr bool + }{ + { + b.chain.Genesis().Hash(), + uint64(1), + false, + }, + { + b.chain.CurrentBlock().Hash(), + b.chain.CurrentBlock().NumberU64() + 1, + false, + }, + { + common.HexToHash("0xdeadbeef"), + 0, + true, + }, + } + + // This API should work even when the automatic sealing is not enabled + for _, c := range cases { + block, err := w.getSealingBlock(c.parent, timestamp) + if c.expectErr { + if err == nil { + t.Error("Expect error but get nil") + } + } else { + if err != nil { + t.Errorf("Unexpected error %v", err) + } + assertBlock(block, c.expectNumber) + } + } + + // This API should work even when the automatic sealing is enabled + w.start() + for _, c := range cases { + block, err := w.getSealingBlock(c.parent, timestamp) + if c.expectErr { + if err == nil { + t.Error("Expect error but get nil") + } + } else { + if err != nil { + t.Errorf("Unexpected error %v", err) + } + assertBlock(block, c.expectNumber) + } + } +}