Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix batchedsend restart #6272

Closed
wants to merge 2 commits into from
Closed

Conversation

fjetter
Copy link
Member

@fjetter fjetter commented May 4, 2022

Supersedes #6266

@github-actions
Copy link
Contributor

github-actions bot commented May 5, 2022

Unit Test Results

       16 files  ±  0         16 suites  ±0   7h 17m 26s ⏱️ - 16m 6s
  2 761 tests +  4    2 677 ✔️ +  2       78 💤  - 1    6 +  3 
22 050 runs  +32  21 018 ✔️ +20  1 017 💤 +1  15 +11 

For more details on these failures, see this check.

Results for commit c93894f. ± Comparison against base commit 2286896.

♻️ This comment has been updated with latest results.

@fjetter
Copy link
Member Author

fjetter commented May 5, 2022

Locally it looks like the failing cases with test_flight_to_executing_via_cancelled_resumed are caused by #5910

Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this PR will be an improvement, as a hotfix for a current deadlock #6228.

However, it introduces a new potential bug. (And BatchedSend already has many existing bugs and foot-guns, enumerated in my review on #5457.)

Are we okay treating this as a hotfix and leaving this multiple-start bug for future work, or do we want to fix it here?


def start(self, comm):
self.please_stop = False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the BatchedSend isn't fully stopped, it's possible for restarting to create a second _background_send coroutine racing against the first. I would imagine this is a problem, not certain though.

  • If you call BatchedSend.abort() and then BatchedSend.start() without a (long enough) await in between
  • If you call BatchedSend.start() and the BatchedSend has just aborted, but the _background_send coroutine is currently waiting on self.waker.wait or self.comm.write.
  • If you just (accidentally) call BatchedSend.start() multiple times in a row when it hasn't been stopped already.

Again, this is the sort of thing I'd like to be checking for and validating against. Even if we don't currently have code that can trigger this error condition, it's all too easy for future maintainers to use the API in ways we don't currently expect, and then subtly break things in ways that are hard to debug and cause significant pain for users (just like this issue was). See #5481 (comment).

I don't think we can validate this properly without having a handle on the _background_send coroutine to check whether it's still running. We also need to decide what the desired behavior is if you call start() while the coroutine is still running and BatchedSend isn't closed yet, or if it's currently trying to close. Do we just error? Or do we need start() to be idempotent as long as it's passed the same comm each time?

yield self.stopped.wait(timeout=timeout)
if not self.comm.closed():
if self._background_task:
await self._background_task
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the background task failed, we'll raise an exception here instead of trying to flush the buffer. I assume that's okay, but should be documented.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

background task doesn't raise any exceptions

This currently deadlocks
Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple race conditions I'd like to fix. I find WorkerStatusPlugin weird, but can't decide if it's problematic enough to justify delaying this PR if everything else is otherwise fixed.

Also, can you remove this now-irrelevant block:

except CommClosedError:
# Batched stream send might raise if it was already closed
pass

Comment on lines 136 to 137
# To propagate exceptions, we rely on subsequent `BatchedSend.send`
# calls to raise CommClosedErrors.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# To propagate exceptions, we rely on subsequent `BatchedSend.send`
# calls to raise CommClosedErrors.
# The exception will not be propagated. Instead, users of `BatchedSend` are expected
# to be implementing explicit reconnection logic when the comm closes. Reconnection often
# involves application logic reconciling state (because messages buffered on the
# `BatchedSend` may have been lost), then calling `start` again with a new comm object.

This is no longer true.

@@ -125,50 +135,43 @@ def _background_send(self):
# This means that any messages in our buffer our lost.
# To propagate exceptions, we rely on subsequent `BatchedSend.send`
# calls to raise CommClosedErrors.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, are we confident nothing was relying on this behavior? Some places where we might be:

Probably important:

def worker_send(self, worker, msg):
"""Send message to worker
This also handles connection failures by adding a callback to remove
the worker on the next cycle.
"""
stream_comms: dict = self.stream_comms
try:
stream_comms[worker].send(msg)
except (CommClosedError, AttributeError):
self.loop.add_callback(
self.remove_worker,
address=worker,
stimulus_id=f"worker-send-comm-fail-{time()}",
)

for worker, msgs in worker_msgs.items():
try:
w = stream_comms[worker]
w.send(*msgs)
except KeyError:
# worker already gone
pass
except (CommClosedError, AttributeError):
self.loop.add_callback(
self.remove_worker,
address=worker,
stimulus_id=f"send-all-comm-fail-{time()}",
)

try:
self.scheduler.client_comms[c].send(
{"op": "pubsub-msg", "name": name, "msg": msg}
)
except (KeyError, CommClosedError):
self.remove_subscriber(name=name, client=c)

Less important:

try:
c.send(msg)
# logger.debug("Scheduler sends message to client %s", msg)
except CommClosedError:
if self.status == Status.running:
logger.critical(
"Closed comm %r while trying to write %s", c, msg, exc_info=True
)

def client_send(self, client, msg):
"""Send message to client"""
client_comms: dict = self.client_comms
c = client_comms.get(client)
if c is None:
return
try:
c.send(msg)
except CommClosedError:
if self.status == Status.running:
logger.critical(
"Closed comm %r while trying to write %s", c, msg, exc_info=True
)

try:
c.send(*msgs)
except CommClosedError:
if self.status == Status.running:
logger.critical(
"Closed comm %r while trying to write %s",
c,
msgs,
exc_info=True,
)

def add_worker(self, worker=None, **kwargs):
ident = self.scheduler.workers[worker].identity()
del ident["metrics"]
del ident["last_seen"]
try:
self.bcomm.send(["add", {"workers": {worker: ident}}])
except CommClosedError:
self.scheduler.remove_plugin(name=self.name)

except CommClosedError:
logger.info("Worker comm %r closed while stealing: %r", victim, ts)
return "comm-closed"

I have a feeling that removing these "probably important" code paths is actually a good thing (besides PubSub). It centralizes the logic for handling worker disconnects into handle_worker. If any of these places were previously raising CommClosedError, handle_worker should already have been calling remove_worker in its finally statement. Now we're not duplicating that.

@@ -72,13 +75,15 @@ def __repr__(self):

__str__ = __repr__
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update __repr__ to include self.name?


def start(self, comm):
if self._background_task and not self._background_task.done():
raise RuntimeError("Background task still running")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
raise RuntimeError("Background task still running")
raise RuntimeError("Background task still running for {self!r}")

Comment on lines 3503 to +3504
for comm in self.client_comms.values():
comm.abort()
await comm.close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be an asyncio.gather instead of serial?

def teardown(self):
self.bcomm.close()
async def close(self) -> None:
await self.bcomm.close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe teardown was never called before (it's not part of the SchedulerPlugin interface).

bcomm.close() is going to close the underlying comm object. I believe that's a comm to a client? Is that something WorkerStatusPlugin should actually be closing unilaterally, or should add_client for example be responsible for that here

finally:
if not comm.closed():
self.client_comms[client].send({"op": "stream-closed"})
try:
if not sys.is_finalizing():
await self.client_comms[client].close()
del self.client_comms[client]
if self.status == Status.running:
logger.info("Close client connection: %s", client)

Just want to point out that you're exposing a previously-dead codepath, which has some questionable behavior regardless of your changes in this PR.

I think it'll be okay as things stand right now (only because close won't be called in practice until the cluster is shutting down), but it feels a little weird. I don't get the sense that multiple BatchedSends are meant to be multiplexed onto a single comm object, which is what WorkerStatusPlugin is doing. Multiplexing the send side is probably okay, it's just odd to multiplex the close side.

@@ -2448,7 +2444,8 @@ def fast_on_a(lock):

await s.stream_comms[a.address].close()

assert len(s.workers) == 1
while len(s.workers) == 1:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
while len(s.workers) == 1:
while len(s.workers) != 1:

Is this what you meant? I feel like we do want to confirm that from the scheduler's perspective, the worker is gone at this point. Since a.heartbeat_active is True, the worker shouldn't try to reconnect to the scheduler yet. So I'm surprised len(s.workers) ever becomes >1 right now.

@@ -201,8 +200,7 @@ def wrapper(self, *args, **kwargs):
},
)
logger.exception(e)
else:
self.loop.add_callback(_force_close, self)
self.loop.add_callback(_force_close, self)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

xref: also handled in #6269

("", 1),
],
)
async def test_worker_batched_send_broken(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

xref #6268

@@ -2399,10 +2399,6 @@ async def test_hold_on_to_replicas(c, s, *workers):
await asyncio.sleep(0.01)


@pytest.mark.xfail(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exciting if this actually fixes these flaky tests! I'm not surprised they were failing before.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants