New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Avoid pandas
constructors in dask.dataframe.core
#9570
Conversation
That's a very cool solution to this problem -- and mirror's what @pentschev did for NEP37 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @rjzamora. Just left a few quick comments while passing by, I'll take a more detailed look soon
@@ -7150,7 +7159,7 @@ def cov_corr_combine(data_in, corr=False): | |||
return out | |||
|
|||
|
|||
def cov_corr_agg(data, cols, min_periods=2, corr=False, scalar=False): | |||
def cov_corr_agg(data, cols, min_periods=2, corr=False, scalar=False, like_df=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is data
here? Can we get the DataFrame type from that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
data
is list[dict[str, np/cupy.ndarray]]
here. Therfore, we would need to extend serial_frame_constructor
to handle array-like data if we want to avoid the like_df
argument.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1ba0353 adds a serial_constructor_from_array
dispatch (but doesn't actually use it yet) to illustrate what it would probably look like.
@pytest.mark.gpu | ||
def test_cov_gpu(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the backend PR goes in before this PR, could we just reuse test_cov
with the backend engine parametrized?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps. We would need to change the _compat
dataframe-creation machinery to use dispachable functions to do this.
@pytest.mark.gpu | ||
def test_corr_gpu(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar comment here
I wonder if there are any ways to add a CI check to ensure that future invocations of frame-like constructors go through this path. Nothing lightweight springs to mind, but I imagine it would be fairly easy for me or others to accidentally introduce a pandas constructor when one of these utilities would be more appropriate. |
Yea, I also struggled to come up with a clever solutions for this problem (beyond the systematic expansion and maintenance of gpuci). One challenge is that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Circling back here, @rjzamora are there any additional changes you'd like to include here? It'd be nice to include this in the release tomorrow if feasible
@@ -7775,7 +7782,7 @@ def to_datetime(arg, meta=None, **kwargs): | |||
def to_timedelta(arg, unit=None, errors="raise"): | |||
if not PANDAS_GT_110 and unit is None: | |||
unit = "ns" | |||
meta = pd.Series([pd.Timedelta(1, unit=unit)]) | |||
meta = meta_series_constructor(arg)([pd.Timedelta(1, unit=unit)]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Not a blocking comment, just meant for other reviewers) Noting that this will fail for some inputs (e.g. numpy.ndarray
s) that are supported by pandas.to_timedelta
. However, it looks like things already fail with dd.to_timedelta
for such inputs on main
and we arguably get a better error message with the changes in this PR.
On main
:
In [1]: import numpy as np
In [2]: import dask.dataframe as dd
In [3]: dd.to_timedelta(np.arange(5), unit='s')
---------------------------------------------------------------------------
IndexError Traceback (most recent call last)
Input In [3], in <cell line: 1>()
----> 1 dd.to_timedelta(np.arange(5), unit='s')
File ~/projects/dask/dask/dask/dataframe/core.py:7779, in to_timedelta(arg, unit, errors)
7777 unit = "ns"
7778 meta = pd.Series([pd.Timedelta(1, unit=unit)])
-> 7779 return map_partitions(pd.to_timedelta, arg, unit=unit, errors=errors, meta=meta)
File ~/projects/dask/dask/dask/dataframe/core.py:6668, in map_partitions(func, meta, enforce_metadata, transform_divisions, align_dataframes, *args, **kwargs)
6665 if collections:
6666 simple = False
-> 6668 divisions = _get_divisions_map_partitions(
6669 align_dataframes, transform_divisions, dfs, func, args, kwargs
6670 )
6672 if has_keyword(func, "partition_info"):
6673 partition_info = {
6674 (i,): {"number": i, "division": division}
6675 for i, division in enumerate(divisions[:-1])
6676 }
File ~/projects/dask/dask/dask/dataframe/core.py:6711, in _get_divisions_map_partitions(align_dataframes, transform_divisions, dfs, func, args, kwargs)
6707 """
6708 Helper to get divisions for map_partitions and map_overlap output.
6709 """
6710 if align_dataframes:
-> 6711 divisions = dfs[0].divisions
6712 else:
6713 # Unaligned, dfs is a mix of 1 partition and 1+ partition dataframes,
6714 # use longest divisions found
6715 divisions = max((d.divisions for d in dfs), key=len)
IndexError: list index out of range
With this PR:
In [4]: dd.to_timedelta(np.arange(5), unit='s')
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Input In [4], in <cell line: 1>()
----> 1 dd.to_timedelta(np.arange(5), unit='s')
File ~/projects/dask/dask/dask/dataframe/core.py:7785, in to_timedelta(arg, unit, errors)
7783 if not PANDAS_GT_110 and unit is None:
7784 unit = "ns"
-> 7785 meta = meta_series_constructor(arg)([pd.Timedelta(1, unit=unit)])
7786 return map_partitions(pd.to_timedelta, arg, unit=unit, errors=errors, meta=meta)
File ~/projects/dask/dask/dask/dataframe/utils.py:782, in meta_series_constructor(like)
780 return like.to_frame()._constructor_sliced
781 else:
--> 782 raise TypeError(f"{type(like)} not supported by meta_series_constructor")
TypeError: <class 'numpy.ndarray'> not supported by meta_series_constructor
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
pandas
constructors in dask.dataframe.core
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @rjzamora!
There are many places in dask.dataframe where
pd.DataFrame
/pd.Series
constructors are used explicitly. This PR proposes the addition ofserial_frame_constructor
andserial_series_constrictor
utilities that take in an optionallike
parameter to determine whichDataFrame
/Series
constructor to use (i.e.pandas
orcudf
). Default ispandas.DataFrame
andpandas.Series
.The optional
like
argument is currently expected to be a serialDataFrame
, serialSeries
,DataFrame
collection, orSeries
collection. It may also make sense to handle a numpy/cupy Array or Array collection (along the lines of @quasiben's suggestion in #11889). Howeer, that feature will probablty require the addition of a newarray_to_frame
diispatch as well (or something similar).pre-commit run --all-files