Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/downloader: implement beacon sync #23982

Merged
merged 22 commits into from Mar 11, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
0f78c4f
eth/downloader: implement beacon sync
karalabe Sep 30, 2021
0d22561
eth/downloader: fix a crash if the beacon chain is reduced in length
karalabe Jan 11, 2022
786308d
eth/downloader: fix beacon sync start/stop thrashing data race
karalabe Jan 11, 2022
49b7771
eth/downloader: use a non-nil pivot even in degenerate sync requests
karalabe Jan 11, 2022
3f16180
eth/downloader: don't touch internal state on beacon Head retrieval
karalabe Jan 11, 2022
1c5a698
eth/downloader: fix spelling mistakes
karalabe Jan 14, 2022
344d81a
eth/downloader: fix some typos
karalabe Jan 18, 2022
944e050
eth: integrate legacy/beacon sync switchover and UX
karalabe Nov 2, 2021
0a50767
eth: handle UX wise being stuck on post-merge TTD
karalabe Feb 1, 2022
f6a427d
core, eth: integrate the beacon client with the beacon sync
karalabe Feb 2, 2022
ba9e91f
eth/catalyst: make some warning messages nicer
karalabe Feb 2, 2022
ef4d465
eth/downloader: remove Ethereum 1&2 notions in favor of merge
karalabe Feb 2, 2022
92020c9
core/beacon, eth: clean up engine API returns a bit
karalabe Feb 3, 2022
46c0e3f
eth/downloader: add skeleton extension tests
karalabe Feb 3, 2022
336b5df
eth/catalyst: keep non-kiln spec, handle mining on ttd
karalabe Feb 3, 2022
38114ad
eth/downloader: add beacon header retrieval tests
karalabe Feb 23, 2022
71861e7
eth: fixed spelling, commented failing tests out
MariusVanDerWijden Mar 7, 2022
7ac324b
eth/downloader: review fixes
karalabe Mar 9, 2022
4f2c4e9
eth/downloader: drop peers failing to deliver beacon headers
karalabe Mar 11, 2022
2cb92eb
core/rawdb: track beacon sync data in db inspect
karalabe Mar 11, 2022
bec4fd6
eth: fix review concerns
karalabe Mar 11, 2022
7c12053
internal/web3ext: nit
karalabe Mar 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/rawdb/schema.go
Expand Up @@ -95,7 +95,7 @@ var (
SnapshotAccountPrefix = []byte("a") // SnapshotAccountPrefix + account hash -> account trie value
SnapshotStoragePrefix = []byte("o") // SnapshotStoragePrefix + account hash + storage hash -> storage trie value
CodePrefix = []byte("c") // CodePrefix + code hash -> account code
skeletonHeaderPrefix = []byte("S") // skeletonHeaderPrefox + num (uint64 big endian) -> header
skeletonHeaderPrefix = []byte("S") // skeletonHeaderPrefix + num (uint64 big endian) -> header
karalabe marked this conversation as resolved.
Show resolved Hide resolved

PreimagePrefix = []byte("secure-key-") // PreimagePrefix + hash -> preimage
configPrefix = []byte("ethereum-config-") // config prefix for the db
Expand Down
43 changes: 30 additions & 13 deletions eth/downloader/downloader.go
Expand Up @@ -476,12 +476,9 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
if err != nil {
return err
}
// Opposed to legacy mode, in beacon mode we trust the chain we've been
// told to sync to, so no need to leave a gap between the pivot and head
// to full sync. Still, the downloader's been architected to do a full
// block import after the pivot, so make it off by one to avoid having
// to special case everything internally.
pivot = d.skeleton.Header(latest.Number.Uint64() - 1)
if latest.Number.Uint64() > uint64(fsMinFullBlocks) {
pivot = d.skeleton.Header(latest.Number.Uint64() - uint64(fsMinFullBlocks))
}
}
// If no pivot block was returned, the head is below the min full block
// threshold (i.e. new chain). In that case we won't really snap sync
Expand Down Expand Up @@ -1346,18 +1343,31 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
// Although the received headers might be all valid, a legacy
// PoW/PoA sync must not accept post-merge headers. Make sure
// that any transition is rejected at this point.
var (
rejected []*types.Header
td *big.Int
)
if !beaconMode && ttd != nil {
ptd := d.lightchain.GetTd(chunkHeaders[0].ParentHash, chunkHeaders[0].Number.Uint64()-1)
if ptd == nil {
td = d.blockchain.GetTd(chunkHeaders[0].ParentHash, chunkHeaders[0].Number.Uint64()-1)
if td == nil {
// This should never really happen, but handle gracefully for now
log.Error("Failed to retrieve parent header TD", "number", chunkHeaders[0].Number.Uint64()-1, "hash", chunkHeaders[0].ParentHash)
return fmt.Errorf("%w: parent TD missing", errInvalidChain)
}
for _, header := range chunkHeaders {
ptd = new(big.Int).Add(ptd, header.Difficulty)
if ptd.Cmp(ttd) >= 0 {
log.Info("Legacy sync reached merge threshold", "number", header.Number, "hash", header.Hash(), "td", ptd, "ttd", ttd)
return ErrMergeTransition
for i, header := range chunkHeaders {
td = new(big.Int).Add(td, header.Difficulty)
if td.Cmp(ttd) >= 0 {
// Terminal total difficulty reached, allow the last header in
if new(big.Int).Sub(td, header.Difficulty).Cmp(ttd) < 0 {
chunkHeaders, rejected = chunkHeaders[:i+1], chunkHeaders[i+1:]
if len(rejected) > 0 {
// Make a nicer user log as to the first TD truly rejected
td = new(big.Int).Add(td, rejected[0].Difficulty)
}
} else {
chunkHeaders, rejected = chunkHeaders[:i], chunkHeaders[i:]
}
break
}
}
}
Expand All @@ -1380,6 +1390,13 @@ func (d *Downloader) processHeaders(origin uint64, td, ttd *big.Int, beaconMode
rollback = 1
}
}
if len(rejected) != 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the last chunkHeader is the final PoW-block, then len(rejected) will be 0, but we still would have wanted to enter here, no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My gut feeling is that we don't want to, since everything can be successfully imported. The error should be triggered only if you attempt to import something after the merge block. It should get triggered when the next batch of something is attempted to be imported.

// Merge threshold reached, stop importing, but don't roll back
rollback = 0

log.Info("Legacy sync reached merge threshold", "number", rejected[0].Number, "hash", rejected[0].Hash(), "td", td, "ttd", ttd)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number reported here is the first rejected, so,
block-n, transition-block, invalid-block-1,..., the invalid-block-1 is what it will output. Seems not the right thing to output?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, that's exactly what I wanted to output. The block that's imported is not that relevant since we have import logs too as well as it's part of the chain so can be retrieved. What I'm interested to see is the details of the block that is rejected so I can look it up or investigate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But -- if all our peers are well-behaving, they won't send us PoS blocks over the network, will they? Just want to avoid the scenario where we wait for the fist non-pow header to abort our cycle and "officially enter beacon mode", but our peers aren't delivering it because it's PoS.

return ErrMergeTransition
}
}
// Unless we're doing light chains, schedule the headers for associated content retrieval
if mode == FullSync || mode == SnapSync {
Expand Down
12 changes: 9 additions & 3 deletions eth/downloader/skeleton.go
Expand Up @@ -177,7 +177,7 @@ type backfiller interface {
// concurrently with the sync cycle, since extensions arrive from an API surface,
// not from within (vs. legacy Ethereum sync).
//
// Since the skeleton tracks the entire header chain until it is cosumed by the
// Since the skeleton tracks the entire header chain until it is consumed by the
// forward block filling, it needs 0.5KB/block storage. At current mainnet sizes
// this is only possible with a disk backend. Since the skeleton is separate from
// the node's header chain, storing the headers ephemerally until sync finishes
Expand Down Expand Up @@ -748,13 +748,13 @@ func (s *skeleton) executeTask(peer *peerConnection, req *headerRequest) {
res.Done <- errors.New("invalid header batch anchor")
s.scheduleRevertRequest(req)

case headers[0].Number.Uint64() >= requestHeaders && len(headers) != requestHeaders:
case req.head >= requestHeaders && len(headers) != requestHeaders:
// Invalid number of non-genesis headers delivered, reject the response and reschedule
peer.log.Debug("Invalid non-genesis header count", "have", len(headers), "want", requestHeaders)
res.Done <- errors.New("not enough non-genesis headers delivered")
s.scheduleRevertRequest(req)

case headers[0].Number.Uint64() < requestHeaders && uint64(len(headers)) != headers[0].Number.Uint64():
case req.head < requestHeaders && uint64(len(headers)) != req.head:
// Invalid number of genesis headers delivered, reject the response and reschedule
peer.log.Debug("Invalid genesis header count", "have", len(headers), "want", headers[0].Number.Uint64())
res.Done <- errors.New("not enough genesis headers delivered")
Expand Down Expand Up @@ -953,6 +953,12 @@ func (s *skeleton) processResponse(res *headerResponse) bool {
merged = true
}
}
// If subchains were merged, all further available headers in the scratch
// space are invalid since we skipped ahead. Stop processing the scratch
// space to avoid dropping peers thinking they delivered invalid data.
if merged {
break
}
}
s.saveSyncStatus(batch)
if err := batch.Write(); err != nil {
Expand Down
86 changes: 71 additions & 15 deletions eth/downloader/skeleton_test.go
Expand Up @@ -79,20 +79,33 @@ func (hf *hookedBackfiller) resume() {
type skeletonTestPeer struct {
id string // Unique identifier of the mock peer
headers []*types.Header // Headers to serve when requested
served uint64 // Number of headers served by this peer
dropped uint64 // Flag whether the peer was dropped (stop responding)

serve func(origin uint64) []*types.Header // Hook to allow custom responses

served uint64 // Number of headers served by this peer
dropped uint64 // Flag whether the peer was dropped (stop responding)
}

// newSkeletonTestPeer creates a new mock peer to test the skeleton sync with.
// The only purpose of the constructor is to ensure we don't forget to set some
// mandatory field vs a struct literal initialization.
func newSkeletonTestPeer(id string, headers []*types.Header) *skeletonTestPeer {
return &skeletonTestPeer{
id: id,
headers: headers,
}
}

// newSkeletonTestPeer creates a new mock peer to test the skeleton sync with,
// and sets an optional serve hook that can return headers for delivery instead
// of the predefined chain. Useful for emulating malicious behavior that would
// otherwise require dedicated peer types.
func newSkeletonTestPeerWithHook(id string, headers []*types.Header, serve func(origin uint64) []*types.Header) *skeletonTestPeer {
return &skeletonTestPeer{
id: id,
headers: headers,
serve: serve,
}
}

// RequestHeadersByNumber constructs a GetBlockHeaders function based on a numbered
// origin; associated with a particular peer in the download tester. The returned
// function can be used to retrieve batches of headers from the particular peer.
Expand Down Expand Up @@ -126,18 +139,26 @@ func (p *skeletonTestPeer) RequestHeadersByNumber(origin uint64, amount int, ski
if amount > requestHeaders || (amount < requestHeaders && origin > uint64(amount)) {
panic(fmt.Sprintf("non-chunk size header batch requested: requested %d, want %d, origin %d", amount, requestHeaders, origin))
}
// Simple reverse header retrieval. Fill from the peer's chain and return
headers := make([]*types.Header, 0, amount)
if len(p.headers) > int(origin) { // Don't serve headers if we're missing the origin
for i := 0; i < amount; i++ {
// Consider nil headers as a form of attack and withhold them. Nil
// cannot be decoded from RLP, so it's not possible to produce an
// attack by sending/receiving those over eth.
header := p.headers[int(origin)-i]
if header == nil {
continue
// Simple reverse header retrieval. Fill from the peer's chain and return.
// If the tester has a serve hook set, try to use that before falling back
// to the default behavior.
var headers []*types.Header
if p.serve != nil {
headers = p.serve(origin)
}
if headers == nil {
headers = make([]*types.Header, 0, amount)
if len(p.headers) > int(origin) { // Don't serve headers if we're missing the origin
for i := 0; i < amount; i++ {
// Consider nil headers as a form of attack and withhold them. Nil
// cannot be decoded from RLP, so it's not possible to produce an
// attack by sending/receiving those over eth.
header := p.headers[int(origin)-i]
if header == nil {
continue
}
headers = append(headers, header)
}
headers = append(headers, header)
}
}
atomic.AddUint64(&p.served, uint64(len(headers)))
Expand Down Expand Up @@ -705,6 +726,41 @@ func TestSkeletonSyncRetrievals(t *testing.T) {
endserve: (requestHeaders + 101 - 2) + (100 - 1), // midserve + lenrest - genesis
enddrop: 1, // no new drops
},
// This test reproduces a bug caught during review (kudos to @holiman)
// where a subchain is merged with a previously interrupted one, causing
// pending data in the scratch space to become "invalid" (since we jump
// ahead during subchain merge). In that case it is expected to ignore
// the queued up data instead of trying to process on top of a shifted
// task set.
//
// The test is a bit convoluted since it needs to trigger a concurrency
// issue. First we sync up an initial chain of 2x512 items. Then announce
// 2x512+2 as head and delay delivering the head batch to fill the scratch
// space first. The delivery head should merge with the previous download
// and the scratch space must not be consumed further.
{
head: chain[2*requestHeaders],
peers: []*skeletonTestPeer{
newSkeletonTestPeerWithHook("peer-1", chain, func(origin uint64) []*types.Header {
if origin == chain[2*requestHeaders+2].Number.Uint64() {
time.Sleep(100 * time.Millisecond)
}
return nil // Fallback to default behavior, just delayed
}),
newSkeletonTestPeerWithHook("peer-2", chain, func(origin uint64) []*types.Header {
if origin == chain[2*requestHeaders+2].Number.Uint64() {
time.Sleep(100 * time.Millisecond)
}
return nil // Fallback to default behavior, just delayed
}),
},
midstate: []*subchain{{Head: 2 * requestHeaders, Tail: 1}},
midserve: 2*requestHeaders - 1, // len - head - genesis

newHead: chain[2*requestHeaders+2],
endstate: []*subchain{{Head: 2*requestHeaders + 2, Tail: 1}},
endserve: 4 * requestHeaders,
},
}
for i, tt := range tests {
// Create a fresh database and initialize it with the starting state
Expand Down