Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Huge increase in average memory in test_spilling[release] #964

Open
github-actions bot opened this issue Aug 26, 2023 · 6 comments
Open

Huge increase in average memory in test_spilling[release] #964

github-actions bot opened this issue Aug 26, 2023 · 6 comments
Assignees

Comments

@github-actions
Copy link

Workflow Run URL

@phofl
Copy link
Contributor

phofl commented Aug 28, 2023

Will run an AB test to figure out whether or not the regression is real

related: #966

@hendrikmakait
Copy link
Member

A mild (10-20%) performance regression in spilling is expected as a result of merging dask/distributed#8083 (see dask/distributed#8083 (comment)).

@crusaderky
Copy link
Contributor

This was expected:
image

This was absolutely not:
image

@crusaderky crusaderky changed the title ⚠️ CI failed ⚠️ Huge increase in average memory in test_spilling[release] Sep 5, 2023
@crusaderky crusaderky self-assigned this Sep 5, 2023
@crusaderky
Copy link
Contributor

This looks potentially pretty bad. It seems like recursive reductions are now releasing memory later than they used to do.
It's only visible in test_spilling because in all other cases the reductions in our benchmarks are too fast to observe.

Next actions:

  • figure out if it's reproducible with a custom (slow) reduction that doesn't imply disk I/O or significant amounts of CPU usage
  • figure out what changed

The code

(runs on 5x m6i.large)

a = da.random.random((94000, 94000))
a = a.persist()
distributed.wait(a)
# This is the midpoint of the plots below
b = a.sum().persist()
del a
b.compute()

2023.8.1

image
image

2023.9.0

image
image

@crusaderky
Copy link
Contributor

Status update: some progress, but ultimately inconclusive.

Lead 1: priority

dask/distributed#8083 changed the priority whenever you have a collection of embarassingly parallel tasks with no dependencies (e.g. da.random.random(...).persist()).

Before the PR (priority, task name)

(0, 1, 0) ('random_sample-ca66719549ab009f26bf65492a119ef1', 0, 0)
(0, 1, 1) ('random_sample-ca66719549ab009f26bf65492a119ef1', 0, 1)
(0, 1, 2) ('random_sample-ca66719549ab009f26bf65492a119ef1', 0, 10)
(0, 1, 3) ('random_sample-ca66719549ab009f26bf65492a119ef1', 0, 11)
(0, 1, 4) ('random_sample-ca66719549ab009f26bf65492a119ef1', 0, 12)
(0, 1, 5) ('random_sample-ca66719549ab009f26bf65492a119ef1', 0, 13)
(0, 1, 6) ('random_sample-ca66719549ab009f26bf65492a119ef1', 0, 14)
(0, 1, 7) ('random_sample-ca66719549ab009f26bf65492a119ef1', 0, 15)
(0, 1, 8) ('random_sample-ca66719549ab009f26bf65492a119ef1', 0, 16)
(0, 1, 9) ('random_sample-ca66719549ab009f26bf65492a119ef1', 0, 17)
(0, 1, 10) ('random_sample-ca66719549ab009f26bf65492a119ef1', 0, 18)
(0, 1, 11) ('random_sample-ca66719549ab009f26bf65492a119ef1', 0, 19)
(0, 1, 12) ('random_sample-ca66719549ab009f26bf65492a119ef1', 0, 2)
(0, 1, 13) ('random_sample-ca66719549ab009f26bf65492a119ef1', 0, 20)
(0, 1, 14) ('random_sample-ca66719549ab009f26bf65492a119ef1', 0, 21)
(0, 1, 15) ('random_sample-ca66719549ab009f26bf65492a119ef1', 0, 22)
(0, 1, 16) ('random_sample-ca66719549ab009f26bf65492a119ef1', 0, 3)
(0, 1, 17) ('random_sample-ca66719549ab009f26bf65492a119ef1', 0, 4)
(0, 1, 18) ('random_sample-ca66719549ab009f26bf65492a119ef1', 0, 5)
(0, 1, 19) ('random_sample-ca66719549ab009f26bf65492a119ef1', 0, 6)
(0, 1, 20) ('random_sample-ca66719549ab009f26bf65492a119ef1', 0, 7)

After the PR

(0, 1, 0) ('random_sample-c661ad08ddeea013afa94c8c30e0e158', 0, 0)
(0, 1, 1) ('random_sample-c661ad08ddeea013afa94c8c30e0e158', 0, 1)
(0, 1, 2) ('random_sample-c661ad08ddeea013afa94c8c30e0e158', 0, 2)
(0, 1, 3) ('random_sample-c661ad08ddeea013afa94c8c30e0e158', 0, 3)
(0, 1, 4) ('random_sample-c661ad08ddeea013afa94c8c30e0e158', 0, 4)
(0, 1, 5) ('random_sample-c661ad08ddeea013afa94c8c30e0e158', 0, 5)
(0, 1, 6) ('random_sample-c661ad08ddeea013afa94c8c30e0e158', 0, 6)
(0, 1, 7) ('random_sample-c661ad08ddeea013afa94c8c30e0e158', 0, 7)
(0, 1, 8) ('random_sample-c661ad08ddeea013afa94c8c30e0e158', 0, 8)
(0, 1, 9) ('random_sample-c661ad08ddeea013afa94c8c30e0e158', 0, 9)
(0, 1, 10) ('random_sample-c661ad08ddeea013afa94c8c30e0e158', 0, 10)
(0, 1, 11) ('random_sample-c661ad08ddeea013afa94c8c30e0e158', 0, 11)
(0, 1, 12) ('random_sample-c661ad08ddeea013afa94c8c30e0e158', 0, 12)
(0, 1, 13) ('random_sample-c661ad08ddeea013afa94c8c30e0e158', 0, 13)

Priority of the second stage, where all the tasks of the first stage (which are in memory) are recursively aggregated, does not change before and after the PR:

(0, 4, 1) ('sum-969240e99010a2b99790ef5725d4de1e', 0, 0)
(0, 4, 3) ('sum-969240e99010a2b99790ef5725d4de1e', 0, 1)
(0, 4, 5) ('sum-969240e99010a2b99790ef5725d4de1e', 1, 0)
(0, 4, 7) ('sum-969240e99010a2b99790ef5725d4de1e', 1, 1)
(0, 4, 8) ('sum-partial-5c44d85ac5dcb3295f65812c1b6ddc4f', 0, 0)
(0, 4, 10) ('sum-969240e99010a2b99790ef5725d4de1e', 0, 2)
(0, 4, 12) ('sum-969240e99010a2b99790ef5725d4de1e', 0, 3)
(0, 4, 14) ('sum-969240e99010a2b99790ef5725d4de1e', 1, 2)
(0, 4, 16) ('sum-969240e99010a2b99790ef5725d4de1e', 1, 3)
(0, 4, 17) ('sum-partial-5c44d85ac5dcb3295f65812c1b6ddc4f', 0, 1)

Every random-sample task has an initial map stage to a sum task with matching indices (1:1). The output of each sum task is a single numpy scalar, which gets recursively aggregated in a reduce stage by sum-partial, in blocks of 4 (split_every=4). Everything after sum is trivial performance-wise.

The order of execution of the random-sample task group determines which tasks are in memory and which are on disk when the sum task group starts - which could impact the performance of sum. So I changed the code to make sure that all random-sample data is on disk, thus making their execution order inconsequential:

import asyncio
from distributed import span

with span("A"):
    a = da.random.random((94000, 94000))
    a = a.persist()
    distributed.wait(a)

async def spill_everything(dask_worker):
    while dask_worker.data.fast:
        dask_worker.data.evict()
        await asyncio.sleep(0)

client.run(spill_everything)
input()  # Pause until user hits resume

with span("B"):
    b = a.sum().persist()
    del a
    b.compute()

The two task plots remain substantially different:

Before

image

After

image

It's important to observe how the workers can contain exactly 170 tasks in memory before they're forced to spill. This number is clearly seen at all points while computing random-sample (orange ascending area below). We never reach this threshold during the sum phase (orange descending area below); it stays around 80 and 120 tasks in memory.
image

