Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fail_hard decorator for worker methods #6210

Merged
merged 18 commits into from
Apr 29, 2022
Merged

Conversation

mrocklin
Copy link
Member

@mrocklin mrocklin commented Apr 26, 2022

We add a decorator that we can apply to a worker method that closes the
worker if an exception is encountered in that method.

We add a decorator that we can apply to a worker method that closes the
worker if an exception is encountered in that method.

This is still a work in progress, and is just up here as a proof of
concept.
@mrocklin mrocklin changed the title Add fail_hard decorator for worker methods WIP: Add fail_hard decorator for worker methods Apr 26, 2022
Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any other methods that we think aren't covered here, but are common causes of issues

This should probably be on everything that gets called via loop.add_callback. In worker.py, I see that that's:

I'd also like to see this in core.py, but that's a bigger story:

It's interesting that transitions isn't on that list, because it just gets called indirectly, likely via handle_scheduler, gather_dep, etc. If we just catch the concurrency entrypoints, we wouldn't actually need to wrap these internal functions like transitions or validate_state.

In fact, I wonder if doing that could make things even worse. For example, a transitions error stemming from a compute-task op will propagate up to handle_scheduler. handle_scheduler will already log and reraise the error... but then try to reconnect to the scheduler (or close!).

If we add a @fail_hard to transitions, this code path will still exist. But while handle_scheduler is trying to reconnect or await self.close, a separate self.close callback will already be running in the background. So we're trying to reconnect and close at the same time. Which one is even proper behavior? (A while ago we discovered this reconnection logic is also broken #5480, so it's extra unclear what should happen.) But brittle as it is, reconnection usually works right now. If there's also a self.close running in the background, I bet it won't. And what if the attempt to reconnect causes the self.close to also throw an error? Now the worker is in an even more broken state, and it's still not closed.

I assume this isn't a special case to handle_scheduler, but that many other places could have similar race conditions.

I do want to see more structure to our concurrency. I do think errors happening at all these places you've annotated should cause the worker to close. But I'm worried that, with how tangled things are, adding more unstructured concurrency, well-meaning and reasonable as it seems, could sometimes make things even worse.

except Exception as e:
logger.exception(e)
# TODO: send event to scheduler
self.loop.add_callback(self.close, nanny=False, executor_wait=False)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if self.close encounters an error? :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we probably wouldn't close? I'm not sure that we can guarnatee against this case. Do you have some recommendation?

async def wrapper(self, *args, **kwargs):
try:
return await method(self, *args, **kwargs)
except Exception as e:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
except Exception as e:
except BaseException as e:

This would fix #5958 if you also wrapped execute (you should wrap execute either way I think).

The fact that BaseExceptions in callbacks aren't propagated by Tornado is pretty crazy. If we're going to add manual support for propagating exceptions like this, I don't see why we'd let BaseExceptions be ignored.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK with catching BaseException for tasks, i.e. in apply_function, et al. to fix #5958

However, I would be worried to close workers upon a asyncio.CancelledError. While I don't think we're using cancellation in many places right now, this would be a very confusing behavior if that ever changes.

@mrocklin
Copy link
Member Author

I do want to see more structure to our concurrency. I do think errors happening at all these places you've annotated should cause the worker to close. But I'm worried that, with how tangled things are, adding more unstructured concurrency, well-meaning and reasonable as it seems, could sometimes make things even worse

@gjoseph92 to be clear, are you suggesting that we shouldn't make this change? Or are you just bringing up the general concern that we're not certain about this change, but still think that something in this direction probably makes sense to do?

@fjetter
Copy link
Member

fjetter commented Apr 26, 2022

I think we might be mixing up a few things. I agree that dealing with all add_callbacks properly would be great but I would not go as far as to close the worker upon every exception, e.g. is a failed heartbeat cause for this escalation?

Putting this stuff into core.py / Server is also an interesting approach but I think I would prefer a more structured approach there that thinks about how we deal with asyncio overall. the same is true for PeriodicCallbacks. That all related obviously to #6201

I consider this specific change as a "very crude exception handler for state validation exceptions". For this I would consider it useful to cover

  • Worker.transitions
  • Worker.transition
  • Worker.validate_*
  • Worker.handle_*

@gjoseph92
Copy link
Collaborator

are you suggesting that we shouldn't make this change?

I don't think we should make the change currently in this PR. I think the sorts of race conditions I pointed out have the potential to make things even worse.

still think that something in this direction probably makes sense to do?

I think we can come up with something that works though. The real problem we're trying to solve is that Tornado drops exceptions from callbacks. Let's focus on that problem. A few ideas:

  • only use fail_hard on functions being passed into loop.add_callback. Probably via functools.partial instead of a decorator (to be sure, if the function is called directly somewhere, that it's not accidentally wrapped). Don't put it on transitions, for example. Whatever transitions error occurs will eventually propagate up to an add_callback point, if they're not handled above. Because we know anything running via add_callback cannot have error handling above it, we should be safe to implement our own.
  • use fail_hard wherever we want, but don't re-raise the exception. Even if there's exception handling logic above the call site, assume that we're always right and the worker should always close. This avoids race conditions with other error handlers that also might use add_callback. This is probably a little simpler and closer to the spirit of your PR.
  • If a synchronous function is passed to loop.add_callback and we want to fail_hard on it, fail_hard should wrap it in an async function, which just calls the sync function synchronously, but then does await self.close rather than loop.add_callback(self.close, ...). That way, we can handle errors from self.close. If the self.close call fails, we should do something drastic—probably os._exit(1) (I think sys.exit won't work because the BaseException it raises will just be ignored, though reading this in asyncio I'm not longer sure). Simply by being in fail_hard, something was already wrong, so if our last-ditch error handling is failing, we really need the worker to shut down immediately.
    • Or, if self.close is itself wrapped in fail_hard (with some recursion-preventing logic so that fail_hard won't call close again if close fails, but goes straight into the os._exit branch), then we can safely continue using loop.add_callback in fail_hard for synchronous functions, since if the close call fail eventually, we know it'll still result in an os._exit—we don't have to await around for that.

Or we could go in a different direction, and set the worker event loop's exception handler. We keep saying "Tornado drops exceptions", but actually it's not Tornado's fault. Tornado is just using asyncio, and asyncio drops exceptions by default. But we could customize that behavior, and have it shut down the worker. This the most correct and reliable thing to do, since it'll automatically apply to everything. But it also wouldn't be possible to do with gen_cluster tests where multiple workers share an event loop without first fixing #4959 (since the error handler would need to know which worker is currently "active" and needs to be closed).

@mrocklin
Copy link
Member Author

I think I agree with @fjetter here (or at least my interpretation of what @fjetter is saying) which is that I support avoiding add_callback more generally, but that I think that we can make incremental progress here. I think that it comes to this statement:

I think the sorts of race conditions I pointed out have the potential to make things even worse.

Yes, I agree that this is true. However, I also think that it is unlikely. In general worker.close calls are pretty good at closing the worker, and I don't anticipate any particular complication there. Do we have reason to anticipate something there? I think that I'm just generally a bit less concerned here (although that's a common theme 🙂 ).

I'm also trying to get something in by next release so that we can have a stop-gap measure for current state machine issues (and an easy way to provide future stop-gap solutions) while we spend time investigating.

Given that, I'm also open to other approaches. Let me go through some of the options above with the "let's get something in in the next day or two" timeline in mind.

  1. We could apply this to only gather_dep, which is the source of some issues, is already called in add_callback, and is async so we won't be calling add_callback any more than we already are. I think that this would meet Gabe's constraint of "no more add_callback" (or at least that's how I'm interpretting Gabe's concern).

    This is a special-case of what I'm hearing above in Gabe's statement,

    only use fail_hard on functions being passed into loop.add_callback

  2. Another option from Gabe:

    use fail_hard wherever we want, but don't re-raise the exception. Even if there's exception handling logic above the call site, assume that we're always right and the worker should always close. This avoids race conditions with other error handlers that also might use add_callback. This is probably a little simpler and closer to the spirit of your PR.

    I'd prefer to reject this for now. It's a large enough change that I suspect that it would have unforseen effects that I can't reason through completely in a short timeline. I think that it's a reasonable idea, but not with my current timeline constraint.

  3. If a synchronous function is passed to loop.add_callback and we want to fail_hard on it, fail_hard should wrap it in an async function, which just calls the sync function synchronously, but then does await self.close rather than loop.add_callback(self.close, ...)

    Eh, don't forget that these aren't sync in the client sense. They're normal def functions, but they're running in the event loop. There is no easy way to run async code within them.

  4. Asyncio exception handler. I like this idea. We could do a lot with it I think. I'd like to defer this to after this release though.

@mrocklin
Copy link
Member Author

To option 1, which was "just use this on gather_dep" I'll also add handle_scheduler and all of the handle methods. I think that the only cases where @fjetter and @gjoseph92 's constraints differ here is on internal functions like transitions and validation methods. I'm going to restrict to the shared subset so that, hopefully, this can go in quickly.

@mrocklin
Copy link
Member Author

OK, for now I've removed fail_hard on anything non-asynchronous. I don't actually necessarily think that this is the right approach long-term, but I'm fine starting here, and I think that it's ok to start with something agreeable and leave discussion for later.

@mrocklin mrocklin changed the title WIP: Add fail_hard decorator for worker methods Add fail_hard decorator for worker methods Apr 26, 2022
@mrocklin
Copy link
Member Author

Removing WIP title

@mrocklin
Copy link
Member Author

I've been playing with this with test_chaos_rechunk and it definitely causes it to be more noisy in ways that I think people here will appreciate.

@mrocklin
Copy link
Member Author

Hrm, nevermind. It's not capturing something failing in ensure_communicating yet (which should be captured because it's in handle_stream/handle_scheduler. 👀

@mrocklin
Copy link
Member Author

Yeah, there is still a bit of work to do here. It's fun to dive into this though. I'm hopeful that we can get both the chaos testing to be much more sensitive (with this and other recent scheduler-event signals) and hopefully also expose other things that have been going on at the same time.

@@ -1199,6 +1226,7 @@ async def heartbeat(self):
finally:
self.heartbeat_active = False

@fail_hard
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the decorator is here, what's the point of having this block anymore?

finally:
if self.reconnect and self.status in Status.ANY_RUNNING:
logger.info("Connection to scheduler broken. Reconnecting...")
self.loop.add_callback(self.heartbeat)
else:
logger.info(
"Connection to scheduler broken. Closing without reporting. Status: %s",
self.status,
)
await self.close(report=False)

The decorator is going to close the worker as soon as the finally block completes. So what's the point of trying to reconnect if we're going to close either way? These seem like opposing behaviors.

I think we want to either remove the try/except entirely from handle_scheduler (because all we want to do on error is close the worker, and @fail_hard will do that for us anyway), or not use @fail_hard here, if we do in fact still want to reconnect in the face of errors.

@gjoseph92
Copy link
Collaborator

It's not capturing something failing in ensure_communicating yet

First off, there's this:

except OSError:
# FIXME: This is silently ignored, is this intentional?
pass

Also, sometimes ensure_communicating can be scheduled as a callback from other entrypoints you might not be accounting for. For example, busy workers in gather_dep schedule self._readd_busy_worker to run later

self.io_loop.call_later(0.15, self._readd_busy_worker, worker)

which then calls ensure_communicating
@log_errors
def _readd_busy_worker(self, worker: str) -> None:
self.busy_workers.remove(worker)
self.ensure_communicating()

Same with find_missing, which is running in a PeriodicCallback.

Additionally, ensure_communicating gets called to handle UnpauseEvent:

@handle_event.register
def _(self, ev: UnpauseEvent) -> RecsInstrs:
"""Emerge from paused status. Do not send this event directly. Instead, just set
Worker.status back to running.
"""
assert self.status == Status.running
self.ensure_communicating()

I think the fact that _handle_instructions currently only processes SendMessageToScheduler and Execute means that ensure_communicating won't be called as part of a done_callback on an asyncio task. (If that were the case, the problem would be that though _handle_stimulus_from_task does check the task's result to capture any exception the task may have raised, if it does raise an unexpected exception, that exception will just be propagated to the ether of the event loop (because _handle_stimulus_from_task is just a callback).)

But UnpauseEvent is handled in the Worker.status setter. So any place we set self.status = Status.running, we could be also transitively calling ensure_communicating.


I think all that code-tracing above makes it clear that it's really hard, and tedious, to find every entrypoint to these functions we care about and add a @fail_hard to them. We'd much rather just add @fail_hard to the places we actually care about.

I think that this would meet Gabe's constraint of "no more add_callback" (or at least that's how I'm interpretting Gabe's concern)

I'm not too concerned about add_callback, because as I mentioned we can easily avoid it. I was specifically concerned about

try:
    return await method(self, *args, **kwargs)
except Exception:
    ...
    self.loop.add_callback(self.close, nanny=False, executor_wait=False)
    # ^ what if self.close fails in the callback? then we're letting the worker continue to run in a broken state.
    # to even reach this place in code, things that we thought were "unlikely" have already happened.
    # this is our ejector seat. nobody pulls the ejector seat unless the airplane's already seriously broken.
    # we need to make sure ejection still happens, even if something goes wrong in the process.
    raise  # <-- why re-raise? we're shutting down.

We are taking the very final action here of closing the worker if an error happens. What other error-handling logic could possibly need to run? Whatever error handling logic used to happen before this PR, it's irrelevant now, because the worker will be dead in the next few seconds.

use fail_hard wherever we want, but don't re-raise the exception

It's a large enough change that I suspect that it would have unforseen effects that I can't reason through completely in a short timeline. I think that it's a reasonable idea, but not with my current timeline constraint.

I guess I'm confused by this concern, because regardless of how it's implemented, this large change is what this PR is doing. Hopefully my comment about whether handle_scheduler should try to reconnect illustrates this. We're now shutting down the whole worker. We used to not do that. I don't think there's much to reason through—the worker will be shutting down, so existing error-handling logic will be superfluous. We could reraise, but that just adds even more to reason about (what will the error handlers above us do? Could they possibly conflict with self.close?).

@gjoseph92
Copy link
Collaborator

Anyway, this is how I'd feel most comfortable implementing it:

async def _force_close(self):
    try:
        await asyncio.wait_for(self.close(nanny=False, executor_wait=False), 30)
    except (Exception, BaseException):  # <-- include BaseException here or not??
        # Worker is in a very broken state if closing fails. We need to shut down immediately,
        # to ensure things don't get even worse and this worker potentially deadlocks the cluster.
        logger.critical(
            "Error trying close worker in response to broken internal state. "
            "Forcibly exiting worker NOW",
            exc_info=True
        )
        # use `os._exit` instead of `sys.exit` because of uncertainty
        # around propagating `SystemExit` from asyncio callbacks
        os._exit(1)


def fail_hard(method):
    """
    Decorator to close the worker if this method encounters an exception.
    """
    if iscoroutinefunction(method):

        @functools.wraps(method)
        async def wrapper(self, *args, **kwargs):
            try:
                return await method(self, *args, **kwargs)
            except Exception as e:
                logger.exception(e)
                # TODO: send event to scheduler
                await _force_close(self)
                
                # Could re-raise here if we wanted, but what's the point? We're closing.
                # Main argument would be if the caller expects a return value, it'll
                # fail anyway if we return None.

    else:
        @functools.wraps(method)
        def wrapper(self, *args, **kwargs):
            try:
                return method(self, *args, **kwargs)
            except Exception as e:
                logger.exception(e)
                # TODO: send event to scheduler
                self.loop.add_callback(_force_close, self)
                # ^ Normally, you'd never know if an `add_callback` is actually going to succeed.
                # But using `_force_close`, we can be confident the worker will get closed no matter what.

                # Could re-raise here if we wanted, but what's the point? We're closing.

    return wrapper

Then we could use @fail_hard on whatever functions we wanted. Ideally, we'd also remove the dead code of any try/except blocks around functions decorated with @fail_hard.

I do realize one good reason for re-raising the exception though: if we don't, the caller might immediately error anyway because we returned None instead of a real value. I still don't like letting other error handlers do unknown things while we're trying to close, but _force_close would make me pretty confident that the worker will shut down one way or another.

@github-actions
Copy link
Contributor

github-actions bot commented Apr 27, 2022

Unit Test Results

       16 files  +       4         16 suites  +4   7h 38m 22s ⏱️ + 2h 7m 27s
  2 741 tests +       4    2 655 ✔️ +       8       80 💤  -   10  6 +6 
21 829 runs  +5 439  20 776 ✔️ +5 169  1 047 💤 +264  6 +6 

For more details on these failures, see this check.

Results for commit 50c31ef. ± Comparison against base commit b837003.

♻️ This comment has been updated with latest results.

@fjetter
Copy link
Member

fjetter commented Apr 27, 2022

I would like to repeat what I said yesterday because I am surprised that so much of the conversation here was about add_callback

I do not think this PR is a good approach to structured concurrency. It is true that we are missing any proper means of dealing with exceptions. For the record, I tried asyncio exception handlers a long time ago already and it doesn't work because when using tornado, there are no unhandled exceptions. Tornado wraps all callbacks basically with a log_error and doesn't reraise.
IMO, this issue should not be about how to capture and deal with exceptions. This issue should be about "What is the exception handler for state validation errors".

Think pseudo code

try:
    some_code()
except StateCorruptionDetected:
    handle_state_corruption()

Even with structured concurrency or an asyncio exception handler we'd need to talk about "What happens to an unhandled state corruption error, assuming we can detect it".

What I heard so far, the requirements for handle_state_corruption is:

  • Log something
  • Try to tell the scheduler
  • Try to close gracefully (e.g. evict data)
  • Make sure this worker is dead

This handle_state_corruption is the body of the fail_hard decorator.

Regarding what this body should look like, I like the approach in #6210 (comment) using

  • Log a message
  • Send an event to the scheduler
  • Close gracefully with timeout (Don't wait for the executor since this can block the event loop #XXX; there is a PR about this)
  • If this fails or timeouts, kill the interpreter.

include BaseException here or not??

I don't see a reason why it should be included but at the same time I don't see a lot of harm


To the second question of where to handle this exception (task groups/whatever would be nice but we don't have them), I think it's the same problem as for all other exception handlers. Try to be as target with exception handling as possible. The places where I see the possibility for us to detect these state violation errors are

  • Worker.transitions
  • Worker.transition
  • Worker.validate_*
  • Worker.handle_*

Once we have a better approach to exception handling overall we may pull this to an outer layer but for now these are the functions that matter. I think all concerns about race conditions are moot if we're calling sys._exit

@fjetter fjetter linked an issue Apr 27, 2022 that may be closed by this pull request
Also set the chaos_rechunk test to sys.exit,
this to focus us on situations where another worker fails,
rather than this worker.
@mrocklin
Copy link
Member Author

OK, I've used the formulation recommended above by @gjoseph92 but modified to ...

  1. Send an event to the scheduler
  2. Not trigger if we're already closing. I think that we should maybe reverse this at some point, but for now I want us to focus on failures that are in normal operation.

Every 20 runs or so on my laptop I'm now getting the following error, which is great news!

Worker: tcp://127.0.0.1:36529
InvalidTransition: ('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895) :: released->missing
  Story:
    ("('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", 'ensure-task-exists', 'released', 'compute-task-1651095913.7592962', 1651095913.7619393)
    ("('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", 'released', 'fetch', 'fetch', {}, 'compute-task-1651095913.7592962', 1651095913.763571)
    ('gather-dependencies', 'tcp://127.0.0.1:37115', {"('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1824)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1859)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1900)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1812)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1851)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1883)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1827)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1816)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1847)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1844)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1868)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1839)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1430)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1450)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1803)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1891)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1848)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1852)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1907)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1903)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1880)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1892)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1864)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1899)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1867)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1836)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1840)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1807)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1832)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1855)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1828)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1831)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1908)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1808)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1904)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1860)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1911)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1875)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1804)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1856)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1896)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1390)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1884)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1871)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1872)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1820)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1835)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1879)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1800)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1876)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1823)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1811)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1863)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1819)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1815)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1887)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1843)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1888)"}, 'ensure-communicating-1651095913.7787864', 1651095913.7789662)
    ("('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", 'fetch', 'flight', 'flight', {}, 'ensure-communicating-1651095913.7787864', 1651095913.7791023)
    ('request-dep', 'tcp://127.0.0.1:37115', {"('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1824)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1859)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1900)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1812)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1851)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1883)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1827)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1816)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1847)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1844)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1868)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1839)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1430)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1450)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1803)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1891)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1848)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1852)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1907)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1903)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1880)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1892)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1864)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1899)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1867)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1836)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1840)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1807)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1832)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1855)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1828)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1831)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1908)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1808)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1904)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1860)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1911)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1875)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1804)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1856)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1896)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1390)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1884)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1871)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1872)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1820)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1835)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1879)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1800)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1876)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1823)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1811)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1863)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1819)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1815)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1887)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1843)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1888)"}, 'ensure-communicating-1651095913.7787864', 1651095913.780679)
    ("('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", 'flight', 'released', 'cancelled', {}, 'steal-58', 1651095913.8156393)
    ('receive-dep', 'tcp://127.0.0.1:37115', {"('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1824)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1859)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1900)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1812)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1851)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1883)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1827)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1816)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1847)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1844)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1868)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1839)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1430)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1450)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1803)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1891)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1848)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1852)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1907)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1903)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1880)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1892)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1864)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1899)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1867)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1836)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1840)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1807)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1832)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1855)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1828)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1831)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1908)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1808)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1904)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1860)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1911)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1875)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1804)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1856)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1896)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1390)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1884)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1871)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1872)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1820)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1835)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1879)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1800)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1876)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1823)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1811)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1863)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1819)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1815)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1887)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1843)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1888)"}, 'ensure-communicating-1651095913.7787864', 1651095913.8474061)
    ("('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", 'cancelled', 'memory', 'cancelled', {"('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)": 'released'}, 'ensure-communicating-1651095913.7787864', 1651095913.8480968)
    ("('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", 'release-key', 'ensure-communicating-1651095913.7787864', 1651095913.8481023)
    ("('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", 'cancelled', 'released', 'released', {"('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)": 'forgotten'}, 'ensure-communicating-1651095913.7787864', 1651095913.8481138)
    ("('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", 'released', 'forgotten', 'forgotten', {}, 'ensure-communicating-1651095913.7787864', 1651095913.8481185)
    ("('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", 'ensure-task-exists', 'released', 'compute-task-1651095919.0555253', 1651095919.0676618)
    ("('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", 'released', 'fetch', 'fetch', {}, 'compute-task-1651095919.0555253', 1651095919.0678046)
    ('gather-dependencies', 'tcp://127.0.0.1:45327', {"('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1891)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)"}, 'ensure-communicating-1651095919.1077273', 1651095919.107835)
    ("('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", 'fetch', 'flight', 'flight', {}, 'ensure-communicating-1651095919.1077273', 1651095919.1078584)
    ('request-dep', 'tcp://127.0.0.1:45327', {"('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1891)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)"}, 'ensure-communicating-1651095919.1077273', 1651095919.1186914)
    ("('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", 'flight', 'released', 'cancelled', {}, 'steal-194', 1651095919.2378554)
    ('receive-dep-failed', 'tcp://127.0.0.1:45327', {"('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 395)", "('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 369)", "('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 389)", "('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 387)", "('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 368)", "('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 392)", "('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 412)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1891)", "('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 373)", "('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 377)", "('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 385)", "('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 363)", "('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 370)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", "('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 372)", "('rechunk-split-2f03aa8100cc9be2cc9038da818a1e26', 337)", "('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 366)", "('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 398)", "('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 371)", "('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 367)"}, 'ensure-communicating-1651095919.1077273', 1651095919.5865068)
    ('missing-who-has', 'tcp://127.0.0.1:45327', "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", 'ensure-communicating-1651095919.1077273', 1651095919.5865407)
    ("('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", 'release-key', 'ensure-communicating-1651095919.1077273', 1651095919.5866287)
    ("('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", 'cancelled', 'released', 'released', {}, 'ensure-communicating-1651095919.1077273', 1651095919.5866582)
Worker: tcp://127.0.0.1:36529
InvalidTransition: ('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895) :: cancelled->missing
  Story:
    ("('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", 'ensure-task-exists', 'released', 'compute-task-1651095913.7592962', 1651095913.7619393)
    ("('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", 'released', 'fetch', 'fetch', {}, 'compute-task-1651095913.7592962', 1651095913.763571)
    ('gather-dependencies', 'tcp://127.0.0.1:37115', {"('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1824)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1859)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1900)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1812)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1851)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1883)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1827)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1816)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1847)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1844)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1868)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1839)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1430)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1450)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1803)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1891)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1848)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1852)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1907)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1903)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1880)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1892)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1864)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1899)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1867)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1836)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1840)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1807)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1832)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1855)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1828)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1831)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1908)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1808)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1904)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1860)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1911)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1875)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1804)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1856)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1896)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1390)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1884)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1871)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1872)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1820)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1835)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1879)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1800)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1876)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1823)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1811)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1863)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1819)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1815)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1887)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1843)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1888)"}, 'ensure-communicating-1651095913.7787864', 1651095913.7789662)
    ("('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", 'fetch', 'flight', 'flight', {}, 'ensure-communicating-1651095913.7787864', 1651095913.7791023)
    ('request-dep', 'tcp://127.0.0.1:37115', {"('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1824)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1859)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1900)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1812)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1851)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1883)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1827)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1816)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1847)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1844)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1868)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1839)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1430)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1450)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1803)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1891)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1848)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1852)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1907)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1903)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1880)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1892)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1864)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1899)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1867)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1836)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1840)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1807)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1832)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1855)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1828)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1831)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1908)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1808)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1904)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1860)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1911)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1875)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1804)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1856)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1896)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1390)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1884)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1871)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1872)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1820)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1835)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1879)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1800)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1876)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1823)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1811)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1863)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1819)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1815)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1887)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1843)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1888)"}, 'ensure-communicating-1651095913.7787864', 1651095913.780679)
    ("('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", 'flight', 'released', 'cancelled', {}, 'steal-58', 1651095913.8156393)
    ('receive-dep', 'tcp://127.0.0.1:37115', {"('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1824)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1859)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1900)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1812)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1851)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1883)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1827)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1816)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1847)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1844)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1868)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1839)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1430)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1450)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1803)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1891)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1848)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1852)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1907)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1903)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1880)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1892)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1864)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1899)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1867)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1836)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1840)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1807)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1832)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1855)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1828)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1831)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1908)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1808)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1904)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1860)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1911)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1875)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1804)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1856)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1896)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1390)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1884)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1871)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1872)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1820)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1835)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1879)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1800)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1876)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1823)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1811)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1863)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1819)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1815)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1887)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1843)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1888)"}, 'ensure-communicating-1651095913.7787864', 1651095913.8474061)
    ("('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", 'cancelled', 'memory', 'cancelled', {"('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)": 'released'}, 'ensure-communicating-1651095913.7787864', 1651095913.8480968)
    ("('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", 'release-key', 'ensure-communicating-1651095913.7787864', 1651095913.8481023)
    ("('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", 'cancelled', 'released', 'released', {"('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)": 'forgotten'}, 'ensure-communicating-1651095913.7787864', 1651095913.8481138)
    ("('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", 'released', 'forgotten', 'forgotten', {}, 'ensure-communicating-1651095913.7787864', 1651095913.8481185)
    ("('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", 'ensure-task-exists', 'released', 'compute-task-1651095919.0555253', 1651095919.0676618)
    ("('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", 'released', 'fetch', 'fetch', {}, 'compute-task-1651095919.0555253', 1651095919.0678046)
    ('gather-dependencies', 'tcp://127.0.0.1:45327', {"('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1891)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)"}, 'ensure-communicating-1651095919.1077273', 1651095919.107835)
    ("('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", 'fetch', 'flight', 'flight', {}, 'ensure-communicating-1651095919.1077273', 1651095919.1078584)
    ('request-dep', 'tcp://127.0.0.1:45327', {"('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1891)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)"}, 'ensure-communicating-1651095919.1077273', 1651095919.1186914)
    ("('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", 'flight', 'released', 'cancelled', {}, 'steal-194', 1651095919.2378554)
    ('receive-dep-failed', 'tcp://127.0.0.1:45327', {"('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 395)", "('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 369)", "('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 389)", "('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 387)", "('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 368)", "('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 392)", "('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 412)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1891)", "('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 373)", "('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 377)", "('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 385)", "('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 363)", "('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 370)", "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", "('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 372)", "('rechunk-split-2f03aa8100cc9be2cc9038da818a1e26', 337)", "('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 366)", "('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 398)", "('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 371)", "('rechunk-merge-42d027b592cefa31c0593225bdda7a8b', 0, 367)"}, 'ensure-communicating-1651095919.1077273', 1651095919.5865068)
    ('missing-who-has', 'tcp://127.0.0.1:45327', "('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", 'ensure-communicating-1651095919.1077273', 1651095919.5865407)
    ("('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", 'release-key', 'ensure-communicating-1651095919.1077273', 1651095919.5866287)
    ("('rechunk-split-42d027b592cefa31c0593225bdda7a8b', 1895)", 'cancelled', 'released', 'released', {}, 'ensure-communicating-1651095919.1077273', 1651095919.5866582)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../mambaforge/envs/dask-distributed/lib/python3.10/site-packages/tornado/ioloop.py:530: in run_sync
    return future_cell[0].result()
distributed/utils_test.py:1096: in coro
    await end_cluster(s, workers)
distributed/utils_test.py:937: in end_cluster
    check_invalid_worker_transitions(s)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

s = <Scheduler 'tcp://127.0.0.1:39109', workers: 0, cores: 0, tasks: 0>

    def check_invalid_worker_transitions(s: Scheduler) -> None:
        if not s.events.get("invalid-worker-transition"):
            return
    
        for timestamp, msg in s.events["invalid-worker-transition"]:
            worker = msg.pop("worker")
            print("Worker:", worker)
            print(InvalidTransition(**msg))
    
>       raise ValueError(
            "Invalid worker transitions found", len(s.events["invalid-worker-transition"])
        )
E       ValueError: ('Invalid worker transitions found', 2)

@mrocklin
Copy link
Member Author

Hrm, so this is interesting

    @fail_hard
    async def handle_scheduler(self, comm):
        try:
            await self.handle_stream(comm, every_cycle=[self.ensure_communicating])
        except Exception as e:
            logger.exception(e)
            raise
        finally:
            if self.reconnect and self.status in Status.ANY_RUNNING:
                logger.info("Connection to scheduler broken.  Reconnecting...")
                self.loop.add_callback(self.heartbeat)
            else:
                logger.info(
                    "Connection to scheduler broken. Closing without reporting.  Status: %s",
                    self.status,
                )
                await self.close(report=False)

Currently handle_scheduler tries to reconnect on an exception. This makes some sense. Comm errors are reasonable, and reconnection in that case is reasonable. However, it might be cleaner if we just bail and let the Nanny handle restart+reconnect on a failure. This would be a change in current behavior, but maybe ok.

@mrocklin
Copy link
Member Author

I should note that the error above isn't actually the error generated from these changes (the gather_dep errors are more rare) but some of the other changes, like using sys.exit rather than close_gracefully in the chaos test, and the cleanliness around closing well, seems to be letting these more relevant errors come up more easily.

If we merge this, this test is likely to start lighting up on CI. I think that I'm ok with that very short term. This seems like good pain to feel.

@gjoseph92
Copy link
Collaborator

Currently handle_scheduler tries to reconnect on an exception

Yeah, that's what I was asking about in https://github.com/dask/distributed/pull/6210/files#r859199361.

Comm errors are reasonable, and reconnection in that case is reasonable

except Exception as e is catching more than just comm errors though #5483 :) Reconnection in the case of state errors may not be as reasonable?

it might be cleaner if we just bail and let the Nanny handle restart+reconnect on a failure

I'm in favor of this, mostly because we've seen bugs with reconnection in the past: #5480 #5481 #5457.

@mrocklin
Copy link
Member Author

Yeah, I think that the question here is "should handle_scheduler handle some exceptions?" Are there some CommClosedErrors that we're actually ok with, and don't want to raise up to fail_hard, but actually want to handle locally?

Short term my guess is that we just want to cancel on everything and restart the worker. Longer term, I wouldn't be surprised if we want workers to be a little bit more resilient than this, but I also wouldn't be surprised if it goes the other direction.

@mrocklin
Copy link
Member Author

@fjetter as far as state machine failures go, I think that a lot of what I was seeing before were failures when we were shutting down. These are genuine and we should care about them, but recently I've shifted the chaos test to just use sys.exit, which makes us blind to these (which i think is ok short term).

As a result, if you're still looking at state machine issues I'd like to focus you on the failure in #6210 (comment) , which comes up consistently with this test, while the others don't. This one seems to happen to workers when other workers fail, which seems like something that we should care about much more.

@mrocklin
Copy link
Member Author

OK, so there is a behavior change here around reconnection. If we're ok with that then I think that this is good to go. I'm not feeling a huge rush to get this in, but wouldn't mind getting it in this release.

@mrocklin
Copy link
Member Author

To share my thoughts on that calculus, this does identify and resolve a state machine transition failure (and presumably resolve others as well). Because of this, I'd very much like to get it in.

I'm a little nervous about the reconnect change, but not that nervous.

As I think more about this, I think I'm more in favor of merging before the release. @gjoseph92 if you have time to look this over again I would welcome a 👍 or 👎

distributed/worker.py Show resolved Hide resolved
distributed/worker.py Show resolved Hide resolved
distributed/worker.py Show resolved Hide resolved
@fjetter
Copy link
Member

fjetter commented Apr 28, 2022

I'm on board with removing application layer reconnects and think we should replace it with a more robust network layer. I think there is tons of code we can clean up if we remove this feature, e.g.

if nbytes:
assert isinstance(nbytes, dict)
already_released_keys = []
for key in nbytes:
ts: TaskState = self.tasks.get(key) # type: ignore
if ts is not None and ts.state != "released":
if ts.state == "memory":
self.add_keys(worker=address, keys=[key])
else:
t: tuple = self._transition(
key,
"memory",
stimulus_id,
worker=address,
nbytes=nbytes[key],
typename=types[key],
)
recommendations, client_msgs, worker_msgs = t
self._transitions(
recommendations, client_msgs, worker_msgs, stimulus_id
)
recommendations = {}
else:
already_released_keys.append(key)
if already_released_keys:
if address not in worker_msgs:
worker_msgs[address] = []
worker_msgs[address].append(
{
"op": "remove-replicas",
"keys": already_released_keys,
"stimulus_id": stimulus_id,
}
)

However, I am nervous about this too and I would like to not rush this. I would definitely prefer a dedicated PR.

Is this absolutely required to get the fail hard in?

@mrocklin
Copy link
Member Author

Is this absolutely required to get the fail hard in?

I'll see if I can separate. I'll probably allow CommClosedErrors and OSErrors to pass through.

@mrocklin
Copy link
Member Author

However, I am nervous about this too and I would like to not rush this. I would definitely prefer a dedicated PR.
Is this absolutely required to get the fail hard in?

All is well now. The behavior is still there, and it's a bit cleaner now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Close worker when we discover an internal error
3 participants