Skip to content

Commit

Permalink
builds now
Browse files Browse the repository at this point in the history
  • Loading branch information
jwasinger committed Sep 22, 2021
1 parent f3309b3 commit 2afbb01
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 94 deletions.
10 changes: 6 additions & 4 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
if err != nil {
return nil, err
}
}
} else {
minerCollator = &miner.DefaultCollator{}
}

if config.NoPruning && config.TrieDirtyCache > 0 {
if config.SnapshotCache > 0 {
Expand Down Expand Up @@ -310,11 +312,11 @@ func (s *Ethereum) APIs() []rpc.API {
// Append any APIs exposed explicitly by the consensus engine
apis = append(apis, s.engine.APIs(s.BlockChain())...)

if s.config.Miner.UseCustomCollator && s.miner.API != nil {
if s.config.Miner.UseCustomCollator && s.miner.CollatorAPI != nil {
apis = append(apis, rpc.API{
Namespace: "minercollator",
Version: s.miner.API.Version(),
Service: s.miner.API.Service(),
Version: s.miner.CollatorAPI.Version(),
Service: s.miner.CollatorAPI.Service(),
Public: true,
})
}
Expand Down
61 changes: 43 additions & 18 deletions miner/collator.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ type BlockState interface {
the account which will receive the block reward.
*/
Etherbase() common.Address
GasPool() core.GasPool
}

type collatorBlockState struct {
Expand All @@ -118,12 +119,12 @@ type collatorBlockState struct {
gasPool *core.GasPool // available gas used to pack transactions
logs []*types.Log
snapshots []int
header *types.Header
}

type MinerState interface {
IsRunning() bool
ChainConfig() *params.ChainConfig
GetNewScratchBlock() (context.Context, BlockState)
}

type minerState struct {
Expand All @@ -142,7 +143,7 @@ func (m *minerState) IsRunning() bool {

type BlockCollatorWork struct {
Ctx context.Context
Block *BlockState
Block BlockState
}

type Collator interface {
Expand All @@ -160,19 +161,25 @@ var (
ErrTxTypeNotSupported = errors.New("tx type not supported")
ErrGasFeeCapTooLow = errors.New("gas fee cap too low")
ErrZeroTxs = errors.New("zero transactions")
ErrTooManyTxs = errors.New("applying txs to block would go over the block gas limit")
// error which encompasses all other reasons a given transaction
// could not be added to the block.
ErrStrange = errors.New("strange error")
)

func (bs *collatorBlockState) Commit() {
func (bs *collatorBlockState) Commit() bool {
if bs.committed {
return
return false
}
bs.env.worker.curEnvMu.Lock()
defer bs.env.worker.curEnvMu.Unlock()
if bs.env.cycleCtx != nil && bs.env.cycleCtx.Done() {
return

if bs.env.cycleCtx != nil {
select {
case <-bs.env.cycleCtx.Done():
return false
default:
}
}

bs.env.current = bs
Expand All @@ -181,6 +188,7 @@ func (bs *collatorBlockState) Commit() {
}

bs.committed = true
return true
}

func copyLogs(logs []*types.Log) []*types.Log {
Expand Down Expand Up @@ -216,14 +224,19 @@ func copyReceipts(receipts []*types.Receipt) []*types.Receipt {
return result
}

func (bs *collatorBlockState) Copy() *collatorBlockState {
func (bs *collatorBlockState) Copy() BlockState {
return bs.copy()
}

func (bs *collatorBlockState) copy() *collatorBlockState {
cpy := collatorBlockState{
env: bs.env,
state: bs.state.Copy(),
tcount: bs.tcount,
committed: bs.committed,
logs: copyLogs(bs.logs),
receipts: copyReceipts(bs.receipts),
header: types.CopyHeader(bs.header),
}

if bs.gasPool != nil {
Expand All @@ -239,15 +252,15 @@ func (bs *collatorBlockState) Copy() *collatorBlockState {
}

func (bs *collatorBlockState) commitTransaction(tx *types.Transaction) ([]*types.Log, error) {
snap := env.state.Snapshot()
snap := bs.state.Snapshot()

receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &coinbase, env.gasPool, env.state, env.header, tx, &env.header.GasUsed, *w.chain.GetVMConfig())
receipt, err := core.ApplyTransaction(bs.env.worker.chainConfig, bs.env.worker.chain, &bs.env.coinbase, bs.gasPool, bs.state, bs.header, tx, &bs.header.GasUsed, *bs.env.worker.chain.GetVMConfig())
if err != nil {
env.state.RevertToSnapshot(snap)
bs.state.RevertToSnapshot(snap)
return nil, err
}
env.txs = append(env.txs, tx)
env.receipts = append(env.receipts, receipt)
bs.txs = append(bs.txs, tx)
bs.receipts = append(bs.receipts, receipt)

return receipt.Logs, nil
}
Expand Down Expand Up @@ -283,7 +296,7 @@ func (bs *collatorBlockState) AddTransactions(txs types.Transactions) (error, ty

snapshot := bs.state.Snapshot()
bs.state.Prepare(tx.Hash(), bs.tcount+tcount)
txLogs, err := bs.env.worker.commitTransaction(bs.env, tx, bs.etherbase)
txLogs, err := bs.commitTransaction(tx)
if err != nil {
switch {
case errors.Is(err, core.ErrGasLimitReached):
Expand All @@ -302,7 +315,7 @@ func (bs *collatorBlockState) AddTransactions(txs types.Transactions) (error, ty
}

bs.logs = bs.logs[:len(bs.logs)-tcount]
bs.env.state.RevertToSnapshot(bs.snapshots[len(bs.snapshots)-tcount])
bs.state.RevertToSnapshot(bs.snapshots[len(bs.snapshots)-tcount])
bs.snapshots = bs.snapshots[:len(bs.snapshots)-tcount]

return retErr, nil
Expand All @@ -313,15 +326,15 @@ func (bs *collatorBlockState) AddTransactions(txs types.Transactions) (error, ty
}
}

retReceipts := bs.env.receipts[bs.env.tcount:]
bs.env.tcount += tcount
retReceipts := bs.receipts[bs.tcount:]
bs.tcount += tcount

return nil, retReceipts
}

func (bs *collatorBlockState) RevertTransactions(count uint) error {
if bs.committed {
return ErrCommitted
return ErrAlreadyCommitted
} else if int(count) > len(bs.snapshots) {
return ErrTooManyTxs
} else if count == 0 {
Expand All @@ -341,5 +354,17 @@ func (bs *collatorBlockState) Signer() types.Signer {
}

func (bs *collatorBlockState) Etherbase() common.Address {
return bs.env.etherbase
return bs.env.coinbase
}

func (bs *collatorBlockState) GasPool() core.GasPool {
return *bs.gasPool
}

func (bs *collatorBlockState) discard() {
bs.state.StopPrefetcher()
}

func (bs *collatorBlockState) Header() *types.Header {
return types.CopyHeader(bs.header)
}
76 changes: 50 additions & 26 deletions miner/default_collator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,38 @@ import (
"errors"
"time"
"sync"
"context"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
)

const (
// minRecommitInterval is the minimal time interval to recreate the mining block with
// any newly arrived transactions.
minRecommitInterval = 1 * time.Second

// maxRecommitInterval is the maximum time interval to recreate the mining block with
// any newly arrived transactions.
maxRecommitInterval = 15 * time.Second

// intervalAdjustRatio is the impact a single interval adjustment has on sealing work
// resubmitting interval.
intervalAdjustRatio = 0.1

// intervalAdjustBias is applied during the new resubmit interval calculation in favor of
// increasing upper limit or decreasing lower limit so that the limit can be reachable.
intervalAdjustBias = 200 * 1000.0 * 1000.0
)

type DefaultCollator struct {
recommitMu sync.Mutex
recommit time.Duration
minRecommit time.Duration
miner MinerState
exitCh <-chan struct{}
pool Pool
}

// recalcRecommit recalculates the resubmitting interval upon feedback.
Expand Down Expand Up @@ -43,37 +63,40 @@ func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) t
func (c *DefaultCollator) adjustRecommit(bs BlockState, inc bool) {
c.recommitMu.Lock()
defer c.recommitMu.Unlock()
header := bs.Header()
gasLimit := header.GasLimit
gasPool := bs.GasPool()
if inc {
before := recommit
ratio := float64(gasLimit-w.current.gasPool.Gas()) / float64(gasLimit)
before := c.recommit
ratio := float64(gasLimit-gasPool.Gas()) / float64(gasLimit)
if ratio < 0.1 {
ratio = 0.1
}

target := float64(recommit.Nanoseconds()) / ratio
c.recommit = recalcRecommit(minRecommit, recommit, target, true)
log.Trace("Increase miner recommit interval", "from", before, "to", recommit)
target := float64(c.recommit.Nanoseconds()) / ratio
c.recommit = recalcRecommit(c.minRecommit, c.recommit, target, true)
log.Trace("Increase miner recommit interval", "from", before, "to", c.recommit)
} else {
before := recommit
c.recommit = recalcRecommit(minRecommit, recommit, float64(minRecommit.Nanoseconds()), false)
log.Trace("Decrease miner recommit interval", "from", before, "to", recommit)
before := c.recommit
c.recommit = recalcRecommit(c.minRecommit, c.recommit, float64(c.minRecommit.Nanoseconds()), false)
log.Trace("Decrease miner recommit interval", "from", before, "to", c.recommit)
}
}

func submitTransactions(ctx context.Context, bs BlockState, txs *types.TransactionsByPriceAndNonce) {
func submitTransactions(ctx context.Context, bs BlockState, txs *types.TransactionsByPriceAndNonce, timer *time.Timer) bool {
header := bs.Header()
availableGas := header.GasLimit
for {
select {
case <-ctx.Done():
return
return true
default:
}

if timer != nil {
select {
case <-timer.C:
return
return false
default:
}
}
Expand Down Expand Up @@ -124,10 +147,10 @@ func submitTransactions(ctx context.Context, bs BlockState, txs *types.Transacti
}
}

return
return false
}

func (c *DefaultCollator) fillTransactions(ctx context.Context, bs BlockState, timer time.Timer, exitch <-chan struct{}) {
func (c *DefaultCollator) fillTransactions(ctx context.Context, bs BlockState, timer *time.Timer) {
header := bs.Header()
txs, err := c.pool.Pending(true)
if err != nil {
Expand All @@ -146,30 +169,31 @@ func (c *DefaultCollator) fillTransactions(ctx context.Context, bs BlockState, t
}
}
if len(localTxs) > 0 {
if submitTransactions(bs, types.NewTransactionsByPriceAndNonce(bs.Signer(), localTxs, header.BaseFee)) {
return true
if submitTransactions(ctx, bs, types.NewTransactionsByPriceAndNonce(bs.Signer(), localTxs, header.BaseFee), timer) {
return
}
}
if len(remoteTxs) > 0 {
if submitTransactions(bs, types.NewTransactionsByPriceAndNonce(bs.Signer(), remoteTxs, header.BaseFee)) {
return true
if submitTransactions(ctx, bs, types.NewTransactionsByPriceAndNonce(bs.Signer(), remoteTxs, header.BaseFee), timer) {
return
}
}

bs.Commit()

return
}

func (c* DefaultCollator) workCycle() {
func (c* DefaultCollator) workCycle(work BlockCollatorWork) {
ctx := work.Ctx
emptyBs := work.Block

for {
c.recommitMu.Rlock()
c.recommitMu.Lock()
curRecommit := c.recommit
c.recommitMu.Unlock()
timer := time.NewTimer(curRecommit)

bs := emptyBs.Copy()
c.fillTransactions(bs, timer)
c.fillTransactions(ctx, bs, timer)
bs.Commit()
shouldContinue := false

Expand All @@ -193,7 +217,7 @@ func (c* DefaultCollator) workCycle() {
}

select {
case ctx.Done():
case <-ctx.Done():
return
case <-timer.C:
// If mining is running resubmit a new work cycle periodically to pull in
Expand All @@ -211,18 +235,18 @@ func (c* DefaultCollator) workCycle() {
}

func (c *DefaultCollator) SetRecommit(interval time.Duration) {
c.recommitMu.WLock()
c.recommitMu.Lock()
defer c.recommitMu.Unlock()

c.recommit, c.minRecommit = interval, interval
}

func (c *DefaultCollator) CollateBlocks(miner MinerState) {
func (c *DefaultCollator) CollateBlocks(miner MinerState, blockCh <-chan BlockCollatorWork, exitCh <-chan struct{}) {
c.miner = miner
c.exitCh = exitCh
for {
select {
case <-exitCh:
case <-c.exitCh:
return
case cycleWork := <-blockCh:
c.workCycle(cycleWork)
Expand Down
5 changes: 2 additions & 3 deletions miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type Miner struct {
exitCh chan struct{}
startCh chan common.Address
stopCh chan struct{}
API CollatorAPI
CollatorAPI CollatorAPI
}

func New(eth Backend, config *Config, chainConfig *params.ChainConfig, mux *event.TypeMux, engine consensus.Engine, isLocalBlock func(block *types.Block) bool, collator Collator, collatorAPI CollatorAPI) *Miner {
Expand All @@ -77,7 +77,7 @@ func New(eth Backend, config *Config, chainConfig *params.ChainConfig, mux *even
exitCh: make(chan struct{}),
startCh: make(chan common.Address),
stopCh: make(chan struct{}),
API: collatorAPI,
CollatorAPI: collatorAPI,
worker: newWorker(config, chainConfig, collator, engine, eth, mux, isLocalBlock, true),
}
go miner.update()
Expand Down Expand Up @@ -154,7 +154,6 @@ func (miner *Miner) Start(coinbase common.Address) {
}

func (miner *Miner) Stop() {
miner.worker.collator.Close()
miner.stopCh <- struct{}{}
}

Expand Down

0 comments on commit 2afbb01

Please sign in to comment.