Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth: pre-process downloader responses on the peer reader thread #24032

Merged
merged 1 commit into from
Dec 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
72 changes: 45 additions & 27 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ var (
// peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func(id string)

// headerTask is a set of downloaded headers to queue along with their precomputed
// hashes to avoid constant rehashing.
type headerTask struct {
headers []*types.Header
hashes []common.Hash
}

type Downloader struct {
mode uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode
mux *event.TypeMux // Event multiplexer to announce sync operation events
Expand Down Expand Up @@ -116,7 +123,7 @@ type Downloader struct {
ancientLimit uint64 // The maximum block number which can be regarded as ancient data.

// Channels
headerProcCh chan []*types.Header // Channel to feed the header processor new tasks
headerProcCh chan *headerTask // Channel to feed the header processor new tasks

// State sync
pivotHeader *types.Header // Pivot block header to dynamically push the syncing state root
Expand Down Expand Up @@ -210,7 +217,7 @@ func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom,
blockchain: chain,
lightchain: lightchain,
dropPeer: dropPeer,
headerProcCh: make(chan []*types.Header, 1),
headerProcCh: make(chan *headerTask, 1),
quitCh: make(chan struct{}),
SnapSyncer: snap.NewSyncer(stateDb),
stateSyncStart: make(chan *stateSync),
Expand Down Expand Up @@ -626,7 +633,7 @@ func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *ty
if mode == SnapSync {
fetch = 2 // head + pivot headers
}
headers, err := d.fetchHeadersByHash(p, latest, fetch, fsMinFullBlocks-1, true)
headers, hashes, err := d.fetchHeadersByHash(p, latest, fetch, fsMinFullBlocks-1, true)
if err != nil {
return nil, nil, err
}
Expand All @@ -645,7 +652,7 @@ func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *ty
if mode == SnapSync && head.Number.Uint64() > uint64(fsMinFullBlocks) {
return nil, nil, fmt.Errorf("%w: no pivot included along head header", errBadPeer)
}
p.log.Debug("Remote head identified, no pivot", "number", head.Number, "hash", head.Hash())
p.log.Debug("Remote head identified, no pivot", "number", head.Number, "hash", hashes[0])
return head, nil, nil
}
// At this point we have 2 headers in total and the first is the
Expand Down Expand Up @@ -784,7 +791,7 @@ func (d *Downloader) findAncestorSpanSearch(p *peerConnection, mode SyncMode, re
from, count, skip, max := calculateRequestSpan(remoteHeight, localHeight)

p.log.Trace("Span searching for common ancestor", "count", count, "from", from, "skip", skip)
headers, err := d.fetchHeadersByNumber(p, uint64(from), count, skip, false)
headers, hashes, err := d.fetchHeadersByNumber(p, uint64(from), count, skip, false)
if err != nil {
return 0, err
}
Expand All @@ -811,7 +818,7 @@ func (d *Downloader) findAncestorSpanSearch(p *peerConnection, mode SyncMode, re
continue
}
// Otherwise check if we already know the header or not
h := headers[i].Hash()
h := hashes[i]
n := headers[i].Number.Uint64()

var known bool
Expand Down Expand Up @@ -854,7 +861,7 @@ func (d *Downloader) findAncestorBinarySearch(p *peerConnection, mode SyncMode,
// Split our chain interval in two, and request the hash to cross check
check := (start + end) / 2

headers, err := d.fetchHeadersByNumber(p, check, 1, 0, false)
headers, hashes, err := d.fetchHeadersByNumber(p, check, 1, 0, false)
if err != nil {
return 0, err
}
Expand All @@ -864,7 +871,7 @@ func (d *Downloader) findAncestorBinarySearch(p *peerConnection, mode SyncMode,
return 0, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers))
}
// Modify the search interval based on the response
h := headers[0].Hash()
h := hashes[0]
n := headers[0].Number.Uint64()

var known bool
Expand Down Expand Up @@ -923,6 +930,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e
// - Full header retrieval if we're near the chain head
var (
headers []*types.Header
hashes []common.Hash
err error
)
switch {
Expand All @@ -932,15 +940,15 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e
d.pivotLock.RUnlock()

p.log.Trace("Fetching next pivot header", "number", pivot+uint64(fsMinFullBlocks))
headers, err = d.fetchHeadersByNumber(p, pivot+uint64(fsMinFullBlocks), 2, fsMinFullBlocks-9, false) // move +64 when it's 2x64-8 deep
headers, hashes, err = d.fetchHeadersByNumber(p, pivot+uint64(fsMinFullBlocks), 2, fsMinFullBlocks-9, false) // move +64 when it's 2x64-8 deep

case skeleton:
p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from)
headers, err = d.fetchHeadersByNumber(p, from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false)
headers, hashes, err = d.fetchHeadersByNumber(p, from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false)

default:
p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from)
headers, err = d.fetchHeadersByNumber(p, from, MaxHeaderFetch, 0, false)
headers, hashes, err = d.fetchHeadersByNumber(p, from, MaxHeaderFetch, 0, false)
}
switch err {
case nil:
Expand Down Expand Up @@ -1038,12 +1046,14 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e
// If we received a skeleton batch, resolve internals concurrently
var progressed bool
if skeleton {
filled, proced, err := d.fillHeaderSkeleton(from, headers)
filled, hashset, proced, err := d.fillHeaderSkeleton(from, headers)
if err != nil {
p.log.Debug("Skeleton chain invalid", "err", err)
return fmt.Errorf("%w: %v", errInvalidChain, err)
}
headers = filled[proced:]
hashes = hashset[proced:]

progressed = proced > 0
from += uint64(proced)
} else {
Expand Down Expand Up @@ -1079,6 +1089,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e
delay = n
}
headers = headers[:n-delay]
hashes = hashes[:n-delay]
}
}
}
Expand All @@ -1098,7 +1109,10 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e
if len(headers) > 0 {
p.log.Trace("Scheduling new headers", "count", len(headers), "from", from)
select {
case d.headerProcCh <- headers:
case d.headerProcCh <- &headerTask{
headers: headers,
hashes: hashes,
}:
case <-d.cancelCh:
return errCanceled
}
Expand All @@ -1121,19 +1135,19 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e
//
// The method returns the entire filled skeleton and also the number of headers
// already forwarded for processing.
func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) {
func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, []common.Hash, int, error) {
log.Debug("Filling up skeleton", "from", from)
d.queue.ScheduleSkeleton(from, skeleton)

err := d.concurrentFetch((*headerQueue)(d))
if err != nil {
log.Debug("Skeleton fill failed", "err", err)
}
filled, proced := d.queue.RetrieveHeaders()
filled, hashes, proced := d.queue.RetrieveHeaders()
if err == nil {
log.Debug("Skeleton fill succeeded", "filled", len(filled), "processed", proced)
}
return filled, proced, err
return filled, hashes, proced, err
}

// fetchBodies iteratively downloads the scheduled block bodies, taking any
Expand Down Expand Up @@ -1199,9 +1213,9 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
rollbackErr = errCanceled
return errCanceled

case headers := <-d.headerProcCh:
case task := <-d.headerProcCh:
// Terminate header processing if we synced up
if len(headers) == 0 {
if task == nil || len(task.headers) == 0 {
// Notify everyone that headers are fully processed
for _, ch := range []chan bool{d.queue.blockWakeCh, d.queue.receiptWakeCh} {
select {
Expand Down Expand Up @@ -1245,6 +1259,8 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
return nil
}
// Otherwise split the chunk of headers into batches and process them
headers, hashes := task.headers, task.hashes

gotHeaders = true
for len(headers) > 0 {
// Terminate if something failed in between processing chunks
Expand All @@ -1259,7 +1275,8 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
if limit > len(headers) {
limit = len(headers)
}
chunk := headers[:limit]
chunkHeaders := headers[:limit]
chunkHashes := hashes[:limit]

// In case of header only syncing, validate the chunk immediately
if mode == SnapSync || mode == LightSync {
Expand All @@ -1273,22 +1290,22 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
d.pivotLock.RUnlock()

frequency := fsHeaderCheckFrequency
if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
if chunkHeaders[len(chunkHeaders)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
frequency = 1
}
if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil {
if n, err := d.lightchain.InsertHeaderChain(chunkHeaders, frequency); err != nil {
rollbackErr = err

// If some headers were inserted, track them as uncertain
if (mode == SnapSync || frequency > 1) && n > 0 && rollback == 0 {
rollback = chunk[0].Number.Uint64()
rollback = chunkHeaders[0].Number.Uint64()
}
log.Warn("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "parent", chunk[n].ParentHash, "err", err)
log.Warn("Invalid header encountered", "number", chunkHeaders[n].Number, "hash", chunkHashes[n], "parent", chunkHeaders[n].ParentHash, "err", err)
return fmt.Errorf("%w: %v", errInvalidChain, err)
}
// All verifications passed, track all headers within the alloted limits
if mode == SnapSync {
head := chunk[len(chunk)-1].Number.Uint64()
head := chunkHeaders[len(chunkHeaders)-1].Number.Uint64()
if head-rollback > uint64(fsHeaderSafetyNet) {
rollback = head - uint64(fsHeaderSafetyNet)
} else {
Expand All @@ -1308,13 +1325,14 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
}
}
// Otherwise insert the headers for content retrieval
inserts := d.queue.Schedule(chunk, origin)
if len(inserts) != len(chunk) {
rollbackErr = fmt.Errorf("stale headers: len inserts %v len(chunk) %v", len(inserts), len(chunk))
inserts := d.queue.Schedule(chunkHeaders, chunkHashes, origin)
if len(inserts) != len(chunkHeaders) {
rollbackErr = fmt.Errorf("stale headers: len inserts %v len(chunk) %v", len(inserts), len(chunkHeaders))
return fmt.Errorf("%w: stale headers", errBadPeer)
}
}
headers = headers[limit:]
hashes = hashes[limit:]
origin += uint64(limit)
}
// Update the highest block number we know if a higher one is found.
Expand Down
26 changes: 26 additions & 0 deletions eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,13 +177,18 @@ func (dlp *downloadTesterPeer) RequestHeadersByHash(origin common.Hash, amount i
}
}
}
hashes := make([]common.Hash, len(headers))
for i, header := range headers {
hashes[i] = header.Hash()
}
// Deliver the headers to the downloader
req := &eth.Request{
Peer: dlp.id,
}
res := &eth.Response{
Req: req,
Res: (*eth.BlockHeadersPacket)(&headers),
Meta: hashes,
Time: 1,
Done: make(chan error, 1), // Ignore the returned status
}
Expand Down Expand Up @@ -216,13 +221,18 @@ func (dlp *downloadTesterPeer) RequestHeadersByNumber(origin uint64, amount int,
}
}
}
hashes := make([]common.Hash, len(headers))
for i, header := range headers {
hashes[i] = header.Hash()
}
// Deliver the headers to the downloader
req := &eth.Request{
Peer: dlp.id,
}
res := &eth.Response{
Req: req,
Res: (*eth.BlockHeadersPacket)(&headers),
Meta: hashes,
Time: 1,
Done: make(chan error, 1), // Ignore the returned status
}
Expand All @@ -243,12 +253,22 @@ func (dlp *downloadTesterPeer) RequestBodies(hashes []common.Hash, sink chan *et
bodies[i] = new(eth.BlockBody)
rlp.DecodeBytes(blob, bodies[i])
}
var (
txsHashes = make([]common.Hash, len(bodies))
uncleHashes = make([]common.Hash, len(bodies))
)
hasher := trie.NewStackTrie(nil)
for i, body := range bodies {
txsHashes[i] = types.DeriveSha(types.Transactions(body.Transactions), hasher)
uncleHashes[i] = types.CalcUncleHash(body.Uncles)
}
req := &eth.Request{
Peer: dlp.id,
}
res := &eth.Response{
Req: req,
Res: (*eth.BlockBodiesPacket)(&bodies),
Meta: [][]common.Hash{txsHashes, uncleHashes},
Time: 1,
Done: make(chan error, 1), // Ignore the returned status
}
Expand All @@ -268,12 +288,18 @@ func (dlp *downloadTesterPeer) RequestReceipts(hashes []common.Hash, sink chan *
for i, blob := range blobs {
rlp.DecodeBytes(blob, &receipts[i])
}
hasher := trie.NewStackTrie(nil)
hashes = make([]common.Hash, len(receipts))
for i, receipt := range receipts {
hashes[i] = types.DeriveSha(types.Receipts(receipt), hasher)
}
req := &eth.Request{
Peer: dlp.id,
}
res := &eth.Response{
Req: req,
Res: (*eth.ReceiptsPacket)(&receipts),
Meta: hashes,
Time: 1,
Done: make(chan error, 1), // Ignore the returned status
}
Expand Down