Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/catalyst: evict old payloads, type PayloadID #24236

Merged
merged 7 commits into from Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
45 changes: 22 additions & 23 deletions eth/catalyst/api.go
Expand Up @@ -26,7 +26,6 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/beacon"
"github.com/ethereum/go-ethereum/consensus/misc"
Expand All @@ -50,9 +49,10 @@ var (
GenericServerError = rpc.CustomError{Code: -32000, ValidationError: "Server error"}
UnknownPayload = rpc.CustomError{Code: -32001, ValidationError: "Unknown payload"}
InvalidTB = rpc.CustomError{Code: -32002, ValidationError: "Invalid terminal block"}
InvalidPayloadID = rpc.CustomError{Code: 1, ValidationError: "invalid payload id"}
)

const preparedPayloadsCacheSize = 10
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls delete this, it's dead code. Otherwise LGTM


// Register adds catalyst APIs to the full node.
func Register(stack *node.Node, backend *eth.Ethereum) error {
log.Warn("Catalyst mode enabled", "protocol", "eth")
Expand Down Expand Up @@ -86,7 +86,7 @@ type ConsensusAPI struct {
eth *eth.Ethereum
les *les.LightEthereum
engine consensus.Engine // engine is the post-merge consensus engine, only for block creation
preparedBlocks map[uint64]*ExecutableDataV1
preparedBlocks *payloadQueue // preparedBlocks caches payloads (*ExecutableDataV1) by payload ID (PayloadID)
}

func NewConsensusAPI(eth *eth.Ethereum, les *les.LightEthereum) *ConsensusAPI {
Expand All @@ -110,12 +110,13 @@ func NewConsensusAPI(eth *eth.Ethereum, les *les.LightEthereum) *ConsensusAPI {
engine = beacon.New(eth.Engine())
}
}

return &ConsensusAPI{
light: eth == nil,
eth: eth,
les: les,
engine: engine,
preparedBlocks: make(map[uint64]*ExecutableDataV1),
preparedBlocks: newPayloadQueue(),
}
}

Expand Down Expand Up @@ -175,20 +176,17 @@ func (api *ConsensusAPI) makeEnv(parent *types.Block, header *types.Header) (*bl
return env, nil
}

