Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poor work scheduling when cluster adapts size #4471

Open
chrisroat opened this issue Jan 30, 2021 · 12 comments · May be fixed by #4925, #6115 or #4920
Open

Poor work scheduling when cluster adapts size #4471

chrisroat opened this issue Jan 30, 2021 · 12 comments · May be fixed by #4925, #6115 or #4920
Labels
adaptive All things relating to adaptive scaling stealing

Comments

@chrisroat
Copy link
Contributor

What happened:

When a cluster is autoscaled in increments while running, as can happen in a GKE cluster, the work concentrates on few workers and uses the cluster inefficiently. This seems to be worse when there are long running tasks. The example below simulates this by adjusting a local cluster's size as it is processing a graph with 10-second tasks.

The image below shows the final look of the task graph, and the animated gif shows the status screen as the cluster processes the graph. Many workers do zero long tasks, and only a few workers seem to be fully utilised.

If the cluster is initially set to 20 workers with no changes, work is distributed evenly and all workers are efficiently used.

@fjetter In this example, the workers under load are green a lot.

What you expected to happen:

After the cluster scales up, the work should be evenly divided among all the workers.

Minimal Complete Verifiable Example:

import time
import distributed
import dask.array as da

client = distributed.Client(n_workers=1, threads_per_worker=1, memory_limit='1G')
print(client.dashboard_link)

# Slow task.
def func1(chunk):
    if sum(chunk.shape) != 0: # Make initialization fast
        time.sleep(10)
    return chunk

def func2(chunk):
    return chunk

data = da.zeros((30, 30, 30), chunks=5)
result = data.map_overlap(func1, depth=1, dtype=data.dtype)
result = result.map_overlap(func2, depth=1, dtype=data.dtype)
future = client.compute(result)

print('started computation')

time.sleep(11)
print('scaling to 4 workers')
client.cluster.scale(4)

time.sleep(5)
print('scaling to 20 workers')
client.cluster.scale(20)

_ = future.result()

Anything else we need to know?:

