diff --git a/eth/downloader/skeleton.go b/eth/downloader/skeleton.go index fb51527599c05..67fe39d052ae8 100644 --- a/eth/downloader/skeleton.go +++ b/eth/downloader/skeleton.go @@ -400,8 +400,10 @@ func (s *skeleton) sync(head *types.Header) (*types.Header, error) { // checked for potential assignment or reassignment peerid := event.peer.id if event.join { + log.Debug("Joining skeleton peer", "id", peerid) s.idles[peerid] = event.peer } else { + log.Debug("Leaving skeleton peer", "id", peerid) s.revertRequests(peerid) delete(s.idles, peerid) } @@ -763,8 +765,8 @@ func (s *skeleton) executeTask(peer *peerConnection, req *headerRequest) { // is correct too, deliver for storage for i := 0; i < len(headers)-1; i++ { if headers[i].ParentHash != headers[i+1].Hash() { - peer.log.Debug("Invalid genesis header count", "have", len(headers), "want", headers[0].Number.Uint64()) - res.Done <- errors.New("not enough genesis headers delivered") + peer.log.Debug("Invalid hash progression", "index", i, "wantparenthash", headers[i].ParentHash, "haveparenthash", headers[i+1].Hash()) + res.Done <- errors.New("invalid hash progression") s.scheduleRevertRequest(req) return } diff --git a/eth/downloader/skeleton_test.go b/eth/downloader/skeleton_test.go index 53f10c66913ba..0aeabd87b8157 100644 --- a/eth/downloader/skeleton_test.go +++ b/eth/downloader/skeleton_test.go @@ -18,12 +18,18 @@ package downloader import ( "encoding/json" + "errors" + "fmt" "math/big" "os" + "sync/atomic" "testing" + "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/log" ) @@ -39,6 +45,12 @@ type hookedBackfiller struct { resumeHook func() } +// newHookedBackfiller creates a hooked backfiller with all callbacks disabled, +// essentially acting as a noop. +func newHookedBackfiller() backfiller { + return new(hookedBackfiller) +} + // suspend requests the backfiller to abort any running full or snap sync // based on the skeleton chain as it might be invalid. The backfiller should // gracefully handle multiple consecutive suspends without a resume, even @@ -58,10 +70,117 @@ func (hf *hookedBackfiller) resume() { } } -// newNoopBackfiller creates a hooked backfiller with all callbacks disabled, -// essentially acting as a noop. -func newNoopBackfiller() backfiller { - return new(hookedBackfiller) +// skeletonTestPeer is a mock peer that can only serve header requests from a +// pre-perated header chain (which may be arbitrarilly wrong for testing). +// +// Requesting anything else from these peers will hard panic. Note, do *not* +// implement any other methods. We actually want to make sure that the skeleton +// syncer only depends on - and will only ever do so - on header requests. +type skeletonTestPeer struct { + id string // Unique identifier of the mock peer + headers []*types.Header // Headers to serve when requested + served uint64 // Number of headers served by this peer + dropped uint64 // Flag whether the peer was dropped (stop responding) +} + +// newSkeletonTestPeer creates a new mock peer to test the skeleton sync with. +// The only purpose of the constructor is to ensure we don't forget to set some +// mandatory field vs a struct literal initialization. +func newSkeletonTestPeer(id string, headers []*types.Header) *skeletonTestPeer { + return &skeletonTestPeer{ + id: id, + headers: headers, + } +} + +// RequestHeadersByNumber constructs a GetBlockHeaders function based on a numbered +// origin; associated with a particular peer in the download tester. The returned +// function can be used to retrieve batches of headers from the particular peer. +func (p *skeletonTestPeer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool, sink chan *eth.Response) (*eth.Request, error) { + // Since skeleton test peer are in-memory mocks, dropping the does not make + // them inaccepssible. As such, check a local `dropped` field to see if the + // peer has been dropped and should not respond any more. + if atomic.LoadUint64(&p.dropped) != 0 { + return nil, errors.New("peer already dropped") + } + // Skeleton sync retrieves batches of headers going backward without gaps. + // This ensures we can follow a clean parent progression without any reorg + // hiccups. There is no need for any other type of header retrieval, so do + // panic if there's such a request. + if !reverse || skip != 0 { + // Note, if other clients want to do these kinds of requests, it's their + // problem, it will still work. We just don't want *us* making complicated + // requests without a very strong reason to. + panic(fmt.Sprintf("invalid header retrieval: reverse %v, want true; skip %d, want 0", reverse, skip)) + } + // If the skeleton syncer requests the genesis block, panic. Whilst it could + // be considered a valid request, our code specifically should not request it + // ever since we want to link up headers to an existing local chain, which at + // worse will be the genesis. + if int64(origin)-int64(amount) < 0 { + panic(fmt.Sprintf("headers requested before (or at) genesis: origin %d, amount %d", origin, amount)) + } + // To make concurrency easier, the skeleton syncer always requests fixed size + // batches of headers. Panic if the peer is requested an amount other than the + // configured batch size (apart from the request leading to the genesis). + if amount > requestHeaders || (amount < requestHeaders && origin > uint64(amount)) { + panic(fmt.Sprintf("non-chunk size header batch requested: requested %d, want %d, origin %d", amount, requestHeaders, origin)) + } + // Simple reverse header retrieval. Fill from the peer's chain and return + headers := make([]*types.Header, 0, amount) + if len(p.headers) > int(origin) { // Don't serve headers if we're missing the origin + for i := 0; i < amount; i++ { + // Consider nil headers as a form of attack and withhold them. Nil + // cannot be decoded from RLP, so it's not possible to produce an + // attack by sending/receiving those over eth. + header := p.headers[int(origin)-i] + if header == nil { + continue + } + headers = append(headers, header) + } + } + atomic.AddUint64(&p.served, uint64(len(headers))) + + hashes := make([]common.Hash, len(headers)) + for i, header := range headers { + hashes[i] = header.Hash() + } + // Deliver the headers to the downloader + req := ð.Request{ + Peer: p.id, + } + res := ð.Response{ + Req: req, + Res: (*eth.BlockHeadersPacket)(&headers), + Meta: hashes, + Time: 1, + Done: make(chan error), + } + go func() { + sink <- res + if err := <-res.Done; err != nil { + log.Warn("Skeleton test peer response rejected", "err", err) + atomic.AddUint64(&p.dropped, 1) + } + }() + return req, nil +} + +func (p *skeletonTestPeer) Head() (common.Hash, *big.Int) { + panic("skeleton sync must not request the remote head") +} + +func (p *skeletonTestPeer) RequestHeadersByHash(common.Hash, int, int, bool, chan *eth.Response) (*eth.Request, error) { + panic("skeleton sync must not request headers by hash") +} + +func (p *skeletonTestPeer) RequestBodies([]common.Hash, chan *eth.Response) (*eth.Request, error) { + panic("skeleton sync must not request block bodies") +} + +func (p *skeletonTestPeer) RequestReceipts([]common.Hash, chan *eth.Response) (*eth.Request, error) { + panic("skeleton sync must not request receipts") } // Tests various sync initialzations based on previous leftovers in the database @@ -228,7 +347,7 @@ func TestSkeletonSyncInit(t *testing.T) { // Create a skeleton sync and run a cycle wait := make(chan struct{}) - skeleton := newSkeleton(db, newPeerSet(), func(string) {}, newNoopBackfiller()) + skeleton := newSkeleton(db, newPeerSet(), nil, newHookedBackfiller()) skeleton.syncStarting = func() { close(wait) } skeleton.Sync(tt.head, true) @@ -257,8 +376,6 @@ func TestSkeletonSyncInit(t *testing.T) { // Tests that a running skeleton sync can be extended with properly linked up // headers but not with side chains. func TestSkeletonSyncExtend(t *testing.T) { - log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true)))) - // Create a few key headers var ( genesis = &types.Header{Number: big.NewInt(0)} @@ -345,7 +462,7 @@ func TestSkeletonSyncExtend(t *testing.T) { // Create a skeleton sync and run a cycle wait := make(chan struct{}) - skeleton := newSkeleton(db, newPeerSet(), func(string) {}, newNoopBackfiller()) + skeleton := newSkeleton(db, newPeerSet(), nil, newHookedBackfiller()) skeleton.syncStarting = func() { close(wait) } skeleton.Sync(tt.head, true) @@ -373,3 +490,329 @@ func TestSkeletonSyncExtend(t *testing.T) { } } } + +// Tests that the skeleton sync correctly retrieves headers from one or more +// peers without duplicates or other strange side effects. +func TestSkeletonSyncRetrievals(t *testing.T) { + log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(true)))) + + // Since skeleton headers don't need to be meaningful, beyond a parent hash + // progression, create a long fake chain to test with. + chain := []*types.Header{{Number: big.NewInt(0)}} + for i := 1; i < 10000; i++ { + chain = append(chain, &types.Header{ + ParentHash: chain[i-1].Hash(), + Number: big.NewInt(int64(i)), + }) + } + tests := []struct { + headers []*types.Header // Database content (beside the genesis) + oldstate []*subchain // Old sync state with various interrupted subchains + + head *types.Header // New head header to announce to reorg to + peers []*skeletonTestPeer // Initial peer set to start the sync with + midstate []*subchain // Expected sync state after inital cycle + midserve uint64 // Expected number of header retrievals after initial cycle + middrop uint64 // Expectd number of peers dropped after initial cycle + + newHead *types.Header // New header to annount on top of the old one + newPeer *skeletonTestPeer // New peer to join the skeleton syncer + endstate []*subchain // Expected sync state after the post-init event + endserve uint64 // Expected number of header retrievals after the post-init event + enddrop uint64 // Expectd number of peers dropped after the post-init event + }{ + // Completely empty database with only the genesis set. The sync is expected + // to create a single subchain with the requested head. No peers however, so + // the sync should be stuck without any progression. + // + // When a new peer is added, it should detect the join and fill the headers + // to the genesis block. + { + head: chain[len(chain)-1], + midstate: []*subchain{{Head: uint64(len(chain) - 1), Tail: uint64(len(chain) - 1)}}, + + newPeer: newSkeletonTestPeer("test-peer", chain), + endstate: []*subchain{{Head: uint64(len(chain) - 1), Tail: 1}}, + endserve: uint64(len(chain) - 2), // len - head - genesis + }, + // Completely empty database with only the genesis set. The sync is expected + // to create a single subchain with the requested head. With one valid peer, + // the sync is expected to complete already in the initial round. + // + // Adding a second peer should not have any effect. + { + head: chain[len(chain)-1], + peers: []*skeletonTestPeer{newSkeletonTestPeer("test-peer-1", chain)}, + midstate: []*subchain{{Head: uint64(len(chain) - 1), Tail: 1}}, + midserve: uint64(len(chain) - 2), // len - head - genesis + + newPeer: newSkeletonTestPeer("test-peer-2", chain), + endstate: []*subchain{{Head: uint64(len(chain) - 1), Tail: 1}}, + endserve: uint64(len(chain) - 2), // len - head - genesis + }, + // Completely empty database with only the genesis set. The sync is expected + // to create a single subchain with the requested head. With many valid peers, + // the sync is expected to complete already in the initial round. + // + // Adding a new peer should not have any effect. + { + head: chain[len(chain)-1], + peers: []*skeletonTestPeer{ + newSkeletonTestPeer("test-peer-1", chain), + newSkeletonTestPeer("test-peer-2", chain), + newSkeletonTestPeer("test-peer-3", chain), + }, + midstate: []*subchain{{Head: uint64(len(chain) - 1), Tail: 1}}, + midserve: uint64(len(chain) - 2), // len - head - genesis + + newPeer: newSkeletonTestPeer("test-peer-4", chain), + endstate: []*subchain{{Head: uint64(len(chain) - 1), Tail: 1}}, + endserve: uint64(len(chain) - 2), // len - head - genesis + }, + // This test checks if a peer tries to withhold a header - *on* the sync + // boundary - instead of sending the requested amount. The malicious short + // package should not be accepted. + // + // Joining with a new peer should however unblock the sync. + { + head: chain[requestHeaders+100], + peers: []*skeletonTestPeer{ + newSkeletonTestPeer("header-skipper", append(append(append([]*types.Header{}, chain[:99]...), nil), chain[100:]...)), + }, + midstate: []*subchain{{Head: requestHeaders + 100, Tail: 100}}, + midserve: requestHeaders + 101 - 3, // len - head - genesis - missing + middrop: 1, // penalize shortened header deliveries + + newPeer: newSkeletonTestPeer("good-peer", chain), + endstate: []*subchain{{Head: requestHeaders + 100, Tail: 1}}, + endserve: (requestHeaders + 101 - 3) + (100 - 1), // midserve + lenrest - genesis + enddrop: 1, // no new drops + }, + // This test checks if a peer tries to withhold a header - *off* the sync + // boundary - instead of sending the requested amount. The malicious short + // package should not be accepted. + // + // Joining with a new peer should however unblock the sync. + { + head: chain[requestHeaders+100], + peers: []*skeletonTestPeer{ + newSkeletonTestPeer("header-skipper", append(append(append([]*types.Header{}, chain[:50]...), nil), chain[51:]...)), + }, + midstate: []*subchain{{Head: requestHeaders + 100, Tail: 100}}, + midserve: requestHeaders + 101 - 3, // len - head - genesis - missing + middrop: 1, // penalize shortened header deliveries + + newPeer: newSkeletonTestPeer("good-peer", chain), + endstate: []*subchain{{Head: requestHeaders + 100, Tail: 1}}, + endserve: (requestHeaders + 101 - 3) + (100 - 1), // midserve + lenrest - genesis + enddrop: 1, // no new drops + }, + // This test checks if a peer tries to duplicate a header - *on* the sync + // boundary - instead of sending the correct sequence. The malicious duped + // package should not be accepted. + // + // Joining with a new peer should however unblock the sync. + { + head: chain[requestHeaders+100], // We want to force the 100th header to be a request boundary + peers: []*skeletonTestPeer{ + newSkeletonTestPeer("header-duper", append(append(append([]*types.Header{}, chain[:99]...), chain[98]), chain[100:]...)), + }, + midstate: []*subchain{{Head: requestHeaders + 100, Tail: 100}}, + midserve: requestHeaders + 101 - 2, // len - head - genesis + middrop: 1, // penalize invalid header sequences + + newPeer: newSkeletonTestPeer("good-peer", chain), + endstate: []*subchain{{Head: requestHeaders + 100, Tail: 1}}, + endserve: (requestHeaders + 101 - 2) + (100 - 1), // midserve + lenrest - genesis + enddrop: 1, // no new drops + }, + // This test checks if a peer tries to duplicate a header - *off* the sync + // boundary - instead of sending the correct sequence. The malicious duped + // package should not be accepted. + // + // Joining with a new peer should however unblock the sync. + { + head: chain[requestHeaders+100], // We want to force the 100th header to be a request boundary + peers: []*skeletonTestPeer{ + newSkeletonTestPeer("header-duper", append(append(append([]*types.Header{}, chain[:50]...), chain[49]), chain[51:]...)), + }, + midstate: []*subchain{{Head: requestHeaders + 100, Tail: 100}}, + midserve: requestHeaders + 101 - 2, // len - head - genesis + middrop: 1, // penalize invalid header sequences + + newPeer: newSkeletonTestPeer("good-peer", chain), + endstate: []*subchain{{Head: requestHeaders + 100, Tail: 1}}, + endserve: (requestHeaders + 101 - 2) + (100 - 1), // midserve + lenrest - genesis + enddrop: 1, // no new drops + }, + // This test checks if a peer tries to inject a different header - *on* + // the sync boundary - instead of sending the correct sequence. The bad + // package should not be accepted. + // + // Joining with a new peer should however unblock the sync. + { + head: chain[requestHeaders+100], // We want to force the 100th header to be a request boundary + peers: []*skeletonTestPeer{ + newSkeletonTestPeer("header-changer", + append( + append( + append([]*types.Header{}, chain[:99]...), + &types.Header{ + ParentHash: chain[98].Hash(), + Number: big.NewInt(int64(99)), + GasLimit: 1, + }, + ), chain[100:]..., + ), + ), + }, + midstate: []*subchain{{Head: requestHeaders + 100, Tail: 100}}, + midserve: requestHeaders + 101 - 2, // len - head - genesis + middrop: 1, // different set of headers, drop // TODO(karalabe): maybe just diff sync? + + newPeer: newSkeletonTestPeer("good-peer", chain), + endstate: []*subchain{{Head: requestHeaders + 100, Tail: 1}}, + endserve: (requestHeaders + 101 - 2) + (100 - 1), // midserve + lenrest - genesis + enddrop: 1, // no new drops + }, + // This test checks if a peer tries to inject a different header - *off* + // the sync boundary - instead of sending the correct sequence. The bad + // package should not be accepted. + // + // Joining with a new peer should however unblock the sync. + { + head: chain[requestHeaders+100], // We want to force the 100th header to be a request boundary + peers: []*skeletonTestPeer{ + newSkeletonTestPeer("header-changer", + append( + append( + append([]*types.Header{}, chain[:50]...), + &types.Header{ + ParentHash: chain[49].Hash(), + Number: big.NewInt(int64(50)), + GasLimit: 1, + }, + ), chain[51:]..., + ), + ), + }, + midstate: []*subchain{{Head: requestHeaders + 100, Tail: 100}}, + midserve: requestHeaders + 101 - 2, // len - head - genesis + middrop: 1, // different set of headers, drop + + newPeer: newSkeletonTestPeer("good-peer", chain), + endstate: []*subchain{{Head: requestHeaders + 100, Tail: 1}}, + endserve: (requestHeaders + 101 - 2) + (100 - 1), // midserve + lenrest - genesis + enddrop: 1, // no new drops + }, + } + for i, tt := range tests { + // Create a fresh database and initialize it with the starting state + db := rawdb.NewMemoryDatabase() + rawdb.WriteHeader(db, chain[0]) + + // Create a peer set to feed headers through + peerset := newPeerSet() + for _, peer := range tt.peers { + peerset.Register(newPeerConnection(peer.id, eth.ETH66, peer, log.New("id", peer.id))) + } + // Create a peer dropper to track malicious peers + dropped := make(map[string]int) + drop := func(peer string) { + if p := peerset.Peer(peer); p != nil { + atomic.AddUint64(&p.peer.(*skeletonTestPeer).dropped, 1) + } + peerset.Unregister(peer) + dropped[peer]++ + } + // Create a skeleton sync and run a cycle + skeleton := newSkeleton(db, peerset, drop, newHookedBackfiller()) + skeleton.Sync(tt.head, true) + + // Wait a bit (bleah) for the initial sync loop to go to idle. This might + // be either a finish or a never-start hence why there's no event to hook. + time.Sleep(250 * time.Millisecond) + + // Check the post-init mid state if it matches the required results + var progress skeletonProgress + json.Unmarshal(rawdb.ReadSkeletonSyncStatus(db), &progress) + + if len(progress.Subchains) != len(tt.midstate) { + t.Errorf("test %d, mid state: subchain count mismatch: have %d, want %d", i, len(progress.Subchains), len(tt.midstate)) + continue + } + for j := 0; j < len(progress.Subchains); j++ { + if progress.Subchains[j].Head != tt.midstate[j].Head { + t.Errorf("test %d, mid state: subchain %d head mismatch: have %d, want %d", i, j, progress.Subchains[j].Head, tt.midstate[j].Head) + } + if progress.Subchains[j].Tail != tt.midstate[j].Tail { + t.Errorf("test %d, mid state: subchain %d tail mismatch: have %d, want %d", i, j, progress.Subchains[j].Tail, tt.midstate[j].Tail) + } + } + var served uint64 + for _, peer := range tt.peers { + served += atomic.LoadUint64(&peer.served) + } + if served != tt.midserve { + t.Errorf("test %d, mid state: served headers mismatch: have %d, want %d", i, served, tt.midserve) + } + var drops uint64 + for _, peer := range tt.peers { + drops += atomic.LoadUint64(&peer.dropped) + } + if drops != tt.middrop { + t.Errorf("test %d, mid state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop) + } + // Apply the post-init events if there's any + if tt.newHead != nil { + skeleton.Sync(tt.newHead, true) + } + if tt.newPeer != nil { + if err := peerset.Register(newPeerConnection(tt.newPeer.id, eth.ETH66, tt.newPeer, log.New("id", tt.newPeer.id))); err != nil { + t.Errorf("test %d: failed to register new peer: %v", i, err) + } + } + // Wait a bit (bleah) for the second sync loop to go to idle. This might + // be either a finish or a never-start hence why there's no event to hook. + time.Sleep(250 * time.Millisecond) + + // Check the post-init mid state if it matches the required results + json.Unmarshal(rawdb.ReadSkeletonSyncStatus(db), &progress) + + if len(progress.Subchains) != len(tt.endstate) { + t.Errorf("test %d, end state: subchain count mismatch: have %d, want %d", i, len(progress.Subchains), len(tt.endstate)) + continue + } + for j := 0; j < len(progress.Subchains); j++ { + if progress.Subchains[j].Head != tt.endstate[j].Head { + t.Errorf("test %d, end state: subchain %d head mismatch: have %d, want %d", i, j, progress.Subchains[j].Head, tt.endstate[j].Head) + } + if progress.Subchains[j].Tail != tt.endstate[j].Tail { + t.Errorf("test %d, end state: subchain %d tail mismatch: have %d, want %d", i, j, progress.Subchains[j].Tail, tt.endstate[j].Tail) + } + } + // Check that the peers served no more headers than we actually needed + served = 0 + for _, peer := range tt.peers { + served += atomic.LoadUint64(&peer.served) + } + if tt.newPeer != nil { + served += atomic.LoadUint64(&tt.newPeer.served) + } + if served != tt.endserve { + t.Errorf("test %d, end state: served headers mismatch: have %d, want %d", i, served, tt.endserve) + } + drops = 0 + for _, peer := range tt.peers { + drops += atomic.LoadUint64(&peer.dropped) + } + if tt.newPeer != nil { + drops += atomic.LoadUint64(&tt.newPeer.dropped) + } + if drops != tt.middrop { + t.Errorf("test %d, end state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop) + } + // Clean up any leftover skeleton sync resources + skeleton.Terminate() + } +}