diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 7b6296265fc..298c0982b55 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -29,7 +29,7 @@ const loggerName = "puller" var errCursorsLength = errors.New("cursors length mismatch") -const DefaultSyncSleepDur = time.Minute +const DefaultSyncErrorSleepDur = time.Second * 5 type Options struct { Bins uint8 @@ -52,7 +52,7 @@ type Puller struct { wg sync.WaitGroup - syncSleepDur time.Duration + syncErrorSleepDur time.Duration bins uint8 // how many bins do we support } @@ -66,15 +66,15 @@ func New(stateStore storage.StateStorer, topology topology.Driver, reserveState } p := &Puller{ - statestore: stateStore, - topology: topology, - reserveState: reserveState, - syncer: pullSync, - metrics: newMetrics(), - logger: logger.WithName(loggerName).Register(), - syncPeers: make([]map[string]*syncPeer, bins), - syncSleepDur: o.SyncSleepDur, - bins: bins, + statestore: stateStore, + topology: topology, + reserveState: reserveState, + syncer: pullSync, + metrics: newMetrics(), + logger: logger.WithName(loggerName).Register(), + syncPeers: make([]map[string]*syncPeer, bins), + syncErrorSleepDur: o.SyncSleepDur, + bins: bins, } for i := uint8(0); i < bins; i++ { @@ -233,7 +233,7 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin case <-ctx.Done(): loggerV2.Debug("histSyncWorker context cancelled", "peer_address", peer, "bin", bin, "cursor", cur) return - case <-time.After(p.syncSleepDur): + case <-time.After(p.syncErrorSleepDur): } sleep = false } @@ -251,7 +251,6 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin p.logger.Error(err, "histSyncWorker nextPeerInterval failed") return } - if s > cur { p.logger.Debug("histSyncWorker syncing finished", "bin", bin, "cursor", cur) return @@ -260,17 +259,21 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin if err != nil { p.metrics.HistWorkerErrCounter.Inc() p.logger.Error(err, "histSyncWorker syncing interval failed", "peer_address", peer, "bin", bin, "cursor", cur, "start", s, "topmost", top) - sleep = true } - - err = p.addPeerInterval(peer, bin, s, top) - if err != nil { - p.metrics.HistWorkerErrCounter.Inc() - p.logger.Error(err, "histSyncWorker could not persist interval for peer, quitting...", "peer_address", peer) - return + if top == 0 { + // we did not receive an offer from the peer + // this is due to a connection error, read interval timing out, or peer disconnecting + // as such only for this case the sync sleeps before a new attempt + sleep = true + } else { + err = p.addPeerInterval(peer, bin, s, top) + if err != nil { + p.metrics.HistWorkerErrCounter.Inc() + p.logger.Error(err, "histSyncWorker could not persist interval for peer, quitting...", "peer_address", peer) + return + } + loggerV2.Debug("histSyncWorker pulled", "bin", bin, "start", s, "topmost", top, "peer_address", peer) } - - loggerV2.Debug("histSyncWorker pulled", "bin", bin, "start", s, "topmost", top, "peer_address", peer) } } @@ -291,7 +294,7 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin case <-ctx.Done(): loggerV2.Debug("liveSyncWorker context cancelled", "peer_address", peer, "bin", bin, "cursor", cur) return - case <-time.After(p.syncSleepDur): + case <-time.After(p.syncErrorSleepDur): } sleep = false } @@ -307,23 +310,26 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin if err != nil { p.metrics.LiveWorkerErrCounter.Inc() p.logger.Error(err, "liveSyncWorker sync error", "peer_address", peer, "bin", bin, "from", from, "topmost", top) - sleep = true } - if top == math.MaxUint64 { p.metrics.MaxUintErrCounter.Inc() + p.logger.Error(nil, "liveSyncWorker max uint64 encountered", "peer_address", peer, "bin", bin, "from", from, "topmost", top) return + } else if top == 0 { + // we did not receive an offer from the peer + // this is due to a connection error, read interval timing out, or peer disconnecting + // as such only for this case the sync sleeps before a new attempt + sleep = true + } else { + err = p.addPeerInterval(peer, bin, from, top) + if err != nil { + p.metrics.LiveWorkerErrCounter.Inc() + p.logger.Error(err, "liveSyncWorker exit on add peer interval", "peer_address", peer, "bin", bin, "from", from, "error", err) + return + } + loggerV2.Debug("liveSyncWorker pulled bin", "bin", bin, "from", from, "topmost", top, "peer_address", peer) + from = top + 1 } - err = p.addPeerInterval(peer, bin, from, top) - if err != nil { - p.metrics.LiveWorkerErrCounter.Inc() - p.logger.Error(err, "liveSyncWorker exit on add peer interval", "peer_address", peer, "bin", bin, "from", from, "error", err) - return - } - - loggerV2.Debug("liveSyncWorker pulled bin", "bin", bin, "from", from, "topmost", top, "peer_address", peer) - - from = top + 1 } } diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index 28eb1f0e612..4e3f5cdd376 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -303,12 +303,16 @@ func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Strea return fmt.Errorf("read get range: %w", err) } + s.logger.Debug("make offer start", "bin", rn.Bin, "from", rn.From, "to", rn.To, "peer", p.Address) + // make an offer to the upstream peer in return for the requested range offer, _, err := s.makeOffer(ctx, rn) if err != nil { return fmt.Errorf("make offer: %w", err) } + s.logger.Debug("make offer end", "bin", rn.Bin, "from", rn.From, "topmost", offer.Topmost, "peer", p.Address) + // recreate the reader to allow the first one to be garbage collected // before the makeOffer function call, to reduce the total memory allocated // while makeOffer is executing (waiting for the new chunks)