Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust for enforced deprecations in pandas #10899

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions dask/dataframe/_compat.py
Expand Up @@ -19,6 +19,7 @@
PANDAS_GE_210 = PANDAS_VERSION.release >= (2, 1, 0)
PANDAS_GE_211 = PANDAS_VERSION.release >= (2, 1, 1)
PANDAS_GE_220 = PANDAS_VERSION.release >= (2, 2, 0)
PANDAS_GE_300 = PANDAS_VERSION.release >= (3, 0, 0)

import pandas.testing as tm

Expand Down
135 changes: 69 additions & 66 deletions dask/dataframe/core.py
Expand Up @@ -39,6 +39,7 @@
PANDAS_GE_150,
PANDAS_GE_200,
PANDAS_GE_210,
PANDAS_GE_300,
PANDAS_VERSION,
check_convert_dtype_deprecation,
check_nuisance_columns_warning,
Expand Down Expand Up @@ -3808,84 +3809,86 @@ def resample(self, rule, closed=None, label=None):

return Resampler(self, rule, closed=closed, label=label)

@_deprecated(
message=(
"Will be removed in a future version. "
"Please create a mask and filter using .loc instead"
)
)
@derived_from(pd.DataFrame)
def first(self, offset):
# Let pandas error on bad args
self._meta_nonempty.first(offset)
if not PANDAS_GE_300:

if not self.known_divisions:
raise ValueError("`first` is not implemented for unknown divisions")
@_deprecated(
message=(
"Will be removed in a future version. "
"Please create a mask and filter using .loc instead"
)
)
@derived_from(pd.DataFrame)
def first(self, offset):
# Let pandas error on bad args
self._meta_nonempty.first(offset)

offset = pd.tseries.frequencies.to_offset(offset)
date = self.divisions[0] + offset
end = self.loc._get_partitions(date)
if not self.known_divisions:
raise ValueError("`first` is not implemented for unknown divisions")

is_anchored = offset.is_anchored()
offset = pd.tseries.frequencies.to_offset(offset)
date = self.divisions[0] + offset
end = self.loc._get_partitions(date)

include_right = is_anchored or not hasattr(offset, "delta")
is_anchored = offset.is_anchored()

if end == self.npartitions - 1:
divs = self.divisions
else:
divs = self.divisions[: end + 1] + (date,)
include_right = is_anchored or not hasattr(offset, "delta")

name = "first-" + tokenize(self, offset)
dsk = {(name, i): (self._name, i) for i in range(end)}
dsk[(name, end)] = (
methods.boundary_slice,
(self._name, end),
None,
date,
include_right,
True,
)
graph = HighLevelGraph.from_collections(name, dsk, dependencies=[self])
return new_dd_object(graph, name, self, divs)
if end == self.npartitions - 1:
divs = self.divisions
else:
divs = self.divisions[: end + 1] + (date,)

name = "first-" + tokenize(self, offset)
dsk = {(name, i): (self._name, i) for i in range(end)}
dsk[(name, end)] = (
methods.boundary_slice,
(self._name, end),
None,
date,
include_right,
True,
)
graph = HighLevelGraph.from_collections(name, dsk, dependencies=[self])
return new_dd_object(graph, name, self, divs)

@_deprecated(
message=(
"Will be removed in a future version. "
"Please create a mask and filter using .loc instead"
@_deprecated(
message=(
"Will be removed in a future version. "
"Please create a mask and filter using .loc instead"
)
)
)
@derived_from(pd.DataFrame)
def last(self, offset):
# Let pandas error on bad args
self._meta_nonempty.last(offset)
@derived_from(pd.DataFrame)
def last(self, offset):
# Let pandas error on bad args
self._meta_nonempty.last(offset)

if not self.known_divisions:
raise ValueError("`last` is not implemented for unknown divisions")
if not self.known_divisions:
raise ValueError("`last` is not implemented for unknown divisions")

offset = pd.tseries.frequencies.to_offset(offset)
date = self.divisions[-1] - offset
start = self.loc._get_partitions(date)
offset = pd.tseries.frequencies.to_offset(offset)
date = self.divisions[-1] - offset
start = self.loc._get_partitions(date)

if start == 0:
divs = self.divisions
else:
divs = (date,) + self.divisions[start + 1 :]
if start == 0:
divs = self.divisions
else:
divs = (date,) + self.divisions[start + 1 :]

name = "last-" + tokenize(self, offset)
dsk = {
(name, i + 1): (self._name, j + 1)
for i, j in enumerate(range(start, self.npartitions))
}
dsk[(name, 0)] = (
methods.boundary_slice,
(self._name, start),
date,
None,
True,
False,
)
graph = HighLevelGraph.from_collections(name, dsk, dependencies=[self])
return new_dd_object(graph, name, self, divs)
name = "last-" + tokenize(self, offset)
dsk = {
(name, i + 1): (self._name, j + 1)
for i, j in enumerate(range(start, self.npartitions))
}
dsk[(name, 0)] = (
methods.boundary_slice,
(self._name, start),
date,
None,
True,
False,
)
graph = HighLevelGraph.from_collections(name, dsk, dependencies=[self])
return new_dd_object(graph, name, self, divs)

def nunique_approx(self, split_every=None):
"""Approximate number of unique rows.
Expand Down
97 changes: 51 additions & 46 deletions dask/dataframe/groupby.py
Expand Up @@ -19,6 +19,7 @@
PANDAS_GE_200,
PANDAS_GE_210,
PANDAS_GE_220,
PANDAS_GE_300,
check_groupby_axis_deprecation,
check_numeric_only_deprecation,
check_observed_deprecation,
Expand Down Expand Up @@ -2859,57 +2860,61 @@ def _normalize_axis(self, axis, method: str):

return axis

@_deprecated(message="Please use `ffill`/`bfill` or `fillna` without a GroupBy.")
def fillna(self, value=None, method=None, limit=None, axis=no_default):
"""Fill NA/NaN values using the specified method.
if not PANDAS_GE_300:

Parameters
----------
value : scalar, default None
Value to use to fill holes (e.g. 0).
method : {'bfill', 'ffill', None}, default None
Method to use for filling holes in reindexed Series. ffill: propagate last
valid observation forward to next valid. bfill: use next valid observation
to fill gap.
axis : {0 or 'index', 1 or 'columns'}
Axis along which to fill missing values.
limit : int, default None
If method is specified, this is the maximum number of consecutive NaN values
to forward/backward fill. In other words, if there is a gap with more than
this number of consecutive NaNs, it will only be partially filled. If method
is not specified, this is the maximum number of entries along the entire
axis where NaNs will be filled. Must be greater than 0 if not None.

Returns
-------
Series or DataFrame
Object with missing values filled

See also
--------
pandas.core.groupby.DataFrameGroupBy.fillna
"""
axis = self._normalize_axis(axis, "fillna")
if not np.isscalar(value) and value is not None:
raise NotImplementedError(
"groupby-fillna with value=dict/Series/DataFrame is not supported"
)
@_deprecated(
message="Please use `ffill`/`bfill` or `fillna` without a GroupBy."
)
def fillna(self, value=None, method=None, limit=None, axis=no_default):
"""Fill NA/NaN values using the specified method.

Parameters
----------
value : scalar, default None
Value to use to fill holes (e.g. 0).
method : {'bfill', 'ffill', None}, default None
Method to use for filling holes in reindexed Series. ffill: propagate last
valid observation forward to next valid. bfill: use next valid observation
to fill gap.
axis : {0 or 'index', 1 or 'columns'}
Axis along which to fill missing values.
limit : int, default None
If method is specified, this is the maximum number of consecutive NaN values
to forward/backward fill. In other words, if there is a gap with more than
this number of consecutive NaNs, it will only be partially filled. If method
is not specified, this is the maximum number of entries along the entire
axis where NaNs will be filled. Must be greater than 0 if not None.

Returns
-------
Series or DataFrame
Object with missing values filled

See also
--------
pandas.core.groupby.DataFrameGroupBy.fillna
"""
axis = self._normalize_axis(axis, "fillna")
if not np.isscalar(value) and value is not None:
raise NotImplementedError(
"groupby-fillna with value=dict/Series/DataFrame is not supported"
)

kwargs = dict(value=value, method=method, limit=limit, axis=axis)
if PANDAS_GE_220:
func = M.fillna
kwargs.update(include_groups=False)
else:
func = _drop_apply
kwargs.update(by=self.by, what="fillna")
kwargs = dict(value=value, method=method, limit=limit, axis=axis)
if PANDAS_GE_220:
func = M.fillna
kwargs.update(include_groups=False)
else:
func = _drop_apply
kwargs.update(by=self.by, what="fillna")

meta = self._meta_nonempty.apply(func, **kwargs)
result = self.apply(func, meta=meta, **kwargs)
meta = self._meta_nonempty.apply(func, **kwargs)
result = self.apply(func, meta=meta, **kwargs)

if PANDAS_GE_150 and self.group_keys:
return result.map_partitions(M.droplevel, self.by)
if PANDAS_GE_150 and self.group_keys:
return result.map_partitions(M.droplevel, self.by)

return result
return result

@derived_from(pd.core.groupby.GroupBy)
def ffill(self, limit=None):
Expand Down
3 changes: 2 additions & 1 deletion dask/dataframe/tests/test_dataframe.py
Expand Up @@ -31,6 +31,7 @@
PANDAS_GE_200,
PANDAS_GE_210,
PANDAS_GE_220,
PANDAS_GE_300,
tm,
)
from dask.dataframe._pyarrow import to_pyarrow_string
Expand Down Expand Up @@ -4771,7 +4772,7 @@ def test_shift_with_freq_errors():
pytest.raises(NotImplementedError, lambda: ddf.index.shift(2))


@pytest.mark.skipif(DASK_EXPR_ENABLED, reason="deprecated in pandas")
@pytest.mark.skipif(DASK_EXPR_ENABLED or PANDAS_GE_300, reason="deprecated in pandas")
@pytest.mark.parametrize("method", ["first", "last"])
def test_first_and_last(method):
f = lambda x, offset: getattr(x, method)(offset)
Expand Down
3 changes: 2 additions & 1 deletion dask/dataframe/tests/test_groupby.py
Expand Up @@ -19,6 +19,7 @@
PANDAS_GE_200,
PANDAS_GE_210,
PANDAS_GE_220,
PANDAS_GE_300,
check_nuisance_columns_warning,
check_numeric_only_deprecation,
check_observed_deprecation,
Expand Down Expand Up @@ -1241,7 +1242,7 @@ def test_aggregate_median(spec, keys, shuffle_method):
ddf.groupby(keys).median(shuffle_method=False).compute()


@pytest.mark.skipif(DASK_EXPR_ENABLED, reason="deprecated in pandas")
@pytest.mark.skipif(DASK_EXPR_ENABLED or PANDAS_GE_300, reason="deprecated in pandas")
@pytest.mark.parametrize("axis", [0, 1])
@pytest.mark.parametrize("group_keys", [True, False, None])
@pytest.mark.parametrize("limit", [None, 1, 4])
Expand Down