diff --git a/dask/dataframe/core.py b/dask/dataframe/core.py index 568e39de5e2..a043085ebff 100644 --- a/dask/dataframe/core.py +++ b/dask/dataframe/core.py @@ -62,6 +62,8 @@ is_index_like, is_series_like, make_meta, + meta_frame_constructor, + meta_series_constructor, raise_on_meta_error, valid_divisions, ) @@ -2482,8 +2484,8 @@ def _convert_time_cols_to_numeric(self, time_cols, axis, meta, skipna): # since each standard deviation will just be NaN needs_time_conversion = False numeric_dd = from_pandas( - pd.DataFrame( - {"_": pd.Series([np.nan])}, + meta_frame_constructor(self)( + {"_": meta_series_constructor(self)([np.nan])}, index=self.index, ), npartitions=self.npartitions, @@ -2813,7 +2815,7 @@ def quantile(self, q=0.5, axis=0, numeric_only=True, method="default"): graph = HighLevelGraph.from_collections( keyname, layer, dependencies=quantiles ) - return DataFrame(graph, keyname, meta, quantiles[0].divisions) + return new_dd_object(graph, keyname, meta, quantiles[0].divisions) @derived_from(pd.DataFrame) def describe( @@ -3062,7 +3064,7 @@ def _cum_agg( _take_last, cumpart, skipna, - meta=pd.Series([], dtype="float"), + meta=meta_series_constructor(self)([], dtype="float"), token=name2, ) @@ -3260,7 +3262,7 @@ def dot(self, other, meta=no_default): def _dot_series(*args, **kwargs): # .sum() is invoked on each partition before being applied to all # partitions. The return type is expected to be a series, not a numpy object - return pd.Series(M.dot(*args, **kwargs)) + return meta_series_constructor(self)(M.dot(*args, **kwargs)) return self.map_partitions(_dot_series, other, token="dot", meta=meta).sum( skipna=False @@ -3532,7 +3534,7 @@ def __array_wrap__(self, array, context=None): f"{method_name} is not implemented for `dask.dataframe.Series`." ) - return pd.Series(array, index=index, name=self.name) + return meta_series_constructor(self)(array, index=index, name=self.name) @property def axes(self): @@ -4513,7 +4515,7 @@ def __array_wrap__(self, array, context=None): f"{method_name} is not implemented for `dask.dataframe.DataFrame`." ) - return pd.DataFrame(array, index=index, columns=self.columns) + return meta_frame_constructor(self)(array, index=index, columns=self.columns) @property def axes(self): @@ -6323,7 +6325,7 @@ def split_evenly(df, k): def split_out_on_index(df): h = df.index if isinstance(h, pd.MultiIndex): - h = pd.DataFrame([], index=h).reset_index() + h = meta_frame_constructor(df)([], index=h).reset_index() return h @@ -7048,14 +7050,17 @@ def cov_corr(df, min_periods=None, corr=False, scalar=False, split_every=False): min_periods, corr, scalar, + df._meta, ) graph = HighLevelGraph.from_collections(name, dsk, dependencies=[df]) if scalar: return Scalar(graph, name, "f8") meta = make_meta( - [(c, "f8") for c in df.columns], index=df.columns, parent_meta=df._meta + [(c, "f8") for c in df.columns], + index=meta_series_constructor(df)(df.columns), + parent_meta=df._meta, ) - return DataFrame(graph, name, meta, (df.columns.min(), df.columns.max())) + return new_dd_object(graph, name, meta, (df.columns.min(), df.columns.max())) def cov_corr_chunk(df, corr=False): @@ -7128,7 +7133,7 @@ def cov_corr_combine(data_in, corr=False): return out -def cov_corr_agg(data, cols, min_periods=2, corr=False, scalar=False): +def cov_corr_agg(data, cols, min_periods=2, corr=False, scalar=False, like_df=None): out = cov_corr_combine(data, corr) counts = out["count"] C = out["cov"] @@ -7142,7 +7147,9 @@ def cov_corr_agg(data, cols, min_periods=2, corr=False, scalar=False): mat = C / den if scalar: return float(mat[0, 1]) - return pd.DataFrame(mat, columns=cols, index=cols) + return (pd.DataFrame if like_df is None else meta_frame_constructor(like_df))( + mat, columns=cols, index=cols + ) def pd_split(df, p, random_state=None, shuffle=False): @@ -7657,10 +7664,10 @@ def idxmaxmin_chunk(x, fn=None, skipna=True): idx = getattr(x, fn)(skipna=skipna) value = getattr(x, minmax)(skipna=skipna) else: - idx = value = pd.Series([], dtype="i8") + idx = value = meta_series_constructor(x)([], dtype="i8") if is_series_like(idx): - return pd.DataFrame({"idx": idx, "value": value}) - return pd.DataFrame({"idx": [idx], "value": [value]}) + return meta_frame_constructor(x)({"idx": idx, "value": value}) + return meta_frame_constructor(x)({"idx": [idx], "value": [value]}) def idxmaxmin_row(x, fn=None, skipna=True): @@ -7670,8 +7677,8 @@ def idxmaxmin_row(x, fn=None, skipna=True): idx = [getattr(x.value, fn)(skipna=skipna)] value = [getattr(x.value, minmax)(skipna=skipna)] else: - idx = value = pd.Series([], dtype="i8") - return pd.DataFrame({"idx": idx, "value": value}) + idx = value = meta_series_constructor(x)([], dtype="i8") + return meta_frame_constructor(x)({"idx": idx, "value": value}) def idxmaxmin_combine(x, fn=None, skipna=True): @@ -7765,7 +7772,7 @@ def to_datetime(arg, meta=None, **kwargs): "non-index-able arguments (like scalars)" ) else: - meta = pd.Series([pd.Timestamp("2000", **tz_kwarg)]) + meta = meta_series_constructor(arg)([pd.Timestamp("2000", **tz_kwarg)]) meta.index = meta.index.astype(arg.index.dtype) meta.index.name = arg.index.name return map_partitions(pd.to_datetime, arg, meta=meta, **kwargs) @@ -7775,7 +7782,7 @@ def to_datetime(arg, meta=None, **kwargs): def to_timedelta(arg, unit=None, errors="raise"): if not PANDAS_GT_110 and unit is None: unit = "ns" - meta = pd.Series([pd.Timedelta(1, unit=unit)]) + meta = meta_series_constructor(arg)([pd.Timedelta(1, unit=unit)]) return map_partitions(pd.to_timedelta, arg, unit=unit, errors=errors, meta=meta) diff --git a/dask/dataframe/tests/test_dataframe.py b/dask/dataframe/tests/test_dataframe.py index 4d6a0f0a43f..52627f34efe 100644 --- a/dask/dataframe/tests/test_dataframe.py +++ b/dask/dataframe/tests/test_dataframe.py @@ -3201,6 +3201,23 @@ def test_cov(): assert res._name != res3._name +@pytest.mark.gpu +def test_cov_gpu(): + cudf = pytest.importorskip("cudf") + + # cudf DataFrame + df = cudf.from_pandas(_compat.makeDataFrame()) + ddf = dd.from_pandas(df, npartitions=6) + + res = ddf.cov() + res2 = ddf.cov(split_every=2) + sol = df.cov() + assert_eq(res, sol) + assert_eq(res2, sol) + assert res._name == ddf.cov()._name + assert res._name != res2._name + + def test_corr(): # DataFrame df = _compat.makeMissingDataframe() @@ -3248,6 +3265,23 @@ def test_corr(): pytest.raises(TypeError, lambda: da.corr(ddf)) +@pytest.mark.gpu +def test_corr_gpu(): + cudf = pytest.importorskip("cudf") + + # cudf DataFrame + df = cudf.from_pandas(_compat.makeDataFrame()) + ddf = dd.from_pandas(df, npartitions=6) + + res = ddf.corr() + res2 = ddf.corr(split_every=2) + sol = df.corr() + assert_eq(res, sol) + assert_eq(res2, sol) + assert res._name == ddf.corr()._name + assert res._name != res2._name + + def test_corr_same_name(): # Series with same names (see https://github.com/dask/dask/issues/4906) diff --git a/dask/dataframe/tests/test_utils_dataframe.py b/dask/dataframe/tests/test_utils_dataframe.py index f5e46a229f6..4e0fc4502ad 100644 --- a/dask/dataframe/tests/test_utils_dataframe.py +++ b/dask/dataframe/tests/test_utils_dataframe.py @@ -20,7 +20,9 @@ is_index_like, is_series_like, make_meta, + meta_frame_constructor, meta_nonempty, + meta_series_constructor, raise_on_meta_error, shard_df_on_index, ) @@ -618,3 +620,33 @@ def check_custom_scheduler(part: pd.DataFrame) -> pd.DataFrame: assert_eq(ddf2, ddf2, scheduler=custom_scheduler) with dask.config.set(scheduler=custom_scheduler): assert_eq(ddf2, ddf2, scheduler=None) + + +@pytest.mark.parametrize( + "data", + [ + pd.DataFrame([0]), + pd.Series([0]), + pd.Index([0]), + dd.from_dict({"x": [0]}, npartitions=1), + dd.from_dict({"x": [0]}, npartitions=1).x, + dd.from_dict({"x": [0]}, npartitions=1).index, + ], +) +def test_meta_constructor_utilities(data): + assert meta_series_constructor(data) is pd.Series + assert meta_frame_constructor(data) is pd.DataFrame + + +@pytest.mark.parametrize( + "data", + [ + dd.from_dict({"x": [0]}, npartitions=1).x.values, + np.array([0]), + ], +) +def test_meta_constructor_utilities_raise(data): + with pytest.raises(TypeError, match="not supported by meta_series"): + meta_series_constructor(data) + with pytest.raises(TypeError, match="not supported by meta_frame"): + meta_frame_constructor(data) diff --git a/dask/dataframe/utils.py b/dask/dataframe/utils.py index f66212de4f0..a52a53619fb 100644 --- a/dask/dataframe/utils.py +++ b/dask/dataframe/utils.py @@ -734,3 +734,49 @@ def drop_by_shallow_copy(df, columns, errors="raise"): class AttributeNotImplementedError(NotImplementedError, AttributeError): """NotImplementedError and AttributeError""" + + +def meta_frame_constructor(like): + """Return a serial DataFrame constructor + + Parameters + ---------- + like : + Any series-like, Index-like or dataframe-like object. + """ + if is_dask_collection(like): + try: + like = like._meta + except AttributeError: + raise TypeError(f"{type(like)} not supported by meta_frame_constructor") + if is_dataframe_like(like): + return like._constructor + elif is_series_like(like): + return like._constructor_expanddim + elif is_index_like(like): + return like.to_frame()._constructor + else: + raise TypeError(f"{type(like)} not supported by meta_frame_constructor") + + +def meta_series_constructor(like): + """Return a serial Series constructor + + Parameters + ---------- + like : + Any series-like, Index-like or dataframe-like object. + """ + if is_dask_collection(like): + try: + like = like._meta + except AttributeError: + raise TypeError(f"{type(like)} not supported by meta_series_constructor") + if is_dataframe_like(like): + return like._constructor_sliced + elif is_series_like(like): + return like._constructor + elif is_index_like(like): + return like.to_frame()._constructor_sliced + else: + raise TypeError(f"{type(like)} not supported by meta_series_constructor")