Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AMM: Graceful Worker Retirement #5381

Merged
merged 3 commits into from Feb 4, 2022
Merged

Conversation

crusaderky
Copy link
Collaborator

@crusaderky crusaderky commented Oct 1, 2021

Scheduler.retire_workers(), either when called from the client or from the adaptive scaler, can now be invoked safely in the middle of a computation.

Done

  • Reimplement method using the Active Memory Manager
  • retire_workers sets the worker status to closing_gracefully
  • Prevent workers that are closing_gracefully from running queued tasks (they will be reassigned when the worker shuts down)
  • paused workers never receive tasks from retiring workers
  • unit tests

Out of scope

  • Steal away tasks from a worker in paused or closing_gracefully status (xref Tasks are not stolen if worker runs out of memory #3761)
  • Adaptive scaler to effectively scale down in the middle of a computation
  • Automatically transition "paused" status to "closing_gracefully" after a certain timeout
  • Reimplement rebalance() and replicate() using the AMM

CC @mrocklin @jrbourbeau @fjetter

Comment on lines 348 to 423
be retired will be bombarded with ``Worker.get_data`` calls from the rest of the
cluster. This can be a problem if most of the managed memory of the worker has been
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually should not be a problem since the worker limits concurrent incoming get_data requests with distributed.worker.connections.incoming (default 10). Of course, this might not be sufficient...

@@ -1434,6 +1432,9 @@ async def close_gracefully(self, restart=None):
restart = self.lifetime_restart

logger.info("Closing worker gracefully: %s", self.address)
# Scheduler.retire_workers will set the status to closing_gracefully and push it
# back to this worker. We're additionally setting it here immediately to prevent
# race conditions during the round trip.
self.status = Status.closing_gracefully
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will trigger another message to the scheduler, won't it? How is this protecting from race conditions?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two use cases:

Use case 1: graceful shutdown initiated by nanny

  1. Worker sets Worker.status.closing_gracefully and sends updated status to Scheduler.
  2. Worker immediately afterwards invokes Scheduler.retire_workers
  3. retire_workers sets WorkerState.status = closing_gracefully. It's unknown which status update arrives first, if the one from (1) or the one from (2), and we shouldn't care.
  4. retire_workers sends notification of the updated status to the Worker, which will be superfluous as its status is already closing_gracefully
  5. The scheduler stops (a) assigning new tasks to the worker (b) rebalancing in-memory tasks onto the worker

Use case 2: graceful shutdown initiated by client or adaptive scaler

  1. retire_workers sets WorkerState.status = closing_gracefully and immediately stops (a) assigning new tasks to the worker (b) rebalancing in-memory tasks onto the worker. This is crucial to avoid accidentally closing a worker with unique in-memory tasks on it.
  2. retire_workers sends notification of the updated status to the Worker. This in turn causes the worker to stop processing queued tasks.

In both cases, queued tasks on the workers need to be stolen (still missing in this PR).

distributed/worker.py Outdated Show resolved Hide resolved
@crusaderky crusaderky changed the title WIP Graceful Worker Retirement WIP AMM: Graceful Worker Retirement Oct 15, 2021
@crusaderky crusaderky self-assigned this Oct 15, 2021
@crusaderky crusaderky changed the title WIP AMM: Graceful Worker Retirement AMM: Graceful Worker Retirement Oct 19, 2021
@crusaderky crusaderky force-pushed the AMM/RetireWorker branch 2 times, most recently from 1579834 to d529111 Compare October 19, 2021 13:50
@crusaderky crusaderky force-pushed the AMM/RetireWorker branch 3 times, most recently from 2142bb2 to 302fac5 Compare November 18, 2021 23:22
@crusaderky crusaderky closed this Nov 19, 2021
@crusaderky crusaderky reopened this Nov 19, 2021
@crusaderky crusaderky force-pushed the AMM/RetireWorker branch 4 times, most recently from 245978b to d19e737 Compare November 21, 2021 20:42
Comment on lines 700 to 730
# FIXME an unresponsive worker takes ~88s to be removed from the scheduler.
# The below setting seems to do nothing.
"distributed.scheduler.worker-ttl": "5s",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FIXME Can't find the setting that governs this timeout

@crusaderky
Copy link
Collaborator Author

Current status:

  • Code done, unit tests done, docs done
  • stress test distributed/tests/test_active_memory_manager.py::test_RetireWorker_stress is failing on desktop and in CI - which is very worrying. Most times it hangs; sometimes the computation breaks. Must investigate.
  • A few preexisting tests have become very flaky; must investigate:
    • distributed/tests/test_client.py::test_reconnect
    • distributed/tests/test_scheduler.py::test_retire_state_change
    • distributed/tests/test_steal.py::test_reschedule_concurrent_requests_deadlock
    • distributed/deploy/tests/test_adaptive.py::test_adapt_quickly
    • distributed/deploy/tests/test_adaptive.py::test_adaptive_local_cluster_multi_workers

@fjetter
Copy link
Member

fjetter commented Nov 30, 2021

  • distributed/tests/test_steal.py::test_reschedule_concurrent_requests_deadlock has been a known flaky test for a while
  • I also remember seeing distributed/tests/test_client.py::test_reconnect to be flaky outside of this PR (at least I run into it frequently when running things locally

@crusaderky crusaderky force-pushed the AMM/RetireWorker branch 3 times, most recently from 7e98474 to 9676934 Compare January 21, 2022 15:50
@crusaderky
Copy link
Collaborator Author

crusaderky commented Jan 21, 2022

I observed a random failure in test_adapt_quickly. Those are present in main too; it remains to be seen if this PR makes them more frequent.
See #5682

@crusaderky
Copy link
Collaborator Author

ALL GREEN except unrelated failures!
Waiting for linked PRs.
Also need to rerun the tests a few extra times to verify stability.

@crusaderky
Copy link
Collaborator Author

This PR is in draft status only because it incorporates #5653; it is otherwise ready for final review.

distributed/active_memory_manager.py Outdated Show resolved Hide resolved
distributed/active_memory_manager.py Outdated Show resolved Hide resolved


class RetireWorker(ActiveMemoryManagerPolicy):
"""Replicate somewhere else all unique in-memory tasks on a worker, preparing for
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to say this is excellent high-level documentation. I have a very good sense of the purpose of this and how it works without reading any code.


- At every iteration, this policy drops all tasks that have already been replicated
somewhere else. This makes room for further tasks to be moved out of the spill
file in order to be replicated onto another worker.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find myself wondering how much of a race condition there is here between the get_data requests from peers and the drop signals from the scheduler. Might be good to discuss this in more detail?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be safe. get_data will fail and the requesting worker will try again on the next worker that owns the data. ActiveMemoryManagerExtension._find_dropper makes sure that you don't accidentally drop the last replica of a key.

distributed/active_memory_manager.py Outdated Show resolved Hide resolved
@gen_cluster(
client=True,
Worker=Nanny,
config={
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be helpful to use a similar config as #5697 and only look at managed memory? (I know AMM currently is hardcoded to use optimistic memory, but that seems like an easy thing to make configurable. Just thinking about any future test flakiness due to unmanaged memory changes.)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, something for a future PR indeed


If ReduceReplicas is enabled,
1. On the first AMM iteration, either ReduceReplicas or RetireWorker (arbitrarily
depending on which comes first in the iteration of
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this particular test, won't ReduceReplicas always come first, since RetireWorker isn't added c.retire_workers gets called? Maybe you want three possibilities for active-memory-manager.policies:

  1. [ReduceReplicas, RetireWorker]
  2. [RetireWorker, ReduceReplicas]
  3. [RetireWorker]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ActiveMemoryManager.policies is a set, so the order of the policies is pseudo-random and not controllable from a unit test.

distributed/tests/test_active_memory_manager.py Outdated Show resolved Hide resolved
distributed/tests/test_active_memory_manager.py Outdated Show resolved Hide resolved
Comment on lines +1024 to +1026
# the time to migrate. This is OK if it happens occasionally, but if this
# setting is too aggressive the cluster will get flooded with repeated comm
# requests.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a problem in real-world clusters too?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. As the comment says, it's OK if once in a while a task takes more than 2 seconds to migrate; it could be problematic if many tasks over and over take longer than 2 seconds. I don't really have a pulse about how long it typically takes on a big busy cluster between get_data and the actual data arriving.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a busy cluster, it wouldn't surprise me if the full roundtrip (including the scheduler finding out the task had migrated) could take longer that 2s, especially when data is spilled to disk, and especially if multiple workers are retiring (scale-down).

I suppose we'll just have to try it and find out. I could see it becoming necessary to have some memory on the AMM and not drop self.pending every cycle (or store pending as prev for the next cycle, and remove any current suggestions that were in prev or something), but that will be a future PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we see this when it happens? Are there logs or other traces or would we be blind if that happened?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify: this is exclusively an issue if

  • a substantial amount of keys are not replicated within 2 seconds, AND
  • something other than the AMM, such as a computation or a new worker coming online, tilted the memory balance of the cluster enough that the AMM will take a substantially different decision from the previous one

In all other cases, including when some/most of the keys that the AMM asked to be replicated have already moved and increased the memory usage in the recipient, then the AMM will just take the same decision as before.

Again, it is OK if this happens occasionally - you'll just end up with a bunch of superfluous replications which will be reverted shortly by ReduceReplicas.

Can we see this when it happens?

At every AMM iteration, in the debug log you can read how many keys each worker is going to receive and drop. You may witness a worker that was previously unaffected suddenly loaded with replication requests.

e.g.
FIRST ITERATION:

distributed.active_memory_manager - DEBUG - Running policy: ReduceReplicas()
distributed.active_memory_manager - DEBUG - Running policy: RetireWorker('tcp://127.0.0.1:33379')
distributed.active_memory_manager - DEBUG - Enacting suggestions for 143 tasks:
distributed.active_memory_manager - DEBUG - - <WorkerState 'tcp://127.0.0.1:34809', name: tcp://127.0.0.1:34809, status: running, memory: 1, processing: 0> to acquire 95 replicas
distributed.active_memory_manager - DEBUG - - <WorkerState 'tcp://127.0.0.1:34810', name: tcp://127.0.0.1:34810, status: running, memory: 1, processing: 0> to acquire 48 replicas
distributed.active_memory_manager - DEBUG - Active Memory Manager run in 1ms

HEALTHY second iteration: most of the tasks have been replicated within 2s

distributed.active_memory_manager - DEBUG - Running policy: ReduceReplicas()
distributed.active_memory_manager - DEBUG - Running policy: RetireWorker('tcp://127.0.0.1:33379')
distributed.active_memory_manager - DEBUG - Enacting suggestions for 12 tasks:
distributed.active_memory_manager - DEBUG - - <WorkerState 'tcp://127.0.0.1:34809', name: tcp://127.0.0.1:34809, status: running, memory: 1, processing: 0> to acquire 12 replicas
distributed.active_memory_manager - DEBUG - Active Memory Manager run in 1ms

HEALTHY second iteration: network and/or the workers' event loop are slow, but nothing besides the AMM itself substantially changed the cluster balance

distributed.active_memory_manager - DEBUG - Running policy: ReduceReplicas()
distributed.active_memory_manager - DEBUG - Running policy: RetireWorker('tcp://127.0.0.1:33379')
distributed.active_memory_manager - DEBUG - Enacting suggestions for 135 tasks:
distributed.active_memory_manager - DEBUG - - <WorkerState 'tcp://127.0.0.1:34809', name: tcp://127.0.0.1:34809, status: running, memory: 1, processing: 0> to acquire 92 replicas
distributed.active_memory_manager - DEBUG - - <WorkerState 'tcp://127.0.0.1:34810', name: tcp://127.0.0.1:34810, status: running, memory: 1, processing: 0> to acquire 43 replicas
distributed.active_memory_manager - DEBUG - Active Memory Manager run in 1ms

UNHEALTHY second iteration: network and/or the workers' event loop are slow, and a computation suddenly saturated the first recipient, so the AMM now decides to load the second recipient a lot more. When (if) eventually the replicas on the first recipient finally go through, we'll end up with a bunch of duplicates which will be removed by ReduceReplicas.

distributed.active_memory_manager - DEBUG - Running policy: ReduceReplicas()
distributed.active_memory_manager - DEBUG - Running policy: RetireWorker('tcp://127.0.0.1:33379')
distributed.active_memory_manager - DEBUG - Enacting suggestions for 135 tasks:
distributed.active_memory_manager - DEBUG - - <WorkerState 'tcp://127.0.0.1:34809', name: tcp://127.0.0.1:34809, status: running, memory: 1, processing: 0> to acquire 20 replicas
distributed.active_memory_manager - DEBUG - - <WorkerState 'tcp://127.0.0.1:34810', name: tcp://127.0.0.1:34810, status: running, memory: 1, processing: 0> to acquire 115 replicas
distributed.active_memory_manager - DEBUG - Active Memory Manager run in 1ms

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only thing I'm worried about is that 2s is very small for key replication imo. network latencies, network bandwidth, (de-)serialization and diskIO all play together here. Especially if a single worker is required to offload a few GB of data that will very likely take more than 2s, wouldn't it?

Debug logs are pretty hidden but I'm not familiar enough with the system, yet, to make a better suggestion. I was wondering if there is a condition that would allow us to detect such a situation and then log a meaningful warning.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either way, we can defer this to another PR/issue. I'm mostly wondering if this is something we should follow up on

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could keep track of incomplete replications from previous iterations and don't reiterate them for e.g. 30 seconds

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussion continues at #5759

@crusaderky crusaderky marked this pull request as ready for review February 1, 2022 14:23
@crusaderky
Copy link
Collaborator Author

All review comments have been addressed

Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work @crusaderky! 👍

Comment on lines +6951 to +6957
# Sleep 0.01s when there are 4 tasks or less
# Sleep 0.5s when there are 200 or more
poll_interval = max(0.01, min(0.5, len(ws.has_what) / 400))
await asyncio.sleep(poll_interval)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. I forgot that done depended on general scheduler state, not just the state of the RetireWorker instance. So the done state can flip even when the policy isn't actively doing anything.

Comment on lines +1024 to +1026
# the time to migrate. This is OK if it happens occasionally, but if this
# setting is too aggressive the cluster will get flooded with repeated comm
# requests.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a busy cluster, it wouldn't surprise me if the full roundtrip (including the scheduler finding out the task had migrated) could take longer that 2s, especially when data is spilled to disk, and especially if multiple workers are retiring (scale-down).

I suppose we'll just have to try it and find out. I could see it becoming necessary to have some memory on the AMM and not drop self.pending every cycle (or store pending as prev for the next cycle, and remove any current suggestions that were in prev or something), but that will be a future PR.

@github-actions
Copy link
Contributor

github-actions bot commented Feb 1, 2022

Unit Test Results

       15 files   -        3         15 suites   - 3   8h 25m 3s ⏱️ - 1h 6m 43s
  2 591 tests +       9    2 507 ✔️ +       7       81 💤 +    1  3 +1 
19 229 runs   - 3 875  17 934 ✔️  - 3 554  1 292 💤  - 322  3 +1 

For more details on these failures, see this check.

Results for commit 53ae7bc. ± Comparison against base commit 30ffa9c.

♻️ This comment has been updated with latest results.

@crusaderky
Copy link
Collaborator Author

All unit test failures are unrelated.
@fjetter am I clear to merge?

@fjetter
Copy link
Member

fjetter commented Feb 2, 2022

There is a merge conflict. I'll have a very brief look over the code. unless anything big jumps out, we're good to go.

**Failure condition**

There may not be any suitable workers to receive the tasks from the retiring worker.
This happens in two use cases:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this respect resources or host restrictions?

Copy link
Collaborator Author

@crusaderky crusaderky Feb 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't. The current implementation of retire_workers (which in turn uses replicate) doesn't, either.

It would be good to hear from the GPU people who use a hybrid cluster if they have a use case of GPU data that will fail to unpickle on non-GPU workers (different answers for different libraries I guess?), but then again, nobody complained until today so I guess the answer is no? At any rate this PR does not introduce a regression.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was worried about GPUs as well but I agree this is not our concern right now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @quasiben for visibility.

If we need "Respect resources and restrictions in retirement" we should file an extra ticket.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the current resources design is fit for purpose.
Take for example a typical use of resources, which is to run a task that requires huge heap and/or dependencies. This says nothing about the size of the output.
e.g. the task could be annotated with resources={"MEM": 20e9}, but its output could be a handful of MB.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'm not entirely sure if resources are perfect. host and worker restrictions, maybe.

@fjetter
Copy link
Member

fjetter commented Feb 2, 2022

LGTM. Remaining questions should not stop merging

@crusaderky
Copy link
Collaborator Author

In it goes!

@crusaderky crusaderky merged commit 3cc145b into dask:main Feb 4, 2022
@crusaderky crusaderky deleted the AMM/RetireWorker branch February 4, 2022 10:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants