From 298d2c2e9b6bc5435511435ccaa0111c627862aa Mon Sep 17 00:00:00 2001 From: Chris Jarrett Date: Mon, 10 Oct 2022 13:09:19 -0700 Subject: [PATCH 1/5] Enable named aggregation syntax --- dask/dataframe/groupby.py | 54 +++++++++++++++++++++------- dask/dataframe/tests/test_groupby.py | 33 +++++++++++++++++ 2 files changed, 75 insertions(+), 12 deletions(-) diff --git a/dask/dataframe/groupby.py b/dask/dataframe/groupby.py index e2db913e786..bb76699909e 100644 --- a/dask/dataframe/groupby.py +++ b/dask/dataframe/groupby.py @@ -7,6 +7,11 @@ import numpy as np import pandas as pd +from pandas.core.apply import ( + is_multi_agg_with_relabel, + normalize_keyword_aggregation, + validate_func_kwargs +) from dask import config from dask.base import tokenize @@ -1698,7 +1703,7 @@ def get_group(self, key): ) @_aggregate_docstring() - def aggregate(self, arg, split_every=None, split_out=1, shuffle=None): + def aggregate(self, arg=None, split_every=None, split_out=1, shuffle=None, **kwargs): if split_out is None: warnings.warn( "split_out=None is deprecated, please use a positive integer, " @@ -1713,6 +1718,20 @@ def aggregate(self, arg, split_every=None, split_out=1, shuffle=None): shuffle = False column_projection = None + relabling = None + columns = None + order = None + + if isinstance(self, DataFrameGroupBy): + relabeling = arg is None and is_multi_agg_with_relabel(**kwargs) + if relabeling: + arg, columns, order = normalize_keyword_aggregation(kwargs) + + elif isinstance(self, SeriesGroupBy): + relabeling = arg is None + if relabeling: + columns, arg = validate_func_kwargs(kwargs) + if isinstance(self.obj, DataFrame): if isinstance(self.by, tuple) or np.isscalar(self.by): group_columns = {self.by} @@ -1832,7 +1851,7 @@ def aggregate(self, arg, split_every=None, split_out=1, shuffle=None): " Otherwise, try using split_out=1, or grouping with sort=False." ) - return aca( + result = aca( chunk_args, chunk=_groupby_apply_funcs, chunk_kwargs=dict( @@ -1864,6 +1883,14 @@ def aggregate(self, arg, split_every=None, split_out=1, shuffle=None): sort=self.sort, ) + if relabeling and result is not None: + if order is not None: + result = result.iloc[:, order] + result.columns = columns + + return result + + @insert_meta_param_description(pad=12) def apply(self, func, *args, **kwargs): """Parallel version of pandas GroupBy.apply @@ -2286,18 +2313,18 @@ def __getattr__(self, key): raise AttributeError(e) from e @_aggregate_docstring(based_on="pd.core.groupby.DataFrameGroupBy.aggregate") - def aggregate(self, arg, split_every=None, split_out=1, shuffle=None): + def aggregate(self, arg=None, split_every=None, split_out=1, shuffle=None, **kwargs): if arg == "size": return self.size() return super().aggregate( - arg, split_every=split_every, split_out=split_out, shuffle=shuffle + arg=arg, split_every=split_every, split_out=split_out, shuffle=shuffle, **kwargs ) @_aggregate_docstring(based_on="pd.core.groupby.DataFrameGroupBy.agg") - def agg(self, arg, split_every=None, split_out=1, shuffle=None): + def agg(self, arg=None, split_every=None, split_out=1, shuffle=None, **kwargs): return self.aggregate( - arg, split_every=split_every, split_out=split_out, shuffle=shuffle + arg=arg, split_every=split_every, split_out=split_out, shuffle=shuffle, **kwargs ) @@ -2368,22 +2395,25 @@ def nunique(self, split_every=None, split_out=1): ) @_aggregate_docstring(based_on="pd.core.groupby.SeriesGroupBy.aggregate") - def aggregate(self, arg, split_every=None, split_out=1, shuffle=None): + def aggregate(self, arg=None, split_every=None, split_out=1, shuffle=None, **kwargs): result = super().aggregate( - arg, split_every=split_every, split_out=split_out, shuffle=shuffle + arg=arg, split_every=split_every, split_out=split_out, shuffle=shuffle, **kwargs ) if self._slice: - result = result[self._slice] + try: + result = result[self._slice] + except KeyError: + pass - if not isinstance(arg, (list, dict)) and isinstance(result, DataFrame): + if arg is not None and not isinstance(arg, (list, dict)) and isinstance(result, DataFrame): result = result[result.columns[0]] return result @_aggregate_docstring(based_on="pd.core.groupby.SeriesGroupBy.agg") - def agg(self, arg, split_every=None, split_out=1, shuffle=None): + def agg(self, arg=None, split_every=None, split_out=1, shuffle=None, **kwargs): return self.aggregate( - arg, split_every=split_every, split_out=split_out, shuffle=shuffle + arg=arg, split_every=split_every, split_out=split_out, shuffle=shuffle, **kwargs ) @derived_from(pd.core.groupby.SeriesGroupBy) diff --git a/dask/dataframe/tests/test_groupby.py b/dask/dataframe/tests/test_groupby.py index f5f9d17f153..d323797d5a3 100644 --- a/dask/dataframe/tests/test_groupby.py +++ b/dask/dataframe/tests/test_groupby.py @@ -3,6 +3,7 @@ import operator import pickle import warnings +from functools import partial import numpy as np import pandas as pd @@ -2839,6 +2840,38 @@ def agg(grp, **kwargs): ) +def test_dataframe_named_agg(): + df = pd.DataFrame({ + "a":[1,1,2,2], + "b":[1,2,5,6], + "c":[6,3,6,7], + }) + ddf = dd.from_pandas(df, npartitions=2) + + expected = df.groupby("a").agg( + x=pd.NamedAgg("b", aggfunc="sum"), + y=pd.NamedAgg("c", aggfunc=partial(np.std, ddof=1)), + ) + actual = ddf.groupby("a").agg( + x=pd.NamedAgg("b", aggfunc="sum"), + y=pd.NamedAgg("c", aggfunc=partial(np.std, ddof=1)), + ) + assert_eq(expected, actual) + + +@pytest.mark.parametrize("agg", ["count", np.mean, partial(np.var, ddof=1)]) +def test_series_named_agg(agg): + df = pd.DataFrame({ + "a":[5,4,3,5,4,2,3,2], + "b":[1,2,5,6,9,2,6,8], + }) + ddf = dd.from_pandas(df, npartitions=2) + + expected = df.groupby("a").b.agg(c=agg, d="sum") + actual = ddf.groupby("a").b.agg(c=agg, d="sum") + assert_eq(expected, actual) + + def test_empty_partitions_with_value_counts(): # https://github.com/dask/dask/issues/7065 df = pd.DataFrame( From 5bc8a3912c17c286d3e3813b0f4bb14a1150ae1f Mon Sep 17 00:00:00 2001 From: Chris Jarrett Date: Mon, 10 Oct 2022 13:13:12 -0700 Subject: [PATCH 2/5] Fix style --- dask/dataframe/groupby.py | 48 +++++++++++++++++++++------- dask/dataframe/tests/test_groupby.py | 22 +++++++------ 2 files changed, 49 insertions(+), 21 deletions(-) diff --git a/dask/dataframe/groupby.py b/dask/dataframe/groupby.py index bb76699909e..345676d3ba8 100644 --- a/dask/dataframe/groupby.py +++ b/dask/dataframe/groupby.py @@ -10,7 +10,7 @@ from pandas.core.apply import ( is_multi_agg_with_relabel, normalize_keyword_aggregation, - validate_func_kwargs + validate_func_kwargs, ) from dask import config @@ -1703,7 +1703,9 @@ def get_group(self, key): ) @_aggregate_docstring() - def aggregate(self, arg=None, split_every=None, split_out=1, shuffle=None, **kwargs): + def aggregate( + self, arg=None, split_every=None, split_out=1, shuffle=None, **kwargs + ): if split_out is None: warnings.warn( "split_out=None is deprecated, please use a positive integer, " @@ -1718,7 +1720,6 @@ def aggregate(self, arg=None, split_every=None, split_out=1, shuffle=None, **kwa shuffle = False column_projection = None - relabling = None columns = None order = None @@ -1887,9 +1888,8 @@ def aggregate(self, arg=None, split_every=None, split_out=1, shuffle=None, **kwa if order is not None: result = result.iloc[:, order] result.columns = columns - - return result + return result @insert_meta_param_description(pad=12) def apply(self, func, *args, **kwargs): @@ -2313,18 +2313,28 @@ def __getattr__(self, key): raise AttributeError(e) from e @_aggregate_docstring(based_on="pd.core.groupby.DataFrameGroupBy.aggregate") - def aggregate(self, arg=None, split_every=None, split_out=1, shuffle=None, **kwargs): + def aggregate( + self, arg=None, split_every=None, split_out=1, shuffle=None, **kwargs + ): if arg == "size": return self.size() return super().aggregate( - arg=arg, split_every=split_every, split_out=split_out, shuffle=shuffle, **kwargs + arg=arg, + split_every=split_every, + split_out=split_out, + shuffle=shuffle, + **kwargs, ) @_aggregate_docstring(based_on="pd.core.groupby.DataFrameGroupBy.agg") def agg(self, arg=None, split_every=None, split_out=1, shuffle=None, **kwargs): return self.aggregate( - arg=arg, split_every=split_every, split_out=split_out, shuffle=shuffle, **kwargs + arg=arg, + split_every=split_every, + split_out=split_out, + shuffle=shuffle, + **kwargs, ) @@ -2395,9 +2405,15 @@ def nunique(self, split_every=None, split_out=1): ) @_aggregate_docstring(based_on="pd.core.groupby.SeriesGroupBy.aggregate") - def aggregate(self, arg=None, split_every=None, split_out=1, shuffle=None, **kwargs): + def aggregate( + self, arg=None, split_every=None, split_out=1, shuffle=None, **kwargs + ): result = super().aggregate( - arg=arg, split_every=split_every, split_out=split_out, shuffle=shuffle, **kwargs + arg=arg, + split_every=split_every, + split_out=split_out, + shuffle=shuffle, + **kwargs, ) if self._slice: try: @@ -2405,7 +2421,11 @@ def aggregate(self, arg=None, split_every=None, split_out=1, shuffle=None, **kwa except KeyError: pass - if arg is not None and not isinstance(arg, (list, dict)) and isinstance(result, DataFrame): + if ( + arg is not None + and not isinstance(arg, (list, dict)) + and isinstance(result, DataFrame) + ): result = result[result.columns[0]] return result @@ -2413,7 +2433,11 @@ def aggregate(self, arg=None, split_every=None, split_out=1, shuffle=None, **kwa @_aggregate_docstring(based_on="pd.core.groupby.SeriesGroupBy.agg") def agg(self, arg=None, split_every=None, split_out=1, shuffle=None, **kwargs): return self.aggregate( - arg=arg, split_every=split_every, split_out=split_out, shuffle=shuffle, **kwargs + arg=arg, + split_every=split_every, + split_out=split_out, + shuffle=shuffle, + **kwargs, ) @derived_from(pd.core.groupby.SeriesGroupBy) diff --git a/dask/dataframe/tests/test_groupby.py b/dask/dataframe/tests/test_groupby.py index d323797d5a3..aa1d6830c6e 100644 --- a/dask/dataframe/tests/test_groupby.py +++ b/dask/dataframe/tests/test_groupby.py @@ -2841,11 +2841,13 @@ def agg(grp, **kwargs): def test_dataframe_named_agg(): - df = pd.DataFrame({ - "a":[1,1,2,2], - "b":[1,2,5,6], - "c":[6,3,6,7], - }) + df = pd.DataFrame( + { + "a": [1, 1, 2, 2], + "b": [1, 2, 5, 6], + "c": [6, 3, 6, 7], + } + ) ddf = dd.from_pandas(df, npartitions=2) expected = df.groupby("a").agg( @@ -2861,10 +2863,12 @@ def test_dataframe_named_agg(): @pytest.mark.parametrize("agg", ["count", np.mean, partial(np.var, ddof=1)]) def test_series_named_agg(agg): - df = pd.DataFrame({ - "a":[5,4,3,5,4,2,3,2], - "b":[1,2,5,6,9,2,6,8], - }) + df = pd.DataFrame( + { + "a": [5, 4, 3, 5, 4, 2, 3, 2], + "b": [1, 2, 5, 6, 9, 2, 6, 8], + } + ) ddf = dd.from_pandas(df, npartitions=2) expected = df.groupby("a").b.agg(c=agg, d="sum") From 711df356b571384d1a93d4ef206264579729de7a Mon Sep 17 00:00:00 2001 From: Chris Jarrett Date: Tue, 11 Oct 2022 09:24:57 -0700 Subject: [PATCH 3/5] Use codepath only when Pandas >= 1.4 --- dask/dataframe/groupby.py | 36 +++++++++++++++++++----------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/dask/dataframe/groupby.py b/dask/dataframe/groupby.py index 345676d3ba8..75bd31cfeeb 100644 --- a/dask/dataframe/groupby.py +++ b/dask/dataframe/groupby.py @@ -7,15 +7,14 @@ import numpy as np import pandas as pd -from pandas.core.apply import ( - is_multi_agg_with_relabel, - normalize_keyword_aggregation, - validate_func_kwargs, -) from dask import config from dask.base import tokenize -from dask.dataframe._compat import PANDAS_GT_150, check_numeric_only_deprecation +from dask.dataframe._compat import ( + PANDAS_GT_140, + PANDAS_GT_150, + check_numeric_only_deprecation, +) from dask.dataframe.core import ( GROUP_KEYS_DEFAULT, DataFrame, @@ -42,6 +41,9 @@ from dask.highlevelgraph import HighLevelGraph from dask.utils import M, _deprecated, derived_from, funcname, itemgetter +if PANDAS_GT_140: + from pandas.core.apply import reconstruct_func, validate_func_kwargs + # ############################################# # # GroupBy implementation notes @@ -1719,19 +1721,19 @@ def aggregate( else: shuffle = False - column_projection = None + relabeling = None columns = None order = None - - if isinstance(self, DataFrameGroupBy): - relabeling = arg is None and is_multi_agg_with_relabel(**kwargs) - if relabeling: - arg, columns, order = normalize_keyword_aggregation(kwargs) - - elif isinstance(self, SeriesGroupBy): - relabeling = arg is None - if relabeling: - columns, arg = validate_func_kwargs(kwargs) + column_projection = None + if PANDAS_GT_140: + if isinstance(self, DataFrameGroupBy): + if arg is None: + relabeling, arg, columns, order = reconstruct_func(arg, **kwargs) + + elif isinstance(self, SeriesGroupBy): + relabeling = arg is None + if relabeling: + columns, arg = validate_func_kwargs(kwargs) if isinstance(self.obj, DataFrame): if isinstance(self.by, tuple) or np.isscalar(self.by): From 316a00b35eb47089bf8c969d4599e8673c134d1d Mon Sep 17 00:00:00 2001 From: Chris Jarrett Date: Thu, 13 Oct 2022 12:59:53 -0700 Subject: [PATCH 4/5] Enable shuffle support --- dask/dataframe/groupby.py | 84 ++++++++++++++-------------- dask/dataframe/tests/test_groupby.py | 12 +++- 2 files changed, 51 insertions(+), 45 deletions(-) diff --git a/dask/dataframe/groupby.py b/dask/dataframe/groupby.py index 75bd31cfeeb..905e97ad61b 100644 --- a/dask/dataframe/groupby.py +++ b/dask/dataframe/groupby.py @@ -1819,7 +1819,7 @@ def aggregate( # for larger values of split_out. However, the shuffle # step requires that the result of `chunk` produces a # proper DataFrame type - return _shuffle_aggregate( + result = _shuffle_aggregate( chunk_args, chunk=_groupby_apply_funcs, chunk_kwargs=dict( @@ -1842,50 +1842,50 @@ def aggregate( shuffle=shuffle if isinstance(shuffle, str) else "tasks", sort=self.sort, ) + else: + if self.sort is None and split_out > 1: + warnings.warn(SORT_SPLIT_OUT_WARNING, FutureWarning) + + # Check sort behavior + if self.sort and split_out > 1: + raise NotImplementedError( + "Cannot guarantee sorted keys for `split_out>1` and `shuffle=False`" + " Try using `shuffle=True` if you are grouping on a single column." + " Otherwise, try using split_out=1, or grouping with sort=False." + ) - if self.sort is None and split_out > 1: - warnings.warn(SORT_SPLIT_OUT_WARNING, FutureWarning) - - # Check sort behavior - if self.sort and split_out > 1: - raise NotImplementedError( - "Cannot guarantee sorted keys for `split_out>1` and `shuffle=False`" - " Try using `shuffle=True` if you are grouping on a single column." - " Otherwise, try using split_out=1, or grouping with sort=False." + result = aca( + chunk_args, + chunk=_groupby_apply_funcs, + chunk_kwargs=dict( + funcs=chunk_funcs, + sort=False, + **self.observed, + **self.dropna, + ), + combine=_groupby_apply_funcs, + combine_kwargs=dict( + funcs=aggregate_funcs, + level=levels, + sort=False, + **self.observed, + **self.dropna, + ), + aggregate=_agg_finalize, + aggregate_kwargs=dict( + aggregate_funcs=aggregate_funcs, + finalize_funcs=finalizers, + level=levels, + **self.observed, + **self.dropna, + ), + token="aggregate", + split_every=split_every, + split_out=split_out, + split_out_setup=split_out_on_index, + sort=self.sort, ) - result = aca( - chunk_args, - chunk=_groupby_apply_funcs, - chunk_kwargs=dict( - funcs=chunk_funcs, - sort=False, - **self.observed, - **self.dropna, - ), - combine=_groupby_apply_funcs, - combine_kwargs=dict( - funcs=aggregate_funcs, - level=levels, - sort=False, - **self.observed, - **self.dropna, - ), - aggregate=_agg_finalize, - aggregate_kwargs=dict( - aggregate_funcs=aggregate_funcs, - finalize_funcs=finalizers, - level=levels, - **self.observed, - **self.dropna, - ), - token="aggregate", - split_every=split_every, - split_out=split_out, - split_out_setup=split_out_on_index, - sort=self.sort, - ) - if relabeling and result is not None: if order is not None: result = result.iloc[:, order] diff --git a/dask/dataframe/tests/test_groupby.py b/dask/dataframe/tests/test_groupby.py index aa1d6830c6e..12184cca31c 100644 --- a/dask/dataframe/tests/test_groupby.py +++ b/dask/dataframe/tests/test_groupby.py @@ -15,6 +15,7 @@ from dask.dataframe._compat import ( PANDAS_GT_110, PANDAS_GT_130, + PANDAS_GT_140, PANDAS_GT_150, check_numeric_only_deprecation, tm, @@ -2840,7 +2841,9 @@ def agg(grp, **kwargs): ) -def test_dataframe_named_agg(): +@pytest.mark.skipif(not PANDAS_GT_140, reason="requires pandas >= 1.4.0") +@pytest.mark.parametrize("shuffle", [True, False]) +def test_dataframe_named_agg(shuffle): df = pd.DataFrame( { "a": [1, 1, 2, 2], @@ -2855,14 +2858,17 @@ def test_dataframe_named_agg(): y=pd.NamedAgg("c", aggfunc=partial(np.std, ddof=1)), ) actual = ddf.groupby("a").agg( + shuffle=shuffle, x=pd.NamedAgg("b", aggfunc="sum"), y=pd.NamedAgg("c", aggfunc=partial(np.std, ddof=1)), ) assert_eq(expected, actual) +@pytest.mark.skipif(not PANDAS_GT_140, reason="requires pandas >= 1.4.0") +@pytest.mark.parametrize("shuffle", [True, False]) @pytest.mark.parametrize("agg", ["count", np.mean, partial(np.var, ddof=1)]) -def test_series_named_agg(agg): +def test_series_named_agg(shuffle, agg): df = pd.DataFrame( { "a": [5, 4, 3, 5, 4, 2, 3, 2], @@ -2872,7 +2878,7 @@ def test_series_named_agg(agg): ddf = dd.from_pandas(df, npartitions=2) expected = df.groupby("a").b.agg(c=agg, d="sum") - actual = ddf.groupby("a").b.agg(c=agg, d="sum") + actual = ddf.groupby("a").b.agg(shuffle=shuffle, c=agg, d="sum") assert_eq(expected, actual) From caab43441157532f01f58f7646d2b9460765cb17 Mon Sep 17 00:00:00 2001 From: Chris Jarrett Date: Wed, 19 Oct 2022 10:11:39 -0700 Subject: [PATCH 5/5] Update docstring --- dask/dataframe/groupby.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/dask/dataframe/groupby.py b/dask/dataframe/groupby.py index b8e3ecff341..e5e601ed914 100644 --- a/dask/dataframe/groupby.py +++ b/dask/dataframe/groupby.py @@ -1169,13 +1169,14 @@ def wrapper(func): {based_on_str} Parameters ---------- - arg : callable, str, list or dict + arg : callable, str, list or dict, optional Aggregation spec. Accepted combinations are: - callable function - string function name - list of functions and/or function names, e.g. ``[np.sum, 'mean']`` - dict of column names -> function, function name or list of such. + - None only if named aggregation syntax is used split_every : int, optional Number of intermediate partitions that may be aggregated at once. This defaults to 8. If your intermediate partitions are likely to @@ -1192,6 +1193,12 @@ def wrapper(func): ``split_out = 1``. When ``split_out > 1``, it chooses the algorithm set by the ``shuffle`` option in the dask config system, or ``"tasks"`` if nothing is set. + kwargs: tuple or pd.NamedAgg, optional + Used for named aggregations where the keywords are the output column + names and the values are tuples where the first element is the input + column name and the second element is the aggregation function. + ``pandas.NamedAgg`` can also be used as the value. To use the named + aggregation syntax, arg must be set to None. """ return func