Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly support restarting BatchedSend #5481

Closed
gjoseph92 opened this issue Oct 29, 2021 · 3 comments
Closed

Properly support restarting BatchedSend #5481

gjoseph92 opened this issue Oct 29, 2021 · 3 comments

Comments

@gjoseph92
Copy link
Collaborator

In #5480 we found that when Worker.batched_stream is restarted after a broken connection to the scheduler, it enters a broken state where send succeeds, but doesn't actually send data (it just sits in the buffer forever).

This is probably fixed by #5457, but I think it may deserve a more thorough fix. If we want BatchedSend to be restartable, it should have a clear interface and tests for this.

xref #4133 #4163 #5377

@fjetter
Copy link
Member

fjetter commented Nov 2, 2021

If we want BatchedSend to be restartable, it should have a clear interface [...]

How would that look like and what value would this deliver? So far, the restart by reconnect starts the BatchedComm again and the BatchedComm takes care of the rest. I'm wondering what value a "clear" interface would yield. It feels like every change to this interface would make the code using it more complex.

[...] and tests for this.

I personally consider the tests added in #5457 to be sufficient and if something is missing, let's discuss it over there.

  • Test for the worker disconnecting and restarting the batched comm. Ensure the payload is not lost and is properly submitted in the correct order after reconnect test_worker.py::test_dont_lose_payload_reconnect
  • The batched comm itself is ensured to retain any potentially lost payload if disconnected. upon reconnect the payload is properly submitted. test_batched.py:: test_retain_buffer_commclosed

what is missing?

@gjoseph92
Copy link
Collaborator Author

gjoseph92 commented May 5, 2022

Just for historical understanding—I believe this was sort of introduced in #3493. Before that, we were always creating a new BatchedSend instead of restarting the existing one:

self.batched_stream = BatchedSend(interval="2ms", loop=self.loop)
self.batched_stream.start(comm)

Basic assert statements and documentation in BatchedSend.start would have made it clear to future developers that BatchedSend.start could not be called multiple times and prevented this bug. That's what I meant by a clear interface. Then that PR wouldn't have been able to so easily misuse a 5-year-old API and introduce this subtle error condition.

@gjoseph92
Copy link
Collaborator Author

With worker reconnection removed, this is no longer necessary #6361. The API and internal validation should still be tightened up though: #6389.

@gjoseph92 gjoseph92 closed this as not planned Won't fix, can't repro, duplicate, stale May 24, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants