Skip to content

Commit

Permalink
fix: remove puller changes
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Nov 9, 2022
1 parent 8735e8e commit 41b72f3
Showing 1 changed file with 14 additions and 19 deletions.
33 changes: 14 additions & 19 deletions pkg/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ const loggerName = "puller"

var errCursorsLength = errors.New("cursors length mismatch")

const DefaultSyncErrorSleepDur = time.Second * 5
const DefaultSyncSleepDur = time.Minute

type Options struct {
Bins uint8
Expand All @@ -52,7 +52,7 @@ type Puller struct {

wg sync.WaitGroup

syncErrorSleepDur time.Duration
syncSleepDur time.Duration

bins uint8 // how many bins do we support
}
Expand All @@ -66,15 +66,15 @@ func New(stateStore storage.StateStorer, topology topology.Driver, reserveState
}

p := &Puller{
statestore: stateStore,
topology: topology,
reserveState: reserveState,
syncer: pullSync,
metrics: newMetrics(),
logger: logger.WithName(loggerName).Register(),
syncPeers: make([]map[string]*syncPeer, bins),
syncErrorSleepDur: o.SyncSleepDur,
bins: bins,
statestore: stateStore,
topology: topology,
reserveState: reserveState,
syncer: pullSync,
metrics: newMetrics(),
logger: logger.WithName(loggerName).Register(),
syncPeers: make([]map[string]*syncPeer, bins),
syncSleepDur: o.SyncSleepDur,
bins: bins,
}

for i := uint8(0); i < bins; i++ {
Expand Down Expand Up @@ -233,7 +233,7 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin
case <-ctx.Done():
loggerV2.Debug("histSyncWorker context cancelled", "peer_address", peer, "bin", bin, "cursor", cur)
return
case <-time.After(p.syncErrorSleepDur):
case <-time.After(p.syncSleepDur):
}
sleep = false
}
Expand All @@ -260,12 +260,7 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin
if err != nil {
p.metrics.HistWorkerErrCounter.Inc()
p.logger.Error(err, "histSyncWorker syncing interval failed", "peer_address", peer, "bin", bin, "cursor", cur, "start", s, "topmost", top)
// we did not receive an offer from the peer
// this is due to a connection error, read interval timing out, or peer disconnecting
// as such only for this case the sync sleeps before a new attempt
if top == 0 {
sleep = true
}
sleep = true
}

err = p.addPeerInterval(peer, bin, s, top)
Expand Down Expand Up @@ -296,7 +291,7 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin
case <-ctx.Done():
loggerV2.Debug("liveSyncWorker context cancelled", "peer_address", peer, "bin", bin, "cursor", cur)
return
case <-time.After(p.syncErrorSleepDur):
case <-time.After(p.syncSleepDur):
}
sleep = false
}
Expand Down

0 comments on commit 41b72f3

Please sign in to comment.