Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not drop BatchedSend payload if worker reconnects #5457

Closed

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented Oct 22, 2021

If the scheduler<->worker stream closes during a background send or a background send is attempted although the connection was already closed, we'd lose any payload data tried to be submitted during that iteration.

I could trace the flakiness of test_worker_reconnects_mid_compute_multiple_states_on_scheduler back to this condition.

By simply prepending the payload to the buffer upon failure, there worker has a chance to resubmit this upon reconnect since the BatchedSend instance is the same

Closes #5377

  • Dedicated unit test

@fjetter
Copy link
Member Author

fjetter commented Oct 22, 2021

I spoke too soon, it seems. CI is still failing for test_worker_reconnects_mid_compute_multiple_states_on_scheduler even though I can no longer reproduce the timeout with this fix locally :(

https://github.com/dask/distributed/runs/3976598171?check_suite_focus=true

EDIT: Didn't see it again. I'm inclined to ignore this instance since other deadlocky fixes were merged on main and are part of this now

@fjetter fjetter force-pushed the dropped_payload_messages_worker_reconnect branch from b4f2855 to bad077e Compare October 25, 2021 08:23
distributed/batched.py Show resolved Hide resolved
distributed/tests/test_worker.py Outdated Show resolved Hide resolved
Comment on lines +3225 to +3227
# Payload that couldn't be submitted is prepended
assert len(after) >= len(before)
assert after[: len(before)] == before
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If everything went through before the close, then you're not testing anything useful.
From what I can read here it might as well be what happens every time.
The opposite might also be true - everything goes through AFTER the close.

Could you change to

assert before)
assert len(after) > len(before)

while possibly drastically increasing the number of messages in the pipeline to avoid flakiness?

Copy link
Member Author

@fjetter fjetter Oct 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. assert before is already asserted but as assert len(w.batched_stream.buffer) == 100 in L3218
  2. Increasing the number of messages sent should not affect anything since the batched send is implemented as all or nothing
  3. Actually the most probably scenario is after == before since you only get a difference if something in the scheduler sent another message while the background send is awaiting the write to the closed comm which is an incredibly tiny time window. At first I had the equal assert only but that caused this test to fail every now and then. I added the larger equal to avoid flakiness. Equal is fine. after > before is fine iff the first len(before) messages are the same as before.

If the payload would've been submitted before closing, the buffer would be empty before I am trying to send.

