Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider candidates that don't hold any dependencies in decide_worker #4925

Open
wants to merge 25 commits into
base: main
Choose a base branch
from

Conversation

gjoseph92
Copy link
Collaborator

@gjoseph92 gjoseph92 commented Jun 17, 2021

When picking which worker to run a task on, don't only consider the workers that hold that task's dependencies—also mix in up to 10 randomly-selected workers.

This helps with the (rather common) case where a task's dependencies are so small that it would be faster to transfer all of them to a new, less-occupied worker than to queue the task on a worker that holds some dependencies, but is already busy.

This should be especially helpful in cases like #4471, where new workers are added to the cluster midway through computation. In general, it should improve load balancing for most workloads.

Note that this can make errors in measuring the sizeof an object more impactful. If we think an object is cheap to move, but it's actually expensive, we'll be more likely to move it now than before.

distributed/scheduler.py Outdated Show resolved Hide resolved
distributed/scheduler.py Outdated Show resolved Hide resolved
await wait(tasks)

assert all(root.key in worker.data for worker in [a, b, c])
assert len(a.data) == len(b.data) == len(c.data) == 3
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, now that I look at this again, maybe we do need the sleep.

If we're scheduling incs then it's not unreasonable to schedule six of these things on one worker. That's where the data is. This computation is very cheap. But in contrast, the dependency is very small. We may need the scheduler to learn that slowinc is slow, and so heavy relative to the int dependency.

Maybe

await client.submit(slowinc, 10, delay=0.1)  # learn that slowinc is slow
root = await client.scatter(1)

futures = client.submit(slowinc, [root] * 6, delay=0.1, pure=False)
await wait(futures)

This way we're confident that the computational cost of the future will far outweigh the communciation cost, but things are still fast-ish.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's why I originally went with sleep. It also feels more like the case we're trying to test for. Though the test still passes with 6d91816 (and note that that test fails on main), so I'm not sure what we're thinking about wrong?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently 28 bytes / 100MB/s is just still smaller than however fast inc ran in (a few microseconds probably). It would be safer to bump this up to milliseconds though.

gen_cluster tests can run in less than a second, but more than 100ms. So sleeps of around that 100ms time are more welcome than 1s sleeps.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test is passing as-is; do you still think I should change it back to sleep?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would become a problem if the inc ran in 28 bytes / 100MB/s = 0.2us or less. Inc does actually run this fast.

In [1]: 28 / 100e6
Out[1]: 2.8e-07

In [2]: def inc(x):
   ...:     return x + 1
   ...: 

In [3]: %timeit inc(1)
64.2 ns ± 0.914 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)

But presumably the infrastructure around the inc is taking longer. If our timing infrsatructure got much faster then this might result in intermittent errors. It probably doesn't matter at this scale, but it's probably a good idea if it takes only a few minutes.

We might also introduce a penalty of something like 1ms for any communication (I'm surprised that we don't do this today actually) which might tip the scales in the future.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll switch back to a short sleep for now.

We might also introduce a penalty of something like 1ms for any communication

That seems very worth doing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We maybe used to do this? It might be worth checking logs. The worker_objective function is pretty clean if you wanted to add this. We should probably wait until we find a case where it comes up though in an impactful way just to keep from cluttering things.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think it should be a separate PR for cleanliness

@gjoseph92
Copy link
Collaborator Author

This is causing test_task_groups to fail:

    @gen_cluster(client=True, Worker=NoSchedulerDelayWorker)
    async def test_task_groups(c, s, a, b):
        start = time()
        da = pytest.importorskip("dask.array")
        x = da.arange(100, chunks=(20,))
        y = (x + 1).persist(optimize_graph=False)
        y = await y
        stop = time()
    
        tg = s.task_groups[x.name]
        tp = s.task_prefixes["arange"]
        repr(tg)
        repr(tp)
        assert tg.states["memory"] == 0
        assert tg.states["released"] == 5
        assert tp.states["memory"] == 0
        assert tp.states["released"] == 5
        assert tp.groups == [tg]
        assert tg.prefix is tp
        # these must be true since in this simple case there is a 1to1 mapping
        # between prefix and group
        assert tg.duration == tp.duration
        assert tg.nbytes_in_memory == tp.nbytes_in_memory
        assert tg.nbytes_total == tp.nbytes_total
        # It should map down to individual tasks
        assert tg.nbytes_total == sum(
            [ts.get_nbytes() for ts in s.tasks.values() if ts.group is tg]
        )
        in_memory_ts = sum(
            [
                ts.get_nbytes()
                for ts in s.tasks.values()
                if ts.group is tg and ts.state == "memory"
            ]
        )
>       assert tg.nbytes_in_memory == in_memory_ts
E       assert -480 == 0
E         +-480
E         -0

I'm wondering if this is exposing an existing bug where we aren't counting nbytes_in_memory correctly when a task is replicated on multiple workers. Here, the value is decremented for all workers when the key is released, but it feels like it's not being correspondingly incremented when the key gets duplicated. Stepping through the test, I haven't seen the diff in set_nbytes be 0, so maybe set_nbytes just isn't getting called when a dependency is transferred from one worker to another?

@gjoseph92
Copy link
Collaborator Author

Good news though—this helps a lot with #4471.

  1. It runs in 2:20 vs 3:46 on my machine
  2. Kinda hard to tell in these videos (60x sped up), but you can see the tasks per worker graph stays unbalanced on main, but evens out on this branch. Also (when watching this at normal speed) I could tell that the sleep(10) tasks clearly were more evenly distributed across workers.

main:

main

This branch:

4925

cc @chrisroat

Also just for my own sake, here's the incantation I used to make the GIFs from screen recordings:

ffmpeg -itsscale 0.01666 -i main.mov -filter_complex "[0:v] palettegen" palette.png
ffmpeg -itsscale 0.01666 -i main.mov -i palette.png -filter_complex "[0:v] fps=24,scale=600:-1 [i]; [i][1:v] paletteuse" -r 24 main.gif

@mrocklin
Copy link
Member

cc'ing @fjetter who I think will find this interesting

@gjoseph92 gjoseph92 force-pushed the decide_worker/add-random-candidates branch from 646b12b to 346ab17 Compare June 18, 2021 17:40
distributed/scheduler.py Outdated Show resolved Hide resolved
distributed/scheduler.py Outdated Show resolved Hide resolved
distributed/tests/test_scheduler.py Outdated Show resolved Hide resolved
distributed/scheduler.py Outdated Show resolved Hide resolved
distributed/scheduler.py Outdated Show resolved Hide resolved
Co-authored-by: Matthew Rocklin <mrocklin@gmail.com>
else idle_workers or all_workers
)
candidates.update(random.sample(sample_from, min(10, len(sample_from))))
# ^ NOTE: `min` because `random.sample` errors if `len(sample) < k`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import random
import sortedcontainers

d = sortedcontainers.SortedDict()
for i in range(100):
    d[str(i)] = i

candidates = set()

%timeit candidates.update(random.sample(d.values(), min(10, len(d))))
12.4 µs ± 652 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

You're not wrong to be concerned. 12us is a decent chunk of time.

It looks like random.sample isn't cheap

%timeit random.sample(d.values(), min(10, len(d)))
# 11.7 µs ± 61.8 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

%timeit d.values()[:10]
# 2.09 µs ± 27.6 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might help with efficiency while also up with simplicity?

        candidates = {wws for dts in deps for wws in dts._who_has}

        extra = (
            list(valid_workers)
            if valid_workers is not None
            else (idle_workers or all_workers)
        )
        if len(extra) > 10:
            extra = random.sample(extra, 10)
        
        candidates.update(extra)

Copy link
Member

@jakirkham jakirkham Jun 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some time is spent just working with sortedcontainers.

import random

d = dict()
for i in range(100):
    d[str(i)] = i

candidates = set()

%timeit candidates.update(random.sample(list(d.values()), min(10, len(d))))
7.21 µs ± 31.3 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

