New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: puller rewrite and bug fixes #3437
Conversation
pkg/localstore/subscription_pull.go
Outdated
@@ -44,18 +43,16 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) | |||
|
|||
chunkDescriptors := make(chan storage.Descriptor) | |||
|
|||
in, out, clean := flipflop.NewFallingEdge(flipFlopBufferDuration, flipFlopWorstCaseDuration) | |||
trigger := make(chan struct{}, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These triggers would be fired on put operations. As it is buffered the Put operation would block on the pullIndex iteration completion everytime. How would this work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see line 208, subscription_pull.go.
It is a select with a default, so no blocks
pkg/puller/puller.go
Outdated
p.metrics.LiveWorkerErrCounter.Inc() | ||
p.metrics.LiveWorkerErrCounter.Inc() | ||
|
||
if errors.Is(err, context.Canceled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is risky to do if the peer is not reachable. We will stay in this retry loop which could cause a lot of CPU usage. Maybe we should have some retry counter.
Also, I would add a log message to say which attempt it is when we call SyncInterval again. This can help us identify this problem.
Also unit test needs to be added for this behaviour.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we check a specific error, which is context cancellation, unreachable peers do not send a context cancelation error. The loop sleeps for 5 minutes when this errors is detected, it's not a tight spinning loop.
pkg/localstore/subscription_pull.go
Outdated
@@ -171,9 +167,9 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) | |||
defer db.pullTriggersMu.Unlock() | |||
|
|||
for i, t := range db.pullTriggers[bin] { | |||
if t == in { | |||
if t == trigger { | |||
db.pullTriggers[bin] = append(db.pullTriggers[bin][:i], db.pullTriggers[bin][i+1:]...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The channel can be closed here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that will trigger the above select, it should be okay like this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not so sure on removing the falling edge detection on subscription. The reasoning in the PR description that it is related to the node bootup does not have to be true. That is a protection for any situation where a large number of chunks are triggering the subscription to call the pullIndex.Iterate too frequently causing an extensive I/O. I think that removing this type of protections should be backed up with measurements.
4fde41d
to
a6a7ca9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a measure of precaution, I would suggest to add regression tests for all issues that these changes are addressing, for both validation and also for protection of reintroducing same issues in the future.
Additionally, I get this test case failure:
=== CONT TestDepthChange/move_peer_around
/Users/janos/go/projects/ethswarm.org/bee/pkg/puller/puller_test.go:440: got unexpected interval: [], want [[1 1]] bin 3
which passes on the master branch.
} | ||
return | ||
top, _, err := p.syncer.SyncInterval(ctx, peer, bin, from, pullsync.MaxCursor) | ||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as the comment above. We should ideally look for particular errors to restart, if we get terminal errors like stream reset etc we should quit early.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see comment above
}, | ||
pullSync: []mockps.Option{ | ||
mockps.WithCursors([]uint64{1}), | ||
mockps.WithSyncError(errors.New("sync error"))}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be better if this is a function which returns error for few times and then returns success. We should also test if this can restart correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test if this can restart correctly.
what do you mean ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will always return error from SyncInterval. Instead, we should return error and then on the next call we should succeed.
// bound to fail. | ||
ctxC, cancelC := context.WithTimeout(context.Background(), 10*time.Second) | ||
defer cancelC() | ||
if err := p.syncer.CancelRuid(ctxC, peer, ruid); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this being removed? This makes it a breaking change in the protocols.
@janos Can you add some brief description about this cancel ruid functionality? Maybe we are missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The whole pullsync cancel protocol is a way to signal a call to a context's cancel function for a particular syncing request. To avoid unnecessary wait in pullsync handler if the "client" syncing peer has problems storing intervals. Since stream termination (reset) can be detected only on their Read or Write methods, it was needed to have a mechanism to terminate other functions that happen in between if there is an error on the other peer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would add that if the CancelRuid functionality is removed, an alternative approach to possible goroutone leak is required and to cleanup the functionality completely as without CancelRuid function calls, the whole pullsync cancel protocol can be removed.
@@ -23,7 +23,6 @@ linters: | |||
- importas | |||
- ineffassign | |||
- misspell | |||
- nakedret |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have no preference on naked vs not-naked returns, but could not see a strong enough need in this pr to change the linter configuration, and consequently the coding policy. The changes with naked returns have no functional requirement, while naked returns could be a source of a subtle problems.
Checklist
Description
Fixes bugs tagged in the
reserve-bugs
branch.Some bugs are:
We also remove the falling edge detector, it was originally put in place because of high cpu usage during bootup as kademlia was establishing new connections, but the puller now waits for node warm up.
Open API Spec Version Changes (if applicable)
Motivation and Context (Optional)
Related Issue (Optional)
Screenshots (if appropriate):
This change is