Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DataFrame and Series median method #9483

Merged
merged 8 commits into from Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
50 changes: 50 additions & 0 deletions dask/dataframe/core.py
Expand Up @@ -2160,6 +2160,34 @@ def mean(
result.divisions = (self.columns.min(), self.columns.max())
return handle_out(out, result)

def median_approximate(
self,
axis=None,
method="default",
):
"""Return the approximate median of the values over the requested axis.

Parameters
----------
axis : {0, 1, "index", "columns"} (default 0)
0 or ``"index"`` for row-wise, 1 or ``"columns"`` for column-wise
method : {'default', 'tdigest', 'dask'}, optional
What method to use. By default will use Dask's internal custom
algorithm (``"dask"``). If set to ``"tdigest"`` will use tdigest
for floats and ints and fallback to the ``"dask"`` otherwise.
"""
return self.quantile(q=0.5, axis=axis, method=method)

@derived_from(pd.DataFrame)
def median(self, axis=None, method="default"):
if axis in (1, "columns") or self.npartitions == 1:
# Can provide an exact median in these cases
return self.quantile(q=0.5, axis=axis, method=method)
raise NotImplementedError(
"Dask doesn't implement an exact median in all cases as this is hard to do in parallel. "
"See the `median_approximate` method instead, which uses an approximate algorithm."
)

@_numeric_only
@derived_from(pd.DataFrame)
def var(
Expand Down Expand Up @@ -3613,6 +3641,28 @@ def quantile(self, q=0.5, method="default"):
"""
return quantile(self, q, method=method)

def median_approximate(self, method="default"):
"""Return the approximate median of the values over the requested axis.

Parameters
----------
method : {'default', 'tdigest', 'dask'}, optional
What method to use. By default will use Dask's internal custom
algorithm (``"dask"``). If set to ``"tdigest"`` will use tdigest
for floats and ints and fallback to the ``"dask"`` otherwise.
"""
return self.quantile(q=0.5, method=method)

@derived_from(pd.Series)
def median(self, method="default"):
if self.npartitions == 1:
# Can provide an exact median in these cases
return self.quantile(q=0.5, method=method)
raise NotImplementedError(
"Dask doesn't implement an exact median in all cases as this is hard to do in parallel. "
"See the `median_approximate` method instead, which uses an approximate algorithm."
)

def _repartition_quantiles(self, npartitions, upsample=1.0):
"""Approximate quantiles of Series used for repartitioning"""
from dask.dataframe.partitionquantiles import partition_quantiles
Expand Down
102 changes: 87 additions & 15 deletions dask/dataframe/tests/test_dataframe.py
Expand Up @@ -40,6 +40,11 @@
from dask.utils import M, is_dataframe_like, is_series_like, put_lines
from dask.utils_test import _check_warning, hlg_layer

try:
import crick
except ImportError:
crick = None

dsk = {
("x", 0): pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, index=[0, 1, 3]),
("x", 1): pd.DataFrame({"a": [4, 5, 6], "b": [3, 2, 1]}, index=[5, 6, 8]),
Expand Down Expand Up @@ -391,11 +396,17 @@ def test_rename_series_method_2():


@pytest.mark.parametrize(
"method,test_values", [("tdigest", (6, 10)), ("dask", (4, 20))]
"method,test_values",
[
pytest.param(
"tdigest",
(6, 10),
marks=pytest.mark.skipif(not crick, reason="Requires crick"),
),
("dask", (4, 20)),
],
)
def test_describe_numeric(method, test_values):
if method == "tdigest":
pytest.importorskip("crick")
# prepare test case which approx quantiles will be the same as actuals
s = pd.Series(list(range(test_values[1])) * test_values[0])
df = pd.DataFrame(
Expand Down Expand Up @@ -1337,11 +1348,16 @@ def test_nbytes():

@pytest.mark.parametrize(
"method,expected",
[("tdigest", (0.35, 3.80, 2.5, 6.5, 2.0)), ("dask", (0.0, 4.0, 1.2, 6.2, 2.0))],
[
pytest.param(
"tdigest",
(0.35, 3.80, 2.5, 6.5, 2.0),
marks=pytest.mark.skipif(not crick, reason="Requires crick"),
),
("dask", (0.0, 4.0, 1.2, 6.2, 2.0)),
],
)
def test_quantile(method, expected):
if method == "tdigest":
pytest.importorskip("crick")
# series / multiple
result = d.b.quantile([0.3, 0.7], method=method)

Expand Down Expand Up @@ -1380,10 +1396,16 @@ def test_quantile(method, expected):
assert result == expected[4]


@pytest.mark.parametrize("method", ["tdigest", "dask"])
@pytest.mark.parametrize(
"method",
[
pytest.param(
"tdigest", marks=pytest.mark.skipif(not crick, reason="Requires crick")
),
"dask",
],
)
def test_quantile_missing(method):
if method == "tdigest":
pytest.importorskip("crick")
df = pd.DataFrame({"A": [0, np.nan, 2]})
# TODO: Test npartitions=2
# (see https://github.com/dask/dask/issues/9227)
Expand All @@ -1397,10 +1419,16 @@ def test_quantile_missing(method):
assert_eq(result, expected)


@pytest.mark.parametrize("method", ["tdigest", "dask"])
@pytest.mark.parametrize(
"method",
[
pytest.param(
"tdigest", marks=pytest.mark.skipif(not crick, reason="Requires crick")
),
"dask",
],
)
def test_empty_quantile(method):
if method == "tdigest":
pytest.importorskip("crick")
result = d.b.quantile([], method=method)
exp = full.b.quantile([])
assert result.divisions == (None, None)
Expand All @@ -1417,7 +1445,7 @@ def test_empty_quantile(method):
@pytest.mark.parametrize(
"method,expected",
[
(
pytest.param(
"tdigest",
(
pd.Series([9.5, 29.5, 19.5], index=["A", "X", "B"]),
Expand All @@ -1427,6 +1455,7 @@ def test_empty_quantile(method):
columns=["A", "X", "B"],
),
),
marks=pytest.mark.skipif(not crick, reason="Requires crick"),
),
(
"dask",
Expand All @@ -1442,8 +1471,6 @@ def test_empty_quantile(method):
],
)
def test_dataframe_quantile(method, expected):
if method == "tdigest":
pytest.importorskip("crick")
# column X is for test column order and result division
df = pd.DataFrame(
{
Expand Down Expand Up @@ -4307,6 +4334,51 @@ def test_dataframe_mode():
assert_eq(ddf.mode(), df.mode(), check_index=False)


def test_median():
df = pd.DataFrame({"x": [1, 2, 3, 4, 5], "y": [1.1, 2.2, 3.3, 4.4, 5.5]})
ddf = dd.from_pandas(df, npartitions=3)

# Exact medians work when `axis=1` or when there's only have a single partition
# TODO: shouldn't need to specify `check_names=False` below, but names currently don't match
assert_eq(ddf.median(axis=1), df.median(axis=1), check_names=False)
ian-r-rose marked this conversation as resolved.
Show resolved Hide resolved
ddf_single = dd.from_pandas(df, npartitions=1)
assert_eq(ddf_single.median(axis=1), df.median(axis=1), check_names=False)
assert_eq(ddf_single.x.median(), df.x.median(), check_names=False)

# Ensure `median` redirects to `median_approximate` appropriately
for axis in [None, 0, "rows"]:
with pytest.raises(
NotImplementedError, match="See the `median_approximate` method instead"
):
ddf.median(axis=axis)

with pytest.raises(
NotImplementedError, match="See the `median_approximate` method instead"
):
ddf.x.median()


@pytest.mark.parametrize(
"method",
[
"dask",
pytest.param(
"tdigest", marks=pytest.mark.skipif(not crick, reason="Requires crick")
),
],
)
def test_median_approximate(method):
df = pd.DataFrame({"x": range(100), "y": range(100, 200)})
ddf = dd.from_pandas(df, npartitions=10)
# TODO: shouldn't need to specify `check_names=False` below, but names currently don't match
assert_eq(
ddf.median_approximate(method=method),
df.median(),
check_names=False,
rtol=1,
)


def test_datetime_loc_open_slicing():
dtRange = pd.date_range("01.01.2015", "05.05.2015")
df = pd.DataFrame(np.random.random((len(dtRange), 2)), index=dtRange)
Expand Down
4 changes: 4 additions & 0 deletions docs/source/dataframe-api.rst
Expand Up @@ -73,6 +73,8 @@ Dataframe
DataFrame.mask
DataFrame.max
DataFrame.mean
DataFrame.median
DataFrame.median_approximate
DataFrame.melt
DataFrame.memory_usage
DataFrame.memory_usage_per_partition
Expand Down Expand Up @@ -203,6 +205,8 @@ Series
Series.mask
Series.max
Series.mean
Series.median
Series.median_approximate
Series.memory_usage
Series.memory_usage_per_partition
Series.min
Expand Down