Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shuffle-based groupby aggregation for high-cardinality groups #9302

Merged
merged 14 commits into from Aug 17, 2022

Conversation

rjzamora
Copy link
Member

@rjzamora rjzamora commented Jul 25, 2022

Note: Related to performance challenges described in #9292

Dask uses apply-concat-apply (ACA) for most DataFrame reductions and groupby aggregations. This algorithm is typically effective when the final output fits comfortably within a single DataFrame/Series partition. However, performance often degrades quickly as the size of the output grows (and split_out is set to something greater than ~4-8). We can refer to the case where split_out must be >1 as a "high-cardinality" groupby aggregation.

The high-cardinality case is currently handled by splitting the intial blockwise groupby aggregation by hash. A distinct ACA tree reduction is then performed for each hash-based "split". For example, the groupby-aggregation graph for a 6-partition DataFrame with split_out=2 will look something like this:

aca-split-out

This PR improves the performance of high-cardinality groupby aggregations by leveraging the fact that a shuffle operation will typically require fewer overal tasks than a duplicated tree reduction once split_out becomes large enough. That is, for cases where a rough task-count estimate begins to favor a shuffle, the task graph will now look something like this:

shuffle-aca

import time
import dask.dataframe as dd
from dask.datasets import timeseries

from dask.distributed import LocalCluster, Client, wait

cluster = LocalCluster(local_directory="/raid/my-dask-space/")
client = Client(cluster)

ddf = timeseries(
    id_lam=int(1e18),
    start='2000-01-01',
    end='2017-01-01',
    freq='1s',
    seed=15,
)
agg = ddf.groupby(
    "id",
    dropna=False,
).agg(
    {"x": "sum", "y": "mean"},
    split_every=16,  # Note that the default split_every is a bit small when split_out>1
    split_out=8, 
)
result = agg.persist()

t0 = time.time()
wait(result)
total_time = time.time() - t0
print(total_time, len(result))

This PR: 42.99, 53101633 (24354 tasks)
Main: 71.16, 53101633 (53016 tasks)

