Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

miner: support multiple custom block collation strategies evaluated concurrently #23383

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
148 changes: 1 addition & 147 deletions eth/catalyst/api.go
Expand Up @@ -21,12 +21,9 @@ import (
"errors"
"fmt"
"math/big"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/misc"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/log"
Expand Down Expand Up @@ -65,153 +62,11 @@ func newConsensusAPI(eth *eth.Ethereum) *consensusAPI {
return &consensusAPI{eth: eth}
}

// blockExecutionEnv gathers all the data required to execute
// a block, either when assembling it or when inserting it.
type blockExecutionEnv struct {
chain *core.BlockChain
state *state.StateDB
tcount int
gasPool *core.GasPool

header *types.Header
txs []*types.Transaction
receipts []*types.Receipt
}

func (env *blockExecutionEnv) commitTransaction(tx *types.Transaction, coinbase common.Address) error {
vmconfig := *env.chain.GetVMConfig()
snap := env.state.Snapshot()
receipt, err := core.ApplyTransaction(env.chain.Config(), env.chain, &coinbase, env.gasPool, env.state, env.header, tx, &env.header.GasUsed, vmconfig)
if err != nil {
env.state.RevertToSnapshot(snap)
return err
}
env.txs = append(env.txs, tx)
env.receipts = append(env.receipts, receipt)
return nil
}

func (api *consensusAPI) makeEnv(parent *types.Block, header *types.Header) (*blockExecutionEnv, error) {
state, err := api.eth.BlockChain().StateAt(parent.Root())
if err != nil {
return nil, err
}
env := &blockExecutionEnv{
chain: api.eth.BlockChain(),
state: state,
header: header,
gasPool: new(core.GasPool).AddGas(header.GasLimit),
}
return env, nil
}

// AssembleBlock creates a new block, inserts it into the chain, and returns the "execution
// data" required for eth2 clients to process the new block.
func (api *consensusAPI) AssembleBlock(params assembleBlockParams) (*executableData, error) {
log.Info("Producing block", "parentHash", params.ParentHash)

bc := api.eth.BlockChain()
parent := bc.GetBlockByHash(params.ParentHash)
if parent == nil {
log.Warn("Cannot assemble block with parent hash to unknown block", "parentHash", params.ParentHash)
return nil, fmt.Errorf("cannot assemble block with unknown parent %s", params.ParentHash)
}

pool := api.eth.TxPool()

if parent.Time() >= params.Timestamp {
return nil, fmt.Errorf("child timestamp lower than parent's: %d >= %d", parent.Time(), params.Timestamp)
}
if now := uint64(time.Now().Unix()); params.Timestamp > now+1 {
wait := time.Duration(params.Timestamp-now) * time.Second
log.Info("Producing block too far in the future", "wait", common.PrettyDuration(wait))
time.Sleep(wait)
}

pending, err := pool.Pending(true)
if err != nil {
return nil, err
}

coinbase, err := api.eth.Etherbase()
if err != nil {
return nil, err
}
num := parent.Number()
header := &types.Header{
ParentHash: parent.Hash(),
Number: num.Add(num, common.Big1),
Coinbase: coinbase,
GasLimit: parent.GasLimit(), // Keep the gas limit constant in this prototype
Extra: []byte{},
Time: params.Timestamp,
}
if config := api.eth.BlockChain().Config(); config.IsLondon(header.Number) {
header.BaseFee = misc.CalcBaseFee(config, parent.Header())
}
err = api.eth.Engine().Prepare(bc, header)
if err != nil {
return nil, err
}

env, err := api.makeEnv(parent, header)
if err != nil {
return nil, err
}

var (
signer = types.MakeSigner(bc.Config(), header.Number)
txHeap = types.NewTransactionsByPriceAndNonce(signer, pending, nil)
transactions []*types.Transaction
)
for {
if env.gasPool.Gas() < chainParams.TxGas {
log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", chainParams.TxGas)
break
}
tx := txHeap.Peek()
if tx == nil {
break
}

// The sender is only for logging purposes, and it doesn't really matter if it's correct.
from, _ := types.Sender(signer, tx)

// Execute the transaction
env.state.Prepare(tx.Hash(), env.tcount)
err = env.commitTransaction(tx, coinbase)
switch err {
case core.ErrGasLimitReached:
// Pop the current out-of-gas transaction without shifting in the next from the account
log.Trace("Gas limit exceeded for current block", "sender", from)
txHeap.Pop()

case core.ErrNonceTooLow:
// New head notification data race between the transaction pool and miner, shift
log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce())
txHeap.Shift()

case core.ErrNonceTooHigh:
// Reorg notification data race between the transaction pool and miner, skip account =
log.Trace("Skipping account with high nonce", "sender", from, "nonce", tx.Nonce())
txHeap.Pop()

case nil:
// Everything ok, collect the logs and shift in the next transaction from the same account
env.tcount++
txHeap.Shift()
transactions = append(transactions, tx)

default:
// Strange error, discard the transaction and get the next in line (note, the
// nonce-too-high clause will prevent us from executing in vain).
log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err)
txHeap.Shift()
}
}