func (api *ConsensusAPI) GetPayloadV1(payloadID hexutil.Bytes) (*ExecutableDataV1, error) {
hash := []byte(payloadID)
if len(hash) < 8 {
return nil, &InvalidPayloadID
}
id := binary.BigEndian.Uint64(hash[:8])
data, ok := api.preparedBlocks[id]
if !ok {
func (api *ConsensusAPI) GetPayloadV1(payloadID PayloadID) (*ExecutableDataV1, error) {
log.Trace("Engine API request received", "method", "GetPayload", "id", payloadID)
data := api.preparedBlocks.get(payloadID)
if data == nil {
return nil, &UnknownPayload
}
return data, nil
}

func (api *ConsensusAPI) ForkchoiceUpdatedV1(heads ForkchoiceStateV1, PayloadAttributes *PayloadAttributesV1) (ForkChoiceResponse, error) {
log.Trace("Engine API request received", "method", "ForkChoiceUpdated", "head", heads.HeadBlockHash, "finalized", heads.FinalizedBlockHash, "safe", heads.SafeBlockHash)
if heads.HeadBlockHash == (common.Hash{}) {
return ForkChoiceResponse{Status: SUCCESS.Status, PayloadID: nil}, nil
}
Expand Down Expand Up @@ -216,25 +214,24 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(heads ForkchoiceStateV1, PayloadAtt
if err != nil {
return INVALID, err
}
hash := computePayloadId(heads.HeadBlockHash, PayloadAttributes)
id := binary.BigEndian.Uint64(hash)
api.preparedBlocks[id] = data
log.Info("Created payload", "payloadid", id)
// TODO (MariusVanDerWijden) do something with the payloadID?
hex := hexutil.Bytes(hash)
return ForkChoiceResponse{Status: SUCCESS.Status, PayloadID: &hex}, nil
id := computePayloadId(heads.HeadBlockHash, PayloadAttributes)
api.preparedBlocks.put(id, data)
log.Info("Created payload", "payloadID", id)
return ForkChoiceResponse{Status: SUCCESS.Status, PayloadID: &id}, nil
}
return ForkChoiceResponse{Status: SUCCESS.Status, PayloadID: nil}, nil
}

func computePayloadId(headBlockHash common.Hash, params *PayloadAttributesV1) []byte {
func computePayloadId(headBlockHash common.Hash, params *PayloadAttributesV1) PayloadID {
// Hash
hasher := sha256.New()
hasher.Write(headBlockHash[:])
binary.Write(hasher, binary.BigEndian, params.Timestamp)
hasher.Write(params.Random[:])
hasher.Write(params.SuggestedFeeRecipient[:])
return hasher.Sum([]byte{})[:8]
var out PayloadID
copy(out[:], hasher.Sum(nil)[:8])
return out
}

func (api *ConsensusAPI) invalid() ExecutePayloadResponse {
Expand All @@ -246,6 +243,7 @@ func (api *ConsensusAPI) invalid() ExecutePayloadResponse {

// ExecutePayload creates an Eth1 block, inserts it in the chain, and returns the status of the chain.
func (api *ConsensusAPI) ExecutePayloadV1(params ExecutableDataV1) (ExecutePayloadResponse, error) {
log.Trace("Engine API request received", "method", "ExecutePayload", params.BlockHash, "number", params.Number)
block, err := ExecutableDataToBlock(params)
if err != nil {
return api.invalid(), err
Expand Down Expand Up @@ -276,6 +274,7 @@ func (api *ConsensusAPI) ExecutePayloadV1(params ExecutableDataV1) (ExecutePaylo
if td.Cmp(ttd) < 0 {
return api.invalid(), fmt.Errorf("can not execute payload on top of block with low td got: %v threshold %v", td, ttd)
}
log.Trace("InsertingBlockWithoutSetHead", "hash", block.Hash(), "number", block.Number)
if err := api.eth.BlockChain().InsertBlockWithoutSetHead(block); err != nil {
return api.invalid(), err
}
Expand All @@ -301,8 +300,8 @@ func (api *ConsensusAPI) assembleBlock(parentHash common.Hash, params *PayloadAt
return nil, fmt.Errorf("cannot assemble block with unknown parent %s", parentHash)
}

if params.Timestamp < parent.Time() {
return nil, fmt.Errorf("child timestamp lower than parent's: %d < %d", params.Timestamp, parent.Time())
if params.Timestamp <= parent.Time() {
return nil, fmt.Errorf("invalid timestamp: child's %d <= parent's %d", params.Timestamp, parent.Time())
}
if now := uint64(time.Now().Unix()); params.Timestamp > now+1 {
diff := time.Duration(params.Timestamp-now) * time.Second
Expand Down
57 changes: 53 additions & 4 deletions eth/catalyst/api_test.go
Expand Up @@ -17,12 +17,12 @@
package catalyst

import (
"fmt"
"math/big"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
Expand Down Expand Up @@ -158,13 +158,21 @@ func TestEth2PrepareAndGetPayload(t *testing.T) {
t.Fatalf("error preparing payload, err=%v", err)
}
payloadID := computePayloadId(fcState.HeadBlockHash, &blockParams)
execData, err := api.GetPayloadV1(hexutil.Bytes(payloadID))
execData, err := api.GetPayloadV1(payloadID)
if err != nil {
t.Fatalf("error getting payload, err=%v", err)
}
if len(execData.Transactions) != blocks[9].Transactions().Len() {
t.Fatalf("invalid number of transactions %d != 1", len(execData.Transactions))
}
// Test invalid payloadID
var invPayload PayloadID
copy(invPayload[:], payloadID[:])
invPayload[0] = ^invPayload[0]
_, err = api.GetPayloadV1(invPayload)
if err == nil {
t.Fatal("expected error retrieving invalid payload")
}
}

func checkLogEvents(t *testing.T, logsCh <-chan []*types.Log, rmLogsCh <-chan core.RemovedLogsEvent, wantNew, wantRemoved int) {
Expand All @@ -185,6 +193,48 @@ func checkLogEvents(t *testing.T, logsCh <-chan []*types.Log, rmLogsCh <-chan co
}
}

func TestInvalidPayloadTimestamp(t *testing.T) {
genesis, preMergeBlocks := generatePreMergeChain(10)
n, ethservice := startEthService(t, genesis, preMergeBlocks)
ethservice.Merger().ReachTTD()
defer n.Close()
var (
api = NewConsensusAPI(ethservice, nil)
parent = ethservice.BlockChain().CurrentBlock()
)
tests := []struct {
time uint64
shouldErr bool
}{
{0, true},
{parent.Time(), true},
{parent.Time() - 1, true},
{parent.Time() + 1, false},
{uint64(time.Now().Unix()) + uint64(time.Minute), false},
}

for i, test := range tests {
t.Run(fmt.Sprintf("Timestamp test: %v", i), func(t *testing.T) {
params := PayloadAttributesV1{
Timestamp: test.time,
Random: crypto.Keccak256Hash([]byte{byte(123)}),
SuggestedFeeRecipient: parent.Coinbase(),
}
fcState := ForkchoiceStateV1{
HeadBlockHash: parent.Hash(),
SafeBlockHash: common.Hash{},
FinalizedBlockHash: common.Hash{},
}
_, err := api.ForkchoiceUpdatedV1(fcState, &params)
if test.shouldErr && err == nil {
t.Fatalf("expected error preparing payload with invalid timestamp, err=%v", err)
} else if !test.shouldErr && err != nil {
t.Fatalf("error preparing payload with valid timestamp, err=%v", err)
}
})
}
}

func TestEth2NewBlock(t *testing.T) {
genesis, preMergeBlocks := generatePreMergeChain(10)
n, ethservice := startEthService(t, genesis, preMergeBlocks)
Expand Down Expand Up @@ -391,7 +441,7 @@ func TestFullAPI(t *testing.T) {
t.Fatalf("error preparing payload, invalid status: %v", resp.Status)
}
payloadID := computePayloadId(parent.Hash(), &params)
payload, err := api.GetPayloadV1(hexutil.Bytes(payloadID))
payload, err := api.GetPayloadV1(payloadID)
if err != nil {
t.Fatalf("can't get payload: %v", err)
}
Expand All @@ -414,6 +464,5 @@ func TestFullAPI(t *testing.T) {
t.Fatalf("Chain head should be updated")
}
parent = ethservice.BlockChain().CurrentBlock()

}
}
35 changes: 22 additions & 13 deletions eth/catalyst/api_types.go
Expand Up @@ -17,6 +17,7 @@
package catalyst

import (
"fmt"
"math/big"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -69,17 +70,6 @@ type executableDataMarshaling struct {
Transactions []hexutil.Bytes
}

//go:generate go run github.com/fjl/gencodec -type PayloadResponse -field-override payloadResponseMarshaling -out gen_payload.go

type PayloadResponse struct {
PayloadID uint64 `json:"payloadId"`
}

// JSON type overrides for payloadResponse.
type payloadResponseMarshaling struct {
PayloadID hexutil.Uint64
}

type NewBlockResponse struct {
Valid bool `json:"valid"`
}
Expand All @@ -102,9 +92,28 @@ type ConsensusValidatedParams struct {
Status string `json:"status"`
}

// PayloadID is an identifier of the payload build process
type PayloadID [8]byte

func (b PayloadID) String() string {
return hexutil.Encode(b[:])
}

func (b PayloadID) MarshalText() ([]byte, error) {
return hexutil.Bytes(b[:]).MarshalText()
}

func (b *PayloadID) UnmarshalText(input []byte) error {
err := hexutil.UnmarshalFixedText("PayloadID", input, b[:])
if err != nil {
return fmt.Errorf("invalid payload id %q: %w", input, err)
}
return nil
}
karalabe marked this conversation as resolved.
Show resolved Hide resolved

type ForkChoiceResponse struct {
Status string `json:"status"`
PayloadID *hexutil.Bytes `json:"payloadId"`
Status string `json:"status"`
PayloadID *PayloadID `json:"payloadId"`
}

type ForkchoiceStateV1 struct {
Expand Down
74 changes: 74 additions & 0 deletions eth/catalyst/cache.go
@@ -0,0 +1,74 @@
// Copyright 2022 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package catalyst

import "sync"

// maxTrackedPayloads is the maximum number of prepared payloads the execution
// engine tracks before evicting old ones. Ideally we should only ever track the
// latest one; but have a slight wiggle room for non-ideal conditions.
const maxTrackedPayloads = 10

// payloadQueueItem represents an id->payload tuple to store until it's retrieved
// or evicted.
type payloadQueueItem struct {
id PayloadID
payload *ExecutableDataV1
}

// payloadQueue tracks the latest handful of constructed payloads to be retrieved
// by the beacon chain if block production is requested.
type payloadQueue struct {
payloads []*payloadQueueItem
lock sync.RWMutex
}

// newPayloadQueue creates a pre-initialized queue with a fixed number of slots
// all containing empty items.
func newPayloadQueue() *payloadQueue {
return &payloadQueue{
payloads: make([]*payloadQueueItem, maxTrackedPayloads),
}
}

// put inserts a new payload into the queue at the given id.
func (q *payloadQueue) put(id PayloadID, data *ExecutableDataV1) {
q.lock.Lock()
defer q.lock.Unlock()

copy(q.payloads[1:], q.payloads)
q.payloads[0] = &payloadQueueItem{
id: id,
payload: data,
}
}

// get retrieves a previously stored payload item or nil if it does not exist.
func (q *payloadQueue) get(id PayloadID) *ExecutableDataV1 {
q.lock.RLock()
defer q.lock.RUnlock()

for _, item := range q.payloads {
if item == nil {
return nil // no more items
}
if item.id == id {
return item.payload
}
}
return nil
}