There is a round-trip to the scheduler:

  1. a sum task starts
  2. the matching random-sample task is loaded from disk
  3. the `sum task executes and completes
  4. the worker sends a {op: task-finished, key: sum-...} message to the scheduler
  5. the scheduler responds with a {op: free-keys, keys: {random-sample-...}} for the task that was just consumed

As soon as 4 happens, the worker will start again with the next sum task. The memory readings, importantly, are telling us that the round-trip to the scheduler is fast enough that the worker is never forced to send back to disk a random-sample task that it just consumed.

This is not the case without the spill_everything in the middle of the algorithm - before the PR, the memory in the sum phase is not saturated; after the PR it is saturated.
In other words: before the PR, the round-trip to the scheduler to release the random-sample tasks was faster than the worker processing more sum tasks. After the PR, it was slower.

Lead 2: round-trip duration

I wanted to measure the round-trip described above. Here's the code that calculates the delta between the worker adding {op: task-finished, key: sum-...} to the batched comms to the scheduler and the worker receiving {op: free-keys, keys: {random-sample-...}} from the scheduler:

events = client.run(lambda dask_worker: list(dask_worker.state.stimulus_log))

from collections import defaultdict
from dask.utils import key_split
from distributed.worker_state_machine import ComputeTaskEvent, ExecuteSuccessEvent, FreeKeysEvent

deltas = []
for evs in events.values():
    success = {}

    for ev in evs:
        if isinstance(ev, ExecuteSuccessEvent) and key_split(ev.key) == "sum":
            key = ev.key if isinstance(ev.key, tuple) else eval(ev.key)
            success[key[1:]] = ev
        elif isinstance(ev, FreeKeysEvent):
            assert len(ev.keys) == 1
            key = next(iter(ev.keys))
            if not isinstance(key, tuple):
                key = eval(key)
            if key_split(key) == "random_sample":
                if key[1:] in success:
                    deltas.append(ev.handled - success[key[1:]].handled)

import pandas
pandas.Series(deltas).hist(bins=50)

Screenshot from 2023-09-15 16-09-14

This plot looks roughly the same before and after dask/distributed#8083.
It's surprising that the round-trip time is predominantly exact multiples of 1 second. My thought went to these lines of code:
https://github.com/dask/distributed/blob/b75f5da550509440222907c51b2233cb6e062437/distributed/worker_memory.py#L345-L352

However, disabling the memory monitor (dask.config.set({"distributed.worker.memory.spill": False}) had no effect whatsoever on any of the plots - in fact, we already saw above that there is nothing spilling out in the reduction phase of the workflow.

I still have no clue why the round-trip time looks so precisely centered around multiples of 1 second.

Lead 3: work stealing / bad scheduler decisions

99% of the work of a sum task is to load the matching random_sample task from disk. So if the scheduler were to decide to assign the sum task to a different worker for whatever reason, it would be a very poor decision as it would just increase the disk load on an already saturated worker.

However I've measured that the number of such cases is negligible both before and after.

Side by side

Here's how the second phase of the workload (starting from everything spilled out) looks like, side by side.

left: immediately before dask/distributed#8083
right: immediately afterwards

Peek.2023-09-15.18-06.mp4

As you can see, before the worker is just... faster. I still can't figure out why.

Finally, Fine Performance Metrics are inconclusive:

image

As expected, the vast majority of the time is between disk-read, executor, and other - the first one being time spent waiting to load data from disk, and the other two is time spent waiting for the event loop to unfreeze because it's currently busy with a disk-read of another task. All three measures are just slower after the PR. Can't understand why.

activity before after delta
disk-read 285s 308s +23s
blocked event loop (executor+other) 292s 320s +28s
idle (waiting for scheduler) 14s 48s +34s
everything else 23s 21s +2s
TOTAL 10m14s 11m37s +83s

Note how the total of time spent on a blocked event loop roughly matches the disk-read time. This is unsurprising, as the workers have 2 threads each - so while one is unspilling, the other is uncapable of operating. The time that can be spent while unspilling, the execution time, is too negligible in this case to matter.

@crusaderky
Copy link
Contributor

crusaderky commented Sep 18, 2023

TL;DR; version of the above mega-post

  1. the increase in average memory is caused by a race condition between (A) the worker receiving a free-keys message from the scheduler after it has finished consuming a task and (B) the worker unspilling the next task as soon as it finishes. This is a very fragile balance and easy to tip over, particularly due to the fact that the event loop apparently becomes responsive every one second or so as it is heavily busy with spilling.

Async spilling would drastically improve this, as it would drop the time during which a task remains in memory unnecessarily from 1~4 seconds to tens of milliseconds.

  1. In absence of a race condition, the whole workflow has become ~15% slower on the worker, and I can't understand the reason. Again, it's really hard to understand what's going on when most of the time is spent waiting for the event loop to become responsive again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants