Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shuffle-based groupby aggregation by default #9406

Closed
Tracked by #270
jrbourbeau opened this issue Aug 19, 2022 · 5 comments · Fixed by #9453
Closed
Tracked by #270

Shuffle-based groupby aggregation by default #9406

jrbourbeau opened this issue Aug 19, 2022 · 5 comments · Fixed by #9453
Assignees
Labels
dataframe enhancement Improve existing functionality or make things work better

Comments

@jrbourbeau
Copy link
Member

In #9302, @rjzamora added a new shuffle-based groupby algorithm which @ian-r-rose demonstrated has significant performance improvements in certain cases #9302 (comment). Today this new shuffle-based groupby is gated behind an optional shuffle= keyword argument (off by default). We should probably turn it on by default in cases where it makes sense.

Here's a brief summary of @ian-r-rose's findings (see #9302 (comment) for more details)

TL;DR

This largely confirms @rjzamora's points above and in conversations I've had with him:

ACA-based groupby/aggs do fine for low-cardinality cases (say, where the number of groups is less than a few percent of the dataframe size).
Once the cardinality gets larger, it does indeed make sense to start using the shuffle-based algorithm. To me, the range where split_out > 1 and where it's still better to use ACA looks pretty darn narrow. A rule of thumb where shuffle is on-by-default when split_out>1 doesn't sound like the worst idea in the world to me.
Memory-usage is indeed higher for task-based shuffling than ACA in the lower-cardinality cases. But that advantage is wiped out in higher-cardinality cases, so it actually doesn't worry me much. And task-based shuffling is less likely to knock over a worker in the last few aggs of the ACA-based approach.
p2p shuffling has much better memory performance. I ran into a couple of problems when using it here that I'll open as separate issues, but the results are encouraging.

Opening this issue so we don't loose track of this topic

@jrbourbeau jrbourbeau added dataframe enhancement Improve existing functionality or make things work better labels Aug 19, 2022
@jrbourbeau
Copy link
Member Author

Just checking in here. Based on the

Once the cardinality gets larger, it does indeed make sense to start using the shuffle-based algorithm. To me, the range where split_out > 1 and where it's still better to use ACA looks pretty darn narrow. A rule of thumb where shuffle is on-by-default when split_out>1 doesn't sound like the worst idea in the world to me.

comment from @ian-r-rose in #9302 (comment) it sounds like we should move to turn on shuffle= by default when split_out > 1. @ian-r-rose @rjzamora does that sound right to you? Are there other times when it makes sense to default to shuffling?

@ian-r-rose
Copy link
Collaborator

shuffle= by default when split_out > 1

I'm going to try to run some more targeted tests today and tomorrow, but broadly speaking, I think this is correct. It is likely not an improvement when split_out > 1 and we are low-cardinality, but in that case I also wouldn't recommend doing split_out > 1, so I'm not sure that is a big deal.

@rjzamora
Copy link
Member

does that sound right to you? Are there other times when it makes sense to default to shuffling?

As @ian-r-rose mentioned, there are split_out>1 cases where the cardinality is too low to benefit from the shuffle, but this simplification does seem reasonable to me.

Another alternative may be to introduce a more intuitive knob for users to turn (and to discourage the explicit use of split_out, split_every, and shuffle). For example, we could introduce a cardinality kwarg, where the user can (optionally) specify the estimated ratio of output records to input records (i.e. len(ddf.drop_duplicates()) / len(ddf)). This ratio argument could be used within dask set appropriate values for split_out, split_every, and shuffle. We could also implement something like cardinality="calculate" to let the user opt into an eager (but approximate) ratio calculation, and/or cardinality="infer" to estimate the ratio from the first non-empty partition.

@ian-r-rose
Copy link
Collaborator

I took a closer look at whether it ever makes sense to have shuffle-based groupby turned when split_out > 1. The TL;DR is, with an important caveat, no. We should always turn it on for split_out > 1.

I tested a high-cardinality groupby on a ~50 GB, 600MM row, 7000 partition timeseries dataset using a cluster with 15 AWS m5.xlarge instances. I varied the following parameters:

  1. split_out: 1, 2, 4, 8, 16
  2. shuffle: False, "tasks"
  3. n_groups: 50MM, 100MM, 200MM (roughly speaking, 10% to 1/3 of the length of the total dataset)
@pytest.mark.parametrize("shuffle", [False, "tasks"])
@pytest.mark.parametrize("n_groups", [50_000_0000, 100_000_000, 200_000_000])
@pytest.mark.parametrize("split_out", [2, 4, 8, 16])
@pytest.mark.parametrize("partition_freq", ["1d"])
def test_groupby(groupby_client, split_out, n_groups, shuffle, partition_freq):
    ddf = timeseries_of_size(
        cluster_memory(groupby_client) // 4,
        id_maximum=n_groups,
        partition_freq=partition_freq,
    )  # ~600MM rows
    print(ddf)
    agg = ddf.groupby("id").agg(
        {"x": "sum", "y": "mean"},
        split_out=split_out,
        split_every=2,
        shuffle=shuffle,
    )
    print(len(agg))

Here is a faceted chart showing the results:

wall clock
visualization(29)

average memory
visualization(30)

Note that the aca-based groupby/agg is missing for some of the lower values of split_out and higher cardinalities. That's because it failed for those cases.

The main thing to note is that the shuffle-based groupby wins in all cases. If I were to go to lower cardinalities, the aca-based groupby would start winning, but I stand by the initial assertion that shuffling makes sense basically whenever split_out > 1 makes sense.

Caveat

Okay, there is a wrinkle to the above story, and that has to do with split_every. In #9302 @rjzamora repurposed the split_every parameter to determine how many partitions to shuffle. This sort of rhymes with the meaning of split_every for the ACA groupby, in that we are doing an initial repartitioning (read: aggregation) before shuffling, but I think there is an important difference. If we are presuming that the shuffle-based approach is good for high-cardinality cases, then the initial blockwise groupby/agg probably doesn't reduce the size of the partitions that much.

Then, we come along and repartition according to shuffle_npartitions:

dask/dask/dataframe/groupby.py

Lines 2499 to 2502 in 19a5147

shuffle_npartitions = max(
chunked.npartitions // split_every,
split_out,
)

split_every defaults to eight. If we have a few large partitions per worker, this can get us into some trouble. Basically, we can easily wind up in a situation where we try to repartition into split_out before we ever shuffle. This is bad because we're not yet done aggregating, and if we have more than split_out workers we have extremely imbalanced data.

Even worse, I don't think the implementation as it stands today provides a way to keep the same number of partitions before shuffling. It always repartitions to a smaller number (split_every is >= 2, so the max number of pre-shuffle partitions is 1/2 the initial number of partitions).

I think we should allow for split_every to be one in this case (even though it is a bit confusing semantically). That would mean we don't repartition before shufflng, we just proceed directly to the shuffle. In fact, I might want that as the default, with split_every>1 being a performance tuning knob when there are too many initial partitions.

What do you think @rjzamora?

@ian-r-rose ian-r-rose mentioned this issue Sep 1, 2022
3 tasks
@rjzamora
Copy link
Member

rjzamora commented Sep 2, 2022

I’ll try to take a closer look at this soon, but my intention for shuffle_npartitions was to define the number of partitions for the output of the shuffle. I don’t think we are doing any repartitioning before the shuffle, but it’s possible I made an error somewhere. [EDIT: The shuffle function itself is where the repartitioning is performed before the data is shuffled]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
dataframe enhancement Improve existing functionality or make things work better
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants