Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure map_partitions returns Series object if function returns scalar #10739

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 14 additions & 3 deletions dask/dataframe/core.py
Expand Up @@ -16,6 +16,7 @@
is_datetime64_any_dtype,
is_extension_array_dtype,
is_numeric_dtype,
is_scalar,
is_timedelta64_dtype,
)
from tlz import first, merge, partition_all, remove, unique
Expand Down Expand Up @@ -3530,7 +3531,12 @@ def _cum_agg(
# cumulate each partitions
name1 = f"{self._token_prefix}{op_name}-map"
cumpart = map_partitions(
chunk, self, token=name1, meta=self, **chunk_kwargs
chunk,
self,
token=name1,
meta=self,
**chunk_kwargs,
enforce_metadata=False,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When using map_partitions internally we sometimes rely on the data not being cast to a Series. The cumulative aggregation logic becomes quite cumbersome otherwise. I hope this is not introducing any other weird artefacts 🤞

)

name2 = f"{self._token_prefix}{op_name}-take-last"
Expand All @@ -3540,6 +3546,7 @@ def _cum_agg(
skipna,
meta=meta_series_constructor(self)([], dtype="float"),
token=name2,
enforce_metadata=False,
)

suffix = tokenize(self)
Expand Down Expand Up @@ -4161,7 +4168,7 @@ def rename(self, index=None, inplace=no_default, sorted_index=False):
--------
pandas.Series.rename
"""
from pandas.api.types import is_dict_like, is_list_like, is_scalar
from pandas.api.types import is_dict_like, is_list_like

import dask.dataframe as dd

Expand Down Expand Up @@ -5636,7 +5643,7 @@ def assign(self, **kwargs):
isinstance(v, Scalar)
or is_series_like(v)
or callable(v)
or pd.api.types.is_scalar(v)
or is_scalar(v)
or is_index_like(v)
or isinstance(v, Array)
):
Expand Down Expand Up @@ -7384,6 +7391,10 @@ def apply_and_enforce(*args, **kwargs):
func = kwargs.pop("_func")
meta = kwargs.pop("_meta")
df = func(*args, **kwargs)

if is_scalar(df) and is_series_like(meta):
df = type(meta)([df])
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up removing the explicit dtype since it caused issues and was quite unnecessary considering that df is already the final result data. The casting/coercing logic of the Series takes care of compat stuff with nulls, etc.


if is_dataframe_like(df) or is_series_like(df) or is_index_like(df):
if not len(df):
return meta
Expand Down
4 changes: 2 additions & 2 deletions dask/dataframe/shuffle.py
Expand Up @@ -54,8 +54,8 @@ def _calculate_divisions(
"""
sizes = df.map_partitions(sizeof) if repartition else []
divisions = partition_col._repartition_quantiles(npartitions, upsample=upsample)
mins = partition_col.map_partitions(M.min)
maxes = partition_col.map_partitions(M.max)
mins = partition_col.map_partitions(M.min, enforce_metadata=False)
maxes = partition_col.map_partitions(M.max, enforce_metadata=False)

try:
divisions, sizes, mins, maxes = compute(divisions, sizes, mins, maxes)
Expand Down
20 changes: 18 additions & 2 deletions dask/dataframe/tests/test_dataframe.py
Expand Up @@ -1057,6 +1057,7 @@ def test_map_partitions():
d.map_partitions(lambda df: 1),
pd.Series([1, 1, 1], dtype=np.int64),
check_divisions=False,
check_index=False,
)
if not DASK_EXPR_ENABLED:
# We don't support instantiating a Scalar like this
Expand Down Expand Up @@ -4997,14 +4998,14 @@ def test_memory_usage_per_partition(index, deep):
for part in ddf.partitions
)
result = ddf.memory_usage_per_partition(index=index, deep=deep)
assert_eq(expected, result, check_index=not DASK_EXPR_ENABLED)
assert_eq(expected, result, check_index=False)

# Series.memory_usage_per_partition
expected = pd.Series(
part.x.compute().memory_usage(index=index, deep=deep) for part in ddf.partitions
)
result = ddf.x.memory_usage_per_partition(index=index, deep=deep)
assert_eq(expected, result, check_index=not DASK_EXPR_ENABLED)
assert_eq(expected, result, check_index=False)


@pytest.mark.parametrize(
Expand Down Expand Up @@ -6404,3 +6405,18 @@ def test_enforce_runtime_divisions():
RuntimeError, match="`enforce_runtime_divisions` failed for partition 1"
):
ddf.enforce_runtime_divisions().compute()


def test_map_partitions_always_series():
pdf = pd.DataFrame({"x": range(50)})
ddf = dd.from_pandas(pdf, 5)

def assert_series(x):
assert isinstance(x, pd.Series)
return x

res = ddf.x.map_partitions(M.min).map_partitions(assert_series).compute()
assert len(res) == ddf.npartitions
assert min(res) == min(pdf.x)

assert ddf.x.map_partitions(M.min).count().compute() == ddf.npartitions