Skip to content

Commit

Permalink
fix: increment from value on successful sync only
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Nov 9, 2022
1 parent 41b72f3 commit 4f4a33f
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 35 deletions.
76 changes: 41 additions & 35 deletions pkg/puller/puller.go
Expand Up @@ -29,7 +29,7 @@ const loggerName = "puller"

var errCursorsLength = errors.New("cursors length mismatch")

const DefaultSyncSleepDur = time.Minute
const DefaultSyncErrorSleepDur = time.Second * 5

type Options struct {
Bins uint8
Expand All @@ -52,7 +52,7 @@ type Puller struct {

wg sync.WaitGroup

syncSleepDur time.Duration
syncErrorSleepDur time.Duration

bins uint8 // how many bins do we support
}
Expand All @@ -66,15 +66,15 @@ func New(stateStore storage.StateStorer, topology topology.Driver, reserveState
}

p := &Puller{
statestore: stateStore,
topology: topology,
reserveState: reserveState,
syncer: pullSync,
metrics: newMetrics(),
logger: logger.WithName(loggerName).Register(),
syncPeers: make([]map[string]*syncPeer, bins),
syncSleepDur: o.SyncSleepDur,
bins: bins,
statestore: stateStore,
topology: topology,
reserveState: reserveState,
syncer: pullSync,
metrics: newMetrics(),
logger: logger.WithName(loggerName).Register(),
syncPeers: make([]map[string]*syncPeer, bins),
syncErrorSleepDur: o.SyncSleepDur,
bins: bins,
}

for i := uint8(0); i < bins; i++ {
Expand Down Expand Up @@ -233,7 +233,7 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin
case <-ctx.Done():
loggerV2.Debug("histSyncWorker context cancelled", "peer_address", peer, "bin", bin, "cursor", cur)
return
case <-time.After(p.syncSleepDur):
case <-time.After(p.syncErrorSleepDur):
}
sleep = false
}
Expand All @@ -251,7 +251,6 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin
p.logger.Error(err, "histSyncWorker nextPeerInterval failed")
return
}

if s > cur {
p.logger.Debug("histSyncWorker syncing finished", "bin", bin, "cursor", cur)
return
Expand All @@ -260,17 +259,21 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin
if err != nil {
p.metrics.HistWorkerErrCounter.Inc()
p.logger.Error(err, "histSyncWorker syncing interval failed", "peer_address", peer, "bin", bin, "cursor", cur, "start", s, "topmost", top)
sleep = true
}

err = p.addPeerInterval(peer, bin, s, top)
if err != nil {
p.metrics.HistWorkerErrCounter.Inc()
p.logger.Error(err, "histSyncWorker could not persist interval for peer, quitting...", "peer_address", peer)
return
if top == 0 {
// we did not receive an offer from the peer
// this is due to a connection error, read interval timing out, or peer disconnecting
// as such only for this case the sync sleeps before a new attempt
sleep = true
} else {
err = p.addPeerInterval(peer, bin, s, top)
if err != nil {
p.metrics.HistWorkerErrCounter.Inc()
p.logger.Error(err, "histSyncWorker could not persist interval for peer, quitting...", "peer_address", peer)
return
}
loggerV2.Debug("histSyncWorker pulled", "bin", bin, "start", s, "topmost", top, "peer_address", peer)
}

loggerV2.Debug("histSyncWorker pulled", "bin", bin, "start", s, "topmost", top, "peer_address", peer)
}
}

Expand All @@ -291,7 +294,7 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin
case <-ctx.Done():
loggerV2.Debug("liveSyncWorker context cancelled", "peer_address", peer, "bin", bin, "cursor", cur)
return
case <-time.After(p.syncSleepDur):
case <-time.After(p.syncErrorSleepDur):
}
sleep = false
}
Expand All @@ -307,23 +310,26 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin
if err != nil {
p.metrics.LiveWorkerErrCounter.Inc()
p.logger.Error(err, "liveSyncWorker sync error", "peer_address", peer, "bin", bin, "from", from, "topmost", top)
sleep = true
}

if top == math.MaxUint64 {
p.metrics.MaxUintErrCounter.Inc()
p.logger.Error(nil, "liveSyncWorker max uint64 encountered", "peer_address", peer, "bin", bin, "from", from, "topmost", top)
return
} else if top == 0 {
// we did not receive an offer from the peer
// this is due to a connection error, read interval timing out, or peer disconnecting
// as such only for this case the sync sleeps before a new attempt
sleep = true
} else {
err = p.addPeerInterval(peer, bin, from, top)
if err != nil {
p.metrics.LiveWorkerErrCounter.Inc()
p.logger.Error(err, "liveSyncWorker exit on add peer interval", "peer_address", peer, "bin", bin, "from", from, "error", err)
return
}
loggerV2.Debug("liveSyncWorker pulled bin", "bin", bin, "from", from, "topmost", top, "peer_address", peer)
from = top + 1
}
err = p.addPeerInterval(peer, bin, from, top)
if err != nil {
p.metrics.LiveWorkerErrCounter.Inc()
p.logger.Error(err, "liveSyncWorker exit on add peer interval", "peer_address", peer, "bin", bin, "from", from, "error", err)
return
}

loggerV2.Debug("liveSyncWorker pulled bin", "bin", bin, "from", from, "topmost", top, "peer_address", peer)

from = top + 1
}
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/pullsync/pullsync.go
Expand Up @@ -303,12 +303,16 @@ func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Strea
return fmt.Errorf("read get range: %w", err)
}

s.logger.Debug("make offer start", "bin", rn.Bin, "from", rn.From, "to", rn.To, "peer", p.Address)

// make an offer to the upstream peer in return for the requested range
offer, _, err := s.makeOffer(ctx, rn)
if err != nil {
return fmt.Errorf("make offer: %w", err)
}

s.logger.Debug("make offer end", "bin", rn.Bin, "from", rn.From, "topmost", offer.Topmost, "peer", p.Address)

// recreate the reader to allow the first one to be garbage collected
// before the makeOffer function call, to reduce the total memory allocated
// while makeOffer is executing (waiting for the new chunks)
Expand Down

0 comments on commit 4f4a33f

Please sign in to comment.