Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/catalyst: evict old payloads, type PayloadID #24236

Merged
merged 7 commits into from Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
50 changes: 27 additions & 23 deletions eth/catalyst/api.go
Expand Up @@ -25,8 +25,9 @@ import (
"math/big"
"time"

lru "github.com/hashicorp/golang-lru"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/beacon"
"github.com/ethereum/go-ethereum/consensus/misc"
Expand All @@ -50,9 +51,10 @@ var (
GenericServerError = rpc.CustomError{Code: -32000, ValidationError: "Server error"}
UnknownPayload = rpc.CustomError{Code: -32001, ValidationError: "Unknown payload"}
InvalidTB = rpc.CustomError{Code: -32002, ValidationError: "Invalid terminal block"}
InvalidPayloadID = rpc.CustomError{Code: 1, ValidationError: "invalid payload id"}
)

const preparedPayloadsCacheSize = 10
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls delete this, it's dead code. Otherwise LGTM


// Register adds catalyst APIs to the full node.
func Register(stack *node.Node, backend *eth.Ethereum) error {
log.Warn("Catalyst mode enabled", "protocol", "eth")
Expand Down Expand Up @@ -86,7 +88,8 @@ type ConsensusAPI struct {
eth *eth.Ethereum
les *les.LightEthereum
engine consensus.Engine // engine is the post-merge consensus engine, only for block creation
preparedBlocks map[uint64]*ExecutableDataV1
preparedBlocks *lru.Cache // preparedBlocks caches payloads (*ExecutableDataV1) by payload ID (PayloadID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is no really a good one. The role of a "cache" is to keep some recent data available-ish, but it's at the sole discretion of the implementation as to how long and when to evict. Whilst it might seem like not a bad idea at first to use a cache to store recent payloads, losing control of storage determinism can make thing in the future a lot harder to debug.

E.g. Certain caches have limits on the maximum size an item can have and will simply refuse to store something larger. In this context it could happen that if the payload exceeds a certain size, the cache would not store it anymore, making it unmineable. If all nodes would create a similarly large block they can't store, we could end u[ with a stalled network. Now I don't know if this particular cache behaves like that or not - or what the limits are - but this is exactly why a cache should not be used, because it's a disk access "optimization", not a storage data structure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed about a cache not being the right fit here after a second look. But previously it would just keep growing indefinitely, and I discussed this change and eviction number with Marius before implementing it. Least-recently-used as eviction strategy + a fixed size limit seemed reasonable enough. Can you describe what you prefer instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depends how much work you want to put in and what the size of the set will be.

If we take the current cap of 10, then you could use a simple slice and just shift the contents on every insert and do a linear search on every retrieval. We're expected do do these ops rarely when it's our turn to sign, and even then once to create the payload and once to retrieve it within a 14 second time fame. A 10 item iteration doesn't matter in that scope.

If we were to make the set larger, the best solution would probably be a map as it was originally and an eviction order queue. That could be a channel or a circular queue (we have one in the code). But I don't see a reason to complicate it like that.

My 2c, unless there's a reason no to, just go with slice shifting / iteration.


}

func NewConsensusAPI(eth *eth.Ethereum, les *les.LightEthereum) *ConsensusAPI {
Expand All @@ -110,12 +113,16 @@ func NewConsensusAPI(eth *eth.Ethereum, les *les.LightEthereum) *ConsensusAPI {
engine = beacon.New(eth.Engine())
}
}
preparedBlocks, _ := lru.NewWithEvict(preparedPayloadsCacheSize, func(key, value interface{}) {
log.Trace("evicted payload from preparedBlocks", "payloadID", key.(PayloadID),
"blockhash", value.(*ExecutableDataV1).BlockHash)
}) // positive cache size, no error
return &ConsensusAPI{
light: eth == nil,
eth: eth,
les: les,
engine: engine,
preparedBlocks: make(map[uint64]*ExecutableDataV1),
preparedBlocks: preparedBlocks,
}
}

Expand Down Expand Up @@ -175,20 +182,17 @@ func (api *ConsensusAPI) makeEnv(parent *types.Block, header *types.Header) (*bl
return env, nil
}

func (api *ConsensusAPI) GetPayloadV1(payloadID hexutil.Bytes) (*ExecutableDataV1, error) {
hash := []byte(payloadID)
if len(hash) < 8 {
return nil, &InvalidPayloadID
}
id := binary.BigEndian.Uint64(hash[:8])
data, ok := api.preparedBlocks[id]
func (api *ConsensusAPI) GetPayloadV1(payloadID PayloadID) (*ExecutableDataV1, error) {
log.Trace("GetPayload", "payloadID", payloadID)
protolambda marked this conversation as resolved.
Show resolved Hide resolved
MariusVanDerWijden marked this conversation as resolved.
Show resolved Hide resolved
data, ok := api.preparedBlocks.Get(payloadID)
if !ok {
return nil, &UnknownPayload
}
return data, nil
return data.(*ExecutableDataV1), nil
}

func (api *ConsensusAPI) ForkchoiceUpdatedV1(heads ForkchoiceStateV1, PayloadAttributes *PayloadAttributesV1) (ForkChoiceResponse, error) {
log.Trace("ForkChoiceUpdatedV1", "head", heads.HeadBlockHash, "finalBH", heads.FinalizedBlockHash, "safeBH", heads.SafeBlockHash)
MariusVanDerWijden marked this conversation as resolved.
Show resolved Hide resolved
if heads.HeadBlockHash == (common.Hash{}) {
return ForkChoiceResponse{Status: SUCCESS.Status, PayloadID: nil}, nil
}
Expand Down Expand Up @@ -216,25 +220,23 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(heads ForkchoiceStateV1, PayloadAtt
if err != nil {
return INVALID, err
}
hash := computePayloadId(heads.HeadBlockHash, PayloadAttributes)
id := binary.BigEndian.Uint64(hash)
api.preparedBlocks[id] = data
log.Info("Created payload", "payloadid", id)
// TODO (MariusVanDerWijden) do something with the payloadID?
hex := hexutil.Bytes(hash)
return ForkChoiceResponse{Status: SUCCESS.Status, PayloadID: &hex}, nil
id := computePayloadId(heads.HeadBlockHash, PayloadAttributes)
api.preparedBlocks.Add(id, data)
log.Info("Created payload", "payloadID", id)
return ForkChoiceResponse{Status: SUCCESS.Status, PayloadID: &id}, nil
}
return ForkChoiceResponse{Status: SUCCESS.Status, PayloadID: nil}, nil
}

func computePayloadId(headBlockHash common.Hash, params *PayloadAttributesV1) []byte {
func computePayloadId(headBlockHash common.Hash, params *PayloadAttributesV1) (out PayloadID) {
protolambda marked this conversation as resolved.
Show resolved Hide resolved
// Hash
hasher := sha256.New()
hasher.Write(headBlockHash[:])
binary.Write(hasher, binary.BigEndian, params.Timestamp)
hasher.Write(params.Random[:])
hasher.Write(params.SuggestedFeeRecipient[:])
return hasher.Sum([]byte{})[:8]
copy(out[:], hasher.Sum(nil)[:8])
return
}

func (api *ConsensusAPI) invalid() ExecutePayloadResponse {
Expand All @@ -246,6 +248,7 @@ func (api *ConsensusAPI) invalid() ExecutePayloadResponse {

// ExecutePayload creates an Eth1 block, inserts it in the chain, and returns the status of the chain.
func (api *ConsensusAPI) ExecutePayloadV1(params ExecutableDataV1) (ExecutePayloadResponse, error) {
log.Trace("ExecutePayloadV1", "hash", params.BlockHash, "number", params.Number)
block, err := ExecutableDataToBlock(params)
if err != nil {
return api.invalid(), err
Expand Down Expand Up @@ -276,6 +279,7 @@ func (api *ConsensusAPI) ExecutePayloadV1(params ExecutableDataV1) (ExecutePaylo
if td.Cmp(ttd) < 0 {
return api.invalid(), fmt.Errorf("can not execute payload on top of block with low td got: %v threshold %v", td, ttd)
}
log.Trace("InsertingBlockWithoutSetHead", "hash", block.Hash(), "number", block.Number)
if err := api.eth.BlockChain().InsertBlockWithoutSetHead(block); err != nil {
return api.invalid(), err
}
Expand All @@ -301,8 +305,8 @@ func (api *ConsensusAPI) assembleBlock(parentHash common.Hash, params *PayloadAt
return nil, fmt.Errorf("cannot assemble block with unknown parent %s", parentHash)
}

if params.Timestamp < parent.Time() {
return nil, fmt.Errorf("child timestamp lower than parent's: %d < %d", params.Timestamp, parent.Time())
if params.Timestamp <= parent.Time() {
return nil, fmt.Errorf("child timestamp lower or equal to parent's: %d < %d", params.Timestamp, parent.Time())
MariusVanDerWijden marked this conversation as resolved.
Show resolved Hide resolved
}
if now := uint64(time.Now().Unix()); params.Timestamp > now+1 {
diff := time.Duration(params.Timestamp-now) * time.Second
Expand Down
49 changes: 45 additions & 4 deletions eth/catalyst/api_test.go
Expand Up @@ -17,12 +17,12 @@
package catalyst

import (
"fmt"
"math/big"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/consensus/ethash"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/rawdb"
Expand Down Expand Up @@ -158,7 +158,7 @@ func TestEth2PrepareAndGetPayload(t *testing.T) {
t.Fatalf("error preparing payload, err=%v", err)
}
payloadID := computePayloadId(fcState.HeadBlockHash, &blockParams)
execData, err := api.GetPayloadV1(hexutil.Bytes(payloadID))
execData, err := api.GetPayloadV1(payloadID)
if err != nil {
t.Fatalf("error getting payload, err=%v", err)
}
Expand All @@ -185,6 +185,48 @@ func checkLogEvents(t *testing.T, logsCh <-chan []*types.Log, rmLogsCh <-chan co
}
}

func TestInvalidPayloadTimestamp(t *testing.T) {
genesis, preMergeBlocks := generatePreMergeChain(10)
n, ethservice := startEthService(t, genesis, preMergeBlocks)
ethservice.Merger().ReachTTD()
defer n.Close()
var (
api = NewConsensusAPI(ethservice, nil)
parent = ethservice.BlockChain().CurrentBlock()
)
tests := []struct {
time uint64
shouldErr bool
}{
{0, true},
{parent.Time(), true},
{parent.Time() - 1, true},
{parent.Time() + 1, false},
{uint64(time.Now().Unix()) + uint64(time.Minute), false},
}

for i, test := range tests {
t.Run(fmt.Sprintf("Timestamp test: %v", i), func(t *testing.T) {
params := PayloadAttributesV1{
Timestamp: test.time,
Random: crypto.Keccak256Hash([]byte{byte(123)}),
SuggestedFeeRecipient: parent.Coinbase(),
}
fcState := ForkchoiceStateV1{
HeadBlockHash: parent.Hash(),
SafeBlockHash: common.Hash{},
FinalizedBlockHash: common.Hash{},
}
_, err := api.ForkchoiceUpdatedV1(fcState, &params)
if test.shouldErr && err == nil {
t.Fatalf("expected error preparing payload with invalid timestamp, err=%v", err)
} else if !test.shouldErr && err != nil {
t.Fatalf("error preparing payload with valid timestamp, err=%v", err)
}
})
}
}

func TestEth2NewBlock(t *testing.T) {
genesis, preMergeBlocks := generatePreMergeChain(10)
n, ethservice := startEthService(t, genesis, preMergeBlocks)
Expand Down Expand Up @@ -391,7 +433,7 @@ func TestFullAPI(t *testing.T) {
t.Fatalf("error preparing payload, invalid status: %v", resp.Status)
}
payloadID := computePayloadId(parent.Hash(), &params)
payload, err := api.GetPayloadV1(hexutil.Bytes(payloadID))
payload, err := api.GetPayloadV1(payloadID)
if err != nil {
t.Fatalf("can't get payload: %v", err)
}
Expand All @@ -414,6 +456,5 @@ func TestFullAPI(t *testing.T) {
t.Fatalf("Chain head should be updated")
}
parent = ethservice.BlockChain().CurrentBlock()

}
}
35 changes: 22 additions & 13 deletions eth/catalyst/api_types.go
Expand Up @@ -17,6 +17,7 @@
package catalyst

import (
"fmt"
"math/big"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -69,17 +70,6 @@ type executableDataMarshaling struct {
Transactions []hexutil.Bytes
}

//go:generate go run github.com/fjl/gencodec -type PayloadResponse -field-override payloadResponseMarshaling -out gen_payload.go

type PayloadResponse struct {
PayloadID uint64 `json:"payloadId"`
}

// JSON type overrides for payloadResponse.
type payloadResponseMarshaling struct {
PayloadID hexutil.Uint64
}

type NewBlockResponse struct {
Valid bool `json:"valid"`
}
Expand All @@ -102,9 +92,28 @@ type ConsensusValidatedParams struct {
Status string `json:"status"`
}

// PayloadID is an identifier of the payload build process
type PayloadID [8]byte

func (b PayloadID) String() string {
return hexutil.Encode(b[:])
}

func (b PayloadID) MarshalText() ([]byte, error) {
return hexutil.Bytes(b[:]).MarshalText()
}

func (b *PayloadID) UnmarshalText(input []byte) error {
err := hexutil.UnmarshalFixedText("PayloadID", input, b[:])
if err != nil {
return fmt.Errorf("invalid payload id %q: %w", input, err)
}
return nil
}
karalabe marked this conversation as resolved.
Show resolved Hide resolved

type ForkChoiceResponse struct {
Status string `json:"status"`
PayloadID *hexutil.Bytes `json:"payloadId"`
Status string `json:"status"`
PayloadID *PayloadID `json:"payloadId"`
}

type ForkchoiceStateV1 struct {
Expand Down
36 changes: 0 additions & 36 deletions eth/catalyst/gen_payload.go

This file was deleted.