From d9c0890b66e911f2a0cebfad730f757e4699b53e Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Wed, 16 Nov 2022 18:09:09 +0300 Subject: [PATCH 1/2] fix: puller retry internal on error --- pkg/puller/puller.go | 45 +++++++++++++------------------ pkg/pullsync/pullsync.go | 51 +++++++++++------------------------ pkg/pullsync/pullsync_test.go | 4 +-- 3 files changed, 36 insertions(+), 64 deletions(-) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 8a8d3c9cf59..cc956b768ae 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -259,21 +259,16 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin if err != nil { p.metrics.HistWorkerErrCounter.Inc() p.logger.Error(err, "histSyncWorker syncing interval failed", "peer_address", peer, "bin", bin, "cursor", cur, "start", s, "topmost", top) - } - if top == 0 { - // we did not receive an offer from the peer - // this is due to a connection error, read interval timing out, or peer disconnecting - // as such only for this case the sync sleeps before a new attempt sleep = true - } else { - err = p.addPeerInterval(peer, bin, s, top) - if err != nil { - p.metrics.HistWorkerErrCounter.Inc() - p.logger.Error(err, "histSyncWorker could not persist interval for peer, quitting...", "peer_address", peer) - return - } - loggerV2.Debug("histSyncWorker pulled", "bin", bin, "start", s, "topmost", top, "peer_address", peer) + continue } + err = p.addPeerInterval(peer, bin, s, top) + if err != nil { + p.metrics.HistWorkerErrCounter.Inc() + p.logger.Error(err, "histSyncWorker could not persist interval for peer, quitting...", "peer_address", peer) + return + } + loggerV2.Debug("histSyncWorker pulled", "bin", bin, "start", s, "topmost", top, "peer_address", peer) } } @@ -310,26 +305,22 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin if err != nil { p.metrics.LiveWorkerErrCounter.Inc() p.logger.Error(err, "liveSyncWorker sync error", "peer_address", peer, "bin", bin, "from", from, "topmost", top) + sleep = true + continue } if top == math.MaxUint64 { p.metrics.MaxUintErrCounter.Inc() p.logger.Error(nil, "liveSyncWorker max uint64 encountered", "peer_address", peer, "bin", bin, "from", from, "topmost", top) return - } else if top == 0 { - // we did not receive an offer from the peer - // this is due to a connection error, read interval timing out, or peer disconnecting - // as such only for this case the sync sleeps before a new attempt - sleep = true - } else { - err = p.addPeerInterval(peer, bin, from, top) - if err != nil { - p.metrics.LiveWorkerErrCounter.Inc() - p.logger.Error(err, "liveSyncWorker exit on add peer interval", "peer_address", peer, "bin", bin, "from", from, "error", err) - return - } - loggerV2.Debug("liveSyncWorker pulled bin", "bin", bin, "from", from, "topmost", top, "peer_address", peer) - from = top + 1 } + err = p.addPeerInterval(peer, bin, from, top) + if err != nil { + p.metrics.LiveWorkerErrCounter.Inc() + p.logger.Error(err, "liveSyncWorker exit on add peer interval", "peer_address", peer, "bin", bin, "from", from, "error", err) + return + } + loggerV2.Debug("liveSyncWorker pulled bin", "bin", bin, "from", from, "topmost", top, "peer_address", peer) + from = top + 1 } } diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index db10c0dd2f5..ee42422ce76 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -118,7 +118,7 @@ func (s *Syncer) Protocol() p2p.ProtocolSpec { // It returns the BinID of highest chunk that was synced from the given interval. // If the requested interval is too large, the downstream peer has the liberty to // provide less chunks than requested. -func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8, from, to uint64) (topmost uint64, err error) { +func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8, from, to uint64) (uint64, error) { isLiveSync := to == MaxCursor loggerV2 := s.logger.V(2).Register() @@ -157,7 +157,7 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8 return offer.Topmost, nil } - topmost = offer.Topmost + topmost := offer.Topmost var ( bvLen = len(offer.Hashes) / swarm.HashSize @@ -168,24 +168,21 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8 bv, err := bitvector.New(bvLen) if err != nil { - err = fmt.Errorf("new bitvector: %w", err) - return + return topmost, fmt.Errorf("new bitvector: %w", err) } for i := 0; i < len(offer.Hashes); i += swarm.HashSize { a := swarm.NewAddress(offer.Hashes[i : i+swarm.HashSize]) if a.Equal(swarm.ZeroAddress) { // i'd like to have this around to see we don't see any of these in the logs - s.logger.Error(nil, "syncer got a zero address hash on offer") - err = fmt.Errorf("zero address on offer") - return + s.logger.Debug("syncer got a zero address hash on offer") + continue } s.metrics.Offered.Inc() s.metrics.DbOps.Inc() have, err = s.storage.Has(ctx, a) if err != nil { - err = fmt.Errorf("storage has: %w", err) - return + s.logger.Debug("storage has", "error", err) } if !have { wantChunks[a.String()] = struct{}{} @@ -197,32 +194,21 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8 wantMsg := &pb.Want{BitVector: bv.Bytes()} if err = w.WriteMsgWithContext(ctx, wantMsg); err != nil { - err = fmt.Errorf("write want: %w", err) - return + return topmost, fmt.Errorf("write want: %w", err) } - // if ctr is zero, it means we don't want any chunk in the batch - // thus, the following loop will not get executed and the method - // returns immediately with the topmost value on the offer, which - // will seal the interval and request the next one - err = nil var chunksToPut []swarm.Chunk for ; ctr > 0; ctr-- { var delivery pb.Delivery if err = r.ReadMsgWithContext(ctx, &delivery); err != nil { - // this is not a fatal error and we should write - // a partial batch if some chunks have been received. - err = fmt.Errorf("read delivery: %w", err) - break + return topmost, fmt.Errorf("read delivery: %w", err) } addr := swarm.NewAddress(delivery.Address) if _, ok := wantChunks[addr.String()]; !ok { - // this is fatal for the entire batch, return the - // error and don't write the partial batch. - err = ErrUnsolicitedChunk - return + s.logger.Debug("want chunks", "error", ErrUnsolicitedChunk) + continue } delete(wantChunks, addr.String()) @@ -237,13 +223,12 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8 if cac.Valid(chunk) { go s.unwrap(chunk) } else if !soc.Valid(chunk) { - // this is fatal for the entire batch, return the - // error and don't write the partial batch. - err = swarm.ErrInvalidChunk - return + s.logger.Debug("valid chunk", "error", swarm.ErrInvalidChunk) + continue } chunksToPut = append(chunksToPut, chunk) } + if len(chunksToPut) > 0 { if !isLiveSync { s.rate.Add(len(chunksToPut)) @@ -252,16 +237,12 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8 ctx, cancel := context.WithTimeout(ctx, storagePutTimeout) defer cancel() - if ierr := s.storage.Put(ctx, storage.ModePutSync, chunksToPut...); ierr != nil { - if err != nil { - ierr = fmt.Errorf(", sync err: %w", err) - } - err = fmt.Errorf("delivery put: %w", ierr) - return + if err := s.storage.Put(ctx, storage.ModePutSync, chunksToPut...); err != nil { + return topmost, fmt.Errorf("delivery put: %w", err) } } - return + return topmost, nil } // handler handles an incoming request to sync an interval diff --git a/pkg/pullsync/pullsync_test.go b/pkg/pullsync/pullsync_test.go index 333bbaacd59..29a7817638d 100644 --- a/pkg/pullsync/pullsync_test.go +++ b/pkg/pullsync/pullsync_test.go @@ -163,8 +163,8 @@ func TestIncoming_UnsolicitedChunk(t *testing.T) { ) _, err := psClient.SyncInterval(context.Background(), swarm.ZeroAddress, 0, 0, 5) - if !errors.Is(err, pullsync.ErrUnsolicitedChunk) { - t.Fatalf("expected ErrUnsolicitedChunk but got %v", err) + if err != nil { + t.Fatalf("expected nil but got %v", err) } } From 97404e0b4b6a877a9fdd5356b986dbd5d6032cd9 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Wed, 16 Nov 2022 20:39:20 +0300 Subject: [PATCH 2/2] fix: logs --- pkg/pullsync/pullsync.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index ee42422ce76..345bf1ad689 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -120,7 +120,7 @@ func (s *Syncer) Protocol() p2p.ProtocolSpec { // provide less chunks than requested. func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8, from, to uint64) (uint64, error) { isLiveSync := to == MaxCursor - loggerV2 := s.logger.V(2).Register() + // loggerV2 := s.logger.V(2).Register() stream, err := s.streamer.NewStream(ctx, peer, nil, protocolName, protocolVersion, streamName) if err != nil { @@ -129,7 +129,7 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8 defer func() { if err != nil { _ = stream.Reset() - loggerV2.Debug("error syncing peer", "peer_address", peer, "bin", bin, "from", from, "to", to, "error", err) + s.logger.Debug("error syncing peer", "peer_address", peer, "bin", bin, "from", from, "to", to, "error", err) } else { stream.FullClose() } @@ -285,17 +285,12 @@ func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Strea return fmt.Errorf("read get range: %w", err) } - s.logger.Debug("make offer start", "bin", rn.Bin, "from", rn.From, "to", rn.To, "peer_address", p.Address) - t := time.Now() - // make an offer to the upstream peer in return for the requested range offer, _, err := s.makeOffer(ctx, rn) if err != nil { return fmt.Errorf("make offer: %w", err) } - s.logger.Debug("make offer end", "bin", rn.Bin, "from", rn.From, "topmost", offer.Topmost, "peer_address", p.Address, "duration", time.Since(t)) - // recreate the reader to allow the first one to be garbage collected // before the makeOffer function call, to reduce the total memory allocated // while makeOffer is executing (waiting for the new chunks)