(Note that this algorithm makes it possible to sort the global order of groups, while the current approach does not. However, sorting isn't targetted in this PR.)

Another Alternative

Although it is clear that the shuffle-based groupby is superior to "split-then-tree reduce" for large values of split_out, it should require even fewer tasks to perform a single-split tree reduction down to ~split_out partitions, and then perform the shuffle:

aca-then-split

I have been exploring this algorithm in a seperate branch, but have not had any luck outperforming the shuflle-based groupby yet.

@rjzamora
Copy link
Member Author

Another fun note: The shuffle-based groupby algorithm can use the "p2p" option. For example:

ddf.groupby(
    "id", dropna=False
).agg(
    {"x": "sum", "y": "mean"},  split_every=16, split_out=8,  shuffle="p2p"
)

@gjoseph92
Copy link
Collaborator

This is awesome @rjzamora, great thinking!

This seems vaguely related to #8361.

It also makes me wonder if we could simplify/do away with the split_out (and maybe split_every?) parameters with this approach, since it seems like shuffling is almost always going to be better when split_out > 1, so perhaps we don't even need to support multi-output tree-reduction cases anymore. Maybe we could just have a flag like high_cardinality: bool = False or single_partition_output: bool = True, to switch between shuffle-based and tree-agg based. That would be so much easier to document and explain to users.

@rjzamora
Copy link
Member Author

This seems vaguely related to #8361.

Good call - Thanks for linking that!

It also makes me wonder if we could simplify/do away with the split_out (and maybe split_every?) parameters with this approach, since it seems like shuffling is almost always going to be better when split_out > 1, so perhaps we don't even need to support multi-output tree-reduction cases anymore. Maybe we could just have a flag like high_cardinality: bool = False or single_partition_output: bool = True, to switch between shuffle-based and tree-agg based. That would be so much easier to document and explain to users.

Right - I was thinking that something like an estimated (or optionally inferred from the first partition) ‘cardinality_ratio’ could be used to set reasonable defaults for everything. A ratio of unique records to all records gives you a rough idea of how much your data should “compress” after the global aggregation.

@mrocklin
Copy link
Member

Cool. Do you have a sense for how this performs in a few relevant scenarios? I'd be curious about high cardinality / low cardinality and larger than memory / not larger than memory.

@rjzamora
Copy link
Member Author

Cool. Do you have a sense for how this performs in a few relevant scenarios? I'd be curious about high cardinality / low cardinality and larger than memory / not larger than memory.

Good question. This is not a silver bullet by any means, and is primarily a performance booster for the “high-cardinality & smaller than memory” case. It may also be effective for “high-cardinality & larger than memory” in some cases, but I would expect heavy spilling to be an issue for task-based shuffling. I suppose the “p2p” option could make things “work” in this case. However, I was hoping that something like the “ACA-then-shuffle” alternative mentioned above would be a better option for both smaller- and larger-than memory.

For any case where cardinality is low enough that you don’t need to use split_out>1, the existing ACA algorithm is often better (even if you do use something like split_out=2-4).

Overall, it seems that the shuffle-based groupby probably has a place, but this PR will need to remain WIP until we can figure out exactly what that place is.

@mrocklin
Copy link
Member

mrocklin commented Jul 26, 2022 via email

@rjzamora
Copy link
Member Author

This doesn't have to be perfect to be useful. Please don't take my
comments as negative.

No worries at all - I'm pretty sure we are on the same page here :)

@ian-r-rose ian-r-rose self-requested a review August 1, 2022 20:50
@mrocklin
Copy link
Member

mrocklin commented Aug 9, 2022

This maybe stalled? Anything we can do to unstall it?

@jrbourbeau
Copy link
Member

@ian-r-rose's mentioned he's going to take a detailed look at this PR (also just FYI @rjzamora is OOO this week)

@rjzamora
Copy link
Member Author

This maybe stalled? Anything we can do to unstall it?

Apologies for not leaving a note in this PR - I was distracted by other groupby improvements, and then took some time off.

I do think it is worthwhile (and easy) to add the shuffle approach to groupby aggregations. However, I would like to roll back any changes in this PR that use the shuffle algorithm automatically. My reasoning is that the shuffle algorithm is typically slower for larger-than-memory data, and so the user should probably need to opt in explicitly for now.

Side Note: I intend to submit something separate for larger-than-memory groupby aggregations that essentially closes the gap between the algorithms used by dask-dataframe and dask-cudf. It seems that the dask-cudf algorithm is typically more efficient for large-scale/large-memory data (but slower in most other cases). I am not completely sure of the reason for this yet, but I do know that one "low-hanging fruit" is that Dask will currently use sort=True (the pandas default) during intermediate aggregations, even if the global setting is sort=False.

@ian-r-rose
Copy link
Collaborator

ian-r-rose commented Aug 16, 2022

I did some experiments on this PR yesterday to try to draw out a bit more when using a shuffle-based approach is a good idea (warning: highly-faceted charts incoming to capture multiple dimensions). I ran a number of groupby-aggs on a ~60GiB timeseries-based dataset where I dialed the number of unique groups up and down to investigate the effects of cardinality on the outputs.

Rough code, based on @rjzamora's above :

@pytest.mark.parametrize("shuffle", [False, "tasks", "p2p"])
@pytest.mark.parametrize("n_groups", [1000, 10_000, 100_000, 1_000_000, 10_000_000])
@pytest.mark.parametrize("split_out", [1, 4, 8])
def test_groupby(groupby_client, split_out, n_groups, shuffle):
    ddf = timeseries_of_size(
        cluster_memory(groupby_client) // 4,
        id_maximum=n_groups,
    )
    print(len(ddf))
    agg = ddf.groupby("id").agg(
        {"x": "sum", "y": "mean"},
        split_every=16,  # Note that the default split_every is a bit small when split_out>1
        split_out=split_out,
        shuffle=shuffle,
    )
    wait(agg, groupby_client, 180)

Constants:

  • Overall dataset size (~60 GiB, ~650 million rows)
  • Overall dataset structure
  • Aggregation (sum/mean on some non-grouped-by columns)

Variables:

  • Shuffle method (False, "tasks", "p2p")
  • split_out: (1, 4, 8)
  • Number of unique groups (1e3, 1e4, 1e5, 1e6, 1e7, 1e8)

I ran the above on a 15-worker, 60 worker-thread, 240 GiB cluster on AWS, and tracked duration, peak memory usage, and average memory usage.

Duration

visualization(22)

Peak memory

visualization(23)

Average memory

visualization(24)

So, some interpretation:

First of all, from a broad perspective, ACA-based groupbys/aggs (shuffle=False) are faster for low-cardinality (when groups << len(ddf)) cases with split_out=1. When split_out > 1, however, or when groups starts to approach the same order of magnitude as the dataframe length, then the story changes, and shuffle-based approaches start to win. The difference is not drastic for cases where they all fit nicely in memory. It's on the 20% level, which is nice!

However: ACA-based groupby-aggs start to fall over pretty dramatically for high-cardinality cases because the last few partitions are really large and can knock over workers. This worker-killing is not super visible in the average/peak memory charts, as those are cluster-wide, rather than an acute problem on one or a few workers. But this worker-killing predictable, and it's easy to see there is a problem in the duration charts for the 1e8 case. Normally, the advice here would be to tune split_out, but it's also easy to see in the duration and memory charts that higher split_out for ACA-based groupby/aggs run into their own problems with due to the much more complex graphs with more intermediate results in memory. So that's not a hugely satisfying approach for this case.

There is also some useful structure in the memory charts as well. As @rjzamora mentioned above: task-based shuffling generally has worse memory usage than ACA-based grouby/agg. But in the high-cardinality case, that advantage disappears. So at least in that case, I think we can call it an unambiguous win. Also, it's very nice to see that p2p shuffling does indeed have roughly constant memory usage, beating the other two hands-down in the high-cardinality cases.

TL;DR

This largely confirms @rjzamora's points above and in conversations I've had with him:

  1. ACA-based groupby/aggs do fine for low-cardinality cases (say, where the number of groups is less than a few percent of the dataframe size).
  2. Once the cardinality gets larger, it does indeed make sense to start using the shuffle-based algorithm. To me, the range where split_out > 1 and where it's still better to use ACA looks pretty darn narrow. A rule of thumb where shuffle is on-by-default when split_out>1 doesn't sound like the worst idea in the world to me.
  3. Memory-usage is indeed higher for task-based shuffling than ACA in the lower-cardinality cases. But that advantage is wiped out in higher-cardinality cases, so it actually doesn't worry me much. And task-based shuffling is less likely to knock over a worker in the last few aggs of the ACA-based approach.
  4. p2p shuffling has much better memory performance. I ran into a couple of problems when using it here that I'll open as separate issues, but the results are encouraging.

@rjzamora
Copy link
Member Author

Thank you for working through these benchmarks and writing this up @ian-r-rose !

It seems safe to conclude that we should add a shuffle= option to GroupBy.aggregate, and that we may be able to use the shuffle-based algorithm by default for larger values of split_out in the future. I would prefer if we kept shuffle=False as the default for this PR, but I am also open to the default behavior changing in follow-on work.

@rjzamora
Copy link
Member Author

Note that I added support for sort=True when split_out>1 (and shuffle is not False). I mostly did this here to check that the current design doesn't prevent global sorting in any way. Therefore, I can roll back sort-related changes if it makes this PR easier to review.

@mrocklin
Copy link
Member

Cool results. I encourage us to move forward with this quickly.

@mrocklin
Copy link
Member

Also, major kudos to @rjzamora for the work and to @ian-r-rose for the additional perspective

@rjzamora rjzamora marked this pull request as ready for review August 16, 2022 21:01
@rjzamora rjzamora changed the title [WIP] Shuffle-based groupby aggregation for high-cardinality groups Shuffle-based groupby aggregation for high-cardinality groups Aug 16, 2022
Copy link
Collaborator

@ian-r-rose ian-r-rose left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rjzamora, this is looking great! I have some minor comments, but I think this looks mostly good-to-go.

dask/dataframe/groupby.py Show resolved Hide resolved
dask/dataframe/groupby.py Outdated Show resolved Hide resolved
dask/dataframe/groupby.py Outdated Show resolved Hide resolved
dask/dataframe/tests/test_groupby.py Outdated Show resolved Hide resolved
@jrbourbeau jrbourbeau mentioned this pull request Aug 16, 2022
4 tasks
Copy link
Collaborator

@ian-r-rose ian-r-rose left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all your work on this @rjzamora!

@ian-r-rose
Copy link
Collaborator

Everything looks green except for gpuCI, which is unrelated: #9391

@ian-r-rose ian-r-rose merged commit 097decd into dask:main Aug 17, 2022
@rjzamora rjzamora deleted the shuffle-groupby branch August 17, 2022 20:29
@jrbourbeau
Copy link
Member

Woo -- great work @rjzamora @ian-r-rose

rapids-bot bot pushed a commit to rapidsai/cudf that referenced this pull request Aug 19, 2022
Dask-cudf groupby tests *should* be failing as a result of dask/dask#9302 (see [failures](https://gpuci.gpuopenanalytics.com/job/rapidsai/job/gpuci/job/cudf/job/prb/job/cudf-gpu-test/CUDA=11.5,GPU_LABEL=driver-495,LINUX_VER=ubuntu20.04,PYTHON=3.9/9946/) in #11565 is merged - where dask/main is being installed correctly).  This PR updates the dask_cudf groupby code to fix these failures.

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - GALI PREM SAGAR (https://github.com/galipremsagar)

URL: #11561
rapids-bot bot pushed a commit to rapidsai/cudf that referenced this pull request Sep 28, 2022
This PR corresponds to the `dask_cudf` version of dask/dask#9302 (adding a shuffle-based algorithm for high-cardinality groupby aggregations). The benefits of this algorithm are most significant for cases where `split_out>1` is necessary:

```python
agg = ddf.groupby("id").agg({"x": "mean", "y": "max"}, split_out=4, shuffle=True)
```
**NOTES**:

- ~`shuffle="explicit-comms"` is also supported (when `dask_cuda` is installed)~
- It should be possible to refactor remove some of this code in the future. However, due to some subtle differences between the groupby code in `dask.dataframe` and `dask_cudf`, the specialized `_shuffle_aggregate` is currently necessary.

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - Benjamin Zaitlen (https://github.com/quasiben)
  - Lawrence Mitchell (https://github.com/wence-)

URL: #11800
@ian-r-rose ian-r-rose mentioned this pull request Oct 11, 2022
3 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants