From 142de2608df2494bf11e08038aadddb544b4500c Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Tue, 13 Sep 2022 16:44:49 -0500 Subject: [PATCH] Add `DataFrame` and `Series` `median` method (#9483) --- dask/dataframe/core.py | 50 ++++++++++++ dask/dataframe/tests/test_dataframe.py | 105 +++++++++++++++++++++---- docs/source/dataframe-api.rst | 4 + 3 files changed, 144 insertions(+), 15 deletions(-) diff --git a/dask/dataframe/core.py b/dask/dataframe/core.py index 55132d43773..c11642fd064 100644 --- a/dask/dataframe/core.py +++ b/dask/dataframe/core.py @@ -2162,6 +2162,34 @@ def mean( result.divisions = (self.columns.min(), self.columns.max()) return handle_out(out, result) + def median_approximate( + self, + axis=None, + method="default", + ): + """Return the approximate median of the values over the requested axis. + + Parameters + ---------- + axis : {0, 1, "index", "columns"} (default 0) + 0 or ``"index"`` for row-wise, 1 or ``"columns"`` for column-wise + method : {'default', 'tdigest', 'dask'}, optional + What method to use. By default will use Dask's internal custom + algorithm (``"dask"``). If set to ``"tdigest"`` will use tdigest + for floats and ints and fallback to the ``"dask"`` otherwise. + """ + return self.quantile(q=0.5, axis=axis, method=method).rename(None) + + @derived_from(pd.DataFrame) + def median(self, axis=None, method="default"): + if axis in (1, "columns") or self.npartitions == 1: + # Can provide an exact median in these cases + return self.median_approximate(axis=axis, method=method) + raise NotImplementedError( + "Dask doesn't implement an exact median in all cases as this is hard to do in parallel. " + "See the `median_approximate` method instead, which uses an approximate algorithm." + ) + @_numeric_only @derived_from(pd.DataFrame) def var( @@ -3631,6 +3659,28 @@ def quantile(self, q=0.5, method="default"): """ return quantile(self, q, method=method) + def median_approximate(self, method="default"): + """Return the approximate median of the values over the requested axis. + + Parameters + ---------- + method : {'default', 'tdigest', 'dask'}, optional + What method to use. By default will use Dask's internal custom + algorithm (``"dask"``). If set to ``"tdigest"`` will use tdigest + for floats and ints and fallback to the ``"dask"`` otherwise. + """ + return self.quantile(q=0.5, method=method) + + @derived_from(pd.Series) + def median(self, method="default"): + if self.npartitions == 1: + # Can provide an exact median in these cases + return self.median_approximate(method=method) + raise NotImplementedError( + "Dask doesn't implement an exact median in all cases as this is hard to do in parallel. " + "See the `median_approximate` method instead, which uses an approximate algorithm." + ) + def _repartition_quantiles(self, npartitions, upsample=1.0): """Approximate quantiles of Series used for repartitioning""" from dask.dataframe.partitionquantiles import partition_quantiles diff --git a/dask/dataframe/tests/test_dataframe.py b/dask/dataframe/tests/test_dataframe.py index 8350e6bd857..eaa9eae6d1f 100644 --- a/dask/dataframe/tests/test_dataframe.py +++ b/dask/dataframe/tests/test_dataframe.py @@ -41,6 +41,11 @@ from dask.utils import M, is_dataframe_like, is_series_like, put_lines from dask.utils_test import _check_warning, hlg_layer +try: + import crick +except ImportError: + crick = None + dsk = { ("x", 0): pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, index=[0, 1, 3]), ("x", 1): pd.DataFrame({"a": [4, 5, 6], "b": [3, 2, 1]}, index=[5, 6, 8]), @@ -393,11 +398,17 @@ def test_rename_series_method_2(): @pytest.mark.parametrize( - "method,test_values", [("tdigest", (6, 10)), ("dask", (4, 20))] + "method,test_values", + [ + pytest.param( + "tdigest", + (6, 10), + marks=pytest.mark.skipif(not crick, reason="Requires crick"), + ), + ("dask", (4, 20)), + ], ) def test_describe_numeric(method, test_values): - if method == "tdigest": - pytest.importorskip("crick") # prepare test case which approx quantiles will be the same as actuals s = pd.Series(list(range(test_values[1])) * test_values[0]) df = pd.DataFrame( @@ -1365,11 +1376,16 @@ def test_nbytes(): @pytest.mark.parametrize( "method,expected", - [("tdigest", (0.35, 3.80, 2.5, 6.5, 2.0)), ("dask", (0.0, 4.0, 1.2, 6.2, 2.0))], + [ + pytest.param( + "tdigest", + (0.35, 3.80, 2.5, 6.5, 2.0), + marks=pytest.mark.skipif(not crick, reason="Requires crick"), + ), + ("dask", (0.0, 4.0, 1.2, 6.2, 2.0)), + ], ) def test_quantile(method, expected): - if method == "tdigest": - pytest.importorskip("crick") # series / multiple result = d.b.quantile([0.3, 0.7], method=method) @@ -1408,10 +1424,16 @@ def test_quantile(method, expected): assert result == expected[4] -@pytest.mark.parametrize("method", ["tdigest", "dask"]) +@pytest.mark.parametrize( + "method", + [ + pytest.param( + "tdigest", marks=pytest.mark.skipif(not crick, reason="Requires crick") + ), + "dask", + ], +) def test_quantile_missing(method): - if method == "tdigest": - pytest.importorskip("crick") df = pd.DataFrame({"A": [0, np.nan, 2]}) # TODO: Test npartitions=2 # (see https://github.com/dask/dask/issues/9227) @@ -1425,10 +1447,16 @@ def test_quantile_missing(method): assert_eq(result, expected) -@pytest.mark.parametrize("method", ["tdigest", "dask"]) +@pytest.mark.parametrize( + "method", + [ + pytest.param( + "tdigest", marks=pytest.mark.skipif(not crick, reason="Requires crick") + ), + "dask", + ], +) def test_empty_quantile(method): - if method == "tdigest": - pytest.importorskip("crick") result = d.b.quantile([], method=method) exp = full.b.quantile([]) assert result.divisions == (None, None) @@ -1445,7 +1473,7 @@ def test_empty_quantile(method): @pytest.mark.parametrize( "method,expected", [ - ( + pytest.param( "tdigest", ( pd.Series([9.5, 29.5, 19.5], index=["A", "X", "B"]), @@ -1455,6 +1483,7 @@ def test_empty_quantile(method): columns=["A", "X", "B"], ), ), + marks=pytest.mark.skipif(not crick, reason="Requires crick"), ), ( "dask", @@ -1470,8 +1499,6 @@ def test_empty_quantile(method): ], ) def test_dataframe_quantile(method, expected): - if method == "tdigest": - pytest.importorskip("crick") # column X is for test column order and result division df = pd.DataFrame( { @@ -4335,6 +4362,54 @@ def test_dataframe_mode(): assert_eq(ddf.mode(), df.mode(), check_index=False) +def test_median(): + df = pd.DataFrame({"x": [1, 2, 3, 4, 5], "y": [1.1, 2.2, 3.3, 4.4, 5.5]}) + ddf = dd.from_pandas(df, npartitions=3) + + # Exact medians work when `axis=1` or when there's only have a single partition + assert_eq(ddf.median(axis=1), df.median(axis=1)) + ddf_single = dd.from_pandas(df, npartitions=1) + assert_eq(ddf_single.median(axis=1), df.median(axis=1)) + assert_eq(ddf_single.median(axis=0), df.median(axis=0)) + assert_eq(ddf_single.x.median(), df.x.median()) + + # Ensure `median` redirects to `median_approximate` appropriately + for axis in [None, 0, "rows"]: + with pytest.raises( + NotImplementedError, match="See the `median_approximate` method instead" + ): + ddf.median(axis=axis) + + with pytest.raises( + NotImplementedError, match="See the `median_approximate` method instead" + ): + ddf.x.median() + + +@pytest.mark.parametrize( + "method", + [ + "dask", + pytest.param( + "tdigest", marks=pytest.mark.skipif(not crick, reason="Requires crick") + ), + ], +) +def test_median_approximate(method): + df = pd.DataFrame({"x": range(100), "y": range(100, 200)}) + ddf = dd.from_pandas(df, npartitions=10) + if PANDAS_GT_110: + assert_eq( + ddf.median_approximate(method=method), + df.median(), + atol=1, + ) + else: + result = ddf.median_approximate(method=method) + expected = df.median() + assert ((result - expected).abs() < 1).all().compute() + + def test_datetime_loc_open_slicing(): dtRange = pd.date_range("01.01.2015", "05.05.2015") df = pd.DataFrame(np.random.random((len(dtRange), 2)), index=dtRange) diff --git a/docs/source/dataframe-api.rst b/docs/source/dataframe-api.rst index f8ae7830eeb..652566ab456 100644 --- a/docs/source/dataframe-api.rst +++ b/docs/source/dataframe-api.rst @@ -73,6 +73,8 @@ Dataframe DataFrame.mask DataFrame.max DataFrame.mean + DataFrame.median + DataFrame.median_approximate DataFrame.melt DataFrame.memory_usage DataFrame.memory_usage_per_partition @@ -203,6 +205,8 @@ Series Series.mask Series.max Series.mean + Series.median + Series.median_approximate Series.memory_usage Series.memory_usage_per_partition Series.min