As a side note it seems random.sample won't take a dict or its views directly. So this includes coercing to a list

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

l = list(d.values())

%timeit candidates.update(random.sample(l, min(10, len(l))))
# 5.09 µs ± 179 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

%timeit l = list(d.values()[:10])
1.98 µs ± 34.9 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

%timeit l[:10]
# 79.3 ns ± 2.2 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)

SortedValuesView does say lookups are O(log(n)), but it seems like it's more than that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It takes any indexable, which sortedcontainers.SortedDict.values provides. Indexability is the reason why we use sortedcontainers.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For comparison:

from distributed.scheduler import TaskState, WorkerState

N = 5
extra = 2
workers = [WorkerState(str(i)) for i in range(N)]
deps = [TaskState(str(i), None) for i in range(N)]
for dts, ws in zip(deps, workers):
    dts.who_has.add(ws)

for i in range(extra):
    deps[i].who_has.add(workers[-i])
deps = set(deps)

%timeit candidates = {wws for dts in deps for wws in dts._who_has}
971 ns ± 24 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
N = 50
extra = 50
12 µs ± 202 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

N = 100
extra = 10
17.2 µs ± 214 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

So just to be clear that the random sample is the thing to try to avoid, not the who_has selection. In the uncommon case of many many dependencies duplicated over many workers, that selection is still on par with the random sample.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So sampling the keys and then using them to look up values is 2x faster with SortedDict, and the same as what we'd get with a plain dict:

%timeit candidates.update(d[k] for k in random.sample(list(d), 10))
6.95 µs ± 227 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

pd = dict(d)
%timeit candidates.update(pd[k] for k in random.sample(list(pd), 10))
6.36 µs ± 95.5 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

