Skip to content

Commit

Permalink
Merge master to v0.26 (#2536)
Browse files Browse the repository at this point in the history
* Remove payload key comparison in Payload.Equals()

Payload key is auxiliary data and shouldn't be included in payload
equality test.

Although payload key was used in this comparison for about a year,
this is an edge case that isn't triggered by our current use of MTrie.
So this change isn't expected to cause compatibility issues.

* Add comments to say payload key shouldn't change

In Mtrie, it is expected that for a specific path,
the payload's key never changes.

* Add Payload.ValueEquals() to compare payload values

Restore Payload.Equals() to compare payload keys and values.

Add Payload.ValueEquals() to compare payload values.

Add and update tests.

* fix race condition

* skip long running test

* outdated blocks are logged as errors, despite being expected during normal operations

* add jobqueue README

* addressed reviewer feedback

* Add module that requests execution data for newly sealed blocks

* Add timeout to blob fetch

* Add edr to staked access node, and improve backfilling

* Updates to access node config

* Refactor requester to use queues and notifications

* removed old requester files

* fix imports in builders

* fixes from testing

* refactor from review

* always cache new execution datas

* throw irrecoverable on invalid blobs

* cleanup comments and logging

* merge notifications and cache and refactor into heap

* add config, add restart handling, code cleanup

* improve error handling, cleanup refactoring

* cleanup error handling, refactor status

* fix unittest import cycle, and some cleanup

* make more options configurable

* cleanup comments and naming

* add metrics

* remove metrics comment

* add tests and fix bugs for status module

* fix bug in status duplicate height processing

* fix scoping in edr setup

* use a single implementation of blob_service

* add testing for RestartableComponent

* add more comments and unittests

* update check function prototype

* convert to use jobqueue

* fix rebase conflict

* Update module/state_synchronization/requester/execution_data_requester.go

Co-authored-by: Leo Zhang <zhangchiqing@gmail.com>

* add unit tests for notification_heap

* Move ExecutionDataRequester interface to state_synchronization package

* improve requester comments

* cleanup requester bootstrapping

* add tests for ExecutionDataService

* remove unused consumer member, rename wrapped structs to readydoneaware

* add unittest for starting from halted state

* Update module/state_synchronization/requester/execution_data_requester.go

Co-authored-by: Leo Zhang <zhangchiqing@gmail.com>

* Update module/state_synchronization/requester/execution_data_requester.go

Co-authored-by: Leo Zhang <zhangchiqing@gmail.com>

* switch to storage.headers

* save halted reason, improve logging

* move pause/resume to jobqueue

* Remove min heap from status

* add unittest for ReadyDoneAwareConsumer

* remove halts, switch to stateless notification status

* refactor datastore check into separate struct, add unit tests

* Apply suggestions from code review

Co-authored-by: Leo Zhang <zhangchiqing@gmail.com>

* remove halted metric

* refactor start block to be more explicit

* move finalized block reader and sealed header reader to jobqueue namespace

* reorder consumer args, add pre/post notifiers

* remove execution data cache, move ed get into reader

* fix sealed reader test, improve comments

* fix datastore check tests and components lifecycle

* rename ReadyDoneAwareConsumer to ComponentConsumer

* fix linting issues

* increase timeouts in check datastore tests

* comment improvements from review

* remove datastore checker

* remove now used execution data service check method

* add more details to execution data service get comments

* fix tests after rebase

* fix lint errors

* cleanup unused arguments

* apply comment update from review

Co-authored-by: Leo Zhang <zhangchiqing@gmail.com>

* add requester integration test

* add missing lock in module/jobqueue/consumer.go

Co-authored-by: Leo Zhang <zhangchiqing@gmail.com>

* add lock to jobqueue consumer size()

* remove changes to blob service, and create new test blobservice for local db only

* fix potential hang in consumer startup, update cli arg description

* update storage namespaces in access_node_builder

* Apply suggestions from code review

Co-authored-by: Simon Zhu <simon.zsiyan@gmail.com>

* Apply suggestions from code review

Co-authored-by: Peter Argue <89119817+peterargue@users.noreply.github.com>

* Apply suggestions from code review

Co-authored-by: Peter Argue <89119817+peterargue@users.noreply.github.com>

* Apply suggestions from code review

Co-authored-by: Peter Argue <89119817+peterargue@users.noreply.github.com>

* update pull vs push

* Update module/component/component.go

Co-authored-by: Jordan Schalm <jordan@dapperlabs.com>

* Remove unused codeowners

* Add atree reporter to execution state extraction

Report:
- number of FVM/storage/slab payloads
- number of slab array meta/data payloads
- number of slab map meta/data payloads
- number of first level hash collision groups in slab maps
- number of first level hash collisions in each collision group
- etc.

* Fix lint error

* validate guarantors

* handle error from FindGuarantors

* add comment

* add go workspace files to .dockerignore and .gitignore

* Add Ledger.GetSingleValue for speedups and memory

- Added Ledger.GetSingleValue() to improve speed, alloc/op, and
allocs/op
- Modified delta.View.readFunc to use Ledger.GetSingleValue()
- Optimized existing mtrie batch read for single path

Bench comparision between current Ledger.Get() vs
Ledger.GetSingleValue()

Benchstat results:
name                           old time/op    new time/op    delta
LedgerGetOneValue/batch_get-4    6.54µs ± 0%    5.24µs ± 0%  -19.92%
(p=0.000 n=9+10)

name                           old alloc/op   new alloc/op   delta
LedgerGetOneValue/batch_get-4    1.74kB ± 0%    1.62kB ± 0%   -6.85%
(p=0.000 n=10+10)

name                           old allocs/op  new allocs/op  delta
LedgerGetOneValue/batch_get-4      21.0 ± 0%      15.0 ± 0%  -28.57%
(p=0.000 n=10+10)

Bench comparision between optimized new Ledger.Get() vs
Ledger.GetSingleValue()

Benchstat results:
name                           old time/op    new time/op    delta
LedgerGetOneValue/batch_get-4    5.70µs ± 0%    5.23µs ± 0%   -8.24%
(p=0.000 n=9+10)

name                           old alloc/op   new alloc/op   delta
LedgerGetOneValue/batch_get-4    1.69kB ± 0%    1.62kB ± 0%   -3.79%
(p=0.000 n=10+10)

name                           old allocs/op  new allocs/op  delta
LedgerGetOneValue/batch_get-4      18.0 ± 0%      15.0 ± 0%  -16.67%
(p=0.000 n=10+10)

* Update ledger mock

* Check Mtrie root integrity after read in tests

* add comment

* Apply suggestions from code review

Co-authored-by: Jordan Schalm <jordan@dapperlabs.com>

* Reduce allocs/op by 79% in ledger read

Change Forest.Read to return []ledger.Value without deep copying
payload keys.  This avoids 4 heap allocation per key.

This change doesn't affect the caller (Ledger.Get) because it
discards the payload keys.

name        old time/op    new time/op    delta
TrieRead-4     524µs ± 1%     420µs ± 1%  -19.77%  (p=0.000 n=10+10)

name        old alloc/op   new alloc/op   delta
TrieRead-4     190kB ± 0%      95kB ± 0%  -50.04%  (p=0.000 n=10+10)

name        old allocs/op  new allocs/op  delta
TrieRead-4     1.52k ± 0%     0.32k ± 0%  -79.17%  (p=0.000 n=10+10)

* Fix tests

* Update Forest.Read() callers to use new API

* Speedup and reduce allocs/op in ledger single read

Changed Forest.ReadSingleValue to return ledger.Value without deep
copying payload keys.  This avoid 4 heap allocation per key.

This change doesn't affect the caller (Ledger.GetSingleValue) because
it discards the payload key.

* Move a line outside for-loop in readSinglePayload

This optimization improves speed by 0.49% on Haswell CPU.

Thanks for suggestion @tarakby.

Co-authored-by: Tarak Ben Youssef <tarak.benyoussef@dapperlabs.com>

* Read memory limit from state

* Fixed memory metering crasher and tests

lint

* fixed tests

* add tests for script mutations

* update cadence dep

* mod tidy

* go mod tidy integration

* do not split always fatal error

* add tests to manager for discarding computations in scripts

* fix test

* assume service account exists

* update test for discarding storage mutations

* test fail on error

* if memory limit not set in service acct then default to max

* lint

* better fix: return was missing

* accidentally added files

* removed finished TODO

* handling unexpected error

* cleanup unused fields from loader

* add test cases

* update tests

* fix lint

* fix error wrapping

* upload log level for memory weights

* add test

* [Fix] Adding sync request message as a cluster type message (#2495)

* adds sync request as an authorized cluster channel type

* use isClusterChannel and add test

* fix test vars and comments

* Update topic_validator_test.go

* Update topic_validator_test.go

* explicitly check cluster msg codes, handle cluster sync request

Co-authored-by: Khalil Claybon <khalil.claybon@dapperlabs.com>

* upgrade cadence to secure-cadence-m8

* update error handling

* fix test case

* flaky tests fix test async uploader - Test_AsyncUploader/stopping_component_stops_retrying (#2453)

* fixed flaky test - WIP - Test_AsyncUploader/stopping_component_stops_retrying

* fix flaky test - Test_AsyncUploader/stopping_component_stops_retrying - added wait group to ensure upload started before shutting down component

* updated comments, renamed wait group to be more clear

* minor refactoring - var renaming

* added more explicit retry mechanism to better test that retry is not working when component is shutting down

* uses unit.RequireCloseBefore() to check closed channel

* added clarification comments on why test is trying to increment callCount more than once

* Review updates

* Test fix

* change test fix

* chore(tests): fix init-light

* chore(tests): mark localnet and benchmarks owned by the performance team

* fix test case

* add blockheight to transaction result response

* fix test case

* add test case

* handle guarantee.ReferenceBlockID not found

* fix assigner tests

* fix linter

* fix reader tests

* upgrade cadence to secure-cadence-m9

* tidy go mods

* fix naming in limit script

* Upgrades libp2p pubsub version (#2515)

* generate mocks, make tidy

* error fix

* go mod update

* update mocks

* Suggestion for PR #2462 (#2517)

* consolidated errors from signer indices decoding into a single error type

* fix error type and tests

* fix typo

Co-authored-by: Leo Zhang (zhangchiqing) <zhangchiqing@gmail.com>

* Apply suggestions from code review

Co-authored-by: Alexander Hentschel <alex.hentschel@axiomzen.co>

* fix linter

* exit validation early for empty collections

* lint

* Update state/cluster/badger/mutator.go

* fix tests

* update test

* extract decodeSignerIndices

* small refactor

* update to Cadence 8b113c539a2c

* block nil check

* workaround for handling hotstuff error

* check that reference block is known

otherwise the cluster committee may return an error as it is unable to
determine the identities for the block (determined by the reference
block)

* height is already set here

* add blockheight

* blockheight for historical transaction request

* test blockheight equivalence

* upgrade flow to v0.3.1

* fix linter

Co-authored-by: Faye Amacker <33205765+fxamacker@users.noreply.github.com>
Co-authored-by: Simon Zhu <simon.zsiyan@gmail.com>
Co-authored-by: Alexander Hentschel <alex.hentschel@axiomzen.co>
Co-authored-by: Peter Argue <89119817+peterargue@users.noreply.github.com>
Co-authored-by: Jordan Schalm <jordan@dapperlabs.com>
Co-authored-by: Gregor G <75445744+sideninja@users.noreply.github.com>
Co-authored-by: Tarak Ben Youssef <50252200+tarakby@users.noreply.github.com>
Co-authored-by: Alexey Ivanov <SaveTheRbtz@GMail.com>
Co-authored-by: Tarak Ben Youssef <tarak.benyoussef@dapperlabs.com>
Co-authored-by: Janez Podhostnik <janez.podhostnik@gmail.com>
Co-authored-by: robert <robert.davidson@forecastfoundation.org>
Co-authored-by: Daniel Sainati <sainatidaniel@gmail.com>
Co-authored-by: Robert E. Davidson III <45945043+robert-e-davidson3@users.noreply.github.com>
Co-authored-by: bors[bot] <26634292+bors[bot]@users.noreply.github.com>
Co-authored-by: Yahya Hassanzadeh <yahya@dapperlabs.com>
Co-authored-by: Khalil Claybon <khalil.claybon@dapperlabs.com>
Co-authored-by: Misha <misha.rybalov@dapperlabs.com>
Co-authored-by: lolpuddle <oocean.cheung@gmail.com>
Co-authored-by: Jan Bernatik <jan.bernatik@dapperlabs.com>
Co-authored-by: Bastian Müller <bastian@axiomzen.co>
  • Loading branch information
21 people committed Jun 1, 2022
1 parent 31e6f95 commit 303f0b7
Show file tree
Hide file tree
Showing 127 changed files with 8,002 additions and 886 deletions.
3 changes: 3 additions & 0 deletions .dockerignore
Expand Up @@ -4,3 +4,6 @@ integration/localnet/data/
integration/localnet/trie/
integration/localnet/docker-compose.nodes.yml
integration/localnet/targets.nodes.json

go.work
go.work.sum
4 changes: 4 additions & 0 deletions .gitignore
Expand Up @@ -58,3 +58,7 @@ language/tools/vscode-extension/out/*
read-badger
read-protocol-state
remove-execution-fork

# go workspace
go.work
go.work.sum
12 changes: 9 additions & 3 deletions CODEOWNERS
Validating CODEOWNERS rules …
Expand Up @@ -4,8 +4,8 @@
/protobuf/services/collection/** @jordanschalm

# Consensus Stream
/cmd/consensus/** @AlexHentschel @awishformore @zhangchiqing
/engine/consensus/** @AlexHentschel @awishformore @zhangchiqing
/cmd/consensus/** @AlexHentschel @zhangchiqing
/engine/consensus/** @AlexHentschel @zhangchiqing

# Execution Stream
/cmd/execution/** @m4ksio @ramtinms
Expand Down Expand Up @@ -34,4 +34,10 @@
/crypto/** @tarakby

# Bootstrap and transit scripts
/cmd/bootstrap/** @vishalchangrani @ljk662
/cmd/bootstrap/** @vishalchangrani

# Performance Stream
/integration/localnet/** @SaveTheRbtz @simonhf @Kay-Zee @zhangchiqing
/integration/loader/** @SaveTheRbtz @simonhf @Kay-Zee
/integration/benchmark/** @SaveTheRbtz @simonhf @Kay-Zee
/integration/utils/** @SaveTheRbtz @simonhf @Kay-Zee
2 changes: 1 addition & 1 deletion Makefile
Expand Up @@ -127,7 +127,7 @@ generate-mocks:
GO111MODULE=on mockgen -destination=module/mocks/network.go -package=mocks github.com/onflow/flow-go/module Local,Requester
GO111MODULE=on mockgen -destination=network/mocknetwork/engine.go -package=mocknetwork github.com/onflow/flow-go/network Engine
GO111MODULE=on mockgen -destination=network/mocknetwork/mock_network.go -package=mocknetwork github.com/onflow/flow-go/network Network
GO111MODULE=on mockery --name '(ExecutionDataService|ExecutionDataCIDCache)' --dir=module/state_synchronization --case=underscore --output="./module/state_synchronization/mock" --outpkg="state_synchronization"
GO111MODULE=on mockery --name '(ExecutionDataService|ExecutionDataCIDCache|ExecutionDataRequester)' --dir=module/state_synchronization --case=underscore --output="./module/state_synchronization/mock" --outpkg="state_synchronization"
GO111MODULE=on mockery --name 'ExecutionState' --dir=engine/execution/state --case=underscore --output="engine/execution/state/mock" --outpkg="mock"
GO111MODULE=on mockery --name 'BlockComputer' --dir=engine/execution/computation/computer --case=underscore --output="engine/execution/computation/computer/mock" --outpkg="mock"
GO111MODULE=on mockery --name 'ComputationManager' --dir=engine/execution/computation --case=underscore --output="engine/execution/computation/mock" --outpkg="mock"
Expand Down
3 changes: 3 additions & 0 deletions access/api.go
Expand Up @@ -58,6 +58,7 @@ type TransactionResult struct {
BlockID flow.Identifier
TransactionID flow.Identifier
CollectionID flow.Identifier
BlockHeight uint64
}

func TransactionResultToMessage(result *TransactionResult) *access.TransactionResultResponse {
Expand All @@ -69,6 +70,7 @@ func TransactionResultToMessage(result *TransactionResult) *access.TransactionRe
BlockId: result.BlockID[:],
TransactionId: result.TransactionID[:],
CollectionId: result.CollectionID[:],
BlockHeight: uint64(result.BlockHeight),
}
}

Expand All @@ -93,6 +95,7 @@ func MessageToTransactionResult(message *access.TransactionResultResponse) *Tran
BlockID: flow.HashToID(message.BlockId),
TransactionID: flow.HashToID(message.TransactionId),
CollectionID: flow.HashToID(message.CollectionId),
BlockHeight: message.BlockHeight,
}
}

Expand Down
178 changes: 171 additions & 7 deletions cmd/access/node_builder/access_node_builder.go
Expand Up @@ -4,16 +4,16 @@ import (
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"time"

badger "github.com/ipfs/go-ds-badger2"
"github.com/onflow/flow/protobuf/go/flow/access"
"github.com/rs/zerolog"
"github.com/spf13/pflag"

"github.com/onflow/flow-go/crypto"
"github.com/onflow/flow-go/module/compliance"

"github.com/onflow/flow-go/cmd"
"github.com/onflow/flow-go/consensus"
"github.com/onflow/flow-go/consensus/hotstuff"
Expand All @@ -22,6 +22,8 @@ import (
hotsignature "github.com/onflow/flow-go/consensus/hotstuff/signature"
"github.com/onflow/flow-go/consensus/hotstuff/verification"
recovery "github.com/onflow/flow-go/consensus/recovery/protocol"
"github.com/onflow/flow-go/crypto"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/access/ingestion"
"github.com/onflow/flow-go/engine/access/rpc"
"github.com/onflow/flow-go/engine/access/rpc/backend"
Expand All @@ -30,24 +32,30 @@ import (
"github.com/onflow/flow-go/engine/common/requester"
synceng "github.com/onflow/flow-go/engine/common/synchronization"
"github.com/onflow/flow-go/model/encodable"
"github.com/onflow/flow-go/model/encoding/cbor"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/filter"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/buffer"
"github.com/onflow/flow-go/module/compliance"
finalizer "github.com/onflow/flow-go/module/finalizer/consensus"
"github.com/onflow/flow-go/module/id"
"github.com/onflow/flow-go/module/mempool/stdmap"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/module/state_synchronization"
edrequester "github.com/onflow/flow-go/module/state_synchronization/requester"
"github.com/onflow/flow-go/module/synchronization"
"github.com/onflow/flow-go/network"
netcache "github.com/onflow/flow-go/network/cache"
cborcodec "github.com/onflow/flow-go/network/codec/cbor"
"github.com/onflow/flow-go/network/compressor"
"github.com/onflow/flow-go/network/p2p"
"github.com/onflow/flow-go/network/validator"
"github.com/onflow/flow-go/state/protocol"
badgerState "github.com/onflow/flow-go/state/protocol/badger"
"github.com/onflow/flow-go/state/protocol/blocktimer"
storage "github.com/onflow/flow-go/storage/badger"
"github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/badger"
)

// AccessNodeBuilder extends cmd.NodeBuilder and declares additional functions needed to bootstrap an Access node
Expand Down Expand Up @@ -98,6 +106,10 @@ type AccessNodeConfig struct {
logTxTimeToFinalizedExecuted bool
retryEnabled bool
rpcMetricsEnabled bool
executionDataSyncEnabled bool
executionDataDir string
executionDataStartHeight uint64
executionDataConfig edrequester.ExecutionDataConfig
baseOptions []cmd.Option

PublicNetworkConfig PublicNetworkConfig
Expand All @@ -112,6 +124,7 @@ type PublicNetworkConfig struct {

// DefaultAccessNodeConfig defines all the default values for the AccessNodeConfig
func DefaultAccessNodeConfig() *AccessNodeConfig {
homedir, _ := os.UserHomeDir()
return &AccessNodeConfig{
collectionGRPCPort: 9000,
executionGRPCPort: 9000,
Expand Down Expand Up @@ -147,6 +160,16 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
Metrics: metrics.NewNoopCollector(),
},
observerNetworkingKeyPath: cmd.NotSet,
executionDataSyncEnabled: false,
executionDataDir: filepath.Join(homedir, ".flow", "execution_data"),
executionDataStartHeight: 0,
executionDataConfig: edrequester.ExecutionDataConfig{
InitialBlockHeight: 0,
MaxSearchAhead: edrequester.DefaultMaxSearchAhead,
FetchTimeout: edrequester.DefaultFetchTimeout,
RetryDelay: edrequester.DefaultRetryDelay,
MaxRetryDelay: edrequester.DefaultMaxRetryDelay,
},
}
}

Expand Down Expand Up @@ -175,6 +198,8 @@ type FlowAccessNodeBuilder struct {
Finalized *flow.Header
Pending []*flow.Header
FollowerCore module.HotStuffFollower
ExecutionDataService state_synchronization.ExecutionDataService
ExecutionDataRequester state_synchronization.ExecutionDataRequester
// for the unstaked access node, the sync engine participants provider is the libp2p peer store which is not
// available until after the network has started. Hence, a factory function that needs to be called just before
// creating the sync engine
Expand Down Expand Up @@ -277,8 +302,18 @@ func (builder *FlowAccessNodeBuilder) buildFollowerCore() *FlowAccessNodeBuilder
// initialize the verifier for the protocol consensus
verifier := verification.NewCombinedVerifier(builder.Committee, packer)

followerCore, err := consensus.NewFollower(node.Logger, builder.Committee, node.Storage.Headers, final, verifier,
builder.FinalizationDistributor, node.RootBlock.Header, node.RootQC, builder.Finalized, builder.Pending)
followerCore, err := consensus.NewFollower(
node.Logger,
builder.Committee,
node.Storage.Headers,
final,
verifier,
builder.FinalizationDistributor,
node.RootBlock.Header,
node.RootQC,
builder.Finalized,
builder.Pending,
)
if err != nil {
return nil, fmt.Errorf("could not initialize follower core: %w", err)
}
Expand All @@ -293,7 +328,7 @@ func (builder *FlowAccessNodeBuilder) buildFollowerCore() *FlowAccessNodeBuilder
func (builder *FlowAccessNodeBuilder) buildFollowerEngine() *FlowAccessNodeBuilder {
builder.Component("follower engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// initialize cleaner for DB
cleaner := storage.NewCleaner(node.Logger, node.DB, builder.Metrics.CleanCollector, flow.DefaultValueLogGCFrequency)
cleaner := bstorage.NewCleaner(node.Logger, node.DB, builder.Metrics.CleanCollector, flow.DefaultValueLogGCFrequency)
conCache := buffer.NewPendingBlocks()

followerEng, err := follower.New(
Expand Down Expand Up @@ -375,6 +410,111 @@ func (builder *FlowAccessNodeBuilder) BuildConsensusFollower() AccessNodeBuilder
return builder
}

func (builder *FlowAccessNodeBuilder) BuildExecutionDataRequester() *FlowAccessNodeBuilder {
var ds *badger.Datastore
var bs network.BlobService
var processedBlockHeight storage.ConsumerProgress
var processedNotifications storage.ConsumerProgress

builder.
Module("execution data datastore and blobstore", func(node *cmd.NodeConfig) error {
err := os.MkdirAll(builder.executionDataDir, 0700)
if err != nil {
return err
}

ds, err = badger.NewDatastore(builder.executionDataDir, &badger.DefaultOptions)
if err != nil {
return err
}

builder.ShutdownFunc(func() error {
if err := ds.Close(); err != nil {
return fmt.Errorf("could not close execution data datastore: %w", err)
}
return nil
})

return nil
}).
Module("processed block height consumer progress", func(node *cmd.NodeConfig) error {
// uses the datastore's DB
processedBlockHeight = bstorage.NewConsumerProgress(ds.DB, module.ConsumeProgressExecutionDataRequesterBlockHeight)
return nil
}).
Module("processed notifications consumer progress", func(node *cmd.NodeConfig) error {
// uses the datastore's DB
processedNotifications = bstorage.NewConsumerProgress(ds.DB, module.ConsumeProgressExecutionDataRequesterNotification)
return nil
}).
Component("execution data service", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
var err error
bs, err = node.Network.RegisterBlobService(engine.ExecutionDataService, ds)
if err != nil {
return nil, fmt.Errorf("could not register blob service: %w", err)
}

builder.ExecutionDataService = state_synchronization.NewExecutionDataService(
new(cbor.Codec),
compressor.NewLz4Compressor(),
bs,
metrics.NewExecutionDataServiceCollector(),
builder.Logger,
)

return builder.ExecutionDataService, nil
}).
Component("execution data requester", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// Validation of the start block height needs to be done after loading state
if builder.executionDataStartHeight > 0 {
if builder.executionDataStartHeight <= builder.RootBlock.Header.Height {
return nil, fmt.Errorf(
"execution data start block height (%d) must be greater than the root block height (%d)",
builder.executionDataStartHeight, builder.RootBlock.Header.Height)
}

latestSeal, err := builder.State.Sealed().Head()
if err != nil {
return nil, fmt.Errorf("failed to get latest sealed height")
}

// Note: since the root block of a spork is also sealed in the root protocol state, the
// latest sealed height is always equal to the root block height. That means that at the
// very beginning of a spork, this check will always fail. Operators should not specify
// an InitialBlockHeight when starting from the beginning of a spork.
if builder.executionDataStartHeight > latestSeal.Height {
return nil, fmt.Errorf(
"execution data start block height (%d) must be less than or equal to the latest sealed block height (%d)",
builder.executionDataStartHeight, latestSeal.Height)
}

// executionDataStartHeight is provided as the first block to sync, but the
// requester expects the initial last processed height, which is the first height - 1
builder.executionDataConfig.InitialBlockHeight = builder.executionDataStartHeight - 1
} else {
builder.executionDataConfig.InitialBlockHeight = builder.RootBlock.Header.Height
}

builder.ExecutionDataRequester = edrequester.New(
builder.Logger,
metrics.NewExecutionDataRequesterCollector(),
builder.ExecutionDataService,
processedBlockHeight,
processedNotifications,
builder.State,
builder.Storage.Headers,
builder.Storage.Results,
builder.executionDataConfig,
)

builder.FinalizationDistributor.AddOnBlockFinalizedConsumer(builder.ExecutionDataRequester.OnBlockFinalized)

return builder.ExecutionDataRequester, nil
})

return builder
}

type Option func(*AccessNodeConfig)

func WithBootStrapPeers(bootstrapNodes ...*flow.Identity) Option {
Expand Down Expand Up @@ -459,11 +599,35 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
flags.StringSliceVar(&builder.bootstrapNodePublicKeys, "bootstrap-node-public-keys", defaultConfig.bootstrapNodePublicKeys, "the networking public key of the bootstrap access node if this is an unstaked access node (in the same order as the bootstrap node addresses) e.g. \"d57a5e9c5.....\",\"44ded42d....\"")
flags.BoolVar(&builder.supportsUnstakedFollower, "supports-unstaked-node", defaultConfig.supportsUnstakedFollower, "true if this staked access node supports unstaked node")
flags.StringVar(&builder.PublicNetworkConfig.BindAddress, "public-network-address", defaultConfig.PublicNetworkConfig.BindAddress, "staked access node's public network bind address")

// ExecutionDataRequester config
flags.BoolVar(&builder.executionDataSyncEnabled, "execution-data-sync-enabled", defaultConfig.executionDataSyncEnabled, "whether to enable the execution data sync protocol")
flags.StringVar(&builder.executionDataDir, "execution-data-dir", defaultConfig.executionDataDir, "directory to use for Execution Data database")
flags.Uint64Var(&builder.executionDataStartHeight, "execution-data-start-height", defaultConfig.executionDataStartHeight, "height of first block to sync execution data from when starting with an empty Execution Data database")
flags.Uint64Var(&builder.executionDataConfig.MaxSearchAhead, "execution-data-max-search-ahead", defaultConfig.executionDataConfig.MaxSearchAhead, "max number of heights to search ahead of the lowest outstanding execution data height")
flags.DurationVar(&builder.executionDataConfig.FetchTimeout, "execution-data-fetch-timeout", defaultConfig.executionDataConfig.FetchTimeout, "timeout to use when fetching execution data from the network e.g. 300s")
flags.DurationVar(&builder.executionDataConfig.RetryDelay, "execution-data-retry-delay", defaultConfig.executionDataConfig.RetryDelay, "initial delay for exponential backoff when fetching execution data fails e.g. 10s")
flags.DurationVar(&builder.executionDataConfig.MaxRetryDelay, "execution-data-max-retry-delay", defaultConfig.executionDataConfig.MaxRetryDelay, "maximum delay for exponential backoff when fetching execution data fails e.g. 5m")
}).ValidateFlags(func() error {
if builder.supportsUnstakedFollower && (builder.PublicNetworkConfig.BindAddress == cmd.NotSet || builder.PublicNetworkConfig.BindAddress == "") {
return errors.New("public-network-address must be set if supports-unstaked-node is true")
}

if builder.executionDataSyncEnabled {
if builder.executionDataConfig.FetchTimeout <= 0 {
return errors.New("execution-data-fetch-timeout must be greater than 0")
}
if builder.executionDataConfig.RetryDelay <= 0 {
return errors.New("execution-data-retry-delay must be greater than 0")
}
if builder.executionDataConfig.MaxRetryDelay < builder.executionDataConfig.RetryDelay {
return errors.New("execution-data-max-retry-delay must be greater than or equal to execution-data-retry-delay")
}
if builder.executionDataConfig.MaxSearchAhead == 0 {
return errors.New("execution-data-max-search-ahead must be greater than 0")
}
}

return nil
})
}
Expand Down
24 changes: 22 additions & 2 deletions cmd/access/node_builder/staked_access_node_builder.go
Expand Up @@ -241,8 +241,24 @@ func (builder *StakedAccessNodeBuilder) Build() (cmd.Node, error) {
return nil, fmt.Errorf("could not create requester engine: %w", err)
}

builder.IngestEng, err = ingestion.New(node.Logger, node.Network, node.State, node.Me, builder.RequestEng, node.Storage.Blocks, node.Storage.Headers, node.Storage.Collections, node.Storage.Transactions, node.Storage.Results, node.Storage.Receipts, builder.TransactionMetrics,
builder.CollectionsToMarkFinalized, builder.CollectionsToMarkExecuted, builder.BlocksToMarkExecuted, builder.RpcEng)
builder.IngestEng, err = ingestion.New(
node.Logger,
node.Network,
node.State,
node.Me,
builder.RequestEng,
node.Storage.Blocks,
node.Storage.Headers,
node.Storage.Collections,
node.Storage.Transactions,
node.Storage.Results,
node.Storage.Receipts,
builder.TransactionMetrics,
builder.CollectionsToMarkFinalized,
builder.CollectionsToMarkExecuted,
builder.BlocksToMarkExecuted,
builder.RpcEng,
)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -278,6 +294,10 @@ func (builder *StakedAccessNodeBuilder) Build() (cmd.Node, error) {
})
}

if builder.executionDataSyncEnabled {
builder.BuildExecutionDataRequester()
}

builder.Component("ping engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
ping, err := pingeng.New(
node.Logger,
Expand Down

0 comments on commit 303f0b7

Please sign in to comment.