diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 1a27a3255adbe..bf5c488ed71eb 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -125,6 +125,9 @@ var ( utils.MinerExtraDataFlag, utils.MinerRecommitIntervalFlag, utils.MinerNoVerfiyFlag, + utils.MinerCollatorPluginEnableFlag, + utils.MinerCollatorPluginPath, + utils.MinerCollatorPluginConfigPath, utils.NATFlag, utils.NoDiscoverFlag, utils.DiscoveryV5Flag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 7ed5907dba851..108a903e67499 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -477,6 +477,19 @@ var ( Name: "miner.noverify", Usage: "Disable remote sealing verification", } + MinerCollatorPluginEnableFlag = cli.BoolFlag{ + Name: "miner.enablecollatorplugin", + Usage: "Enable custom miner collator", + } + MinerCollatorPluginPath = cli.StringFlag{ + Name: "miner.collatorpluginfile", + Usage: "Path to collator plugin compiled as shared library", + } + MinerCollatorPluginConfigPath = cli.StringFlag{ + Name: "miner.collatorpluginconfigfile", + Usage: "Path to custom collator config toml", + } + // Account settings UnlockedAccountFlag = cli.StringFlag{ Name: "unlock", @@ -1401,6 +1414,15 @@ func setMiner(ctx *cli.Context, cfg *miner.Config) { if ctx.GlobalIsSet(MinerNoVerfiyFlag.Name) { cfg.Noverify = ctx.GlobalBool(MinerNoVerfiyFlag.Name) } + if ctx.GlobalIsSet(MinerCollatorPluginEnableFlag.Name) { + cfg.UseCustomCollator = ctx.GlobalBool(MinerCollatorPluginEnableFlag.Name) + } + if ctx.GlobalIsSet(MinerCollatorPluginPath.Name) { + cfg.CollatorPath = ctx.GlobalString(MinerCollatorPluginPath.Name) + } + if ctx.GlobalIsSet(MinerCollatorPluginConfigPath.Name) { + cfg.CollatorConfigPath = ctx.GlobalString(MinerCollatorPluginConfigPath.Name) + } } func setWhitelist(ctx *cli.Context, cfg *ethconfig.Config) { diff --git a/core/vm/interface.go b/core/vm/interface.go index ad9b05d666a87..11e46a21dc983 100644 --- a/core/vm/interface.go +++ b/core/vm/interface.go @@ -76,6 +76,22 @@ type StateDB interface { ForEachStorage(common.Address, func(common.Hash, common.Hash) bool) error } +type StateReader interface { + GetBalance(common.Address) *big.Int + GetNonce(common.Address) uint64 + GetCodeHash(common.Address) common.Hash + GetCode(common.Address) []byte + GetCodeSize(common.Address) int + GetRefund() uint64 + GetCommittedState(common.Address, common.Hash) common.Hash + GetState(common.Address, common.Hash) common.Hash + HasSuicided(common.Address) bool + Exist(common.Address) bool + Empty(common.Address) bool + AddressInAccessList(addr common.Address) bool + SlotInAccessList(addr common.Address, slot common.Hash) (addressOk bool, slotOk bool) +} + // CallContext provides a basic interface for the EVM calling conventions. The EVM // depends on this context being implemented for doing subcalls and initialising new EVM contracts. type CallContext interface { diff --git a/eth/backend.go b/eth/backend.go index 793d3b81f1b1f..2f8441573b2b8 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -111,6 +111,19 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { log.Warn("Sanitizing invalid miner gas price", "provided", config.Miner.GasPrice, "updated", ethconfig.Defaults.Miner.GasPrice) config.Miner.GasPrice = new(big.Int).Set(ethconfig.Defaults.Miner.GasPrice) } + + var minerCollator miner.Collator + var minerCollatorAPI miner.CollatorAPI + + if config.Miner.UseCustomCollator { + log.Info("using custom mining collator") + var err error + minerCollator, minerCollatorAPI, err = miner.LoadCollator(config.Miner.CollatorPath, config.Miner.CollatorConfigPath) + if err != nil { + return nil, err + } + } + if config.NoPruning && config.TrieDirtyCache > 0 { if config.SnapshotCache > 0 { config.TrieCleanCache += config.TrieDirtyCache * 3 / 5 @@ -225,7 +238,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { return nil, err } - eth.miner = miner.New(eth, &config.Miner, chainConfig, eth.EventMux(), eth.engine, eth.isLocalBlock) + eth.miner = miner.New(eth, &config.Miner, chainConfig, eth.EventMux(), eth.engine, eth.isLocalBlock, minerCollator, minerCollatorAPI) eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData)) eth.APIBackend = &EthAPIBackend{stack.Config().ExtRPCEnabled(), stack.Config().AllowUnprotectedTxs, eth, nil} @@ -297,6 +310,15 @@ func (s *Ethereum) APIs() []rpc.API { // Append any APIs exposed explicitly by the consensus engine apis = append(apis, s.engine.APIs(s.BlockChain())...) + if s.config.Miner.UseCustomCollator && s.miner.API != nil { + apis = append(apis, rpc.API{ + Namespace: "minercollator", + Version: s.miner.API.Version(), + Service: s.miner.API.Service(), + Public: true, + }) + } + // Append all the local APIs and return return append(apis, []rpc.API{ { diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 0913b69d7ffd8..a46ac1830a76b 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -83,10 +83,13 @@ var Defaults = Config{ TrieTimeout: 60 * time.Minute, SnapshotCache: 102, Miner: miner.Config{ - GasFloor: 8000000, - GasCeil: 8000000, - GasPrice: big.NewInt(params.GWei), - Recommit: 3 * time.Second, + GasFloor: 8000000, + GasCeil: 8000000, + GasPrice: big.NewInt(params.GWei), + Recommit: 3 * time.Second, + UseCustomCollator: false, + CollatorPath: "", + CollatorConfigPath: "", }, TxPool: core.DefaultTxPoolConfig, RPCGasCap: 50000000, diff --git a/miner/collator.go b/miner/collator.go new file mode 100644 index 0000000000000..bc622997812e2 --- /dev/null +++ b/miner/collator.go @@ -0,0 +1,402 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Package miner implements Ethereum block creation and mining. +package miner + +import ( + "errors" + // "math" + //"math/big" + "os" + "plugin" + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" + // "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/params" + "github.com/naoina/toml" +) + +type CollatorAPI interface { + Version() string + Service() interface{} +} + +// Pool is an interface to the transaction pool +type Pool interface { + Pending(bool) (map[common.Address]types.Transactions, error) + Locals() []common.Address +} + +/* + BlockState represents an under-construction block. An instance of + BlockState is passed to CollateBlock where it can be filled with transactions + via BlockState.AddTransaction() and submitted for sealing via + BlockState.Commit(). + + Operations on a single BlockState instance are not threadsafe. However, + instances can be copied with BlockState.Copy(). +*/ +type BlockState interface { + /* + adds a single transaction to the blockState. Returned errors include ..,..,.. + which signify that the transaction was invalid for the current EVM/chain state. + + ErrRecommit signals that the recommit timer has elapsed. + ErrNewHead signals that the client has received a new canonical chain head. + All subsequent calls to AddTransaction fail if either newHead or the recommit timer + have occured. + + If the recommit interval has elapsed, the BlockState can still be committed to the sealer. + */ + AddTransactions(tx types.Transactions) (error, types.Receipts) + + /* + removes a number of transactions from the block resetting the state to what + it was before the transactions were added. If count is greater than the number + of transactions in the block, returns + */ + RevertTransactions(count uint) error + + /* + returns true if the Block has been made the current sealing block. + returns false if the newHead interrupt has been triggered. + can also return false if the BlockState is no longer valid (the call to CollateBlock + which the original instance was passed has returned). + */ + Commit() bool + Copy() BlockState + State() vm.StateReader + Signer() types.Signer + Header() *types.Header + /* + the account which will receive the block reward. + */ + Etherbase() common.Address +} + +const ( + InterruptNone int = iota + InterruptResubmit + InterruptNewHead +) + +/* +InterruptContext allows for active polling to detect if a new canonical chain +head was received or recommit timer elapse occured. +*/ +type InterruptContext interface { + InterruptState() int +} + +type Collator interface { + /* + the main entrypoint for constructing a block for sealing. An empty block + bs, is provided which can be modified/copied and committed to the sealer + for the duration of the call to CollateBlock. + */ + CollateBlock(bs BlockState, ctx InterruptContext) + /* + Called when the client is started after the miner worker is created. + */ + Start(pool Pool) + /* + Called when the client is closing. + */ + Close() +} + +var ( + ErrInterruptRecommit = errors.New("interrupt: recommit timer elapsed") + ErrInterruptNewHead = errors.New("interrupt: client received new canon chain head") + ErrCommitted = errors.New("can't mutate BlockState after calling Commit()") + + // errors which indicate that a given transaction cannot be + // added at a given block or chain configuration. + ErrGasLimitReached = errors.New("gas limit reached") + ErrNonceTooLow = errors.New("tx nonce too low") + ErrNonceTooHigh = errors.New("tx nonce too high") + ErrTxTypeNotSupported = errors.New("tx type not supported") + ErrGasFeeCapTooLow = errors.New("gas fee cap too low") + // error which encompasses all other reasons a given transaction + // could not be added to the block. + ErrStrange = errors.New("strange error") +) + +/* + Loads a collator plugin and configuration (toml) from disk. + Expects the plugin to export a method named PluginConstructor + which has a signature: + func(config *map[string]interface{}) (Collator, CollatorAPI, error) + + returns the result of calling the plugin constructor for the given + toml config (which is nil if a custom config filepath is not passed + via --minercollator.configfilepath) +*/ +func LoadCollator(filepath string, configPath string) (Collator, CollatorAPI, error) { + p, err := plugin.Open(filepath) + if err != nil { + return nil, nil, err + } + + v, err := p.Lookup("PluginConstructor") + if err != nil { + return nil, nil, errors.New("Symbol 'APIExport' not found") + } + + pluginConstructor, ok := v.(func(config *map[string]interface{}) (Collator, CollatorAPI, error)) + if !ok { + return nil, nil, errors.New("Expected symbol 'API' to be of type 'CollatorAPI") + } + + f, err := os.Open(configPath) + if err != nil { + return nil, nil, err + } + defer f.Close() + + config := make(map[string]interface{}) + if err := toml.NewDecoder(f).Decode(&config); err != nil { + return nil, nil, err + } + + collator, collatorAPI, err := pluginConstructor(&config) + if err != nil { + return nil, nil, err + } + + return collator, collatorAPI, nil +} + +const ( + interruptNotHandled int32 = 0 + interruptIsHandled int32 = 1 +) + +type blockState struct { + worker *worker + env *environment + start time.Time + logs []*types.Log + shouldSeal bool + snapshots []int + committed bool + + // shared values between multiple copies of a blockState + + // miner interrupt + interrupt *int32 + // mutex to make sure only one blockState is calling commit at a given time + commitMu *sync.Mutex + // this makes sure multiple copies of a blockState can only trigger + // adjustment of the recommit interval once + interruptHandled *int32 + // prevents calls to worker.commit (with a given blockState) after + // CollateBlock call on that blockState returns. examined in commit + // when commitMu is held. modified right after CollateBlock returns + done *bool + // calling Commit() copies the value of env to this value + // and forwards it to the sealer via worker.commit() if shouldSeal is true + resultEnv *environment +} + +type interruptContext struct { + interrupt *int32 +} + +func (ctx *interruptContext) InterruptState() int { + if ctx.interrupt == nil { + return InterruptNone + } + + switch atomic.LoadInt32(ctx.interrupt) { + case commitInterruptResubmit: + return InterruptResubmit + case commitInterruptNewHead: + return InterruptNewHead + default: + return InterruptNone + } +} + +func (bs *blockState) Etherbase() common.Address { + return bs.env.etherbase +} + +func (bs *blockState) Header() *types.Header { + return types.CopyHeader(bs.env.header) +} + +func (bs *blockState) AddTransactions(txs types.Transactions) (error, types.Receipts) { + tcount := 0 + var retErr error = nil + + if bs.committed { + return ErrCommitted, nil + } + + if len(txs) == 0 { + return ErrZeroTxs, nil + } + + for _, tx := range txs { + if bs.interrupt != nil && atomic.LoadInt32(bs.interrupt) != commitInterruptNone { + if atomic.LoadInt32(bs.interrupt) == commitInterruptResubmit { + if atomic.CompareAndSwapInt32(bs.interruptHandled, interruptNotHandled, interruptIsHandled) { + var ratio float64 = 0.1 + bs.worker.resubmitAdjustCh <- &intervalAdjust{ + ratio: ratio, + inc: true, + } + } + return ErrInterruptRecommit, nil + } else { + return ErrInterruptNewHead, nil + } + } + + if bs.env.gasPool.Gas() < params.TxGas { + return ErrGasLimitReached, nil + } + + // Check whether the tx is replay protected. If we're not in the EIP155 hf + // phase, start ignoring the sender until we do. + if tx.Protected() && !bs.worker.chainConfig.IsEIP155(bs.env.header.Number) { + return ErrTxTypeNotSupported, nil + } + + // TODO can this error also be returned by commitTransaction below? + _, err := tx.EffectiveGasTip(bs.env.header.BaseFee) + if err != nil { + return ErrGasFeeCapTooLow, nil + } + + snapshot := bs.env.state.Snapshot() + bs.env.state.Prepare(tx.Hash(), bs.env.tcount+tcount) + txLogs, err := bs.worker.commitTransaction(bs.env, tx, bs.env.etherbase) + if err != nil { + switch { + case errors.Is(err, core.ErrGasLimitReached): + // this should never be reached. + // should be caught above + retErr = ErrGasLimitReached + case errors.Is(err, core.ErrNonceTooLow): + retErr = ErrNonceTooLow + case errors.Is(err, core.ErrNonceTooHigh): + retErr = ErrNonceTooHigh + case errors.Is(err, core.ErrTxTypeNotSupported): + // TODO check that this unspported tx type is the same as the one caught above + retErr = ErrTxTypeNotSupported + default: + retErr = ErrStrange + } + + bs.logs = bs.logs[:len(bs.logs)-tcount] + bs.env.state.RevertToSnapshot(bs.snapshots[len(bs.snapshots)-tcount]) + bs.snapshots = bs.snapshots[:len(bs.snapshots)-tcount] + + return retErr, nil + } else { + bs.logs = append(bs.logs, txLogs...) + bs.snapshots = append(bs.snapshots, snapshot) + tcount++ + } + } + + retReceipts := bs.env.receipts[bs.env.tcount:] + bs.env.tcount += tcount + + return nil, retReceipts +} + +func (bs *blockState) State() vm.StateReader { + return bs.env.state +} + +func (bs *blockState) Signer() types.Signer { + return bs.env.signer +} + +// TODO change return from bool to error +func (bs *blockState) Commit() bool { + if bs.committed { + return false + } + + if bs.interrupt != nil && atomic.LoadInt32(bs.interrupt) != commitInterruptNone { + if atomic.CompareAndSwapInt32(bs.interruptHandled, interruptNotHandled, interruptIsHandled) && atomic.LoadInt32(bs.interrupt) == commitInterruptResubmit { + // Notify resubmit loop to increase resubmitting interval due to too frequent commits. + var ratio float64 = 0.1 + bs.worker.resubmitAdjustCh <- &intervalAdjust{ + ratio: ratio, + inc: true, + } + } + return false + } + + bs.commitMu.Lock() + defer bs.commitMu.Unlock() + if *bs.done { + return false + } + if bs.shouldSeal { + bs.worker.commit(bs.env.copy(), bs.worker.fullTaskHook, true, bs.start) + } + bs.resultEnv = bs.env + bs.committed = true + return true +} + +var ( + ErrTooManyTxs = errors.New("tried to revert more txs than exist in BlockState") + ErrZeroTxs = errors.New("tried to revert 0 transactions") +) + +func (bs *blockState) RevertTransactions(count uint) error { + if bs.committed { + return ErrCommitted + } else if int(count) > len(bs.snapshots) { + return ErrTooManyTxs + } else if count == 0 { + return ErrZeroTxs + } + bs.env.state.RevertToSnapshot(bs.snapshots[len(bs.snapshots)-int(count)]) + bs.snapshots = bs.snapshots[:len(bs.snapshots)-int(count)] + return nil +} + +func (bs *blockState) Copy() BlockState { + return &blockState{ + worker: bs.worker, + env: bs.env.copy(), + start: bs.start, + logs: copyLogs(bs.logs), + interrupt: bs.interrupt, + commitMu: bs.commitMu, + interruptHandled: bs.interruptHandled, + done: bs.done, + committed: bs.committed, + shouldSeal: bs.shouldSeal, + resultEnv: bs.resultEnv, + } +} diff --git a/miner/default_collator.go b/miner/default_collator.go new file mode 100644 index 0000000000000..0269ac5569f64 --- /dev/null +++ b/miner/default_collator.go @@ -0,0 +1,133 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Package miner implements Ethereum block creation and mining. +package miner + +import ( + "errors" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" +) + +type DefaultCollator struct { + pool Pool +} + +func submitTransactions(bs BlockState, txs *types.TransactionsByPriceAndNonce) bool { + header := bs.Header() + availableGas := header.GasLimit + for { + // Retrieve the next transaction and abort if all done + tx := txs.Peek() + if tx == nil { + break + } + // Enough space for this tx? + if availableGas < tx.Gas() { + txs.Pop() + continue + } + from, _ := types.Sender(bs.Signer(), tx) + + err, receipts := bs.AddTransactions(types.Transactions{tx}) + switch { + case errors.Is(err, ErrGasLimitReached): + // Pop the current out-of-gas transaction without shifting in the next from the account + log.Trace("Gas limit exceeded for current block", "sender", from) + txs.Pop() + + case errors.Is(err, ErrNonceTooLow): + // New head notification data race between the transaction pool and miner, shift + log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce()) + txs.Shift() + + case errors.Is(err, ErrNonceTooHigh): + // Reorg notification data race between the transaction pool and miner, skip account = + log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce()) + txs.Pop() + + case errors.Is(err, nil): + availableGas = header.GasLimit - receipts[0].CumulativeGasUsed + // Everything ok, collect the logs and shift in the next transaction from the same account + txs.Shift() + + case errors.Is(err, ErrTxTypeNotSupported): + // Pop the unsupported transaction without shifting in the next from the account + log.Trace("Skipping unsupported transaction type", "sender", from, "type", tx.Type()) + txs.Pop() + case errors.Is(err, ErrInterruptRecommit): + log.Trace("interrupted: recommit interval elapsed") + return true + case errors.Is(err, ErrInterruptNewHead): + log.Trace("interrupted: new canon chain head received") + return true + default: + // Strange error, discard the transaction and get the next in line (note, the + // nonce-too-high clause will prevent us from executing in vain). + log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err) + txs.Shift() + } + } + + return false +} + +// CollateBlock fills a block based on the highest paying transactions from the +// transaction pool, giving precedence over local transactions. +func (w *DefaultCollator) CollateBlock(bs BlockState, ctx InterruptContext) { + header := bs.Header() + txs, err := w.pool.Pending(true) + if err != nil { + log.Error("could not get pending transactions from the pool", "err", err) + return + } + if len(txs) == 0 { + return + } + // Split the pending transactions into locals and remotes + localTxs, remoteTxs := make(map[common.Address]types.Transactions), txs + for _, account := range w.pool.Locals() { + if accountTxs := remoteTxs[account]; len(accountTxs) > 0 { + delete(remoteTxs, account) + localTxs[account] = accountTxs + } + } + if len(localTxs) > 0 { + if submitTransactions(bs, types.NewTransactionsByPriceAndNonce(bs.Signer(), localTxs, header.BaseFee)) { + return + } + } + if len(remoteTxs) > 0 { + if submitTransactions(bs, types.NewTransactionsByPriceAndNonce(bs.Signer(), remoteTxs, header.BaseFee)) { + return + } + } + + bs.Commit() + + return +} + +func (w *DefaultCollator) Start(pool Pool) { + w.pool = pool +} + +func (w *DefaultCollator) Close() { + +} diff --git a/miner/miner.go b/miner/miner.go index 87e809f34f124..5d64898e83933 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -42,15 +42,18 @@ type Backend interface { // Config is the configuration parameters of mining. type Config struct { - Etherbase common.Address `toml:",omitempty"` // Public address for block mining rewards (default = first account) - Notify []string `toml:",omitempty"` // HTTP URL list to be notified of new work packages (only useful in ethash). - NotifyFull bool `toml:",omitempty"` // Notify with pending block headers instead of work packages - ExtraData hexutil.Bytes `toml:",omitempty"` // Block extra data set by the miner - GasFloor uint64 // Target gas floor for mined blocks. - GasCeil uint64 // Target gas ceiling for mined blocks. - GasPrice *big.Int // Minimum gas price for mining a transaction - Recommit time.Duration // The time interval for miner to re-create mining work. - Noverify bool // Disable remote mining solution verification(only useful in ethash). + Etherbase common.Address `toml:",omitempty"` // Public address for block mining rewards (default = first account) + Notify []string `toml:",omitempty"` // HTTP URL list to be notified of new work packages (only useful in ethash). + NotifyFull bool `toml:",omitempty"` // Notify with pending block headers instead of work packages + ExtraData hexutil.Bytes `toml:",omitempty"` // Block extra data set by the miner + GasFloor uint64 // Target gas floor for mined blocks. + GasCeil uint64 // Target gas ceiling for mined blocks. + GasPrice *big.Int // Minimum gas price for mining a transaction + Recommit time.Duration // The time interval for miner to re-create mining work. + Noverify bool // Disable remote mining solution verification(only useful in ethash). + UseCustomCollator bool + CollatorPath string + CollatorConfigPath string } // Miner creates blocks and searches for proof-of-work values. @@ -63,9 +66,10 @@ type Miner struct { exitCh chan struct{} startCh chan common.Address stopCh chan struct{} + API CollatorAPI } -func New(eth Backend, config *Config, chainConfig *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, isLocalBlock func(block *types.Block) bool) *Miner { +func New(eth Backend, config *Config, chainConfig *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, isLocalBlock func(block *types.Block) bool, collator Collator, collatorAPI CollatorAPI) *Miner { miner := &Miner{ eth: eth, mux: mux, @@ -73,7 +77,8 @@ func New(eth Backend, config *Config, chainConfig *params.ChainConfig, mux *even exitCh: make(chan struct{}), startCh: make(chan common.Address), stopCh: make(chan struct{}), - worker: newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, true), + API: collatorAPI, + worker: newWorker(config, chainConfig, collator, engine, eth, mux, isLocalBlock, true), } go miner.update() @@ -153,6 +158,7 @@ func (miner *Miner) Stop() { } func (miner *Miner) Close() { + miner.worker.collator.Close() close(miner.exitCh) } diff --git a/miner/miner_test.go b/miner/miner_test.go index da1e472dbd76f..1772357d9e46e 100644 --- a/miner/miner_test.go +++ b/miner/miner_test.go @@ -257,5 +257,5 @@ func createMiner(t *testing.T) (*Miner, *event.TypeMux) { // Create event Mux mux := new(event.TypeMux) // Create Miner - return New(backend, &config, chainConfig, mux, engine, nil), mux + return New(backend, &config, chainConfig, mux, engine, nil, &DefaultCollator{}, nil), mux } diff --git a/miner/worker.go b/miner/worker.go index 5120e58571a2a..7677910f56a79 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -1,4 +1,4 @@ -// Copyright 2015 The go-ethereum Authors +// Copyright 2021 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify @@ -88,10 +88,11 @@ type environment struct { tcount int // tx count in cycle gasPool *core.GasPool // available gas used to pack transactions - header *types.Header - txs []*types.Transaction - receipts []*types.Receipt - uncles map[common.Hash]*types.Header + header *types.Header + txs []*types.Transaction + receipts []*types.Receipt + uncles map[common.Hash]*types.Header + etherbase common.Address } // copy creates a deep copy of environment. @@ -104,9 +105,11 @@ func (env *environment) copy() *environment { tcount: env.tcount, header: types.CopyHeader(env.header), receipts: copyReceipts(env.receipts), + etherbase: env.etherbase, } if env.gasPool != nil { - cpy.gasPool = &(*env.gasPool) + cpy.gasPool = new(core.GasPool) + *cpy.gasPool = *env.gasPool } // The content of txs and uncles are immutable, unnecessary // to do the expensive deep copy for them. @@ -119,6 +122,29 @@ func (env *environment) copy() *environment { return cpy } +func copyLogs(logs []*types.Log) []*types.Log { + result := make([]*types.Log, len(logs)) + for _, l := range logs { + logCopy := types.Log{ + Address: l.Address, + BlockNumber: l.BlockNumber, + TxHash: l.TxHash, + TxIndex: l.TxIndex, + Index: l.Index, + Removed: l.Removed, + } + for _, t := range l.Topics { + logCopy.Topics = append(logCopy.Topics, t) + } + logCopy.Data = make([]byte, len(l.Data)) + copy(logCopy.Data[:], l.Data[:]) + + result = append(result, &logCopy) + } + + return result +} + // unclelist returns the contained uncles as the list format. func (env *environment) unclelist() []*types.Header { var uncles []*types.Header @@ -180,6 +206,7 @@ type worker struct { engine consensus.Engine eth Backend chain *core.BlockChain + collator Collator // Feeds pendingLogsFeed event.Feed @@ -241,7 +268,7 @@ type worker struct { resubmitHook func(time.Duration, time.Duration) // Method to call upon updating resubmitting interval. } -func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, init bool) *worker { +func newWorker(config *Config, chainConfig *params.ChainConfig, collator Collator, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, init bool) *worker { worker := &worker{ config: config, chainConfig: chainConfig, @@ -265,12 +292,14 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus startCh: make(chan struct{}, 1), resubmitIntervalCh: make(chan time.Duration), resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize), + collator: collator, } // Subscribe NewTxsEvent for tx pool worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh) // Subscribe events for blockchain worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh) worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh) + worker.collator.Start(worker.eth.TxPool()) // Sanitize recommit interval if the user-specified one is too short. recommit := worker.config.Recommit @@ -378,6 +407,7 @@ func (w *worker) close() { if w.current != nil { w.current.discard() } + w.collator.Close() atomic.StoreInt32(&w.running, 0) close(w.exitCh) } @@ -747,6 +777,8 @@ func (w *worker) makeEnv(parent *types.Block, header *types.Header) (*environmen family: mapset.NewSet(), header: header, uncles: make(map[common.Hash]*types.Header), + etherbase: w.coinbase, + gasPool: new(core.GasPool).AddGas(header.GasLimit), } // when 08 is processed ancestors contain 07 (quick block) for _, ancestor := range w.chain.GetBlocksFromHash(parent.Hash(), 7) { @@ -828,10 +860,7 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone { // Notify resubmit loop to increase resubmitting interval due to too frequent commits. if atomic.LoadInt32(interrupt) == commitInterruptResubmit { - ratio := float64(gasLimit-env.gasPool.Gas()) / float64(gasLimit) - if ratio < 0.1 { - ratio = 0.1 - } + var ratio float64 = 0.1 w.resubmitAdjustCh <- &intervalAdjust{ ratio: ratio, inc: true, @@ -1018,51 +1047,44 @@ func (w *worker) prepareWork(genParams *generateParams) (*environment, error) { return env, nil } -// fillTransactions retrieves the pending transactions from the txpool and fills them -// into the given sealing block. The transaction selection and ordering strategy can -// be customized with the plugin in the future. -func (w *worker) fillTransactions(interrupt *int32, env *environment) error { - // Split the pending transactions into locals and remotes - // Fill the block with all available pending transactions. - pending, err := w.eth.TxPool().Pending(true) - if err != nil { - log.Error("Failed to fetch pending transactions", "err", err) - return err - } - localTxs, remoteTxs := make(map[common.Address]types.Transactions), pending - for _, account := range w.eth.TxPool().Locals() { - if txs := remoteTxs[account]; len(txs) > 0 { - delete(remoteTxs, account) - localTxs[account] = txs - } - } - if len(localTxs) > 0 { - txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee) - if w.commitTransactions(env, txs, w.coinbase, interrupt) { - return nil - } - } - if len(remoteTxs) > 0 { - txs := types.NewTransactionsByPriceAndNonce(env.signer, remoteTxs, env.header.BaseFee) - if w.commitTransactions(env, txs, w.coinbase, interrupt) { - return nil - } - } - return nil -} - // generateWork generates a sealing block based on the given parameters. func (w *worker) generateWork(params *generateParams) (*types.Block, error) { - work, err := w.prepareWork(params) + start := time.Now() + emptyEnv, err := w.prepareWork(params) if err != nil { return nil, err } - defer work.discard() + defer emptyEnv.discard() + + bs := blockState{ + worker: w, + env: emptyEnv.copy(), + snapshots: []int{emptyEnv.state.Snapshot()}, + resultEnv: nil, + start: start, + commitMu: new(sync.Mutex), + committed: false, + interruptHandled: new(int32), + done: new(bool), + interrupt: nil, + shouldSeal: false, + } - if err := w.fillTransactions(nil, work); err != nil { - return nil, err + ctx := interruptContext{ + interrupt: nil, + } + + w.collator.CollateBlock(&bs, &ctx) + + bs.commitMu.Lock() + *bs.done = true + bs.commitMu.Unlock() + + if bs.resultEnv == nil { + bs.resultEnv = emptyEnv } - return w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts) + + return w.engine.FinalizeAndAssemble(w.chain, bs.resultEnv.header, bs.resultEnv.state, bs.resultEnv.txs, bs.resultEnv.unclelist(), bs.resultEnv.receipts) } // commitWork generates several new sealing tasks based on the parent block @@ -1078,18 +1100,59 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) { if !noempty && atomic.LoadUint32(&w.noempty) == 0 { w.commit(work.copy(), nil, false, start) } - // Fill pending transactions from the txpool - if err := w.fillTransactions(interrupt, work); err != nil { + bs := blockState{ + worker: w, + snapshots: []int{work.state.Snapshot()}, + env: work.copy(), + start: start, + commitMu: new(sync.Mutex), + committed: false, + interruptHandled: new(int32), + done: new(bool), + interrupt: interrupt, + shouldSeal: true, + resultEnv: nil, + } + ctx := interruptContext{ + interrupt, + } + w.collator.CollateBlock(&bs, &ctx) + + bs.commitMu.Lock() + *bs.done = true + bs.commitMu.Unlock() + + if bs.interrupt != nil && atomic.CompareAndSwapInt32(bs.interruptHandled, interruptNotHandled, interruptIsHandled) { + if atomic.LoadInt32(bs.interrupt) != commitInterruptNewHead { + w.resubmitAdjustCh <- &intervalAdjust{inc: false} + } + } + + if bs.interrupt != nil && atomic.LoadInt32(bs.interrupt) == commitInterruptNewHead { return } - w.commit(work.copy(), w.fullTaskHook, true, start) - // Swap out the old work with the new one, terminating any leftover - // prefetcher processes in the mean time and starting a new one. - if w.current != nil { - w.current.discard() + if !w.isRunning() && len(bs.logs) > 0 { + // We don't push the pendingLogsEvent while we are mining. The reason is that + // when we are mining, the worker will regenerate a mining block every 3 seconds. + // In order to avoid pushing the repeated pendingLog, we disable the pending log pushing. + + // make a copy, the state caches the logs and these logs get "upgraded" from pending to mined + // logs by filling in the block hash when the block was mined by the local miner. This can + // cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed. + cpy := make([]*types.Log, len(bs.logs)) + for i, l := range bs.logs { + cpy[i] = new(types.Log) + *cpy[i] = *l + } + w.pendingLogsFeed.Send(cpy) + } + + if bs.resultEnv != nil { + w.current = bs.resultEnv + } else { + w.current = work } - w.current = work } // commit runs any post-transaction state modifications, assembles the final block diff --git a/miner/worker_test.go b/miner/worker_test.go index c9b2564d1c3d9..efc8bb7f4ca78 100644 --- a/miner/worker_test.go +++ b/miner/worker_test.go @@ -196,7 +196,7 @@ func (b *testWorkerBackend) newRandomTx(creation bool) *types.Transaction { func newTestWorker(t *testing.T, chainConfig *params.ChainConfig, engine consensus.Engine, db ethdb.Database, blocks int) (*worker, *testWorkerBackend) { backend := newTestWorkerBackend(t, chainConfig, engine, db, blocks) backend.txPool.AddLocals(pendingTxs) - w := newWorker(testConfig, chainConfig, engine, backend, new(event.TypeMux), nil, false) + w := newWorker(testConfig, chainConfig, &DefaultCollator{}, engine, backend, new(event.TypeMux), nil, false) w.setEtherbase(testBankAddress) return w, backend }