Skip to content

Commit

Permalink
Fix bug in new shuffle-based groupby implementation (#11836)
Browse files Browse the repository at this point in the history
## Description
This PR fixes a subtle bug introduced in #11800.  While working on the corresponding dask-cuda benchmark for that PR rapidsai/dask-cuda#979, we discovered that non-deterministic column ordering in `_groupby_partition_agg` and `_tree_node_agg` can trigger metadata-enforcement errors in follow-up operations.  This PR simply sorts the output column ordering in those functions (so that the column ordering is always deterministic).

Note that this bug is difficult to reproduce in a pytest, because it rarely occurs with a smaller number of devices (I need to use a full dgx machine to consistently trigger the error).

## Checklist
- [ ] I am familiar with the [Contributing Guidelines](https://github.com/rapidsai/cudf/blob/HEAD/CONTRIBUTING.md).
- [ ] New or existing tests cover these changes.
- [ ] The documentation is up to date with these changes.

Authors:
   - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
   - GALI PREM SAGAR (https://github.com/galipremsagar)
   - Ashwin Srinath (https://github.com/shwina)
  • Loading branch information
rjzamora committed Sep 30, 2022
1 parent 920b58f commit 3f9b3fe
Showing 1 changed file with 8 additions and 7 deletions.
15 changes: 8 additions & 7 deletions python/dask_cudf/dask_cudf/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,6 @@ def _shuffle_aggregate(
chunked = ddf.map_partitions(
chunk,
meta=chunk(ddf._meta, **chunk_kwargs),
enforce_metadata=False,
token=chunk_name,
**chunk_kwargs,
)
Expand All @@ -514,7 +513,6 @@ def _shuffle_aggregate(
.map_partitions(
aggregate,
meta=aggregate(chunked._meta, **aggregate_kwargs),
enforce_metadata=False,
**aggregate_kwargs,
)
)
Expand All @@ -528,7 +526,6 @@ def _shuffle_aggregate(
).map_partitions(
aggregate,
meta=aggregate(chunked._meta, **aggregate_kwargs),
enforce_metadata=False,
**aggregate_kwargs,
)

Expand Down Expand Up @@ -809,8 +806,10 @@ def _groupby_partition_agg(df, gb_cols, aggs, columns, dropna, sort, sep):
gb = df.groupby(gb_cols, dropna=dropna, as_index=False, sort=sort).agg(
_agg_dict
)
gb.columns = [_make_name(name, sep=sep) for name in gb.columns]
return gb
output_columns = [_make_name(name, sep=sep) for name in gb.columns]
gb.columns = output_columns
# Return with deterministic column ordering
return gb[sorted(output_columns)]


@_dask_cudf_nvtx_annotate
Expand Down Expand Up @@ -841,11 +840,13 @@ def _tree_node_agg(df, gb_cols, dropna, sort, sep):
)

# Don't include the last aggregation in the column names
gb.columns = [
output_columns = [
_make_name(name[:-1] if isinstance(name, tuple) else name, sep=sep)
for name in gb.columns
]
return gb
gb.columns = output_columns
# Return with deterministic column ordering
return gb[sorted(output_columns)]


@_dask_cudf_nvtx_annotate
Expand Down

0 comments on commit 3f9b3fe

Please sign in to comment.