From 58d7502fb3f3ae2d51e2b4c3ae5adf9272f3547b Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 12 Sep 2022 12:48:10 -0500 Subject: [PATCH 1/8] Add DataFrame.median method --- dask/dataframe/core.py | 17 ++++++++++++++ dask/dataframe/tests/test_dataframe.py | 31 ++++++++++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/dask/dataframe/core.py b/dask/dataframe/core.py index 184ef693802..ef8a88a7cb5 100644 --- a/dask/dataframe/core.py +++ b/dask/dataframe/core.py @@ -2160,6 +2160,23 @@ def mean( result.divisions = (self.columns.min(), self.columns.max()) return handle_out(out, result) + def median_approximate( + self, + axis=None, + method="default", + ): + return self.quantile(q=0.5, axis=axis, method=method) + + @derived_from(pd.DataFrame) + def median(self, axis=None, method="default"): + if axis == 1 or self.npartitions == 1: + # Can provide an exact median in these cases + return self.quantile(q=0.5, axis=axis, method=method) + raise NotImplementedError( + "Dask doesn't implement an exact median in all cases as this is hard to do in parallel. " + "See the `median_approximate` method instead, which uses an approximate algorithm." + ) + @_numeric_only @derived_from(pd.DataFrame) def var( diff --git a/dask/dataframe/tests/test_dataframe.py b/dask/dataframe/tests/test_dataframe.py index 932abfa0bf1..df533251d3c 100644 --- a/dask/dataframe/tests/test_dataframe.py +++ b/dask/dataframe/tests/test_dataframe.py @@ -4307,6 +4307,37 @@ def test_dataframe_mode(): assert_eq(ddf.mode(), df.mode(), check_index=False) +def test_median(): + df = pd.DataFrame({"x": [1, 2, 3, 4, 5], "y": [1.1, 2.2, 3.3, 4.4, 5.5]}) + ddf = dd.from_pandas(df, npartitions=3) + + # Exact medians work when `axis=1` or when DataFrames only have a single partition + # TODO: shouldn't need to specify `check_names=False` below, but names currently don't match + assert_eq(ddf.median(axis=1), df.median(axis=1), check_names=False) + ddf_single = dd.from_pandas(df, npartitions=1) + assert_eq(ddf_single.median(axis=1), df.median(axis=1), check_names=False) + + # `median` redirects to `median_approximate` if `axis != 1` + for axis in [None, 0]: + with pytest.raises( + NotImplementedError, match="See the `median_approximate` method instead" + ): + ddf.median(axis=axis) + + +@pytest.mark.parametrize("method", ["default", "tdigest"]) +def test_median_approximate(method): + df = pd.DataFrame({"x": range(100), "y": range(100, 200)}) + ddf = dd.from_pandas(df, npartitions=10) + # TODO: shouldn't need to specify `check_names=False` below, but names currently don't match + assert_eq( + ddf.median_approximate(method=method), + df.median(), + check_names=False, + rtol=1, + ) + + def test_datetime_loc_open_slicing(): dtRange = pd.date_range("01.01.2015", "05.05.2015") df = pd.DataFrame(np.random.random((len(dtRange), 2)), index=dtRange) From 3070a078e4d53918d2b40df2619cb0cd094f5406 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 12 Sep 2022 13:17:52 -0500 Subject: [PATCH 2/8] Add Series support --- dask/dataframe/core.py | 13 +++++++++++++ dask/dataframe/tests/test_dataframe.py | 10 ++++++++-- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/dask/dataframe/core.py b/dask/dataframe/core.py index ef8a88a7cb5..f804247a1c9 100644 --- a/dask/dataframe/core.py +++ b/dask/dataframe/core.py @@ -3630,6 +3630,19 @@ def quantile(self, q=0.5, method="default"): """ return quantile(self, q, method=method) + def median_approximate(self, method="default"): + return self.quantile(q=0.5, method=method) + + @derived_from(pd.Series) + def median(self, method="default"): + if self.npartitions == 1: + # Can provide an exact median in these cases + return self.quantile(q=0.5, method=method) + raise NotImplementedError( + "Dask doesn't implement an exact median in all cases as this is hard to do in parallel. " + "See the `median_approximate` method instead, which uses an approximate algorithm." + ) + def _repartition_quantiles(self, npartitions, upsample=1.0): """Approximate quantiles of Series used for repartitioning""" from dask.dataframe.partitionquantiles import partition_quantiles diff --git a/dask/dataframe/tests/test_dataframe.py b/dask/dataframe/tests/test_dataframe.py index df533251d3c..721c0ca5442 100644 --- a/dask/dataframe/tests/test_dataframe.py +++ b/dask/dataframe/tests/test_dataframe.py @@ -4311,19 +4311,25 @@ def test_median(): df = pd.DataFrame({"x": [1, 2, 3, 4, 5], "y": [1.1, 2.2, 3.3, 4.4, 5.5]}) ddf = dd.from_pandas(df, npartitions=3) - # Exact medians work when `axis=1` or when DataFrames only have a single partition + # Exact medians work when `axis=1` or when there's only have a single partition # TODO: shouldn't need to specify `check_names=False` below, but names currently don't match assert_eq(ddf.median(axis=1), df.median(axis=1), check_names=False) ddf_single = dd.from_pandas(df, npartitions=1) assert_eq(ddf_single.median(axis=1), df.median(axis=1), check_names=False) + assert_eq(ddf_single.x.median(), df.x.median(), check_names=False) - # `median` redirects to `median_approximate` if `axis != 1` + # Ensure `median` redirects to `median_approximate` appropriately for axis in [None, 0]: with pytest.raises( NotImplementedError, match="See the `median_approximate` method instead" ): ddf.median(axis=axis) + with pytest.raises( + NotImplementedError, match="See the `median_approximate` method instead" + ): + ddf.x.median() + @pytest.mark.parametrize("method", ["default", "tdigest"]) def test_median_approximate(method): From e33dada9eb8ffa44681afcf338ffc63341fb9bc5 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 12 Sep 2022 14:04:28 -0500 Subject: [PATCH 3/8] Fix mindeps build --- dask/dataframe/tests/test_dataframe.py | 70 ++++++++++++++++++++------ 1 file changed, 54 insertions(+), 16 deletions(-) diff --git a/dask/dataframe/tests/test_dataframe.py b/dask/dataframe/tests/test_dataframe.py index 721c0ca5442..5b8190917fb 100644 --- a/dask/dataframe/tests/test_dataframe.py +++ b/dask/dataframe/tests/test_dataframe.py @@ -40,6 +40,11 @@ from dask.utils import M, is_dataframe_like, is_series_like, put_lines from dask.utils_test import _check_warning, hlg_layer +try: + import crick +except ImportError: + crick = None + dsk = { ("x", 0): pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, index=[0, 1, 3]), ("x", 1): pd.DataFrame({"a": [4, 5, 6], "b": [3, 2, 1]}, index=[5, 6, 8]), @@ -391,11 +396,18 @@ def test_rename_series_method_2(): @pytest.mark.parametrize( - "method,test_values", [("tdigest", (6, 10)), ("dask", (4, 20))] + "method,test_values", + [ + ( + pytest.param( + "tdigest", marks=pytest.mark.skipif(not crick, reason="Requires crick") + ), + (6, 10), + ), + ("dask", (4, 20)), + ], ) def test_describe_numeric(method, test_values): - if method == "tdigest": - pytest.importorskip("crick") # prepare test case which approx quantiles will be the same as actuals s = pd.Series(list(range(test_values[1])) * test_values[0]) df = pd.DataFrame( @@ -1337,11 +1349,17 @@ def test_nbytes(): @pytest.mark.parametrize( "method,expected", - [("tdigest", (0.35, 3.80, 2.5, 6.5, 2.0)), ("dask", (0.0, 4.0, 1.2, 6.2, 2.0))], + [ + ( + pytest.param( + "tdigest", marks=pytest.mark.skipif(not crick, reason="Requires crick") + ), + (0.35, 3.80, 2.5, 6.5, 2.0), + ), + ("dask", (0.0, 4.0, 1.2, 6.2, 2.0)), + ], ) def test_quantile(method, expected): - if method == "tdigest": - pytest.importorskip("crick") # series / multiple result = d.b.quantile([0.3, 0.7], method=method) @@ -1380,10 +1398,16 @@ def test_quantile(method, expected): assert result == expected[4] -@pytest.mark.parametrize("method", ["tdigest", "dask"]) +@pytest.mark.parametrize( + "method", + [ + pytest.param( + "tdigest", marks=pytest.mark.skipif(not crick, reason="Requires crick") + ), + "dask", + ], +) def test_quantile_missing(method): - if method == "tdigest": - pytest.importorskip("crick") df = pd.DataFrame({"A": [0, np.nan, 2]}) # TODO: Test npartitions=2 # (see https://github.com/dask/dask/issues/9227) @@ -1397,10 +1421,16 @@ def test_quantile_missing(method): assert_eq(result, expected) -@pytest.mark.parametrize("method", ["tdigest", "dask"]) +@pytest.mark.parametrize( + "method", + [ + pytest.param( + "tdigest", marks=pytest.mark.skipif(not crick, reason="Requires crick") + ), + "dask", + ], +) def test_empty_quantile(method): - if method == "tdigest": - pytest.importorskip("crick") result = d.b.quantile([], method=method) exp = full.b.quantile([]) assert result.divisions == (None, None) @@ -1418,7 +1448,9 @@ def test_empty_quantile(method): "method,expected", [ ( - "tdigest", + pytest.param( + "tdigest", marks=pytest.mark.skipif(not crick, reason="Requires crick") + ), ( pd.Series([9.5, 29.5, 19.5], index=["A", "X", "B"]), pd.DataFrame( @@ -1442,8 +1474,6 @@ def test_empty_quantile(method): ], ) def test_dataframe_quantile(method, expected): - if method == "tdigest": - pytest.importorskip("crick") # column X is for test column order and result division df = pd.DataFrame( { @@ -4331,7 +4361,15 @@ def test_median(): ddf.x.median() -@pytest.mark.parametrize("method", ["default", "tdigest"]) +@pytest.mark.parametrize( + "method", + [ + "default", + pytest.param( + "tdigest", marks=pytest.importorskip("crick", reason="Requires crick") + ), + ], +) def test_median_approximate(method): df = pd.DataFrame({"x": range(100), "y": range(100, 200)}) ddf = dd.from_pandas(df, npartitions=10) From 0fe2ca8c73649a88505935e0f7d39ed65abef4f2 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 12 Sep 2022 14:12:58 -0500 Subject: [PATCH 4/8] Typo --- dask/dataframe/tests/test_dataframe.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dask/dataframe/tests/test_dataframe.py b/dask/dataframe/tests/test_dataframe.py index 5b8190917fb..07c02009126 100644 --- a/dask/dataframe/tests/test_dataframe.py +++ b/dask/dataframe/tests/test_dataframe.py @@ -4364,9 +4364,9 @@ def test_median(): @pytest.mark.parametrize( "method", [ - "default", + "dask", pytest.param( - "tdigest", marks=pytest.importorskip("crick", reason="Requires crick") + "tdigest", marks=pytest.mark.skipif(not crick, reason="Requires crick") ), ], ) From 14570b80d85de7b0f20910c6d31f695434d926d9 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 12 Sep 2022 14:24:29 -0500 Subject: [PATCH 5/8] API docs --- dask/dataframe/core.py | 22 +++++++++++++++++++++- dask/dataframe/tests/test_dataframe.py | 2 +- docs/source/dataframe-api.rst | 4 ++++ 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/dask/dataframe/core.py b/dask/dataframe/core.py index f804247a1c9..9c556ef224f 100644 --- a/dask/dataframe/core.py +++ b/dask/dataframe/core.py @@ -2165,11 +2165,22 @@ def median_approximate( axis=None, method="default", ): + """Return the approximate median of the values over the requested axis. + + Parameters + ---------- + axis : {0, 1, "index", "columns"} (default 0) + 0 or ``"index"`` for row-wise, 1 or ``"columns"`` for column-wise + method : {'default', 'tdigest', 'dask'}, optional + What method to use. By default will use Dask's internal custom + algorithm (``"dask"``). If set to ``"tdigest"`` will use tdigest + for floats and ints and fallback to the ``"dask"`` otherwise. + """ return self.quantile(q=0.5, axis=axis, method=method) @derived_from(pd.DataFrame) def median(self, axis=None, method="default"): - if axis == 1 or self.npartitions == 1: + if axis in (1, "columns") or self.npartitions == 1: # Can provide an exact median in these cases return self.quantile(q=0.5, axis=axis, method=method) raise NotImplementedError( @@ -3631,6 +3642,15 @@ def quantile(self, q=0.5, method="default"): return quantile(self, q, method=method) def median_approximate(self, method="default"): + """Return the approximate median of the values over the requested axis. + + Parameters + ---------- + method : {'default', 'tdigest', 'dask'}, optional + What method to use. By default will use Dask's internal custom + algorithm (``"dask"``). If set to ``"tdigest"`` will use tdigest + for floats and ints and fallback to the ``"dask"`` otherwise. + """ return self.quantile(q=0.5, method=method) @derived_from(pd.Series) diff --git a/dask/dataframe/tests/test_dataframe.py b/dask/dataframe/tests/test_dataframe.py index 07c02009126..ceb08f27059 100644 --- a/dask/dataframe/tests/test_dataframe.py +++ b/dask/dataframe/tests/test_dataframe.py @@ -4349,7 +4349,7 @@ def test_median(): assert_eq(ddf_single.x.median(), df.x.median(), check_names=False) # Ensure `median` redirects to `median_approximate` appropriately - for axis in [None, 0]: + for axis in [None, 0, "rows"]: with pytest.raises( NotImplementedError, match="See the `median_approximate` method instead" ): diff --git a/docs/source/dataframe-api.rst b/docs/source/dataframe-api.rst index f8ae7830eeb..652566ab456 100644 --- a/docs/source/dataframe-api.rst +++ b/docs/source/dataframe-api.rst @@ -73,6 +73,8 @@ Dataframe DataFrame.mask DataFrame.max DataFrame.mean + DataFrame.median + DataFrame.median_approximate DataFrame.melt DataFrame.memory_usage DataFrame.memory_usage_per_partition @@ -203,6 +205,8 @@ Series Series.mask Series.max Series.mean + Series.median + Series.median_approximate Series.memory_usage Series.memory_usage_per_partition Series.min From e224f4002c37989a342403debe35267b5a2d6cd1 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 12 Sep 2022 15:46:03 -0500 Subject: [PATCH 6/8] Fix pytest.param --- dask/dataframe/tests/test_dataframe.py | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/dask/dataframe/tests/test_dataframe.py b/dask/dataframe/tests/test_dataframe.py index ceb08f27059..9578cf3d627 100644 --- a/dask/dataframe/tests/test_dataframe.py +++ b/dask/dataframe/tests/test_dataframe.py @@ -398,11 +398,10 @@ def test_rename_series_method_2(): @pytest.mark.parametrize( "method,test_values", [ - ( - pytest.param( - "tdigest", marks=pytest.mark.skipif(not crick, reason="Requires crick") - ), + pytest.param( + "tdigest", (6, 10), + marks=pytest.mark.skipif(not crick, reason="Requires crick"), ), ("dask", (4, 20)), ], @@ -1350,11 +1349,10 @@ def test_nbytes(): @pytest.mark.parametrize( "method,expected", [ - ( - pytest.param( - "tdigest", marks=pytest.mark.skipif(not crick, reason="Requires crick") - ), + pytest.param( + "tdigest", (0.35, 3.80, 2.5, 6.5, 2.0), + marks=pytest.mark.skipif(not crick, reason="Requires crick"), ), ("dask", (0.0, 4.0, 1.2, 6.2, 2.0)), ], @@ -1447,10 +1445,8 @@ def test_empty_quantile(method): @pytest.mark.parametrize( "method,expected", [ - ( - pytest.param( - "tdigest", marks=pytest.mark.skipif(not crick, reason="Requires crick") - ), + pytest.param( + "tdigest", ( pd.Series([9.5, 29.5, 19.5], index=["A", "X", "B"]), pd.DataFrame( @@ -1459,6 +1455,7 @@ def test_empty_quantile(method): columns=["A", "X", "B"], ), ), + marks=pytest.mark.skipif(not crick, reason="Requires crick"), ), ( "dask", From bccfb5c476dc50713fe530fee98bfd0d7b6e26e0 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Tue, 13 Sep 2022 10:15:28 -0500 Subject: [PATCH 7/8] Workaround name --- dask/dataframe/core.py | 6 +++--- dask/dataframe/tests/test_dataframe.py | 24 +++++++++++++----------- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/dask/dataframe/core.py b/dask/dataframe/core.py index 9c556ef224f..068fcd39481 100644 --- a/dask/dataframe/core.py +++ b/dask/dataframe/core.py @@ -2176,13 +2176,13 @@ def median_approximate( algorithm (``"dask"``). If set to ``"tdigest"`` will use tdigest for floats and ints and fallback to the ``"dask"`` otherwise. """ - return self.quantile(q=0.5, axis=axis, method=method) + return self.quantile(q=0.5, axis=axis, method=method).rename(None) @derived_from(pd.DataFrame) def median(self, axis=None, method="default"): if axis in (1, "columns") or self.npartitions == 1: # Can provide an exact median in these cases - return self.quantile(q=0.5, axis=axis, method=method) + return self.median_approximate(axis=axis, method=method) raise NotImplementedError( "Dask doesn't implement an exact median in all cases as this is hard to do in parallel. " "See the `median_approximate` method instead, which uses an approximate algorithm." @@ -3657,7 +3657,7 @@ def median_approximate(self, method="default"): def median(self, method="default"): if self.npartitions == 1: # Can provide an exact median in these cases - return self.quantile(q=0.5, method=method) + return self.median_approximate(method=method) raise NotImplementedError( "Dask doesn't implement an exact median in all cases as this is hard to do in parallel. " "See the `median_approximate` method instead, which uses an approximate algorithm." diff --git a/dask/dataframe/tests/test_dataframe.py b/dask/dataframe/tests/test_dataframe.py index 9578cf3d627..60f98196bee 100644 --- a/dask/dataframe/tests/test_dataframe.py +++ b/dask/dataframe/tests/test_dataframe.py @@ -4339,11 +4339,10 @@ def test_median(): ddf = dd.from_pandas(df, npartitions=3) # Exact medians work when `axis=1` or when there's only have a single partition - # TODO: shouldn't need to specify `check_names=False` below, but names currently don't match - assert_eq(ddf.median(axis=1), df.median(axis=1), check_names=False) + assert_eq(ddf.median(axis=1), df.median(axis=1)) ddf_single = dd.from_pandas(df, npartitions=1) - assert_eq(ddf_single.median(axis=1), df.median(axis=1), check_names=False) - assert_eq(ddf_single.x.median(), df.x.median(), check_names=False) + assert_eq(ddf_single.median(axis=1), df.median(axis=1)) + assert_eq(ddf_single.x.median(), df.x.median()) # Ensure `median` redirects to `median_approximate` appropriately for axis in [None, 0, "rows"]: @@ -4370,13 +4369,16 @@ def test_median(): def test_median_approximate(method): df = pd.DataFrame({"x": range(100), "y": range(100, 200)}) ddf = dd.from_pandas(df, npartitions=10) - # TODO: shouldn't need to specify `check_names=False` below, but names currently don't match - assert_eq( - ddf.median_approximate(method=method), - df.median(), - check_names=False, - rtol=1, - ) + if PANDAS_GT_110: + assert_eq( + ddf.median_approximate(method=method), + df.median(), + atol=1, + ) + else: + result = ddf.median_approximate(method=method) + expected = df.median() + assert ((result - expected).abs() < 1).all().compute() def test_datetime_loc_open_slicing(): From 8e6584077a2d63ce1a4765a25ccd915299582526 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Tue, 13 Sep 2022 10:55:20 -0500 Subject: [PATCH 8/8] Increase test coverage for single-partition dataframes --- dask/dataframe/tests/test_dataframe.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dask/dataframe/tests/test_dataframe.py b/dask/dataframe/tests/test_dataframe.py index 60f98196bee..0eafc4e6a08 100644 --- a/dask/dataframe/tests/test_dataframe.py +++ b/dask/dataframe/tests/test_dataframe.py @@ -4342,6 +4342,7 @@ def test_median(): assert_eq(ddf.median(axis=1), df.median(axis=1)) ddf_single = dd.from_pandas(df, npartitions=1) assert_eq(ddf_single.median(axis=1), df.median(axis=1)) + assert_eq(ddf_single.median(axis=0), df.median(axis=0)) assert_eq(ddf_single.x.median(), df.x.median()) # Ensure `median` redirects to `median_approximate` appropriately