If we want to absolutely have > the only way I see is to schedule futures trying to send smth simultaneously to the await background_send in L 3220 but that would render our buffer assert impossible since we no longer can say for sure what the state before actually was.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explicitly call batched_stream.send(..) after the comm has been closed, and test that it has the correct behavior? (As I mentioned in another comment I don't know what this behavior should be; raise an error or just add to the buffer?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's been done as part of test_send_before_close. I didn't change the behaviour but extended the test a bit. The behaviour is as follows

  1. If a send is successful, the message is appended to the buffer. The BatchedSend is now responsible for delivery. The responsibility of the server is to check periodically and restart by providing a new comm if necessary.
  2. If an exception is raised, the code should raise immediately and not append to the buffer. Whatever logic was trying to send a message can then decide how the failure should be handled.

To me, that's sensible behaviour

Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this PR will close #5480 as it stands, which is great. However I do still think some cleanup of BatchedSend would be worthwhile to ensure other usage errors like it don't come up in the future.

What I meant about a "clear interface" in #5481 was just that the function contracts and invariants around how BatchedSend should behave are not well documented, and in most cases not validated by the functions themselves. This leads to effectively undefined behavior. Before this PR, calling start multiple times could be called undefined behavior. Now, it has the behavior we want, and there are tests for it, but I'd still say it's not that well defined, since there isn't yet:
a. documentation for it
b. validation for it within the class, to prevent future developers from misusing this behavior somehow
c. tests for the validation

In these comments I'm noting some things about BatchedSend that I think should be documented and tightened up. Most of them aren't part of the changes you've made here, but hopefully explain what I mean by wanting a clearly defined interface.

@@ -58,6 +58,7 @@ def __init__(self, interval, loop=None, serializers=None):

def start(self, comm):
self.comm = comm
self.please_stop = False
self.loop.add_callback(self._background_send)

def closed(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please_stop and stopped should probably factor into this as well

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think this check is insufficient for telling whether we're closed or not. While we're in the process of closing (please_stop is True, or self.stopped is set, or _background_send is no longer running, etc.), it may still return True.

From the way this is used though, in both start and write, we probably want to treat "in the process of closing" as closed, not as running. Restarting a BatchedSend that's closing should be an error. If writing to a closed BatchedSend is an error, then so should be writing to one that's in the process of closing.

payload = None # lose ref
# If anything failed we should not lose payload. If a new comm
# is provided we can still resubmit messages
self.buffer = payload + self.buffer
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure we should retain payload? Couldn't those messages have been successfully sent, and the error occurred after? Then we might be duplicating them when we reconnect.

Though maybe we'd rather duplicate messages than drop them. In that case, let's add a note saying so.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point I am assuming a certain behaviour of the comm. Either the comm writes all or nothing. That's likely not always true but I believe we cannot do much about it on this level of abstraction. imho, that guarantee should be implemented by our protocol and/or Comm interface.

Either way, I'm happy if you have any suggestions to improve this

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reading through TCP comm code, I don't think there's anywhere for an exception to happen after all of the payload has been sent. An exception could still happen when part has been sent and part hasn't, but either way, since we have to assume something here, I think it's more useful to assume that the payload hasn't been sent.

@jcrist and I were discussing this, and given the way the BatchedSend interface works, it actually needs to implement some sort of protocol with an ack from the receiver for each sent batch to guarantee messages can't be dropped. Since send on a BatchedSend is nonblocking, the caller is basically handing off full responsibility for the message to BatchedSend. If the message fails to send, it's too late to raise an error and let the caller figure out what to do about it—once we've been given a message, we have to ensure it's delivered. So the logical thing to do would be for the receiving side to ack each message, and only when ack'd does the sender drop the payload (with some deduplication of course).

OTOH there are lots of protocols out there for doing things like this, more performantly, robustly, and with better testing than we'll ever have. As with other things (framing in serialization), maybe the better solution is to stop duplicating functionality at the application level that should be the transport layer's job.

Could we get rid of BatchedSend entirely with some well-tuned TCP buffering settings + a serialization scheme that was more efficient for many small messages?

@@ -152,6 +155,7 @@ def close(self, timeout=None):
self.please_stop = True
self.waker.set()
yield self.stopped.wait(timeout=timeout)
payload = []
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could move this below the if not self.comm.closed():

@@ -58,6 +58,7 @@ def __init__(self, interval, loop=None, serializers=None):

def start(self, comm):
self.comm = comm
self.please_stop = False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.please_stop = False
self.please_stop = False
self.stopped.clear()
self.waker.clear()

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean by a "clear interface" is that I'd like a docstring for start saying the contract for what it can/can't do. For example, that start on a closed BatchedSend will restart it with a new comm. And that start on a currently-running BatchedSend raises an error, etc. What are the invariants for this internal state? How do all the methods ensure they are upheld?

@@ -1238,6 +1238,7 @@ async def handle_scheduler(self, comm):
comm, every_cycle=[self.ensure_communicating, self.ensure_computing]
)
except Exception as e:
self.batched_stream.please_stop = True
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we not mutate internal state of BatchedSend? This feels brittle. I see this is going to do something slightly different from batched_stream.close(). If we need the ability to do both things, let's have a method for doing whatever this is as well (request_stop?).

Though I'd prefer having just one close() function. If the behavior of the current close isn't useful to calling code, perhaps we should change its behavior to something that is useful.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was actually not even necessary. The BatchedSend infers automatically that the Comm is closed and there is no need to close anything

pass
# If we're closing and there is an error there is little we
# can do about this to recover.
logger.error("Lost %i payload messages.", len(payload))
yield self.comm.close()

def abort(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should abort be part of the public interface? When/how should callers use abort vs close? What can and cannot be done with a BatchedSend after abort (or close) has been called? There are all things I'd like to see documented.

@@ -121,7 +125,6 @@ def _background_send(self):
# there was an exception when using `comm`.
# We can't close gracefully via `.close()` since we can't send messages.
# So we just abort.
# This means that any messages in our buffer our lost.
# To propagate exceptions, we rely on subsequent `BatchedSend.send`
# calls to raise CommClosedErrors.
self.stopped.set()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't comment there, but this is a comment for send:

        if self.comm is not None and self.comm.closed():
            raise CommClosedError(f"Comm {self.comm!r} already closed.")

This check feels insufficient. A more thorough check would have caught #5481. Should we also check not self.closed()? not self.please_stop? That the _background_send coroutine is still running? Things like that? It seems inconsistent to prohibit sends when the underlying comm is closed, but still allow them if the BatchedSend itself is effectively closed.

On the other hand, we may be relying on the fact that messages can be enqueued to a BatchedSend even when it's closed. If we expect the BatchedSend to be restarted and reused soon, perhaps there should be no restrictions on when send can be called.

I'd like to:

  1. Figure out exactly what the behavior is we actually want
  2. Write a docstring explaining the contract for when send can and cannot be called, how it behaves in the case of an open vs a closed underlying comm, an open vs closed BatchedSend, etc.
  3. Make the code match this contract
  4. Test that it does so

@@ -58,6 +58,7 @@ def __init__(self, interval, loop=None, serializers=None):

def start(self, comm):
self.comm = comm
self.please_stop = False
self.loop.add_callback(self._background_send)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One downside of using Tornado here is that we have no handle on this coroutine. If we did asyncio.create_task or similar, we could keep a Task object here.

  1. That might let us simplify the please_stop/stopped/abort logic by just letting us do task.cancel() and handling the CancelledError within _background_send. We would probably want to shield the comm.write call from cancellation if we did so.
  2. We could assert in send and elsewhere that the _background_send Task is not done(). If we're also checking please_stop, etc. this may be overkill, but it's a nice sanity check that something hasn't gone horribly wrong. The fact that the coroutine wasn't running was the cause of Properly support restarting BatchedSend #5481; a check for this would have made the bug much easier to find.

Comment on lines +3225 to +3227
# Payload that couldn't be submitted is prepended
assert len(after) >= len(before)
assert after[: len(before)] == before
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explicitly call batched_stream.send(..) after the comm has been closed, and test that it has the correct behavior? (As I mentioned in another comment I don't know what this behavior should be; raise an error or just add to the buffer?)

assert len(after) >= len(before)
assert after[: len(before)] == before

await w.heartbeat()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this is to make the worker reconnect to the scheduler. Is there a way we could wait for that to happen naturally, to test that it does so properly? Or do we have other good tests around worker reconnection that we're confident in?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the heartbeat triggers a reconnect. There are
test_worker_reconnects_mid_compute
and
test_worker_reconnects_mid_compute_multiple_states_on_scheduler
which test that the worker reconnects and corrects the scheduler state accordingly if any tasks are on the cluster. I consider testing the reconnect itself out of scope for this.

@fjetter fjetter force-pushed the dropped_payload_messages_worker_reconnect branch from 291de5d to 5795c5b Compare November 4, 2021 17:54
Comment on lines +295 to +303
await stream.write(b"")
# FIXME: How do I test this? Why is the stream closed _sometimes_?
# Diving into tornado, so far, I can only confirm that once the
# write future has been awaited, the entire buffer has been written
# to the socket. Not sure if one loop iteration is sufficient in
# general or just sufficient for the local tests I've been running
await asyncio.sleep(0)
if stream.closed():
raise StreamClosedError()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixing this problem, reusing the buffer, etc. is relatively easy to do assuming we can rely on the Com.write to raise if something is wrong. Tornado ensures that if we await BaseIOStream.write, the data is written to the socket. I dove into the code and can confirm this. To the best of my knowledge, that's working properly.
However, there is a timewindow where the data is written to the socket, the connection is closed, the Comm.write returns control but the data was never submitted. I can catch these situations for my local unit tests by waiting a loop iteration but I'm not sure what the mechanism behind this is and whether or not this is reliable.

cc @jcrist @gjoseph92

FWIW, I think even without this asyncio.sleep(0) this fix should fix >99% of our problems. For a full fix we might need to reconsider the possibility for workers to reconnect statefully or introduce another way to guarantee message delivery.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fjetter to rephrase what you're saying:

  • awaiting the stream.write is a big improvement, because it means that comm.write doesn't return until the message has been successfully written to the socket buffer
  • That's still not enough to guarantee the message has actually been sent, because the socket could get closed after the message is handed off the buffer, but before it's actually sent over the network.

For a full fix we might need to reconsider the possibility for workers to reconnect statefully or introduce another way to guarantee message delivery.

I agree. Like I mentioned in another comment, I think we need a better protocol (either application-level or more careful use of TCP) to guarantee all messages are delivered through reconnects.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what I figured but I was confused why I never encountered this for ordinary comms.

"""
if self.closed():
if comm.closed():
raise RuntimeError("Comm already closed.")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
raise RuntimeError("Comm already closed.")
raise RuntimeError(f"Tried to start BatchedSend with an already-closed comm: {comm!r}.")

raise RuntimeError("Comm already closed.")
self.comm = comm
self.please_stop = False
self.loop.add_callback(self._background_send)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If self.closed is True, can we be certain this callback is not already running? Two _background_sends running at the same time might make a mess. I think there's currently a race condition:

  • _background_send is awaiting self.waker.wait
  • comm gets closed (nothing happens to BatchedSend state at this point; a closed comm is only detected once a write to it fails)
  • start is called with a new comm, launching a new _background_send coroutine
  • both coroutines are now blocking on self.waker.wait; it's a race condition which gets awakened first
  • both can run validly; from their perspective, nothing even happened (the comm was switched out under their noses while they were sleeping)

I'm not actually sure if, with the exact way the code is written now, two coroutines running at once can actually do something bad, but it still seems like a bad and brittle situation. "only one _background_send is running at once" feels like a key invariant of this class to me.

This is another way in which having an asyncio Task handle on the _background_send might make things easier to reason about #5457 (comment)

if comm.closed():
raise RuntimeError("Comm already closed.")
self.comm = comm
self.please_stop = False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.please_stop = False
self.please_stop = False
self.stopped.clear()

although I also find the use of stopped a little odd, see the comment on cancel

@@ -58,6 +58,7 @@ def __init__(self, interval, loop=None, serializers=None):

def start(self, comm):
self.comm = comm
self.please_stop = False
self.loop.add_callback(self._background_send)

def closed(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think this check is insufficient for telling whether we're closed or not. While we're in the process of closing (please_stop is True, or self.stopped is set, or _background_send is no longer running, etc.), it may still return True.

From the way this is used though, in both start and write, we probably want to treat "in the process of closing" as closed, not as running. Restarting a BatchedSend that's closing should be an error. If writing to a closed BatchedSend is an error, then so should be writing to one that's in the process of closing.

payload = None # lose ref
# If anything failed we should not lose payload. If a new comm
# is provided we can still resubmit messages
self.buffer = payload + self.buffer
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reading through TCP comm code, I don't think there's anywhere for an exception to happen after all of the payload has been sent. An exception could still happen when part has been sent and part hasn't, but either way, since we have to assume something here, I think it's more useful to assume that the payload hasn't been sent.

@jcrist and I were discussing this, and given the way the BatchedSend interface works, it actually needs to implement some sort of protocol with an ack from the receiver for each sent batch to guarantee messages can't be dropped. Since send on a BatchedSend is nonblocking, the caller is basically handing off full responsibility for the message to BatchedSend. If the message fails to send, it's too late to raise an error and let the caller figure out what to do about it—once we've been given a message, we have to ensure it's delivered. So the logical thing to do would be for the receiving side to ack each message, and only when ack'd does the sender drop the payload (with some deduplication of course).

OTOH there are lots of protocols out there for doing things like this, more performantly, robustly, and with better testing than we'll ever have. As with other things (framing in serialization), maybe the better solution is to stop duplicating functionality at the application level that should be the transport layer's job.

Could we get rid of BatchedSend entirely with some well-tuned TCP buffering settings + a serialization scheme that was more efficient for many small messages?


if self.comm:
self.comm.abort()
yield self.close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why close and not abort?

  1. It's weird that calling close from user code will trigger the coroutine to call back into close
  2. Most of the things close does are already guaranteed to have happened by this point:
    • self.please_stop must be True to reach this line of code
    • self.stopped was just set a couple lines above
    • The comm was just aborted (not sure how that compares to await comm.close() though)

Not that abort is any better. The only thing it would do that hasn't already been done by this point is set self.comm = None.

I actually think we should move all shut-down/clean-up logic into the coroutine here, remove abort entirely, and have close() just be a function that tells the coroutine to please_stop and waits until it does. There's a lot of inconsistency from having shutdown logic in three different places.

self.buffer = []
self.please_stop = True
self.waker.set()
if not self.comm.closed():
self.comm.abort()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment for both here and close(): why should the BatchedSend close/abort the comm when it's closed? Does the caller expect that, when calling start, it's handed off lifecycle responsibilities for the comm to BatchedSend? If so, that should be documented ("once you call start, you must not directly use the comm object ").

@@ -290,7 +292,15 @@ async def write(self, msg, serializers=None, on_error="message"):
stream._total_write_index += each_frame_nbytes

# start writing frames
stream.write(b"")
await stream.write(b"")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused, why were we not awaiting this before?!

I traversed through the blames and it looks like https://github.com/dask/distributed/pull/661/files#diff-581957c552b88dd04319efca1429c0c0827daff509a115542e70b661c4c04914R235 5 years ago is where we stopped awaiting this future. There was even some conversation about it: #661 (comment), xref #653.

Seems like there were some issues around concurrent writes to the same stream from multiple coroutines producing orphaned futures. I wonder if that's still the case?

Comment on lines +295 to +303
await stream.write(b"")
# FIXME: How do I test this? Why is the stream closed _sometimes_?
# Diving into tornado, so far, I can only confirm that once the
# write future has been awaited, the entire buffer has been written
# to the socket. Not sure if one loop iteration is sufficient in
# general or just sufficient for the local tests I've been running
await asyncio.sleep(0)
if stream.closed():
raise StreamClosedError()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fjetter to rephrase what you're saying:

  • awaiting the stream.write is a big improvement, because it means that comm.write doesn't return until the message has been successfully written to the socket buffer
  • That's still not enough to guarantee the message has actually been sent, because the socket could get closed after the message is handed off the buffer, but before it's actually sent over the network.

For a full fix we might need to reconsider the possibility for workers to reconnect statefully or introduce another way to guarantee message delivery.

I agree. Like I mentioned in another comment, I think we need a better protocol (either application-level or more careful use of TCP) to guarantee all messages are delivered through reconnects.

@@ -333,6 +343,8 @@ def abort(self):
stream.close()

def closed(self):
if self.stream and self.stream.closed():
self.abort()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems odd to me that checking closed on a comm could have a side effect?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, technically this sets self.stream = None and sets _closed = True. I wouldn't consider this side effects

@fjetter
Copy link
Member Author

fjetter commented Nov 29, 2021

I will close this PR. I am no longer convinced that any payload should be reused. Instead, I believe that the handshake upon reconnect should settle any state drift. Either way, this requires a bit more work upfront

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

test_worker_reconnects_mid_compute_multiple_states_on_scheduler flaky
3 participants