Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(pullsync): cancel ruid cleanup #3456

Merged
merged 2 commits into from Oct 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/puller/puller.go
Expand Up @@ -260,7 +260,7 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin
p.logger.Debug("histSyncWorker syncing finished", "bin", bin, "cursor", cur)
return
}
top, _, err := p.syncer.SyncInterval(ctx, peer, bin, s, cur)
top, err := p.syncer.SyncInterval(ctx, peer, bin, s, cur)
if err != nil {
p.metrics.HistWorkerErrCounter.Inc()
p.logger.Error(err, "histSyncWorker syncing interval failed", "peer_address", peer, "bin", bin, "cursor", cur, "start", s, "topmost", top)
Expand Down Expand Up @@ -307,7 +307,7 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin
default:
}

top, _, err := p.syncer.SyncInterval(ctx, peer, bin, from, pullsync.MaxCursor)
top, err := p.syncer.SyncInterval(ctx, peer, bin, from, pullsync.MaxCursor)
if err != nil {
p.metrics.LiveWorkerErrCounter.Inc()
p.logger.Error(err, "liveSyncWorker sync error", "peer_address", peer, "bin", bin, "from", from, "topmost", top)
Expand Down
24 changes: 12 additions & 12 deletions pkg/pullsync/mock/pullsync.go
Expand Up @@ -113,7 +113,7 @@ func NewPullSync(opts ...Option) *PullSyncMock {
return s
}

func (p *PullSyncMock) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8, from, to uint64) (topmost uint64, ruid uint32, err error) {
func (p *PullSyncMock) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8, from, to uint64) (topmost uint64, err error) {

isLive := to == math.MaxUint64

Expand All @@ -129,7 +129,7 @@ func (p *PullSyncMock) SyncInterval(ctx context.Context, peer swarm.Address, bin
p.mtx.Unlock()

if p.syncErr != nil {
return 0, 0, p.syncErr
return 0, p.syncErr
}

if isLive && p.lateReply {
Expand All @@ -141,9 +141,9 @@ func (p *PullSyncMock) SyncInterval(ctx context.Context, peer swarm.Address, bin

select {
case <-p.quit:
return 0, 1, context.Canceled
return 0, context.Canceled
case <-ctx.Done():
return 0, 1, ctx.Err()
return 0, ctx.Err()
default:
}

Expand All @@ -162,20 +162,20 @@ func (p *PullSyncMock) SyncInterval(ctx context.Context, peer swarm.Address, bin
if sr.block {
select {
case <-p.quit:
return 0, 1, context.Canceled
return 0, context.Canceled
case <-ctx.Done():
return 0, 1, ctx.Err()
return 0, ctx.Err()
}
}
return sr.topmost, 0, nil
return sr.topmost, nil
}
panic(fmt.Sprintf("bin %d from %d to %d", bin, from, to))
}

if isLive && p.blockLiveSync {
// don't respond, wait for quit
<-p.quit
return 0, 1, context.Canceled
return 0, context.Canceled
}
if isLive && len(p.liveSyncReplies) > 0 {
p.mtx.Lock()
Expand All @@ -184,12 +184,12 @@ func (p *PullSyncMock) SyncInterval(ctx context.Context, peer swarm.Address, bin
<-p.quit
// when shutting down, onthe puller side we cancel the context going into the pullsync protocol request
// this results in SyncInterval returning with a context cancelled error
return 0, 0, context.Canceled
return 0, context.Canceled
}
v := p.liveSyncReplies[p.liveSyncCalls]
p.liveSyncCalls++
p.mtx.Unlock()
return v, 1, nil
return v, nil
}

if p.autoReply {
Expand All @@ -198,9 +198,9 @@ func (p *PullSyncMock) SyncInterval(ctx context.Context, peer swarm.Address, bin
if t > to {
t = to
}
return t, 1, nil
return t, nil
}
return to, 1, nil
return to, nil
}

func (p *PullSyncMock) GetCursors(_ context.Context, peer swarm.Address) ([]uint64, error) {
Expand Down