// Create the block.
block, err := api.eth.Engine().FinalizeAndAssemble(bc, header, env.state, transactions, nil /* uncles */, env.receipts)
block, err := api.eth.Miner().GetSealingBlock(params.ParentHash, params.Timestamp)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -255,7 +110,6 @@ func insertBlockParamsToBlock(config *chainParams.ChainConfig, parent *types.Hea
if err != nil {
return nil, err
}

number := big.NewInt(0)
number.SetUint64(params.Number)
header := &types.Header{
Expand Down
110 changes: 110 additions & 0 deletions miner/default_collator.go
@@ -0,0 +1,110 @@
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package miner

import (
"errors"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
)

type DefaultCollator struct{}

func submitTransactions(bs BlockState, txs *types.TransactionsByPriceAndNonce) bool {
var shouldAbort bool
cb := func(err error, receipts *types.Receipt) bool {
switch {
case errors.Is(err, core.ErrGasLimitReached):
fallthrough
case errors.Is(err, core.ErrTxTypeNotSupported):
fallthrough
case errors.Is(err, core.ErrNonceTooHigh):
txs.Pop()
case errors.Is(err, core.ErrNonceTooLow):
fallthrough
case errors.Is(err, ErrAbort):
shouldAbort = true
return false // don't need to waste time rolling back these transactions when this work will be thrown away anyways.
case errors.Is(err, nil):
fallthrough
default:
txs.Shift()
}
return false
}

for {
// If we don't have enough gas for any further transactions then we're done
available := bs.Gas()
if available < params.TxGas {
break
}
// Retrieve the next transaction and abort if all done
tx := txs.Peek()
if tx == nil {
break
}
// Enough space for this tx?
if available < tx.Gas() {
txs.Pop()
continue
}
bs.AddTransactions(types.Transactions{tx}, cb)
if shouldAbort {
return true
}
}
return false
}

// CollateBlock fills a block based on the highest paying transactions from the
// transaction pool, giving precedence over local transactions.
func (w *DefaultCollator) CollateBlock(bs BlockState, pool Pool) {
txs, err := pool.Pending(true)
if err != nil {
log.Error("could not get pending transactions from the pool", "err", err)
return
}
if len(txs) == 0 {
return
}
// Split the pending transactions into locals and remotes
localTxs, remoteTxs := make(map[common.Address]types.Transactions), txs
for _, account := range pool.Locals() {
if accountTxs := remoteTxs[account]; len(accountTxs) > 0 {
delete(remoteTxs, account)
localTxs[account] = accountTxs
}
}
if len(localTxs) > 0 {
if submitTransactions(bs, types.NewTransactionsByPriceAndNonce(bs.Signer(), localTxs, bs.BaseFee())) {
return
}
}
if len(remoteTxs) > 0 {
if submitTransactions(bs, types.NewTransactionsByPriceAndNonce(bs.Signer(), remoteTxs, bs.BaseFee())) {
return
}
}

bs.Commit()

return
}
10 changes: 9 additions & 1 deletion miner/miner.go
Expand Up @@ -66,14 +66,16 @@ type Miner struct {
}

func New(eth Backend, config *Config, chainConfig *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, isLocalBlock func(block *types.Block) bool) *Miner {
activeCollators := []BlockCollator{}
activeCollators = append(activeCollators, &DefaultCollator{})
miner := &Miner{
eth: eth,
mux: mux,
engine: engine,
exitCh: make(chan struct{}),
startCh: make(chan common.Address),
stopCh: make(chan struct{}),
worker: newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, true),
worker: newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, true, activeCollators),
}
go miner.update()

Expand Down Expand Up @@ -227,6 +229,12 @@ func (miner *Miner) DisablePreseal() {
miner.worker.disablePreseal()
}

// GetSealingBlock retrieves a sealing block based on the given parameters.
// The returned block is not sealed but all other fields should be filled.
func (miner *Miner) GetSealingBlock(parent common.Hash, timestamp uint64) (*types.Block, error) {
return miner.worker.getSealingBlock(parent, timestamp)
}

// SubscribePendingLogs starts delivering logs from pending transactions
// to the given channel.
func (miner *Miner) SubscribePendingLogs(ch chan<- []*types.Log) event.Subscription {
Expand Down