Also random.choices is 2x faster than random.sample (makes sense since it doesn't have to hold onto any state):

%timeit candidates.update(d[k] for k in random.choices(list(d), k=10))
3.65 µs ± 11.9 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

%timeit candidates.update(pd[k] for k in random.choices(list(pd), k=10))
3.17 µs ± 36.1 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

I'd feel okay about random.choices; we'd get slightly fewer extra workers in the pool, but it's probably worth the time savings.

Copy link
Collaborator Author

@gjoseph92 gjoseph92 Jun 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're okay with duplicates then we can go much faster.

Iterating values of a SortedDict is slow:

%timeit list(d.values())
8.43 µs ± 85.2 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

But it's fast on a plain dict:

pd = dict(d)
%timeit list(pd.values())
691 ns ± 5.07 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

So we just have to make sure we're passing in self._workers_dv.values(), not self._workers.values().

This is decent:

%timeit candidates.update(random.choices(list(pd.values()), k=10))
2.45 µs ± 15.4 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

But we can go much faster and avoid a copy with an iterator:

import itertools
import random

def random_choices_iter(xs: "Iterator", k: int) -> "Iterator":
    return itertools.islice(itertools.takewhile(lambda _: random.random() < 0.5, xs), k)

%timeit candidates.update(random_choices_iter(pd.values(), k=10))
643 ns ± 12.5 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

EDIT: The iterator method could return less than k items though. So particularly if there's one idle worker, that's not ideal. I feel like random.choices(list(pd.values()), k=10) is okay for now. If future profiling shows it's too slow, then we could do

worker_pool = valid_workers if valid_workers is not None else all_workers
if len(candidates) < len(worker_pool):
    sample_from = idle_workers or worker_pool
    candidates.update(
        random_choices_iter(idle_workers or worker_pool, 10)
        if len(sample_from) > 10
        else sample_from
    )

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks random.sample will take d.keys() from playing around with it locally. So we can probably skip the coercion to a list in that case. Though maybe we've already moved well beyond this at this point

I'd probably shy away from getting to clever around random item selection. It's probably more likely to have surprising behavior that we only discover later. Just my 2 cents 🙂

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're beyond that, since we're only taking dict_values objects in this function now. Making it able to support both dicts and sets was a bit of a mess and not worth it IMO.

I didn't go with the too-clever itertools option because it did have surprising behavior. I did have to add another conditional though because choices can repeat items: fcb165e. So perhaps even that's too clever?

This reverts commit 761546db775b772a6dbbfc39218cc9e871fc3c98.
tradeoff between speed and randomness
@gjoseph92
Copy link
Collaborator Author

@gforsyth @fjetter I happened to notice that this PR makes Worker.waiting_for_data_count go negative in some tests. I'd guess the logic for that count doesn't account for replicated keys, and it's getting double-decremented (much like #4927). Like in #4927, waiting_for_data_count doesn't seem to be used for any decisions, only for the worker dashboard. Should we remove it, or is there a simple way to keep this count accurate?

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

client = <Client: No scheduler connected>, s = <Scheduler: "tcp://127.0.0.1:52603" workers: 0 cores: 0, tasks: 0>
a = <Worker: 'tcp://127.0.0.1:52604', 0, Status.closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker: 'tcp://127.0.0.1:52606', 1, Status.closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: -1>
c = <Worker: 'tcp://127.0.0.1:52608', 2, Status.closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: -2>

    @gen_cluster(
        client=True,
        nthreads=[("127.0.0.1", 1)] * 3,
        config={"distributed.scheduler.work-stealing": False},
    )
    async def test_decide_worker_select_candidate_holding_no_deps(client, s, a, b, c):
        await client.submit(slowinc, 10, delay=0.1)  # learn that slowinc is slow
        root = await client.scatter(1)
        assert sum(root.key in worker.data for worker in [a, b, c]) == 1
    
        start = time()
        tasks = client.map(slowinc, [root] * 6, delay=0.1, pure=False)
        await wait(tasks)
        elapsed = time() - start
    
        assert elapsed <= 4
        assert all(root.key in worker.data for worker in [a, b, c]), [
            list(worker.data.keys()) for worker in [a, b, c]
        ]
>       assert (
            a.waiting_for_data_count
            == b.waiting_for_data_count
            == c.waiting_for_data_count
            == 0
        )
E       assert 0 == -1
E         +0
E         --1

distributed/tests/test_scheduler.py:150: AssertionError

distributed/scheduler.py Outdated Show resolved Hide resolved
I'm a little concerned why the tasks never ran on worker `a` only on Windows though?
@gjoseph92 gjoseph92 marked this pull request as ready for review June 23, 2021 03:59
@gjoseph92
Copy link
Collaborator Author

I think this is ready for final review. Note that it does need #4937 to be merged first though.

The tests failing right now are a flaky one (#4957) and test_access_semaphore_by_name, which I've never seen fail before on this PR, so I think it's flaky too.

# Add up to 10 random workers into `candidates`, preferring idle ones.
worker_pool = valid_workers if valid_workers is not None else all_workers
if len(candidates) < len(worker_pool):
sample_from = idle_workers or worker_pool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if valid_workers and idle_workers?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like maybe we want to use idle_workers or all_workers above in line 7501

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if valid_workers and idle_workers?

We'd then intersect candidates with valid_workers, so at least we wouldn't ever pick an invalid worker:

else:
candidates &= valid_workers
if not candidates:
candidates = valid_workers
if not candidates:
if ts._loose_restrictions:
ws = decide_worker(ts, all_workers, idle_workers, None, objective)
return ws

But it's true that if idle_workers and valid_workers are disjoint, then we haven't gained anything here.

How about (valid_workers.intersection(idle_workers) or valid_workers) if valid_workers is not None else (idle_workers or valid_workers)?

Or for simplicity we could ignore idle_workers when there are valid_workers given, and just use valid_workers.

random.choices(list(sample_from), k=min(10, len(sample_from)))
if len(sample_from) > 10
else sample_from
)
if valid_workers is None:
if not candidates:
candidates = set(all_workers)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we have idle_workers in this function should this be idle_workers or all_workers?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting; yes it probably should be.

if len(candidates) < len(worker_pool):
sample_from = idle_workers or worker_pool
candidates.update(
random.choices(list(sample_from), k=min(10, len(sample_from)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on reducing 10 to 2 or 5 or something lower? This might be premature, but I'm curious how much adding more workers helps here in aggregate. Part of me thinks that adding just one random worker into the mix probably does 80% of the good in aggregate.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd kind of like to do that later once we've gotten more of a sense of the impact of this, and hopefully have some more examples to measure that change against. When a cluster is mostly quiet, picking 2 random workers has a high chance of finding a good one, but in a large cluster with mostly busy workers, it's a big haystack to find the few that are underutilized (but not idle).

@gjoseph92
Copy link
Collaborator Author

I'm leaning away from merging this currently. As it stands, this makes all the tests for #4967 fail. I think this is an indicator of what it would do in real-life scenarios too, though I haven't tested it.

We've just done this work to ensure that future tasks run on workers that minimize data transfer; this PR now breaks our assumptions by randomizing the scheduling of the future tasks.

Basically, this would work if our worker_objective function took a more holistic view of scheduling. But it's too short-sighted currently. I'd want to at least:

  1. Consider how the placement of this task would affect the placement of its dependents, and how much data would have to move in the future for the dependents to also run on the given worker. (This can be expensive to calculate.)
  2. Consider the cost of data transfer on the sender. Serialization is not free. If the worker holding the dependencies is busy, it may take a longer time to transfer them to a new idle worker.

There might be some clever, cheap heuristic for whether a future task will be a shoe-in to schedule on one particular worker, or whether it'll be fair game to run anywhere, but I haven't been able to come up with one.


Since #4471 was originally about new workers joining the cluster, we could also consider restricting the random workers just to idle ones? Note though that with that change, both the tests for #4967 and this PR fail, so it may not be a very useful change.

@mrocklin
Copy link
Member

There might be some clever, cheap heuristic for whether a future task will be a shoe-in to schedule on one particular worker, or whether it'll be fair game to run anywhere, but I haven't been able to come up with one

So, let's think about this for a moment.

My read of this issue is that it seems like we like the idea in principle, but that in cases where we know there is some better-than-greedy placement of tasks (such as in rootish tasks) that this greedy approach is likely to make things worse. Is that a correct read?

If so, it seems like maybe we should go ahead with this, but block doing this on any rootish tasks. Today rootish tasks are a (very useful) anomaly in our scheduling. We should be ok working around them in anomalous ways I think.

So, I'd be inclined to add a flag to TaskGroups about being rootish, and check that flag when making this decision. Thoughts?

@gjoseph92
Copy link
Collaborator Author

The problem wasn't about the root-ish tasks. It was about the dependents of root-ish tasks. We might place the root tasks perfectly, but if the downstream tasks are then free to select from any random worker (instead of just workers that hold their input data), it's quite likely they'll disregard our plans and just run on whichever worker is the least busy, even if that means transferring all the data.

So really, the problem is that our worker_objective turns out to not be a very good objective, especially when the set of candidates is unrestricted. It's heavily biased towards low occupancy, and has low penalties for data transfer.

@gjoseph92
Copy link
Collaborator Author

I'll add that though I also find this problem very interesting, and I'd love to work on it, I don't think it's what we should focus on right now. Getting lower-level stability under control is more important.

Between this, #5325, and #5326, we both spent a bit of time thinking about these problems last summer. We found that they were more challenging and subtle than we expected.

I don't think we'll find a quick tweak to solve this in a couple days that doesn't introduce unintended consequences. We should probably save these delicate scheduling changes for when we can dedicate more time to focusing carefully on them.

@mrocklin
Copy link
Member

I don't think we'll find a quick tweak to solve this in a couple days

I remain, as always, more optimistic than you here :)

I'll add that though I also find this problem very interesting, and I'd love to work on it, I don't think it's what we should focus on right now. Getting lower-level stability under control is more important.

Fair point. Shifting my focus over to benchmarking.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Poor work scheduling when cluster adapts size
3 participants