From c3215c9816c0164f0286977483dbcb7dc5b23139 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 30 Sep 2021 14:51:28 +0300 Subject: [PATCH] eth/downloader: implement beacon sync --- core/rawdb/accessors_sync.go | 80 +++ core/rawdb/schema.go | 9 + eth/api.go | 10 + eth/downloader/beaconsync.go | 221 ++++++ eth/downloader/downloader.go | 176 +++-- eth/downloader/downloader_test.go | 6 +- eth/downloader/fetchers_concurrent.go | 6 +- eth/downloader/peer.go | 12 +- eth/downloader/skeleton.go | 965 ++++++++++++++++++++++++++ eth/downloader/skeleton_test.go | 257 +++++++ eth/handler.go | 22 +- eth/sync.go | 8 +- internal/web3ext/web3ext.go | 5 + 13 files changed, 1694 insertions(+), 83 deletions(-) create mode 100644 core/rawdb/accessors_sync.go create mode 100644 eth/downloader/beaconsync.go create mode 100644 eth/downloader/skeleton.go create mode 100644 eth/downloader/skeleton_test.go diff --git a/core/rawdb/accessors_sync.go b/core/rawdb/accessors_sync.go new file mode 100644 index 0000000000000..50dfb848e4e0d --- /dev/null +++ b/core/rawdb/accessors_sync.go @@ -0,0 +1,80 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package rawdb + +import ( + "bytes" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rlp" +) + +// ReadSkeletonSyncStatus retrieves the serialized sync status saved at shutdown. +func ReadSkeletonSyncStatus(db ethdb.KeyValueReader) []byte { + data, _ := db.Get(skeletonSyncStatusKey) + return data +} + +// WriteSkeletonSyncStatus stores the serialized sync status to save at shutdown. +func WriteSkeletonSyncStatus(db ethdb.KeyValueWriter, status []byte) { + if err := db.Put(skeletonSyncStatusKey, status); err != nil { + log.Crit("Failed to store skeleton sync status", "err", err) + } +} + +// DeleteSkeletonSyncStatus deletes the serialized sync status saved at the last +// shutdown +func DeleteSkeletonSyncStatus(db ethdb.KeyValueWriter) { + if err := db.Delete(skeletonSyncStatusKey); err != nil { + log.Crit("Failed to remove skeleton sync status", "err", err) + } +} + +// ReadSkeletonHeader retrieves a block header from the skeleton sync store, +func ReadSkeletonHeader(db ethdb.KeyValueReader, number uint64) *types.Header { + data, _ := db.Get(skeletonHeaderKey(number)) + if len(data) == 0 { + return nil + } + header := new(types.Header) + if err := rlp.Decode(bytes.NewReader(data), header); err != nil { + log.Error("Invalid skeleton header RLP", "number", number, "err", err) + return nil + } + return header +} + +// WriteSkeletonHeader stores a block header into the skeleton sync store. +func WriteSkeletonHeader(db ethdb.KeyValueWriter, header *types.Header) { + data, err := rlp.EncodeToBytes(header) + if err != nil { + log.Crit("Failed to RLP encode header", "err", err) + } + key := skeletonHeaderKey(header.Number.Uint64()) + if err := db.Put(key, data); err != nil { + log.Crit("Failed to store skeleton header", "err", err) + } +} + +// DeleteSkeletonHeader removes all block header data associated with a hash. +func DeleteSkeletonHeader(db ethdb.KeyValueWriter, number uint64) { + if err := db.Delete(skeletonHeaderKey(number)); err != nil { + log.Crit("Failed to delete skeleton header", "err", err) + } +} diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index b35fcba45f79a..f869f36e10405 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -63,6 +63,9 @@ var ( // snapshotSyncStatusKey tracks the snapshot sync status across restarts. snapshotSyncStatusKey = []byte("SnapshotSyncStatus") + // skeletonSyncStatusKey tracks the skeleton sync status across restarts. + skeletonSyncStatusKey = []byte("SkeletonSyncStatus") + // txIndexTailKey tracks the oldest block whose transactions have been indexed. txIndexTailKey = []byte("TransactionIndexTail") @@ -92,6 +95,7 @@ var ( SnapshotAccountPrefix = []byte("a") // SnapshotAccountPrefix + account hash -> account trie value SnapshotStoragePrefix = []byte("o") // SnapshotStoragePrefix + account hash + storage hash -> storage trie value CodePrefix = []byte("c") // CodePrefix + code hash -> account code + skeletonHeaderPrefix = []byte("S") // skeletonHeaderPrefox + num (uint64 big endian) -> header PreimagePrefix = []byte("secure-key-") // PreimagePrefix + hash -> preimage configPrefix = []byte("ethereum-config-") // config prefix for the db @@ -210,6 +214,11 @@ func bloomBitsKey(bit uint, section uint64, hash common.Hash) []byte { return key } +// skeletonHeaderKey = skeletonHeaderPrefix + num (uint64 big endian) +func skeletonHeaderKey(number uint64) []byte { + return append(skeletonHeaderPrefix, encodeBlockNumber(number)...) +} + // preimageKey = PreimagePrefix + hash func preimageKey(hash common.Hash) []byte { return append(PreimagePrefix, hash.Bytes()...) diff --git a/eth/api.go b/eth/api.go index f81dfa922b7a8..bc2a615ad1dd6 100644 --- a/eth/api.go +++ b/eth/api.go @@ -257,6 +257,16 @@ func (api *PrivateAdminAPI) ImportChain(file string) (bool, error) { return true, nil } +// NewHead requests the node to beacon-sync to the designated head header. +func (api *PrivateAdminAPI) NewHead(blob hexutil.Bytes) error { + header := new(types.Header) + if err := rlp.DecodeBytes(blob, header); err != nil { + return err + } + mode, _ := api.eth.handler.chainSync.modeAndLocalHead() + return api.eth.Downloader().BeaconSync(mode, header) +} + // PublicDebugAPI is the collection of Ethereum full node APIs exposed // over the public debugging endpoint. type PublicDebugAPI struct { diff --git a/eth/downloader/beaconsync.go b/eth/downloader/beaconsync.go new file mode 100644 index 0000000000000..6c0b760160534 --- /dev/null +++ b/eth/downloader/beaconsync.go @@ -0,0 +1,221 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package downloader + +import ( + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" +) + +// beaconBackfiller is the chain and state backfilling that can be commenced once +// the skeleton syncer has successfully reverse downloaded all the headers up to +// the genesis block or an existing header in the database. Its operation is fully +// directed by the skeleton sync's head/tail events. +type beaconBackfiller struct { + downloader *Downloader // Downloader to direct via this callback implementation + syncMode SyncMode // Sync mode to use for backfilling the skeleton chains + success func() // Callback to run on successful sync cycle completion + filling bool // Flag whether the downloader is backfilling or not + lock sync.Mutex // Mutex protecting the sync lock +} + +// newBeaconBackfiller is a helper method to create the backfiller. +func newBeaconBackfiller(dl *Downloader, success func()) backfiller { + return &beaconBackfiller{ + downloader: dl, + success: success, + } +} + +// suspend cancels any background downloader threads. +func (b *beaconBackfiller) suspend() { + b.downloader.Cancel() +} + +// resume starts the downloader threads for backfilling state and chain data. +func (b *beaconBackfiller) resume() { + b.lock.Lock() + if b.filling { + // If a previous filling cycle is still running, just ignore this start + // request. // TODO(karalabe): We should make this channel driven + b.lock.Unlock() + return + } + b.filling = true + mode := b.syncMode + b.lock.Unlock() + + // Start the backfilling on its own thread since the downloader does not have + // its own lifecycle runloop. + go func() { + // Set the backfiller to non-filling when download completes + defer func() { + b.lock.Lock() + b.filling = false + b.lock.Unlock() + }() + // If the downloader fails, report an error as in beacon chain mode there + // should be no errors as long as the chain we're syncing to is valid. + if err := b.downloader.synchronise("", common.Hash{}, nil, mode, true); err != nil { + log.Error("Beacon backfilling failed", "err", err) + return + } + // Synchronization succeeded. Since this happens async, notify the outer + // context to disable snap syncing and enable transaction propagation. + if b.success != nil { + b.success() + } + }() +} + +// setMode updates the sync mode from the current one to the requested one. If +// there's an active sync in progress, it will be cancelled and restarted. +func (b *beaconBackfiller) setMode(mode SyncMode) { + // Update the old sync mode and track if it was changed + b.lock.Lock() + updated := b.syncMode != mode + filling := b.filling + b.syncMode = mode + b.lock.Unlock() + + // If the sync mode was changed mid-sync, restart. This should never ever + // really happen, we just handle it to detect programming errors. + if !updated || !filling { + return + } + log.Error("Downloader sync mode changed mid-run", "old", mode.String(), "new", mode.String()) + b.suspend() + b.resume() +} + +// BeaconSync is the Ethereum 2 version of the chain synchronization, where the +// chain is not downloaded from genesis onward, rather from trusted head announces +// backwards. +// +// Internally backfilling and state sync is done the same way, but the header +// retrieval and scheduling is replaced. +func (d *Downloader) BeaconSync(mode SyncMode, head *types.Header) error { + // When the downloader starts a sync cycle, it needs to be aware of the sync + // mode to use (full, snap). To keep the skeleton chain oblivious, inject the + // mode into the backfiller directly. + // + // Super crazy dangerous type cast. Should be fine (TM), we're only using a + // different backfiller implementation for skeleton tests. + d.skeleton.filler.(*beaconBackfiller).setMode(mode) + + // Signal the skeleton sync to switch to a new head, however it wants + if err := d.skeleton.Sync(head); err != nil { + return err + } + return nil +} + +// findBeaconAncestor tries to locate the common ancestor link of the local chain +// and the beacon chain just requested. In the general case when our node was in +// sync and on the correct chain, checking the top N links should already get us +// a match. In the rare scenario when we ended up on a long reorganisation (i.e. +// none of the head links match), we do a binary search to find the ancestor. +func (d *Downloader) findBeaconAncestor() uint64 { + // Figure out the current local head position + var head *types.Header + + switch d.getMode() { + case FullSync: + head = d.blockchain.CurrentBlock().Header() + case SnapSync: + head = d.blockchain.CurrentFastBlock().Header() + default: + head = d.lightchain.CurrentHeader() + } + number := head.Number.Uint64() + + // If the head is present in the skeleton chain, return that + if head.Hash() == d.skeleton.Header(number).Hash() { + return number + } + // Head header not present, binary search to find the ancestor + start, end := uint64(0), number + for start+1 < end { + // Split our chain interval in two, and request the hash to cross check + check := (start + end) / 2 + + h := d.skeleton.Header(check) + n := h.Number.Uint64() + + var known bool + switch d.getMode() { + case FullSync: + known = d.blockchain.HasBlock(h.Hash(), n) + case SnapSync: + known = d.blockchain.HasFastBlock(h.Hash(), n) + default: + known = d.lightchain.HasHeader(h.Hash(), n) + } + if !known { + end = check + continue + } + start = check + } + return start +} + +// fetchBeaconHeaders feeds skeleton headers to the downloader queue for scheduling +// until sync errors or is finished. +func (d *Downloader) fetchBeaconHeaders(from uint64) error { + head, err := d.skeleton.Head() + if err != nil { + return err + } + for { + // Retrieve a batch of headers and feed it to the header processor + headers := make([]*types.Header, 0, maxHeadersProcess) + for i := 0; i < maxHeadersProcess && from <= head.Number.Uint64(); i++ { + headers = append(headers, d.skeleton.Header(from)) + from++ + } + select { + case d.headerProcCh <- headers: + case <-d.cancelCh: + return errCanceled + } + // If we still have headers to import, loop and keep pushing them + if from <= head.Number.Uint64() { + continue + } + // If the pivot block is committed, signal header sync termination + if atomic.LoadInt32(&d.committed) == 1 { + d.headerProcCh <- nil + return nil + } + // State sync still going, wait a bit for new headers and retry + select { + case <-time.After(fsHeaderContCheck): + case <-d.cancelCh: + return errCanceled + } + head, err = d.skeleton.Head() + if err != nil { + return err + } + } +} diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 95b90d3b57109..50b6adbdd71a6 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -30,7 +30,6 @@ import ( "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/eth/protocols/snap" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" @@ -118,6 +117,9 @@ type Downloader struct { // Channels headerProcCh chan []*types.Header // Channel to feed the header processor new tasks + // Skeleton sync + skeleton *skeleton // Header skeleton to backfill the chain with (eth2 mode) + // State sync pivotHeader *types.Header // Pivot block header to dynamically push the syncing state root pivotLock sync.RWMutex // Lock protecting pivot header reads from updates @@ -196,7 +198,7 @@ type BlockChain interface { } // New creates a new downloader to fetch hashes and blocks from remote peers. -func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader { +func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn, success func()) *Downloader { if lightchain == nil { lightchain = chain } @@ -215,6 +217,8 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, SnapSyncer: snap.NewSyncer(stateDb), stateSyncStart: make(chan *stateSync), } + dl.skeleton = newSkeleton(stateDb, dl.peers, dropPeer, newBeaconBackfiller(dl, success)) + go dl.stateFetcher() return dl } @@ -314,10 +318,10 @@ func (d *Downloader) UnregisterPeer(id string) error { return nil } -// Synchronise tries to sync up our local block chain with a remote peer, both +// LegacySync tries to sync up our local block chain with a remote peer, both // adding various sanity checks as well as wrapping it with various log entries. -func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error { - err := d.synchronise(id, head, td, mode) +func (d *Downloader) LegacySync(id string, head common.Hash, td *big.Int, mode SyncMode) error { + err := d.synchronise(id, head, td, mode, false) switch err { case nil, errBusy, errCanceled: @@ -343,7 +347,7 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode // synchronise will select the peer and use it for synchronising. If an empty string is given // it will use the best peer possible and synchronize if its TD is higher than our own. If any of the // checks fail an error will be returned. This method is synchronous -func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error { +func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode, beaconMode bool) error { // Mock out the synchronisation if testing if d.synchroniseMock != nil { return d.synchroniseMock(id, hash) @@ -404,11 +408,14 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode atomic.StoreUint32(&d.mode, uint32(mode)) // Retrieve the origin peer and initiate the downloading process - p := d.peers.Peer(id) - if p == nil { - return errUnknownPeer + var p *peerConnection + if !beaconMode { // Beacon mode doesn't need a peer to sync from + p = d.peers.Peer(id) + if p == nil { + return errUnknownPeer + } } - return d.syncWithPeer(p, hash, td) + return d.syncWithPeer(p, hash, td, beaconMode) } func (d *Downloader) getMode() SyncMode { @@ -417,7 +424,7 @@ func (d *Downloader) getMode() SyncMode { // syncWithPeer starts a block synchronization based on the hash chain from the // specified peer and head hash. -func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) { +func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int, beaconMode bool) (err error) { d.mux.Post(StartEvent{}) defer func() { // reset on error @@ -428,33 +435,57 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I d.mux.Post(DoneEvent{latest}) } }() - if p.version < eth.ETH66 { - return fmt.Errorf("%w: advertized %d < required %d", errTooOld, p.version, eth.ETH66) - } mode := d.getMode() - log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", mode) + if !beaconMode { + log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", mode) + } else { + log.Debug("Backfilling with the network", "mode", mode) + } defer func(start time.Time) { log.Debug("Synchronisation terminated", "elapsed", common.PrettyDuration(time.Since(start))) }(time.Now()) // Look up the sync boundaries: the common ancestor and the target block - latest, pivot, err := d.fetchHead(p) - if err != nil { - return err - } - if mode == SnapSync && pivot == nil { - // If no pivot block was returned, the head is below the min full block - // threshold (i.e. new chain). In that case we won't really snap sync - // anyway, but still need a valid pivot block to avoid some code hitting - // nil panics on an access. - pivot = d.blockchain.CurrentBlock().Header() + var latest, pivot *types.Header + if !beaconMode { + // In legacy mode, use the master peer to retrieve the headers from + latest, pivot, err = d.fetchHead(p) + if err != nil { + return err + } + if mode == SnapSync && pivot == nil { + // If no pivot block was returned, the head is below the min full block + // threshold (i.e. new chain). In that case we won't really snap sync + // anyway, but still need a valid pivot block to avoid some code hitting + // nil panics on an access. + pivot = d.blockchain.CurrentBlock().Header() + } + } else { + // In beacon mode, user the skeleton chain to retrieve the headers from + latest, err = d.skeleton.Head() + if err != nil { + return err + } + // Opposed to legacy mode, in beacon mode we trust the chain we've been + // told to sync to, so no need to leave a gap between the pivot and head + // to full sync. Still, the downloader's been architected to do a full + // block import after the pivot, so make it off by one to avoid having + // to special case everything internally. + pivot = d.skeleton.Header(latest.Number.Uint64() - 1) } height := latest.Number.Uint64() - origin, err := d.findAncestor(p, latest) - if err != nil { - return err + var origin uint64 + if !beaconMode { + // In legacy mode, reach out to the network and find the ancestor + origin, err = d.findAncestor(p, latest) + if err != nil { + return err + } + } else { + // In beacon mode, use the skeleton chain for the ancestor lookup + origin = d.findBeaconAncestor() } d.syncStatsLock.Lock() if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin { @@ -525,11 +556,19 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I if d.syncInitHook != nil { d.syncInitHook(origin, height) } + var headerFetcher func() error + if !beaconMode { + // In legacy mode, headers are retrieved from the network + headerFetcher = func() error { return d.fetchHeaders(p, origin+1, latest.Number.Uint64()) } + } else { + // In beacon mode, headers are served by the skeleton syncer + headerFetcher = func() error { return d.fetchBeaconHeaders(origin + 1) } + } fetchers := []func() error{ - func() error { return d.fetchHeaders(p, origin+1, latest.Number.Uint64()) }, // Headers are always retrieved - func() error { return d.fetchBodies(origin + 1) }, // Bodies are retrieved during normal and snap sync - func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during snap sync - func() error { return d.processHeaders(origin+1, td) }, + headerFetcher, // Headers are always retrieved + func() error { return d.fetchBodies(origin+1, beaconMode) }, // Bodies are retrieved during normal and snap sync + func() error { return d.fetchReceipts(origin+1, beaconMode) }, // Receipts are retrieved during snap sync + func() error { return d.processHeaders(origin+1, td, beaconMode) }, } if mode == SnapSync { d.pivotLock.Lock() @@ -1125,7 +1164,7 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ( log.Debug("Filling up skeleton", "from", from) d.queue.ScheduleSkeleton(from, skeleton) - err := d.concurrentFetch((*headerQueue)(d)) + err := d.concurrentFetch((*headerQueue)(d), false) if err != nil { log.Debug("Skeleton fill failed", "err", err) } @@ -1139,9 +1178,9 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ( // fetchBodies iteratively downloads the scheduled block bodies, taking any // available peers, reserving a chunk of blocks for each, waiting for delivery // and also periodically checking for timeouts. -func (d *Downloader) fetchBodies(from uint64) error { +func (d *Downloader) fetchBodies(from uint64, beaconMode bool) error { log.Debug("Downloading block bodies", "origin", from) - err := d.concurrentFetch((*bodyQueue)(d)) + err := d.concurrentFetch((*bodyQueue)(d), beaconMode) log.Debug("Block body download terminated", "err", err) return err @@ -1150,9 +1189,9 @@ func (d *Downloader) fetchBodies(from uint64) error { // fetchReceipts iteratively downloads the scheduled block receipts, taking any // available peers, reserving a chunk of receipts for each, waiting for delivery // and also periodically checking for timeouts. -func (d *Downloader) fetchReceipts(from uint64) error { +func (d *Downloader) fetchReceipts(from uint64, beaconMode bool) error { log.Debug("Downloading receipts", "origin", from) - err := d.concurrentFetch((*receiptQueue)(d)) + err := d.concurrentFetch((*receiptQueue)(d), beaconMode) log.Debug("Receipt download terminated", "err", err) return err @@ -1161,7 +1200,7 @@ func (d *Downloader) fetchReceipts(from uint64) error { // processHeaders takes batches of retrieved headers from an input channel and // keeps processing and scheduling them into the header chain and downloader's // queue until the stream ends or a failure occurs. -func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { +func (d *Downloader) processHeaders(origin uint64, td *big.Int, beaconMode bool) error { // Keep a count of uncertain headers to roll back var ( rollback uint64 // Zero means no rollback (fine as you can't unroll the genesis) @@ -1209,35 +1248,40 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { case <-d.cancelCh: } } - // If no headers were retrieved at all, the peer violated its TD promise that it had a - // better chain compared to ours. The only exception is if its promised blocks were - // already imported by other means (e.g. fetcher): - // - // R , L : Both at block 10 - // R: Mine block 11, and propagate it to L - // L: Queue block 11 for import - // L: Notice that R's head and TD increased compared to ours, start sync - // L: Import of block 11 finishes - // L: Sync begins, and finds common ancestor at 11 - // L: Request new headers up from 11 (R's TD was higher, it must have something) - // R: Nothing to give - if mode != LightSync { - head := d.blockchain.CurrentBlock() - if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 { - return errStallingPeer + // If we're in legacy sync mode, we need to check total difficulty + // violations from malicious peers. That is not needed in beacon + // mode and we can skip to terminating sync. + if !beaconMode { + // If no headers were retrieved at all, the peer violated its TD promise that it had a + // better chain compared to ours. The only exception is if its promised blocks were + // already imported by other means (e.g. fetcher): + // + // R , L : Both at block 10 + // R: Mine block 11, and propagate it to L + // L: Queue block 11 for import + // L: Notice that R's head and TD increased compared to ours, start sync + // L: Import of block 11 finishes + // L: Sync begins, and finds common ancestor at 11 + // L: Request new headers up from 11 (R's TD was higher, it must have something) + // R: Nothing to give + if mode != LightSync { + head := d.blockchain.CurrentBlock() + if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 { + return errStallingPeer + } } - } - // If snap or light syncing, ensure promised headers are indeed delivered. This is - // needed to detect scenarios where an attacker feeds a bad pivot and then bails out - // of delivering the post-pivot blocks that would flag the invalid content. - // - // This check cannot be executed "as is" for full imports, since blocks may still be - // queued for processing when the header download completes. However, as long as the - // peer gave us something useful, we're already happy/progressed (above check). - if mode == SnapSync || mode == LightSync { - head := d.lightchain.CurrentHeader() - if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 { - return errStallingPeer + // If snap or light syncing, ensure promised headers are indeed delivered. This is + // needed to detect scenarios where an attacker feeds a bad pivot and then bails out + // of delivering the post-pivot blocks that would flag the invalid content. + // + // This check cannot be executed "as is" for full imports, since blocks may still be + // queued for processing when the header download completes. However, as long as the + // peer gave us something useful, we're already happy/progressed (above check). + if mode == SnapSync || mode == LightSync { + head := d.lightchain.CurrentHeader() + if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 { + return errStallingPeer + } } } // Disable any rollback and return diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index e6e092b22f5af..566e0febee23b 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -75,7 +75,7 @@ func newTester() *downloadTester { chain: chain, peers: make(map[string]*downloadTesterPeer), } - tester.downloader = New(0, db, trie.NewSyncBloom(1, db), new(event.TypeMux), tester.chain, nil, tester.dropPeer) + tester.downloader = New(0, db, trie.NewSyncBloom(1, db), new(event.TypeMux), tester.chain, nil, tester.dropPeer, nil) return tester } @@ -96,7 +96,7 @@ func (dl *downloadTester) sync(id string, td *big.Int, mode SyncMode) error { td = dl.peers[id].chain.GetTd(head.Hash(), head.NumberU64()) } // Synchronise with the chosen peer and ensure proper cleanup afterwards - err := dl.downloader.synchronise(id, head.Hash(), td, mode) + err := dl.downloader.synchronise(id, head.Hash(), td, mode, false) select { case <-dl.downloader.cancelCh: // Ok, downloader fully cancelled after sync cycle @@ -933,7 +933,7 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol uint) { // Simulate a synchronisation and check the required result tester.downloader.synchroniseMock = func(string, common.Hash) error { return tt.result } - tester.downloader.Synchronise(id, tester.chain.Genesis().Hash(), big.NewInt(1000), FullSync) + tester.downloader.LegacySync(id, tester.chain.Genesis().Hash(), big.NewInt(1000), FullSync) if _, ok := tester.peers[id]; !ok != tt.drop { t.Errorf("test %d: peer drop mismatch for %v: have %v, want %v", i, tt.result, !ok, tt.drop) } diff --git a/eth/downloader/fetchers_concurrent.go b/eth/downloader/fetchers_concurrent.go index 4bade2b4c3dd3..a0aa197175a31 100644 --- a/eth/downloader/fetchers_concurrent.go +++ b/eth/downloader/fetchers_concurrent.go @@ -76,7 +76,7 @@ type typedQueue interface { // concurrentFetch iteratively downloads scheduled block parts, taking available // peers, reserving a chunk of fetch requests for each and waiting for delivery // or timeouts. -func (d *Downloader) concurrentFetch(queue typedQueue) error { +func (d *Downloader) concurrentFetch(queue typedQueue, beaconMode bool) error { // Create a delivery channel to accept responses from all peers responses := make(chan *eth.Response) @@ -127,7 +127,7 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error { finished := false for { // Short circuit if we lost all our peers - if d.peers.Len() == 0 { + if d.peers.Len() == 0 && !beaconMode { return errNoPeers } // If there's nothing more to fetch, wait or terminate @@ -209,7 +209,7 @@ func (d *Downloader) concurrentFetch(queue typedQueue) error { } // Make sure that we have peers available for fetching. If all peers have been tried // and all failed throw an error - if !progressed && !throttled && len(pending) == 0 && len(idles) == d.peers.Len() && queued > 0 { + if !progressed && !throttled && len(pending) == 0 && len(idles) == d.peers.Len() && queued > 0 && !beaconMode { return errPeersUnavailable } } diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 324fdb9cd51f5..d74d23e74d557 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -294,19 +294,19 @@ func (ps *peerSet) AllPeers() []*peerConnection { // peerCapacitySort implements sort.Interface. // It sorts peer connections by capacity (descending). type peerCapacitySort struct { - p []*peerConnection - tp []int + peers []*peerConnection + caps []int } func (ps *peerCapacitySort) Len() int { - return len(ps.p) + return len(ps.peers) } func (ps *peerCapacitySort) Less(i, j int) bool { - return ps.tp[i] > ps.tp[j] + return ps.caps[i] > ps.caps[j] } func (ps *peerCapacitySort) Swap(i, j int) { - ps.p[i], ps.p[j] = ps.p[j], ps.p[i] - ps.tp[i], ps.tp[j] = ps.tp[j], ps.tp[i] + ps.peers[i], ps.peers[j] = ps.peers[j], ps.peers[i] + ps.caps[i], ps.caps[j] = ps.caps[j], ps.caps[i] } diff --git a/eth/downloader/skeleton.go b/eth/downloader/skeleton.go new file mode 100644 index 0000000000000..7c604cbffac75 --- /dev/null +++ b/eth/downloader/skeleton.go @@ -0,0 +1,965 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package downloader + +import ( + "encoding/json" + "errors" + "math/rand" + "sort" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/protocols/eth" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" +) + +// scratchHeaders is the number of headers to store in a scratch space to allow +// concurrent downloads. A header is about 0.5KB in size, so there is no worry +// about using too much memory. The only catch is that we can only validate gaps +// afer they're linked to the head, so the bigger the scratch space, the larger +// potential for invalid headers. +// +// The current scratch space of 131072 headers is expected to use 64MB RAM. +const scratchHeaders = 131072 + +// requestHeaders is the number of header to request from a remote peer in a single +// network packet. Although the skeleton downloader takes into consideration peer +// capacities when picking idlers, the packet size was decided to remain constant +// since headers are relatively small and it's easier to work with fixed batches +// vs. dynamic interval fillings. +const requestHeaders = 512 + +// errSyncLinked is an internal helper error to signal that the current sync +// cycle linked up to the genesis block, this the skeleton syncer should ping +// the backfiller to resume. Since we already have that logic on sync start, +// piggie-back on that instead of 2 entrypoints. +var errSyncLinked = errors.New("sync linked") + +// errSyncMerged is an internal helper error to signal that the current sync +// cycle merged with a previously aborted subchain, thus the skeleton syncer +// should abort and restart with the new state. +var errSyncMerged = errors.New("sync merged") + +// errSyncReorged is an internal helper error to signal that the head chain of +// the current sync cycle was (partially) reorged, thus the skeleton syncer +// should abort and restart with the new state. +var errSyncReorged = errors.New("sync reorged") + +// errTerminated is returned if the sync mechanism was terminated for this run of +// the process. This is usually the case when Geth is shutting down and some events +// might still be propagating. +var errTerminated = errors.New("terminated") + +func init() { + // Tuning parameters is nice, but the scratch space must be assignable in + // full to peers. It's a useless cornercase to support a dangling half-group. + if scratchHeaders%requestHeaders != 0 { + panic("Please make scratchHeaders divisible by requestHeaders") + } +} + +// subchain is a contiguous header chain segment that is backed by the database, +// but may not be linked to the live chain. The skeleton downloader may produce +// a new one of these every time it is restarted until the subchain grows large +// enough to connect with a previous subchain. +// +// The subchains use the exact same database namespace and are not disjoint from +// each other. As such, extending one to overlap the other entails reducing the +// second one first. This combined buffer model is used to avoid having to move +// data on disk when two subchains are joined together. +type subchain struct { + Head uint64 // Block number of the newest header in the subchain + Tail uint64 // Block number of the oldest header in the subchain + Next common.Hash // Block hash of the next oldest header in the subchain +} + +// skeletonProgress is a database entry to allow suspending and resuming a chain +// sync. As the skeleton header chain is downloaded backwards, restarts can and +// will produce temporarilly disjoint subchains. There is no way to restart a +// suspended skeleton sync without prior knowlege of all prior suspension points. +type skeletonProgress struct { + Subchains []*subchain // Disjoint subchains downloaded until now +} + +// headerRequest tracks a pending header request to ensure responses are to +// actual requests and to validate any security constraints. +// +// Concurrency note: header requests and responses are handled concurrently from +// the main runloop to allow Keccak256 hash verifications on the peer's thread and +// to drop on invalid response. The request struct must contain all the data to +// construct the response without accessing runloop internals (i.e. subchains). +// That is only included to allow the runloop to match a response to the task being +// synced without having yet another set of maps. +type headerRequest struct { + peer string // Peer to which this request is assigned + id uint64 // Request ID of this request + + deliver chan *headerResponse // Channel to deliver successful response on + revert chan *headerRequest // Channel to deliver request failure on + cancel chan struct{} // Channel to track sync cancellation + stale chan struct{} // Channel to signal the request was dropped + + head uint64 // Head number of the requested batch of headers +} + +// headerResponse is an already verified remote response to a header request. +type headerResponse struct { + peer *peerConnection // Peer from which this response originates + reqid uint64 // Request ID that this response fulfils + headers []*types.Header // Chain of headers +} + +// backfiller is a callback interface through which the skeleton sync can tell +// the downloader that it should suspend or resume backfilling on specific head +// events (e.g. suspend on forks or gaps, resume on successfull linkups). +type backfiller interface { + // suspend requests the backfiller to abort any running full or snap sync + // based on the skeleton chain as it might be invalid. The backfiller should + // gracefully handle multiple consecutive suspends without a resume, even + // on initial sartup. + suspend() + + // resume requests the backfiller to start running fill or snap sync based on + // the skeleton chain as it has successfully been linked. Appending new heads + // to the end of the chain will not result in suspend/resume cycles. + resume() +} + +// skeleton represents a header chain synchronized after the Ethereum 2 merge, +// where blocks aren't validated any more via PoW in a forward fashion, rather +// are dictated and extended at the head via the beacon chain and backfilled on +// the original Ethereum 1 block sync protocol. +// +// Since the skeleton is grown backwards from head to genesis, it is handled as +// a separate entity, not mixed in with the logical sequential transition of the +// blocks. Once the skeleton is connected to an existing, validated chain, the +// headers will be moved into the main downloader for filling and execution. +// +// Opposed to the Ethereum 1 block synchronization which is trustless (and uses a +// master peer to minimize the attack surface), Ethereum 2 block synchronization +// starts from a trusted head. As such, there is no need for a master peer any +// more and headers can be requested fully concurrently (though some batches might +// be discarded if they don't link up correctly). +// +// Although a skeleton is part of a sync cycle, it is not recreated, rather stays +// alive throughout the lifetime of the downloader. This allows it to be extended +// concurrently with the sync cycle, since extensions arrive from an API surface, +// not from within (vs. Ethereum 1 sync). +// +// Since the skeleton tracks the entire header chain until it is cosumed by the +// forward block filling, it needs 0.5KB/block storage. At current mainnet sizes +// this is only possible with a disk backend. Since the skeleton is separate from +// the node's header chain, storing the headers ephemerally until sync finishes +// is wasted disk IO, but it's a price we're going to pay to keep things simple +// for now. +type skeleton struct { + db ethdb.Database // Database backing the skeleton + filler backfiller // Chain syncer suspended/resumed by head events + + peers *peerSet // Set of peers we can sync from + idles map[string]*peerConnection // Set of idle peers in the current sync cycle + drop peerDropFn // Drops a peer for misbehaving + + progress *skeletonProgress // Sync progress tracker for resumption and metrics + started time.Time // Timestamp when the skeleton syncer was created + logged time.Time // Timestamp when progress was last logged to the user + pulled uint64 // Number of headers downloaded in this run + + scratchSpace []*types.Header // Scratch space to accumulate headers in (first = recent) + scratchOwners []string // Peer IDs owning chunks of the scratch space (pend or delivered) + scratchHead uint64 // Block number of the first item in the scratch space + + requests map[uint64]*headerRequest // Header requests currently running + + headEvents chan *types.Header // Notification channel for new heads + terminate chan chan error // Termination channel to abort sync + terminated chan struct{} // Channel to signal that the syner is dead + + // Callback hooks used during testing + syncStarting func() // callback triggered after a sync cycle is inited but before started +} + +// newSkeleton creates a new sync skeleton that tracks a potentially dangling +// header chain until it's linked into an existing set of blocks. +func newSkeleton(db ethdb.Database, peers *peerSet, drop peerDropFn, filler backfiller) *skeleton { + sk := &skeleton{ + db: db, + filler: filler, + peers: peers, + drop: drop, + requests: make(map[uint64]*headerRequest), + headEvents: make(chan *types.Header), + terminate: make(chan chan error), + terminated: make(chan struct{}), + } + go sk.startup() + return sk +} + +// startup is an initial background loop which waits for an event to start or +// tear the syncer down. This is required to make the skeleton sync loop once +// per process but at the same time not start before the beacon chain announces +// a new (existing) head. +func (s *skeleton) startup() { + // Close a notification channel so anyone sending us events will know if the + // sync loop was torn down for good. + defer close(s.terminated) + + // Wait for startup or teardown + select { + case errc := <-s.terminate: + // No head was announced but Geth is shutting down + errc <- nil + return + + case head := <-s.headEvents: + // New head announced, start syncing to it, looping every time a current + // cycle is terminated due to a chain event (head reorg, old chain merge) + s.started = time.Now() + + for { + // If the sync cycle terminated or was terminated, propagate up when + // higher layers request termination. There's no fancy explicit error + // signalling as the sync loop should never terminate (TM). + newhead, err := s.sync(head) + switch { + case err == errSyncLinked: + // Sync cycle linked up to the genesis block. Tear down the loop + // and restart it so, it can properly notify the backfiller. Don't + // account a new head. + head = nil + + case err == errSyncMerged: + // Subchains were merged, we just need to reinit the internal + // start to continue on the tail of the merged chain. Don't + // announce a new head, + head = nil + + case err == errSyncReorged: + // The subchain being synced got modified at the head in a + // way that requires resyncing it. Restart sync with the new + // head to force a cleanup. + head = newhead + + case err == errTerminated: + // Sync was requested to be terminated from within, stop and + // return (no need to pass a message, was already done internally) + return + + default: + // Sync either successfully terminated or failed with an unhandled + // error. Abort and wait until Geth requests a termination. + errc := <-s.terminate + errc <- err + return + } + } + } +} + +// Terminate tears down the syncer indefinitely. +func (s *skeleton) Terminate() error { + // Request termination and fetch any errors + errc := make(chan error) + s.terminate <- errc + err := <-errc + + // Wait for full shutdown (not necessary, but cleaner) + <-s.terminated + return err +} + +// Sync starts or resumes a previous sync cycle to download and maintain a reverse +// header chain starting at the head and leading towards genesis to an available +// ancestor. +// +// This method does not block, rather it just waits until the syncer receives the +// fed header. What the syncer does with it is the syncer's problem. +func (s *skeleton) Sync(head *types.Header) error { + log.Trace("New skeleton head announced", "number", head.Number, "hash", head.Hash()) + select { + case s.headEvents <- head: + return nil + case <-s.terminated: + return errTerminated + } +} + +// sync is the internal version of Sync that executes a single sync cycle, either +// until some termination condition is reached, or until the current cycle merges +// with a previously aborted run. +func (s *skeleton) sync(head *types.Header) (*types.Header, error) { + // If we're continuing a previous merge interrupt, just access the existing + // old state without initing from disk. + if head == nil { + head = rawdb.ReadSkeletonHeader(s.db, s.progress.Subchains[0].Head) + } else { + // Otherwise, initialize the sync, trimming and previous leftovers until + // we're consistent with the newly requested chain head + s.initSync(head) + } + // Create the scratch space to fill with concurrently downloaded headers + s.scratchSpace = make([]*types.Header, scratchHeaders) + defer func() { s.scratchSpace = nil }() // don't hold on to references after sync + + s.scratchOwners = make([]string, scratchHeaders/requestHeaders) + defer func() { s.scratchOwners = nil }() // don't hold on to references after sync + + s.scratchHead = s.progress.Subchains[0].Tail - 1 // tail must not be 0! + + // If the sync is already done, resume the backfiller. When the loop stops, + // terminate the backfiller too. + if s.scratchHead == 0 { + s.filler.resume() + } + defer s.filler.suspend() + + // Create a set of unique channels for this sync cycle. We need these to be + // ephemeral so a data race doesn't accidentally deliver something stale on + // a persistent channel across syncs (yup, this happened) + var ( + requestFails = make(chan *headerRequest) + responses = make(chan *headerResponse) + ) + cancel := make(chan struct{}) + defer close(cancel) + + log.Debug("Starting reverse header sync cycle", "head", head.Number, "hash", head.Hash(), "cont", s.scratchHead) + + // Whether sync completed or not, disregard any future packets + defer func() { + log.Debug("Terminating reverse header sync cycle", "head", head.Number, "hash", head.Hash(), "cont", s.scratchHead) + s.requests = make(map[uint64]*headerRequest) + }() + + // Start tracking idle peers for task assignments + peering := make(chan *peeringEvent, 64) // arbitrary buffer, just some burst protection + + peeringSub := s.peers.SubscribeEvents(peering) + defer peeringSub.Unsubscribe() + + s.idles = make(map[string]*peerConnection) + for _, peer := range s.peers.AllPeers() { + s.idles[peer.id] = peer + } + // Nofity any tester listening for startup events + if s.syncStarting != nil { + s.syncStarting() + } + for { + // Something happened, try to assign new tasks to any idle peers + s.assingTasks(responses, requestFails, cancel) + + // Wait for something to happen + select { + case event := <-peering: + // A peer joined or left, the tasks queue and allocations need to be + // checked for potential assignment or reassignment + peerid := event.peer.id + if event.join { + s.idles[peerid] = event.peer + } else { + s.revertRequests(peerid) + delete(s.idles, peerid) + } + + case errc := <-s.terminate: + errc <- nil + return nil, errTerminated + + case head := <-s.headEvents: + // New head was announced, try to integrate it. If successful, nothing + // needs to be done as the head simply extended the last range. For now + // we don't seamlessly integrate reorgs to keep things simple. If the + // network starts doing many mini reorgs, it might be worthwhile handling + // a limited depth without an error. + if reorged := s.processNewHead(head); reorged { + return head, errSyncReorged + } + // New head was integrated into the skeleton chain. If the backfiller + // is still running, it will pick it up. If it already terminated, + // a new cycle needs to be spun up. + if s.scratchHead == 0 { + s.filler.resume() + } + + case req := <-requestFails: + s.revertRequest(req) + + case res := <-responses: + // Process the batch of headers. If though processing we managed to + // link the curret subchain to a previously downloaded one, abort the + // sync and restart with the merged subchains. We could probably hack + // the internal state to switch the scratch space over to the tail of + // the extended subchain, but since the scenario is rare, it's cleaner + // to rely on the restart mechanism than a stateful modification. + if merged := s.processResponse(res); merged { + return nil, errSyncMerged + } + // If we've just reached the genesis block, tear down the sync cycle + // and restart it to resume the backfiller. We could just as well do + // a signalling here, but it's a tad cleaner to have only one entry + // pathway to suspending/resuming it. + return nil, errSyncLinked + } + } +} + +// initSync attempts to get the skeleton sync into a consistent state wrt any +// past state on disk and the newly requested head to sync to. If the new head +// is nil, the method will return and continue from the previous head. +func (s *skeleton) initSync(head *types.Header) { + // Extract the head number, we'll need it all over + number := head.Number.Uint64() + + // Retrieve the previously saved sync progress + if status := rawdb.ReadSkeletonSyncStatus(s.db); len(status) > 0 { + s.progress = new(skeletonProgress) + if err := json.Unmarshal(status, s.progress); err != nil { + log.Error("Failed to decode skeleton sync status", "err", err) + } else { + // Previous sync was available, print some continuation logs + for _, subchain := range s.progress.Subchains { + log.Debug("Restarting skeleton subchain", "head", subchain.Head, "tail", subchain.Tail) + } + // Create a new subchain for the head (unless the last can be extended), + // trimming anything it would overwrite + headchain := &subchain{ + Head: number, + Tail: number, + Next: head.ParentHash, + } + for len(s.progress.Subchains) > 0 { + // If the last chain is above the new head, delete altogether + lastchain := s.progress.Subchains[0] + if lastchain.Tail >= headchain.Tail { + log.Debug("Dropping skeleton subchain", "head", lastchain.Head, "tail", lastchain.Tail) + s.progress.Subchains = s.progress.Subchains[1:] + continue + } + // Otherwise truncate the last chain if needed and abort trimming + if lastchain.Head >= headchain.Tail { + log.Debug("Trimming skeleton subchain", "oldhead", lastchain.Head, "newhead", headchain.Tail-1, "tail", lastchain.Tail) + lastchain.Head = headchain.Tail - 1 + } + break + } + // If the last subchain can be extended, we're lucky. Otherwise create + // a new subchain sync task. + var extended bool + if n := len(s.progress.Subchains); n > 0 { + lastchain := s.progress.Subchains[0] + if lastchain.Head == headchain.Tail-1 { + lasthead := rawdb.ReadSkeletonHeader(s.db, lastchain.Head) + if lasthead.Hash() == head.ParentHash { + log.Debug("Extended skeleton subchain with new head", "head", headchain.Tail, "tail", lastchain.Tail) + lastchain.Head = headchain.Tail + extended = true + } + } + } + if !extended { + log.Debug("Created new skeleton subchain", "head", number, "tail", number) + s.progress.Subchains = append([]*subchain{headchain}, s.progress.Subchains...) + } + // Update the database with the new sync stats and insert the new + // head header. We won't delete any trimmed skeleton headers since + // those will be outside the index space of the many subchains and + // the database space will be reclaimed eventually when processing + // blocks above the current head (TODO(karalabe): don't forget). + batch := s.db.NewBatch() + + rawdb.WriteSkeletonHeader(batch, head) + s.saveSyncStatus(batch) + + if err := batch.Write(); err != nil { + log.Crit("Failed to write skeleton sync status", "err", err) + } + return + } + } + // Either we've failed to decode the previus state, or there was none. Start + // a fresh sync with a single subchain represented by the currently sent + // chain head. + s.progress = &skeletonProgress{ + Subchains: []*subchain{ + { + Head: number, + Tail: number, + Next: head.ParentHash, + }, + }, + } + batch := s.db.NewBatch() + + rawdb.WriteSkeletonHeader(batch, head) + s.saveSyncStatus(batch) + + if err := batch.Write(); err != nil { + log.Crit("Failed to write initial skeleton sync status", "err", err) + } + log.Debug("Created initial skeleton subchain", "head", number, "tail", number) +} + +// saveSyncStatus marshals the remaining sync tasks into leveldb. +func (s *skeleton) saveSyncStatus(db ethdb.KeyValueWriter) { + status, err := json.Marshal(s.progress) + if err != nil { + panic(err) // This can only fail during implementation + } + rawdb.WriteSkeletonSyncStatus(db, status) +} + +// processNewHead does the internal shuffling for a new head marker and either +// accepts and integrates it into the skeleton or requests a reorg. Upon reorg, +// the syncer will tear itself down and restart with a fresh head. It is simpler +// to reconstruct the sync state than to mutate it and hope for the best. +func (s *skeleton) processNewHead(head *types.Header) bool { + // If the header cannot be inserted without interruption, return an error for + // the outer loop to tear down the skeleton sync and restart it + number := head.Number.Uint64() + + lastchain := s.progress.Subchains[0] + if lastchain.Tail >= number { + log.Warn("Beacon chain reorged", "tail", lastchain.Tail, "newHead", number) + return true + } + if lastchain.Head+1 < number { + log.Warn("Beacon chain gapped", "head", lastchain.Head, "newHead", number) + return true + } + if parent := rawdb.ReadSkeletonHeader(s.db, number-1); parent.Hash() != head.ParentHash { + log.Warn("Beacon chain forked", "ancestor", parent.Number, "hash", parent.Hash(), "want", head.ParentHash) + return true + } + // New header seems to be in the last subchain range. Unwind any extra headers + // from the chain tip and insert the new head. We won't delete any trimmed + // skeleton headers since those will be outside the index space of the many + // subchains and the database space will be reclaimed eventually when processing + // blocks above the current head (TODO(karalabe): don't forget). + batch := s.db.NewBatch() + + rawdb.WriteSkeletonHeader(batch, head) + lastchain.Head = number + s.saveSyncStatus(batch) + + if err := batch.Write(); err != nil { + log.Crit("Failed to write skeleton sync status", "err", err) + } + return false +} + +// assingTasks attempts to match idle peers to pending header retrievals. +func (s *skeleton) assingTasks(success chan *headerResponse, fail chan *headerRequest, cancel chan struct{}) { + // Sort the peers by download capacity to use faster ones if many available + idlers := &peerCapacitySort{ + peers: make([]*peerConnection, 0, len(s.idles)), + caps: make([]int, 0, len(s.idles)), + } + targetTTL := s.peers.rates.TargetTimeout() + for _, peer := range s.idles { + idlers.peers = append(idlers.peers, peer) + idlers.caps = append(idlers.caps, s.peers.rates.Capacity(peer.id, eth.BlockHeadersMsg, targetTTL)) + } + if len(idlers.peers) == 0 { + return + } + sort.Sort(idlers) + + // Find header regions not yet downloading and fill them + for task, owner := range s.scratchOwners { + // If we're out of idle peers, stop assigning tasks + if len(idlers.peers) == 0 { + return + } + // Skip any tasks already filling + if owner != "" { + continue + } + // If we've reached the genesis, stop assigning tasks + if uint64(task*requestHeaders) >= s.scratchHead { + return + } + // Found a task and have peers available, assign it + idle := idlers.peers[0] + + idlers.peers = idlers.peers[1:] + idlers.caps = idlers.caps[1:] + + // Matched a pending task to an idle peer, allocate a unique request id + var reqid uint64 + for { + reqid = uint64(rand.Int63()) + if reqid == 0 { + continue + } + if _, ok := s.requests[reqid]; ok { + continue + } + break + } + // Generate the network query and send it to the peer + req := &headerRequest{ + peer: idle.id, + id: reqid, + deliver: success, + revert: fail, + cancel: cancel, + stale: make(chan struct{}), + head: s.scratchHead - uint64(task*requestHeaders), + } + s.requests[reqid] = req + delete(s.idles, idle.id) + + // Generate the network query and send it to the peer + go s.executeTask(idle, req) + + // Inject the request into the task to block further assignments + s.scratchOwners[task] = idle.id + } +} + +// executeTask executes a single fetch request, blocking until either a result +// arrives or a timeouts / cancellation is triggered. The method should be run +// on its own goroutine and will deliver on the requested channels. +func (s *skeleton) executeTask(peer *peerConnection, req *headerRequest) { + start := time.Now() + resCh := make(chan *eth.Response) + + // Figure out how many headers to fetch. Usually this will be a full batch, + // but for the very tail of the chain, trim the request to the number left. + // Since nodes may or may not return the genesis header for a batch request, + // don't even request it. The parent hash of block #1 is enough to link. + requestCount := requestHeaders + if req.head < requestHeaders { + requestCount = int(req.head) + } + peer.log.Trace("Fetching skeleton headers", "from", req.head, "count", requestCount) + netreq, err := peer.peer.RequestHeadersByNumber(req.head, requestCount, 0, true, resCh) + if err != nil { + peer.log.Trace("Failed to request headers", "err", err) + s.scheduleRevertRequest(req) + return + } + defer netreq.Close() + + // Wait until the response arrives, the request is cancelled or times out + ttl := s.peers.rates.TargetTimeout() + + timeoutTimer := time.NewTimer(ttl) + defer timeoutTimer.Stop() + + select { + case <-req.cancel: + peer.log.Debug("Header request cancelled") + s.scheduleRevertRequest(req) + + case <-timeoutTimer.C: + // Header retrieval timed out, update the metrics + peer.log.Trace("Header request timed out", "elapsed", ttl) + headerTimeoutMeter.Mark(1) + s.peers.rates.Update(peer.id, eth.BlockHeadersMsg, 0, 0) + s.scheduleRevertRequest(req) + + case res := <-resCh: + // Headers successfully retrieved, update the metrics + headers := *res.Res.(*eth.BlockHeadersPacket) + + headerReqTimer.Update(time.Since(start)) + s.peers.rates.Update(peer.id, eth.BlockHeadersMsg, res.Time, len(headers)) + + // Cross validate the headers with the requests + switch { + case len(headers) == 0: + // No headers were delivered, reject the response and reschedule + peer.log.Debug("No headers delivered") + res.Done <- errors.New("no headers delivered") + s.scheduleRevertRequest(req) + + case headers[0].Number.Uint64() != req.head: + // Header batch anchored at non-requested number + peer.log.Debug("Invalid header response head", "have", headers[0].Number, "want", req.head) + res.Done <- errors.New("invalid header batch anchor") + s.scheduleRevertRequest(req) + + case headers[0].Number.Uint64() >= requestHeaders && len(headers) != requestHeaders: + // Invalid number of non-genesis headers delivered, reject the response and reschedule + peer.log.Debug("Invalid non-genesis header count", "have", len(headers), "want", requestHeaders) + res.Done <- errors.New("not enough non-genesis headers delivered") + s.scheduleRevertRequest(req) + + case headers[0].Number.Uint64() < requestHeaders && uint64(len(headers)) != headers[0].Number.Uint64(): + // Invalid number of genesis headers delivered, reject the response and reschedule + peer.log.Debug("Invalid genesis header count", "have", len(headers), "want", headers[0].Number.Uint64()) + res.Done <- errors.New("not enough genesis headers delivered") + s.scheduleRevertRequest(req) + + default: + // Packet seems structurally valid, check hash progression and if it + // is correct too, deliver for storage + for i := 0; i < len(headers)-1; i++ { + if headers[i].ParentHash != headers[i+1].Hash() { + peer.log.Debug("Invalid genesis header count", "have", len(headers), "want", headers[0].Number.Uint64()) + res.Done <- errors.New("not enough genesis headers delivered") + s.scheduleRevertRequest(req) + return + } + } + // Hash chain is valid. The delivery might still be junk as we're + // downloading batches concurrently (so no way to link the headers + // until gaps are filled); in that case, we'll nuke the peer when + // we detect the fault. + res.Done <- nil + + select { + case req.deliver <- &headerResponse{ + peer: peer, + reqid: req.id, + headers: headers, + }: + case <-req.cancel: + } + } + } +} + +// revertRequests locates all the currently pending reuqests from a particular +// peer and reverts them, rescheduling for others to fulfill. +func (s *skeleton) revertRequests(peer string) { + // Gather the requests first, revertals need the lock too + var requests []*headerRequest + for _, req := range s.requests { + if req.peer == peer { + requests = append(requests, req) + } + } + // Revert all the requests matching the peer + for _, req := range requests { + s.revertRequest(req) + } +} + +// scheduleRevertRequest asks the event loop to clean up a request and return +// all failed retrieval tasks to the scheduler for reassignment. +func (s *skeleton) scheduleRevertRequest(req *headerRequest) { + select { + case req.revert <- req: + // Sync event loop notified + case <-req.cancel: + // Sync cycle got cancelled + case <-req.stale: + // Request already reverted + } +} + +// revertRequest cleans up a request and returns all failed retrieval tasks to +// the scheduler for reassignment. +// +// Note, this needs to run on the event runloop thread to reschedule to idle peers. +// On peer threads, use scheduleRevertRequest. +func (s *skeleton) revertRequest(req *headerRequest) { + log.Trace("Reverting header request", "peer", req.peer, "reqid", req.id) + select { + case <-req.stale: + log.Trace("Header request already reverted", "peer", req.peer, "reqid", req.id) + return + default: + } + close(req.stale) + + // Remove the request from the tracked set + delete(s.requests, req.id) + + // Remove the request from the tracked set and mark the task as not-pending, + // ready for resheduling + s.scratchOwners[(s.scratchHead-req.head)/requestHeaders] = "" +} + +func (s *skeleton) processResponse(res *headerResponse) bool { + res.peer.log.Trace("Processing header response", "head", res.headers[0].Number, "hash", res.headers[0].Hash(), "count", len(res.headers)) + + // Whether or not the response is valid, we can mark the peer as idle and + // notify the scheduler to assign a new task. If the response is invalid, + // we'll drop the peer in a bit. + s.idles[res.peer.id] = res.peer + + // Ensure the response is for a valid request + if _, ok := s.requests[res.reqid]; !ok { + // Request stale, perhaps the peer timed out but came through in the end + res.peer.log.Warn("Unexpected header packet") + return false + } + delete(s.requests, res.reqid) + + // Insert the delivered headers into the scratch space independent of the + // content or continuation; those will be validated in a moment + head := res.headers[0].Number.Uint64() + copy(s.scratchSpace[s.scratchHead-head:], res.headers) + + // If there's still a gap in the head of the scratch space, abort + if s.scratchSpace[0] == nil { + return false + } + // Try to consume any head headers, validating the boundary conditions + var merged bool // Whether subchains were merged + + batch := s.db.NewBatch() + for s.scratchSpace[0] != nil { + // Next batch of headers available, cross-reference with the subchain + // we are extending and either accept or discard + if s.progress.Subchains[0].Next != s.scratchSpace[0].Hash() { + // Print a log messages to track what's going on + tail := s.progress.Subchains[0].Tail + want := s.progress.Subchains[0].Next + have := s.scratchSpace[0].Hash() + + log.Warn("Invalid skeleton headers", "peer", s.scratchOwners[0], "number", tail-1, "want", want, "have", have) + + // The peer delivered junk, or at least not the subchain we are + // syncing to. Free up the scratch space and assignment, reassign + // and drop the original peer. + for i := 0; i < requestHeaders; i++ { + s.scratchSpace[i] = nil + } + s.drop(s.scratchOwners[0]) + s.scratchOwners[0] = "" + break + } + // Scratch delivery matches required subchain, deliver the batch of + // headers and push the subchain forward + var consumed int + for _, header := range s.scratchSpace[:requestHeaders] { + if header != nil { // nil when the genesis is reached + consumed++ + + rawdb.WriteSkeletonHeader(batch, header) + s.pulled++ + + s.progress.Subchains[0].Tail-- + s.progress.Subchains[0].Next = header.ParentHash + } + } + // Batch of headers consumed, shift the download window forward + head := s.progress.Subchains[0].Head + tail := s.progress.Subchains[0].Tail + next := s.progress.Subchains[0].Next + + log.Trace("Primary subchain extended", "head", head, "tail", tail, "next", next) + + copy(s.scratchSpace, s.scratchSpace[requestHeaders:]) + for i := 0; i < requestHeaders; i++ { + s.scratchSpace[scratchHeaders-i-1] = nil + } + copy(s.scratchOwners, s.scratchOwners[1:]) + s.scratchOwners[scratchHeaders/requestHeaders-1] = "" + + s.scratchHead -= uint64(consumed) + + // If the subchain extended into the next subchain, we need to handle + // the overlap. Since there could be many overlaps (come on), do this + // in a loop. + for len(s.progress.Subchains) > 1 && s.progress.Subchains[1].Head >= s.progress.Subchains[0].Tail { + // Extract some stats from the second subchain + head := s.progress.Subchains[1].Head + tail := s.progress.Subchains[1].Tail + next := s.progress.Subchains[1].Next + + // Since we just overwrote part of the next subchain, we need to trim + // its head independent of matching or mismatching content + if s.progress.Subchains[1].Tail >= s.progress.Subchains[0].Tail { + // Fully overwritten, get rid of the subchain as a whole + log.Debug("Previous subchain fully overwritten", "head", head, "tail", tail, "next", next) + s.progress.Subchains = append(s.progress.Subchains[:1], s.progress.Subchains[2:]...) + continue + } else { + // Partially overwritten, trim the head to the overwritten size + log.Debug("Previous subchain partially overwritten", "head", head, "tail", tail, "next", next) + s.progress.Subchains[1].Head = s.progress.Subchains[0].Tail - 1 + } + // If the old subchain is an extension of the new one, merge the two + // and let the skeleton syncer restart (to clean internal state) + if rawdb.ReadSkeletonHeader(s.db, s.progress.Subchains[1].Head).Hash() == s.progress.Subchains[0].Next { + log.Debug("Previous subchain merged", "head", head, "tail", tail, "next", next) + s.progress.Subchains[0].Tail = s.progress.Subchains[1].Tail + s.progress.Subchains[0].Next = s.progress.Subchains[1].Next + + s.progress.Subchains = append(s.progress.Subchains[:1], s.progress.Subchains[2:]...) + merged = true + } + } + } + s.saveSyncStatus(batch) + if err := batch.Write(); err != nil { + log.Crit("Failed to write skeleton headers and progress", "err", err) + } + // Print a progress report to make the UX a bit nicer + left := s.progress.Subchains[0].Tail - 1 + if time.Since(s.logged) > 8*time.Second || left == 0 { + s.logged = time.Now() + + if s.pulled == 0 { + log.Info("Beacon sync starting", "left", left) + } else { + eta := float64(time.Since(s.started)) / float64(s.pulled) * float64(left) + log.Info("Syncing beacon headers", "downloaded", s.pulled, "left", left, "eta", common.PrettyDuration(eta)) + } + } + return merged +} + +// Head retrieves the current head tracked by the skeleton syncer. This method +// is meant to be used by the backfiller, whose life cycle is controlled by the +// skeleton syncer. +// +// Note, the method will not use the internal state of the skeleton, but will +// rather blindly pull stuff from the database. This is fine, because the back- +// filler will only run when the skeleton chain is fully downloaded and stable. +// There might be new heads appended, but those are stomic from the perspective +// of this method. Any head reorg will first tear down the backfiller and only +// then make the modification. +func (s *skeleton) Head() (*types.Header, error) { + // Read the current sync progress from disk and figure out the current head. + // Although there's a lot of error handling here, these are mostly as sanity + // checks to avoid crashing if a programming error happens. These should not + // happen in live code. + status := rawdb.ReadSkeletonSyncStatus(s.db) + if len(status) == 0 { + return nil, errors.New("beacon sync not yet started") + } + s.progress = new(skeletonProgress) + if err := json.Unmarshal(status, s.progress); err != nil { + return nil, err + } + if s.progress.Subchains[0].Tail != 1 { + return nil, errors.New("beacon sync not yet finished") + } + return rawdb.ReadSkeletonHeader(s.db, s.progress.Subchains[0].Head), nil +} + +// Header retrieves a specific header tracked by the skeleton syncer. This method +// is meant to be used by the backfiller, whose life cycle is controlled by the +// skeleton syncer. +// +// Note, outside the permitted runtimes, this method might return nil results and +// subsequent calls might return headers from different chains. +func (s *skeleton) Header(number uint64) *types.Header { + return rawdb.ReadSkeletonHeader(s.db, number) +} diff --git a/eth/downloader/skeleton_test.go b/eth/downloader/skeleton_test.go new file mode 100644 index 0000000000000..7c2e07a43866a --- /dev/null +++ b/eth/downloader/skeleton_test.go @@ -0,0 +1,257 @@ +// Copyright 2021 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package downloader + +import ( + "encoding/json" + "math/big" + "os" + "testing" + + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" +) + +// hookedBackfiller is a tester backfiller with all interface methods mocked and +// hooked so tests can implement only the things they need. +type hookedBackfiller struct { + // suspendHook is an optional hook to be called when the filler is requested + // to be suspended. + suspendHook func() + + // resumeHook is an optional hook to be called when the filler is requested + // to be resumed. + resumeHook func() +} + +// suspend requests the backfiller to abort any running full or snap sync +// based on the skeleton chain as it might be invalid. The backfiller should +// gracefully handle multiple consecutive suspends without a resume, even +// on initial sartup. +func (hf *hookedBackfiller) suspend() { + if hf.suspendHook != nil { + hf.suspendHook() + } +} + +// resume requests the backfiller to start running fill or snap sync based on +// the skeleton chain as it has successfully been linked. Appending new heads +// to the end of the chain will not result in suspend/resume cycles. +func (hf *hookedBackfiller) resume() { + if hf.resumeHook != nil { + hf.resumeHook() + } +} + +// newNoopBackfiller creates a hooked backfiller with all callbacks disabled, +// essentially acting as a noop. +func newNoopBackfiller() backfiller { + return new(hookedBackfiller) +} + +// Tests various sync initialzations based on previous leftovers in the database +// and announced heads. +func TestSkeletonSyncInit(t *testing.T) { + log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true)))) + + // Create a few key headers + var ( + genesis = &types.Header{Number: big.NewInt(0)} + block49 = &types.Header{Number: big.NewInt(49)} + block49B = &types.Header{Number: big.NewInt(49), Extra: []byte("B")} + block50 = &types.Header{Number: big.NewInt(50), ParentHash: block49.Hash()} + ) + tests := []struct { + headers []*types.Header // Database content (beside the genesis) + oldstate []*subchain // Old sync state with various interrupted subchains + head *types.Header // New head header to announce to reorg to + newstate []*subchain // Expected sync state after the reorg + }{ + // Completely empty database with only the genesis set. The sync is expected + // to create a single subchain with the requested head. + { + head: block50, + newstate: []*subchain{{Head: 50, Tail: 50}}, + }, + // Empty database with only the genesis set with a leftover empty sync + // progess. This is a synthetic case, just for the sake of covering things. + { + oldstate: []*subchain{}, + head: block50, + newstate: []*subchain{{Head: 50, Tail: 50}}, + }, + // A single leftover subchain is present, older than the new head. The + // old subchain should be left as is and a new one appended to the sync + // status. + { + oldstate: []*subchain{{Head: 10, Tail: 5}}, + head: block50, + newstate: []*subchain{ + {Head: 50, Tail: 50}, + {Head: 10, Tail: 5}, + }, + }, + // Multiple leftover subchains are present, older than the new head. The + // old subchains should be left as is and a new one appended to the sync + // status. + { + oldstate: []*subchain{ + {Head: 20, Tail: 15}, + {Head: 10, Tail: 5}, + }, + head: block50, + newstate: []*subchain{ + {Head: 50, Tail: 50}, + {Head: 20, Tail: 15}, + {Head: 10, Tail: 5}, + }, + }, + // A single leftover subchain is present, newer than the new head. The + // newer subchain should be deleted and a fresh one created for the head. + { + oldstate: []*subchain{{Head: 65, Tail: 60}}, + head: block50, + newstate: []*subchain{{Head: 50, Tail: 50}}, + }, + // Multiple leftover subchain is present, newer than the new head. The + // newer subchains should be deleted and a fresh one created for the head. + { + oldstate: []*subchain{ + {Head: 75, Tail: 70}, + {Head: 65, Tail: 60}, + }, + head: block50, + newstate: []*subchain{{Head: 50, Tail: 50}}, + }, + + // Two leftover subchains are present, one fully older and one fully + // newer than the announced head. The head should delete the newer one, + // keeping the older one. + { + oldstate: []*subchain{ + {Head: 65, Tail: 60}, + {Head: 10, Tail: 5}, + }, + head: block50, + newstate: []*subchain{ + {Head: 50, Tail: 50}, + {Head: 10, Tail: 5}, + }, + }, + // Multiple leftover subchains are present, some fully older and some + // fully newer than the announced head. The head should delete the newer + // ones, keeping the older ones. + { + oldstate: []*subchain{ + {Head: 75, Tail: 70}, + {Head: 65, Tail: 60}, + {Head: 20, Tail: 15}, + {Head: 10, Tail: 5}, + }, + head: block50, + newstate: []*subchain{ + {Head: 50, Tail: 50}, + {Head: 20, Tail: 15}, + {Head: 10, Tail: 5}, + }, + }, + // A single leftover subchain is present and the new head is extending + // it with one more header. We expect the subchain head to be pushed + // forward. + { + headers: []*types.Header{block49}, + oldstate: []*subchain{{Head: 49, Tail: 5}}, + head: block50, + newstate: []*subchain{{Head: 50, Tail: 5}}, + }, + // A single leftover subchain is present and although the new head does + // extend it number wise, the hash chain does not link up. We expect a + // new subchain to be created for the dangling head. + { + headers: []*types.Header{block49B}, + oldstate: []*subchain{{Head: 49, Tail: 5}}, + head: block50, + newstate: []*subchain{ + {Head: 50, Tail: 50}, + {Head: 49, Tail: 5}, + }, + }, + // A single leftover subchain is present. A new head is announced that + // links into the middle of it, correctly anchoring into an existing + // header. We expect the old subchain to be truncated and extended with + // the new head. + { + headers: []*types.Header{block49}, + oldstate: []*subchain{{Head: 100, Tail: 5}}, + head: block50, + newstate: []*subchain{{Head: 50, Tail: 5}}, + }, + // A single leftover subchain is present. A new head is announced that + // links into the middle of it, but does not anchor into an existing + // header. We expect the old subchain to be truncated and a new chain + // be created for the dangling head. + { + headers: []*types.Header{block49B}, + oldstate: []*subchain{{Head: 100, Tail: 5}}, + head: block50, + newstate: []*subchain{ + {Head: 50, Tail: 50}, + {Head: 49, Tail: 5}, + }, + }, + } + for i, tt := range tests { + // Create a fresh database and initialize it with the starting state + db := rawdb.NewMemoryDatabase() + + rawdb.WriteHeader(db, genesis) + for _, header := range tt.headers { + rawdb.WriteSkeletonHeader(db, header) + } + if tt.oldstate != nil { + blob, _ := json.Marshal(&skeletonProgress{Subchains: tt.oldstate}) + rawdb.WriteSkeletonSyncStatus(db, blob) + } + // Create a skeleton sync and run a cycle + wait := make(chan struct{}) + + skeleton := newSkeleton(db, newPeerSet(), func(string) {}, newNoopBackfiller()) + skeleton.syncStarting = func() { close(wait) } + skeleton.Sync(tt.head) + + <-wait + skeleton.Terminate() + + // Ensure the correct resulting sync status + var progress skeletonProgress + json.Unmarshal(rawdb.ReadSkeletonSyncStatus(db), &progress) + + if len(progress.Subchains) != len(tt.newstate) { + t.Errorf("test %d: subchain count mismatch: have %d, want %d", i, len(progress.Subchains), len(tt.newstate)) + continue + } + for j := 0; j < len(progress.Subchains); j++ { + if progress.Subchains[j].Head != tt.newstate[j].Head { + t.Errorf("test %d: subchain %d head mismatch: have %d, want %d", i, j, progress.Subchains[j].Head, tt.newstate[j].Head) + } + if progress.Subchains[j].Tail != tt.newstate[j].Tail { + t.Errorf("test %d: subchain %d tail mismatch: have %d, want %d", i, j, progress.Subchains[j].Tail, tt.newstate[j].Tail) + } + } + } +} diff --git a/eth/handler.go b/eth/handler.go index 26a90495b2ef1..8290925185691 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -173,6 +173,26 @@ func newHandler(config *handlerConfig) (*handler, error) { h.checkpointNumber = (config.Checkpoint.SectionIndex+1)*params.CHTFrequency - 1 h.checkpointHash = config.Checkpoint.SectionHead } + // If sync succeeds, pass a callback to potentially disable snap sync mode + // and enable transaction propagation. + success := func() { + // If we were running snap sync and it finished, disable doing another + // round on next sync cycle + if atomic.LoadUint32(&h.snapSync) == 1 { + log.Info("Snap sync complete, auto disabling") + atomic.StoreUint32(&h.snapSync, 0) + } + // If we've successfully finished a sync cycle and passed any required + // checkpoint, enable accepting transactions from the network + head := h.chain.CurrentBlock() + if head.NumberU64() >= h.checkpointNumber { + // Checkpoint passed, sanity check the timestamp to have a fallback mechanism + // for non-checkpointed (number = 0) private networks. + if head.Time() >= uint64(time.Now().AddDate(0, -1, 0).Unix()) { + atomic.StoreUint32(&h.acceptTxs, 1) + } + } + } // Construct the downloader (long sync) and its backing state bloom if snap // sync is requested. The downloader is responsible for deallocating the state // bloom when it's done. @@ -183,7 +203,7 @@ func newHandler(config *handlerConfig) (*handler, error) { if atomic.LoadUint32(&h.snapSync) == 1 && atomic.LoadUint32(&h.snapSync) == 0 { h.stateBloom = trie.NewSyncBloom(config.BloomCache, config.Database) } - h.downloader = downloader.New(h.checkpointNumber, config.Database, h.stateBloom, h.eventMux, h.chain, nil, h.removePeer) + h.downloader = downloader.New(h.checkpointNumber, config.Database, h.stateBloom, h.eventMux, h.chain, nil, h.removePeer, success) // Construct the fetcher (short sync) validator := func(header *types.Header) error { diff --git a/eth/sync.go b/eth/sync.go index b8ac67d3b2d19..384b42bfaa80d 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -113,9 +113,9 @@ func (cs *chainSyncer) loop() { defer cs.force.Stop() for { - if op := cs.nextSyncOp(); op != nil { - cs.startSync(op) - } + //if op := cs.nextSyncOp(); op != nil { + // cs.startSync(op) + //} select { case <-cs.peerEventCh: // Peer information changed, recheck. @@ -227,7 +227,7 @@ func (h *handler) doSync(op *chainSyncOp) error { } } // Run the sync cycle, and disable snap sync if we're past the pivot block - err := h.downloader.Synchronise(op.peer.ID(), op.head, op.td, op.mode) + err := h.downloader.LegacySync(op.peer.ID(), op.head, op.td, op.mode) if err != nil { return err } diff --git a/internal/web3ext/web3ext.go b/internal/web3ext/web3ext.go index c4bdbaeb8d208..acc38dcd40ee6 100644 --- a/internal/web3ext/web3ext.go +++ b/internal/web3ext/web3ext.go @@ -152,6 +152,11 @@ web3._extend({ call: 'admin_importChain', params: 1 }), + new web3._extend.Method({ + name: 'newHead', + call: 'admin_newHead', + params: 1 + }), new web3._extend.Method({ name: 'sleepBlocks', call: 'admin_sleepBlocks',