Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: pullsync will only return an error if the interval needs to be retried #3555

Merged
merged 2 commits into from Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
45 changes: 18 additions & 27 deletions pkg/puller/puller.go
Expand Up @@ -259,21 +259,16 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin
if err != nil {
p.metrics.HistWorkerErrCounter.Inc()
p.logger.Error(err, "histSyncWorker syncing interval failed", "peer_address", peer, "bin", bin, "cursor", cur, "start", s, "topmost", top)
}
if top == 0 {
// we did not receive an offer from the peer
// this is due to a connection error, read interval timing out, or peer disconnecting
// as such only for this case the sync sleeps before a new attempt
sleep = true
} else {
err = p.addPeerInterval(peer, bin, s, top)
if err != nil {
p.metrics.HistWorkerErrCounter.Inc()
p.logger.Error(err, "histSyncWorker could not persist interval for peer, quitting...", "peer_address", peer)
return
}
loggerV2.Debug("histSyncWorker pulled", "bin", bin, "start", s, "topmost", top, "peer_address", peer)
continue
}
err = p.addPeerInterval(peer, bin, s, top)
if err != nil {
p.metrics.HistWorkerErrCounter.Inc()
p.logger.Error(err, "histSyncWorker could not persist interval for peer, quitting...", "peer_address", peer)
return
}
loggerV2.Debug("histSyncWorker pulled", "bin", bin, "start", s, "topmost", top, "peer_address", peer)
}
}

Expand Down Expand Up @@ -310,26 +305,22 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin
if err != nil {
p.metrics.LiveWorkerErrCounter.Inc()
p.logger.Error(err, "liveSyncWorker sync error", "peer_address", peer, "bin", bin, "from", from, "topmost", top)
sleep = true
continue
}
if top == math.MaxUint64 {
p.metrics.MaxUintErrCounter.Inc()
p.logger.Error(nil, "liveSyncWorker max uint64 encountered", "peer_address", peer, "bin", bin, "from", from, "topmost", top)
return
} else if top == 0 {
// we did not receive an offer from the peer
// this is due to a connection error, read interval timing out, or peer disconnecting
// as such only for this case the sync sleeps before a new attempt
sleep = true
} else {
err = p.addPeerInterval(peer, bin, from, top)
if err != nil {
p.metrics.LiveWorkerErrCounter.Inc()
p.logger.Error(err, "liveSyncWorker exit on add peer interval", "peer_address", peer, "bin", bin, "from", from, "error", err)
return
}
loggerV2.Debug("liveSyncWorker pulled bin", "bin", bin, "from", from, "topmost", top, "peer_address", peer)
from = top + 1
}
err = p.addPeerInterval(peer, bin, from, top)
if err != nil {
p.metrics.LiveWorkerErrCounter.Inc()
p.logger.Error(err, "liveSyncWorker exit on add peer interval", "peer_address", peer, "bin", bin, "from", from, "error", err)
return
}
loggerV2.Debug("liveSyncWorker pulled bin", "bin", bin, "from", from, "topmost", top, "peer_address", peer)
from = top + 1
}
}

Expand Down
51 changes: 16 additions & 35 deletions pkg/pullsync/pullsync.go
Expand Up @@ -118,7 +118,7 @@ func (s *Syncer) Protocol() p2p.ProtocolSpec {
// It returns the BinID of highest chunk that was synced from the given interval.
// If the requested interval is too large, the downstream peer has the liberty to
// provide less chunks than requested.
func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8, from, to uint64) (topmost uint64, err error) {
func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8, from, to uint64) (uint64, error) {
isLiveSync := to == MaxCursor
loggerV2 := s.logger.V(2).Register()

Expand Down Expand Up @@ -157,7 +157,7 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8
return offer.Topmost, nil
}

topmost = offer.Topmost
topmost := offer.Topmost