In a real cluster with my real load (not in the simulation above), I also will see the scheduler CPU pegged near 100% (possibly due to #3898), even when all workers are working on the long tasks. This seems odd, since nothing is being actively changed in the scheduling.

Environment:

  • Dask (and distributed) version: 2021.01.1
  • Python version: 3.8.5
  • Operating System: Ubuntu 20.04
  • Install method (conda, pip, source): pip

Final task screenshot:
Screen Shot 2021-01-30 at 7 23 00 AM

Movie of workload (at 10x speed):
Kapture 2021-01-30 at 07 28 34

@chrisroat
Copy link
Contributor Author

@fjetter I'm starting to dig through the scheduler code. I can mitigate this specific issue by assigning tasks to workers without the smarts of decide_worker. If I just remove the first branch of this if-else and only use the else branch, the work gets scheduled very efficiently.

if ts._dependencies or valid_workers is not None:
ws = decide_worker(
ts,
self._workers_dv.values(),
valid_workers,
partial(self.worker_objective, ts),
)
else:
worker_pool = self._idle or self._workers
worker_pool_dv = cast(dict, worker_pool)
n_workers: Py_ssize_t = len(worker_pool_dv)
if n_workers < 20: # smart but linear in small case
ws = min(worker_pool.values(), key=operator.attrgetter("occupancy"))
else: # dumb but fast in large case
ws = worker_pool.values()[self._n_tasks % n_workers]

Screen Shot 2021-03-30 at 9 28 20 PM

@gforsyth
Copy link
Contributor

Looking at the "after" picture (following your scheduler modifications) I see a lot of red. I think this makes sense since part of that first conditional branch is putting tasks on the same workers as their dependencies. but I suspect that the long-running task part of this is playing a role, too -- the (lack of) task occupancy on the unused workers should be encouraging the scheduler to send the tasks there. There may be an update from the workers missing here (or just out of place)

@chrisroat
Copy link
Contributor Author

Yeah, I noticed that, too. I do see some red in the "before" plot, which is sparser. I couldn't actually tell if things were worse -- but I suspect as you do, that they should be.

If I use map_blocks instead of map_overlap (removing the func2 stage to be more fair, as it gets fused in the map_blocks case), the graph looks very efficient in the "before" case. Could that argue that updates are being correctly done?

Could it be that the dask.order ordering that is created with a map_overlaps graph is not well-suited for the work-stealing/update algorithm (in this use case of cluster size changing)?

@gforsyth
Copy link
Contributor

gforsyth commented Apr 1, 2021

map_blocks creates many fewer dependency chains as compared to map_overlap (by definition), so it should get distributed round-robin (ish) and the decide_worker call will look only at worker idleness when distributing the tasks.

And that does point to something in the stealing heuristics that doesn't perform ideally when there are a bunch of linked + long-running tasks.

edit: I said stealing but I meant scheduling, but stealing might also play a part here

@chrisroat
Copy link
Contributor Author

I tried following what the scheduler is doing (with stealing turned off), but with map_overlap creating a complex graph and with this issue not manifest with small workloads, it's hard for me to make headway.

But I have found a workaround, via a tip from @shoyer. By using the graph manipulation API, I can coerce the gnarly overlap section of the graph to complete before doing any mapping. It means breaking up a map_overlap into overlap+wait_on+map_blocks+overlap, like the following:

data_overlap = da.overlap.overlap(data, depth=1, boundary=None)
data_overlap_wait = dask.graph_manipulation.wait_on(data_overlap)  # New step
result_overlap = data_overlap_wait.map_blocks(func1, dtype=data.dtype)
result = da.overlap.trim_overlap(result_overlap, depth=1, boundary=None)

This seems to distribute work better on some moderate test cases.

I only have limited time to tackle my dask-related issues this week. If I can fix a memory leak, maybe I can come back to take a deeper look at this.

@chrisroat
Copy link
Contributor Author

FWIW, when the solution with wait_on is deployed on a cluster with auto-scaling (at least in my hands), I do not see the same improvement. I'm not sure exactly what could be different. For now, I just manually provision/deprovision, and avoid autoscaling for these calculations.

@fjetter
Copy link
Member

fjetter commented Jun 15, 2021

I started peaking into this issue. First of all, thanks @chrisroat for the reproducer, this is incredibly helpful!

My naive expectation for this workload is:

  • cluster scales up, work stealing will balance tasks to maximise parallelism and throughput.
  • Once scaled up, decisions will be reasonably good

Neither is happening :'(

Task stealing for this workload is effectively disabled since all tasks are incredibly small and the workstealing engine doesn't consider it worthwhile to move anything. latencies would objectively make the computation of a single task much more costly than if it was computed on the worker it was initially assigned on. This is obviously a heuristic but usually works well. I think we're missing a measure for opportunity cost, i.e. "what is the cost for not moving the task" or put differently: "How much is the entire graph delayed if this task is not moved". I don't know, yet, how we could compute something like this efficiently but I'll think about it for a bit, I think that would be a valuable addition to work stealing in general if we could approximate this efficiently.

The fact that the decision after scale up is less than optimal is caused by us strictly preferring to assign a task to the workers which have their dependencies.
One example I looked into there was only one viable candidate for task assignment because the task had one dependency and that was on one specific worker, let's call it A, out of 20 total workers. Therefore our decision did pick this single worker A without even considering the others. If we were to allow all other workers, it turned out that A was only ranked on position 15 or something since when properly accounting for transfer cost, occupancy, etc. it was not attractive. That's a more costly (in terms of compute) decision than the "assign to worker with dependencies". We should be mindful about this when fixing this. This also does not take into account opportunity cost but as I said, that's a difficult problem in general

Both are examples where we make decisions which are arguably good micro optimizations but poor macro optimizations. That's a very interesting problem :)

@fjetter fjetter linked a pull request Jun 15, 2021 that will close this issue
3 tasks
@gjoseph92
Copy link
Collaborator

is caused by us strictly preferring to assign a task to the workers which have their dependencies.

It's interesting that I've also had to solve this problem in #4899. See this comment:

# Since this is a root-like task, we should ignore the placement of its dependencies while selecting workers.
# Every worker is going to end up running this type of task eventually, and any dependencies will have to be
# transferred to all workers, so there's no gain from only considering workers where the dependencies already live.
# Indeed, we _must_ consider all workers, otherwise we would keep picking the same "new" worker(s) every time,
# since there are only N workers to choose from that actually have the dependency (where N <= n_deps).
ignore_deps_while_picking = True

I wonder if there's a way in all cases that we could consider candidates beyond just workers that have the dependencies. (Or at least do this when the size of the dependencies is small, by some heuristic.) The objective function should still select workers that have the dependencies when it's reasonable to do so (since they are penalized less for data transfer), so I feel like it would strictly be an improvement in worker selection, though it might slow down decide_workers a little. If there was a way to coarsely—or even randomly—select a subset of valid_workers to mix into the candidates set, that could still improve worker selection while capping how much extra work we're adding to decide_worker.

@chrisroat
Copy link
Contributor Author

Are there any parameters for work stealing or graph hinting that could mitigate this issue (even if it were in a fork where I add my own env var or something like that)?

As an example, I have tried to hint at the long tasks, hoping that would outweigh the "keep near dependencies" effect):

dask.config.set({"distributed.scheduler.default-task-durations._trim": "10s"})
dask.config.set({"distributed.scheduler.default-task-durations.trim-concatenate": "10s"})

@mrocklin
Copy link
Member

OK, so there is a possible solution for this up in #6115 . Comments about what I did and why are in the opening comment / commit message. I don't know yet if this should go in (there was a reason why we turned off stealing for very small tasks) but there is some reason to do it.

To summarize what was happening, map-overlap computations introduce lots of tasks with very short durations. Dask's workstealing is greedy, and strictly avoids stealing any task that takes longer than 5ms. It also sharply penalizes stealing by adding a 100ms network latency into its heuristic. By changing these policies this computation starts to flow smoothly again.

It's not clear though if this resolves the broader problem. @chrisroat @bnaul if this is still an active topic for you then I encourage you to try out that branch.

@mrocklin
Copy link
Member

One thing to note is that as the inputs climb in size the argument to steal becomes weaker and weaker. There is still a greedy-vs-long-term-view issue that we're not resolving here. My change in that PR fixes the reproducer here, but I'd be curious to learn if it fixes other issues in practice.

@bnaul
Copy link
Contributor

bnaul commented Apr 13, 2022

Very interesting...will give it a shot, a couple of interesting quirks of our usage that might be also be of interest are:

  1. our inputs are all very very small here by design; everything communicated via the scheduler is just a GCS URI, all of the actual data transfer is happening by way of cloud storage
  2. because of this "cache everything in GCS" behavior, the same task will sometimes return instantly ("the file already exists") and sometimes run for an hour. could be a somewhat pathological case for something like a work stealing heuristic

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
adaptive All things relating to adaptive scaling stealing
Projects
None yet
6 participants