var (
bvLen = len(offer.Hashes) / swarm.HashSize
Expand All @@ -168,24 +168,21 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8

bv, err := bitvector.New(bvLen)
if err != nil {
err = fmt.Errorf("new bitvector: %w", err)
return
return topmost, fmt.Errorf("new bitvector: %w", err)
}

for i := 0; i < len(offer.Hashes); i += swarm.HashSize {
a := swarm.NewAddress(offer.Hashes[i : i+swarm.HashSize])
if a.Equal(swarm.ZeroAddress) {
// i'd like to have this around to see we don't see any of these in the logs
s.logger.Error(nil, "syncer got a zero address hash on offer")
err = fmt.Errorf("zero address on offer")
return
s.logger.Debug("syncer got a zero address hash on offer")
continue
}
s.metrics.Offered.Inc()
s.metrics.DbOps.Inc()
have, err = s.storage.Has(ctx, a)
if err != nil {
err = fmt.Errorf("storage has: %w", err)
return
s.logger.Debug("storage has", "error", err)
mrekucci marked this conversation as resolved.
Show resolved Hide resolved
}
if !have {
wantChunks[a.String()] = struct{}{}
Expand All @@ -197,32 +194,21 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8

wantMsg := &pb.Want{BitVector: bv.Bytes()}
if err = w.WriteMsgWithContext(ctx, wantMsg); err != nil {
err = fmt.Errorf("write want: %w", err)
return
return topmost, fmt.Errorf("write want: %w", err)
}

// if ctr is zero, it means we don't want any chunk in the batch
// thus, the following loop will not get executed and the method
// returns immediately with the topmost value on the offer, which
// will seal the interval and request the next one
err = nil
var chunksToPut []swarm.Chunk

for ; ctr > 0; ctr-- {
var delivery pb.Delivery
if err = r.ReadMsgWithContext(ctx, &delivery); err != nil {
// this is not a fatal error and we should write
// a partial batch if some chunks have been received.
err = fmt.Errorf("read delivery: %w", err)
break
return topmost, fmt.Errorf("read delivery: %w", err)
}

addr := swarm.NewAddress(delivery.Address)
if _, ok := wantChunks[addr.String()]; !ok {
// this is fatal for the entire batch, return the
// error and don't write the partial batch.
err = ErrUnsolicitedChunk
return
s.logger.Debug("want chunks", "error", ErrUnsolicitedChunk)
continue
}

delete(wantChunks, addr.String())
Expand All @@ -237,13 +223,12 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8
if cac.Valid(chunk) {
go s.unwrap(chunk)
} else if !soc.Valid(chunk) {
// this is fatal for the entire batch, return the
// error and don't write the partial batch.
err = swarm.ErrInvalidChunk
return
s.logger.Debug("valid chunk", "error", swarm.ErrInvalidChunk)
continue
}
chunksToPut = append(chunksToPut, chunk)
}

if len(chunksToPut) > 0 {
if !isLiveSync {
s.rate.Add(len(chunksToPut))
Expand All @@ -252,16 +237,12 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8
ctx, cancel := context.WithTimeout(ctx, storagePutTimeout)
defer cancel()

if ierr := s.storage.Put(ctx, storage.ModePutSync, chunksToPut...); ierr != nil {
if err != nil {
ierr = fmt.Errorf(", sync err: %w", err)
}
err = fmt.Errorf("delivery put: %w", ierr)
return
if err := s.storage.Put(ctx, storage.ModePutSync, chunksToPut...); err != nil {
return topmost, fmt.Errorf("delivery put: %w", err)
}
}

return
return topmost, nil
}

// handler handles an incoming request to sync an interval
Expand Down
4 changes: 2 additions & 2 deletions pkg/pullsync/pullsync_test.go
Expand Up @@ -163,8 +163,8 @@ func TestIncoming_UnsolicitedChunk(t *testing.T) {
)

_, err := psClient.SyncInterval(context.Background(), swarm.ZeroAddress, 0, 0, 5)
if !errors.Is(err, pullsync.ErrUnsolicitedChunk) {
t.Fatalf("expected ErrUnsolicitedChunk but got %v", err)
if err != nil {
t.Fatalf("expected nil but got %v", err)
}
}

Expand Down