From a59af4a56b5a3ce3d4b2c56cc825d4f6e74420cb Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 11 Oct 2022 15:28:30 -0700 Subject: [PATCH 01/11] avoid using pd.DataFrame and pd.Series throughout dask.dataframe.core --- dask/dataframe/core.py | 61 +++++++++++++++++++++++++---------------- dask/dataframe/utils.py | 20 ++++++++++++++ 2 files changed, 58 insertions(+), 23 deletions(-) diff --git a/dask/dataframe/core.py b/dask/dataframe/core.py index 49ba5c15384..fa3f0b09978 100644 --- a/dask/dataframe/core.py +++ b/dask/dataframe/core.py @@ -63,6 +63,8 @@ is_series_like, make_meta, raise_on_meta_error, + serial_frame_constructor, + serial_series_constructor, valid_divisions, ) from dask.delayed import Delayed, delayed, unpack_collections @@ -2482,8 +2484,8 @@ def _convert_time_cols_to_numeric(self, time_cols, axis, meta, skipna): # since each standard deviation will just be NaN needs_time_conversion = False numeric_dd = from_pandas( - pd.DataFrame( - {"_": pd.Series([np.nan])}, + serial_frame_constructor(self)( + {"_": serial_series_constructor(self)([np.nan])}, index=self.index, ), npartitions=self.npartitions, @@ -3062,7 +3064,7 @@ def _cum_agg( _take_last, cumpart, skipna, - meta=pd.Series([], dtype="float"), + meta=serial_series_constructor(self)([], dtype="float"), token=name2, ) @@ -3260,7 +3262,7 @@ def dot(self, other, meta=no_default): def _dot_series(*args, **kwargs): # .sum() is invoked on each partition before being applied to all # partitions. The return type is expected to be a series, not a numpy object - return pd.Series(M.dot(*args, **kwargs)) + return serial_series_constructor(self)(M.dot(*args, **kwargs)) return self.map_partitions(_dot_series, other, token="dot", meta=meta).sum( skipna=False @@ -3532,7 +3534,7 @@ def __array_wrap__(self, array, context=None): f"{method_name} is not implemented for `dask.dataframe.Series`." ) - return pd.Series(array, index=index, name=self.name) + return serial_series_constructor(self)(array, index=index, name=self.name) @property def axes(self): @@ -3675,7 +3677,9 @@ def rename(self, index=None, inplace=False, sorted_index=False): res = self.map_partitions(M.rename, index, enforce_metadata=False) if self.known_divisions: if sorted_index and (callable(index) or is_dict_like(index)): - old = pd.Series(range(self.npartitions + 1), index=self.divisions) + old = serial_series_constructor(self)( + range(self.npartitions + 1), index=self.divisions + ) new = old.rename(index).index if not new.is_monotonic_increasing: msg = ( @@ -4398,7 +4402,9 @@ def map(self, arg, na_action=None, meta=no_default, is_monotonic=False): applied = super().map(arg, na_action=na_action, meta=meta) if is_monotonic and self.known_divisions: applied.divisions = tuple( - pd.Series(self.divisions).map(arg, na_action=na_action) + serial_series_constructor(self)(self.divisions).map( + arg, na_action=na_action + ) ) else: applied = applied.clear_divisions() @@ -4513,7 +4519,7 @@ def __array_wrap__(self, array, context=None): f"{method_name} is not implemented for `dask.dataframe.DataFrame`." ) - return pd.DataFrame(array, index=index, columns=self.columns) + return serial_frame_constructor(self)(array, index=index, columns=self.columns) @property def axes(self): @@ -5973,7 +5979,9 @@ def _repr_data(self): index = self._repr_divisions cols = meta.columns if len(cols) == 0: - series_df = pd.DataFrame([[]] * len(index), columns=cols, index=index) + series_df = serial_frame_constructor(self)( + [[]] * len(index), columns=cols, index=index + ) else: series_df = pd.concat( [_repr_data_series(s, index=index) for _, s in meta.items()], axis=1 @@ -6345,7 +6353,7 @@ def split_evenly(df, k): def split_out_on_index(df): h = df.index if isinstance(h, pd.MultiIndex): - h = pd.DataFrame([], index=h).reset_index() + h = serial_frame_constructor(df)([], index=h).reset_index() return h @@ -7070,6 +7078,7 @@ def cov_corr(df, min_periods=None, corr=False, scalar=False, split_every=False): min_periods, corr, scalar, + df._meta, ) graph = HighLevelGraph.from_collections(name, dsk, dependencies=[df]) if scalar: @@ -7150,7 +7159,7 @@ def cov_corr_combine(data_in, corr=False): return out -def cov_corr_agg(data, cols, min_periods=2, corr=False, scalar=False): +def cov_corr_agg(data, cols, min_periods=2, corr=False, scalar=False, like_df=None): out = cov_corr_combine(data, corr) counts = out["count"] C = out["cov"] @@ -7164,7 +7173,7 @@ def cov_corr_agg(data, cols, min_periods=2, corr=False, scalar=False): mat = C / den if scalar: return float(mat[0, 1]) - return pd.DataFrame(mat, columns=cols, index=cols) + return serial_frame_constructor(like_df)(mat, columns=cols, index=cols) def pd_split(df, p, random_state=None, shuffle=False): @@ -7477,7 +7486,7 @@ def repartition_size(df, size): split_mem_usages = [] for n, usage in zip(nsplits, mem_usages): split_mem_usages.extend([usage / n] * n) - mem_usages = pd.Series(split_mem_usages) + mem_usages = serial_series_constructor(df)(split_mem_usages) # 2. now that all partitions are less than size, concat them up to size assert np.all(mem_usages <= size) @@ -7509,7 +7518,9 @@ def repartition_npartitions(df, npartitions): else: # Drop duplcates in case last partition has same # value for min and max division - original_divisions = divisions = pd.Series(df.divisions).drop_duplicates() + original_divisions = divisions = serial_series_constructor(df)( + df.divisions + ).drop_duplicates() if df.known_divisions and ( np.issubdtype(divisions.dtype, np.datetime64) or np.issubdtype(divisions.dtype, np.number) @@ -7528,7 +7539,9 @@ def repartition_npartitions(df, npartitions): ) if np.issubdtype(original_divisions.dtype, np.datetime64): divisions = methods.tolist( - pd.Series(divisions).astype(original_divisions.dtype) + serial_series_constructor(df)(divisions).astype( + original_divisions.dtype + ) ) elif np.issubdtype(original_divisions.dtype, np.integer): divisions = divisions.astype(original_divisions.dtype) @@ -7679,10 +7692,10 @@ def idxmaxmin_chunk(x, fn=None, skipna=True): idx = getattr(x, fn)(skipna=skipna) value = getattr(x, minmax)(skipna=skipna) else: - idx = value = pd.Series([], dtype="i8") + idx = value = serial_series_constructor(x)([], dtype="i8") if is_series_like(idx): - return pd.DataFrame({"idx": idx, "value": value}) - return pd.DataFrame({"idx": [idx], "value": [value]}) + return serial_frame_constructor(x)({"idx": idx, "value": value}) + return serial_frame_constructor(x)({"idx": [idx], "value": [value]}) def idxmaxmin_row(x, fn=None, skipna=True): @@ -7692,8 +7705,8 @@ def idxmaxmin_row(x, fn=None, skipna=True): idx = [getattr(x.value, fn)(skipna=skipna)] value = [getattr(x.value, minmax)(skipna=skipna)] else: - idx = value = pd.Series([], dtype="i8") - return pd.DataFrame({"idx": idx, "value": value}) + idx = value = serial_series_constructor(x)([], dtype="i8") + return serial_frame_constructor(x)({"idx": idx, "value": value}) def idxmaxmin_combine(x, fn=None, skipna=True): @@ -7787,7 +7800,7 @@ def to_datetime(arg, meta=None, **kwargs): "non-index-able arguments (like scalars)" ) else: - meta = pd.Series([pd.Timestamp("2000", **tz_kwarg)]) + meta = serial_series_constructor(arg)([pd.Timestamp("2000", **tz_kwarg)]) meta.index = meta.index.astype(arg.index.dtype) meta.index.name = arg.index.name return map_partitions(pd.to_datetime, arg, meta=meta, **kwargs) @@ -7797,7 +7810,7 @@ def to_datetime(arg, meta=None, **kwargs): def to_timedelta(arg, unit=None, errors="raise"): if not PANDAS_GT_110 and unit is None: unit = "ns" - meta = pd.Series([pd.Timedelta(1, unit=unit)]) + meta = serial_series_constructor(arg)([pd.Timedelta(1, unit=unit)]) return map_partitions(pd.to_timedelta, arg, unit=unit, errors=errors, meta=meta) @@ -7818,7 +7831,9 @@ def _repr_data_series(s, index): dtype = "category[unknown]" else: dtype = str(s.dtype) - return pd.Series([dtype] + ["..."] * npartitions, index=index, name=s.name) + return serial_series_constructor(s)( + [dtype] + ["..."] * npartitions, index=index, name=s.name + ) def has_parallel_type(x): diff --git a/dask/dataframe/utils.py b/dask/dataframe/utils.py index f66212de4f0..97ae632bd31 100644 --- a/dask/dataframe/utils.py +++ b/dask/dataframe/utils.py @@ -734,3 +734,23 @@ def drop_by_shallow_copy(df, columns, errors="raise"): class AttributeNotImplementedError(NotImplementedError, AttributeError): """NotImplementedError and AttributeError""" + + +def serial_frame_constructor(like=None): + """Return a serial DataFrame constructor""" + if is_dask_collection(like): + like = like._meta + if hasattr(like, "to_frame"): + # `like` is a Series rather than a DataFrame + like = like.iloc[:1].to_frame() + return pd.DataFrame if like is None else like._constructor + + +def serial_series_constructor(like=None): + """Return a serial Series constructor""" + if is_dask_collection(like): + like = like._meta + if not hasattr(like, "to_frame"): + # `like` is a DataFrame rather than a Series + return like._constructor_sliced + return pd.Series if like is None else like._constructor From 5ec150d6ba27bf86322a818b4f8757d974710cc0 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 11 Oct 2022 16:39:41 -0700 Subject: [PATCH 02/11] add basic test coverage --- dask/dataframe/core.py | 4 +-- dask/dataframe/tests/test_dataframe.py | 36 ++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/dask/dataframe/core.py b/dask/dataframe/core.py index fa3f0b09978..411e2fbcf42 100644 --- a/dask/dataframe/core.py +++ b/dask/dataframe/core.py @@ -7831,9 +7831,7 @@ def _repr_data_series(s, index): dtype = "category[unknown]" else: dtype = str(s.dtype) - return serial_series_constructor(s)( - [dtype] + ["..."] * npartitions, index=index, name=s.name - ) + return pd.Series([dtype] + ["..."] * npartitions, index=index, name=s.name) def has_parallel_type(x): diff --git a/dask/dataframe/tests/test_dataframe.py b/dask/dataframe/tests/test_dataframe.py index 8ce96e750a4..598f54ccbd2 100644 --- a/dask/dataframe/tests/test_dataframe.py +++ b/dask/dataframe/tests/test_dataframe.py @@ -3202,6 +3202,24 @@ def test_cov(): assert res._name != res3._name +@pytest.mark.gpu +def test_cov_gpu(): + cudf = pytest.importorskip("cudf") + + # cudf DataFrame + df = cudf.from_pandas(_compat.makeDataFrame()) + ddf = dd.from_pandas(df, npartitions=6) + + res = ddf.cov() + res2 = ddf.cov(split_every=2) + sol = df.cov() + # TODO: dtype of res.index is wrong before compute + assert_eq(res.compute(), sol) + assert_eq(res2.compute(), sol) + assert res._name == ddf.cov()._name + assert res._name != res2._name + + def test_corr(): # DataFrame df = _compat.makeMissingDataframe() @@ -3249,6 +3267,24 @@ def test_corr(): pytest.raises(TypeError, lambda: da.corr(ddf)) +@pytest.mark.gpu +def test_corr_gpu(): + cudf = pytest.importorskip("cudf") + + # cudf DataFrame + df = cudf.from_pandas(_compat.makeDataFrame()) + ddf = dd.from_pandas(df, npartitions=6) + + res = ddf.corr() + res2 = ddf.corr(split_every=2) + sol = df.corr() + # TODO: dtype of res.index is wrong before compute + assert_eq(res.compute(), sol) + assert_eq(res2.compute(), sol) + assert res._name == ddf.corr()._name + assert res._name != res2._name + + def test_corr_same_name(): # Series with same names (see https://github.com/dask/dask/issues/4906) From 91538c8c246d92598a0ba97c41572df89ed12e0e Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 13 Oct 2022 08:38:21 -0700 Subject: [PATCH 03/11] simplify logic --- dask/dataframe/utils.py | 48 ++++++++++++++++++++++++++++++----------- 1 file changed, 35 insertions(+), 13 deletions(-) diff --git a/dask/dataframe/utils.py b/dask/dataframe/utils.py index 97ae632bd31..ed66ca625d5 100644 --- a/dask/dataframe/utils.py +++ b/dask/dataframe/utils.py @@ -736,21 +736,43 @@ class AttributeNotImplementedError(NotImplementedError, AttributeError): """NotImplementedError and AttributeError""" -def serial_frame_constructor(like=None): - """Return a serial DataFrame constructor""" +def serial_frame_constructor(like): + """Return a serial DataFrame constructor + + Parameters + ---------- + like : + Any series-like or dataframe-like object. + """ if is_dask_collection(like): - like = like._meta - if hasattr(like, "to_frame"): - # `like` is a Series rather than a DataFrame - like = like.iloc[:1].to_frame() - return pd.DataFrame if like is None else like._constructor + try: + like = like._meta + except AttributeError: + pass # TODO: Handle array-like + if is_dataframe_like(like): + return like._constructor + elif is_series_like(like): + return like._constructor_expanddim + else: + raise TypeError(f"{type(like)} not supported by serial_frame_constructor") -def serial_series_constructor(like=None): - """Return a serial Series constructor""" +def serial_series_constructor(like): + """Return a serial Series constructor + + Parameters + ---------- + like : + Any series-like or dataframe-like object. + """ if is_dask_collection(like): - like = like._meta - if not hasattr(like, "to_frame"): - # `like` is a DataFrame rather than a Series + try: + like = like._meta + except AttributeError: + pass # TODO: Handle array-like + if is_dataframe_like(like): return like._constructor_sliced - return pd.Series if like is None else like._constructor + elif is_series_like(like): + return like._constructor + else: + raise TypeError(f"{type(like)} not supported by serial_frame_constructor") From dfe11140f2bd99020a16e33315d421ff3371b7d5 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 13 Oct 2022 09:26:00 -0700 Subject: [PATCH 04/11] handle index --- dask/dataframe/utils.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dask/dataframe/utils.py b/dask/dataframe/utils.py index ed66ca625d5..74e2209a90d 100644 --- a/dask/dataframe/utils.py +++ b/dask/dataframe/utils.py @@ -753,6 +753,8 @@ def serial_frame_constructor(like): return like._constructor elif is_series_like(like): return like._constructor_expanddim + elif is_index_like(like): + return like.to_frame()._constructor else: raise TypeError(f"{type(like)} not supported by serial_frame_constructor") @@ -774,5 +776,7 @@ def serial_series_constructor(like): return like._constructor_sliced elif is_series_like(like): return like._constructor + elif is_index_like(like): + return like.to_frame()._constructor_sliced else: raise TypeError(f"{type(like)} not supported by serial_frame_constructor") From 1ba03535ea5b864b7086faeb5de15d522ba11411 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 13 Oct 2022 09:45:54 -0700 Subject: [PATCH 05/11] add experimental serial_constructor_from_array dispatch - but don't actually use it yet --- dask/dataframe/backends.py | 8 ++++++++ dask/dataframe/core.py | 2 ++ dask/dataframe/dispatch.py | 1 + dask/dataframe/utils.py | 10 ++++++++-- 4 files changed, 19 insertions(+), 2 deletions(-) diff --git a/dask/dataframe/backends.py b/dask/dataframe/backends.py index f26b5603fd7..3fb7cf09654 100644 --- a/dask/dataframe/backends.py +++ b/dask/dataframe/backends.py @@ -31,6 +31,7 @@ make_meta_obj, meta_nonempty, pyarrow_schema_dispatch, + serial_constructor_from_array, tolist_dispatch, union_categoricals_dispatch, ) @@ -85,6 +86,13 @@ def _(x, index=None): pass +@serial_constructor_from_array.register(np.ndarray) +def serial_constructor_from_array_numpy(obj, series=False): + if series: + return pd.Series + return pd.DataFrame + + @pyarrow_schema_dispatch.register((pd.DataFrame,)) def get_pyarrow_schema_pandas(obj): import pyarrow as pa diff --git a/dask/dataframe/core.py b/dask/dataframe/core.py index 411e2fbcf42..c7e37cc686d 100644 --- a/dask/dataframe/core.py +++ b/dask/dataframe/core.py @@ -7160,6 +7160,8 @@ def cov_corr_combine(data_in, corr=False): def cov_corr_agg(data, cols, min_periods=2, corr=False, scalar=False, like_df=None): + # TODO: Remove like_df when a serial_constructor_from_array + # dispatch definition is added in dask_cudf out = cov_corr_combine(data, corr) counts = out["count"] C = out["cov"] diff --git a/dask/dataframe/dispatch.py b/dask/dataframe/dispatch.py index 4c6badb5f23..ff759aca1fe 100644 --- a/dask/dataframe/dispatch.py +++ b/dask/dataframe/dispatch.py @@ -13,6 +13,7 @@ make_meta_dispatch = Dispatch("make_meta_dispatch") make_meta_obj = Dispatch("make_meta_obj") meta_nonempty = Dispatch("meta_nonempty") +serial_constructor_from_array = Dispatch("serial_constructor_from_array") hash_object_dispatch = Dispatch("hash_object_dispatch") group_split_dispatch = Dispatch("group_split_dispatch") get_parallel_type = Dispatch("get_parallel_type") diff --git a/dask/dataframe/utils.py b/dask/dataframe/utils.py index 74e2209a90d..33d451dd37e 100644 --- a/dask/dataframe/utils.py +++ b/dask/dataframe/utils.py @@ -25,10 +25,12 @@ make_meta, make_meta_obj, meta_nonempty, + serial_constructor_from_array, ) from dask.dataframe.extensions import make_scalar from dask.utils import ( asciitable, + is_arraylike, is_dataframe_like, is_index_like, is_series_like, @@ -748,13 +750,15 @@ def serial_frame_constructor(like): try: like = like._meta except AttributeError: - pass # TODO: Handle array-like + like = like.meta if is_dataframe_like(like): return like._constructor elif is_series_like(like): return like._constructor_expanddim elif is_index_like(like): return like.to_frame()._constructor + elif is_arraylike(like): + return serial_constructor_from_array(like) else: raise TypeError(f"{type(like)} not supported by serial_frame_constructor") @@ -771,12 +775,14 @@ def serial_series_constructor(like): try: like = like._meta except AttributeError: - pass # TODO: Handle array-like + like = like.meta if is_dataframe_like(like): return like._constructor_sliced elif is_series_like(like): return like._constructor elif is_index_like(like): return like.to_frame()._constructor_sliced + elif is_arraylike(like): + return serial_constructor_from_array(like, series=True) else: raise TypeError(f"{type(like)} not supported by serial_frame_constructor") From ee35931e9e6657bafd16acc35b7ed1a22d074bfe Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 18 Oct 2022 10:27:55 -0700 Subject: [PATCH 06/11] remove from_array dispatch and change names from serial_ to meta_ --- dask/dataframe/backends.py | 8 ------- dask/dataframe/core.py | 48 ++++++++++++++++++-------------------- dask/dataframe/dispatch.py | 1 - dask/dataframe/utils.py | 18 +++++--------- 4 files changed, 29 insertions(+), 46 deletions(-) diff --git a/dask/dataframe/backends.py b/dask/dataframe/backends.py index b6d83dc80ed..fcbf1c99bd4 100644 --- a/dask/dataframe/backends.py +++ b/dask/dataframe/backends.py @@ -32,7 +32,6 @@ make_meta_obj, meta_nonempty, pyarrow_schema_dispatch, - serial_constructor_from_array, tolist_dispatch, union_categoricals_dispatch, ) @@ -211,13 +210,6 @@ def _(x, index=None): pass -@serial_constructor_from_array.register(np.ndarray) -def serial_constructor_from_array_numpy(obj, series=False): - if series: - return pd.Series - return pd.DataFrame - - @pyarrow_schema_dispatch.register((pd.DataFrame,)) def get_pyarrow_schema_pandas(obj): import pyarrow as pa diff --git a/dask/dataframe/core.py b/dask/dataframe/core.py index 0a66d343ff6..ef252edb8a4 100644 --- a/dask/dataframe/core.py +++ b/dask/dataframe/core.py @@ -62,9 +62,9 @@ is_index_like, is_series_like, make_meta, + meta_frame_constructor, + meta_series_constructor, raise_on_meta_error, - serial_frame_constructor, - serial_series_constructor, valid_divisions, ) from dask.delayed import Delayed, delayed, unpack_collections @@ -2484,8 +2484,8 @@ def _convert_time_cols_to_numeric(self, time_cols, axis, meta, skipna): # since each standard deviation will just be NaN needs_time_conversion = False numeric_dd = from_pandas( - serial_frame_constructor(self)( - {"_": serial_series_constructor(self)([np.nan])}, + meta_frame_constructor(self)( + {"_": meta_series_constructor(self)([np.nan])}, index=self.index, ), npartitions=self.npartitions, @@ -3064,7 +3064,7 @@ def _cum_agg( _take_last, cumpart, skipna, - meta=serial_series_constructor(self)([], dtype="float"), + meta=meta_series_constructor(self)([], dtype="float"), token=name2, ) @@ -3262,7 +3262,7 @@ def dot(self, other, meta=no_default): def _dot_series(*args, **kwargs): # .sum() is invoked on each partition before being applied to all # partitions. The return type is expected to be a series, not a numpy object - return serial_series_constructor(self)(M.dot(*args, **kwargs)) + return meta_series_constructor(self)(M.dot(*args, **kwargs)) return self.map_partitions(_dot_series, other, token="dot", meta=meta).sum( skipna=False @@ -3534,7 +3534,7 @@ def __array_wrap__(self, array, context=None): f"{method_name} is not implemented for `dask.dataframe.Series`." ) - return serial_series_constructor(self)(array, index=index, name=self.name) + return meta_series_constructor(self)(array, index=index, name=self.name) @property def axes(self): @@ -3677,7 +3677,7 @@ def rename(self, index=None, inplace=False, sorted_index=False): res = self.map_partitions(M.rename, index, enforce_metadata=False) if self.known_divisions: if sorted_index and (callable(index) or is_dict_like(index)): - old = serial_series_constructor(self)( + old = meta_series_constructor(self)( range(self.npartitions + 1), index=self.divisions ) new = old.rename(index).index @@ -4402,7 +4402,7 @@ def map(self, arg, na_action=None, meta=no_default, is_monotonic=False): applied = super().map(arg, na_action=na_action, meta=meta) if is_monotonic and self.known_divisions: applied.divisions = tuple( - serial_series_constructor(self)(self.divisions).map( + meta_series_constructor(self)(self.divisions).map( arg, na_action=na_action ) ) @@ -4519,7 +4519,7 @@ def __array_wrap__(self, array, context=None): f"{method_name} is not implemented for `dask.dataframe.DataFrame`." ) - return serial_frame_constructor(self)(array, index=index, columns=self.columns) + return meta_frame_constructor(self)(array, index=index, columns=self.columns) @property def axes(self): @@ -5979,7 +5979,7 @@ def _repr_data(self): index = self._repr_divisions cols = meta.columns if len(cols) == 0: - series_df = serial_frame_constructor(self)( + series_df = meta_frame_constructor(self)( [[]] * len(index), columns=cols, index=index ) else: @@ -6328,7 +6328,7 @@ def split_evenly(df, k): def split_out_on_index(df): h = df.index if isinstance(h, pd.MultiIndex): - h = serial_frame_constructor(df)([], index=h).reset_index() + h = meta_frame_constructor(df)([], index=h).reset_index() return h @@ -7135,8 +7135,6 @@ def cov_corr_combine(data_in, corr=False): def cov_corr_agg(data, cols, min_periods=2, corr=False, scalar=False, like_df=None): - # TODO: Remove like_df when a serial_constructor_from_array - # dispatch definition is added in dask_cudf out = cov_corr_combine(data, corr) counts = out["count"] C = out["cov"] @@ -7150,7 +7148,7 @@ def cov_corr_agg(data, cols, min_periods=2, corr=False, scalar=False, like_df=No mat = C / den if scalar: return float(mat[0, 1]) - return serial_frame_constructor(like_df)(mat, columns=cols, index=cols) + return meta_frame_constructor(like_df)(mat, columns=cols, index=cols) def pd_split(df, p, random_state=None, shuffle=False): @@ -7463,7 +7461,7 @@ def repartition_size(df, size): split_mem_usages = [] for n, usage in zip(nsplits, mem_usages): split_mem_usages.extend([usage / n] * n) - mem_usages = serial_series_constructor(df)(split_mem_usages) + mem_usages = meta_series_constructor(df)(split_mem_usages) # 2. now that all partitions are less than size, concat them up to size assert np.all(mem_usages <= size) @@ -7495,7 +7493,7 @@ def repartition_npartitions(df, npartitions): else: # Drop duplcates in case last partition has same # value for min and max division - original_divisions = divisions = serial_series_constructor(df)( + original_divisions = divisions = meta_series_constructor(df)( df.divisions ).drop_duplicates() if df.known_divisions and ( @@ -7516,7 +7514,7 @@ def repartition_npartitions(df, npartitions): ) if np.issubdtype(original_divisions.dtype, np.datetime64): divisions = methods.tolist( - serial_series_constructor(df)(divisions).astype( + meta_series_constructor(df)(divisions).astype( original_divisions.dtype ) ) @@ -7669,10 +7667,10 @@ def idxmaxmin_chunk(x, fn=None, skipna=True): idx = getattr(x, fn)(skipna=skipna) value = getattr(x, minmax)(skipna=skipna) else: - idx = value = serial_series_constructor(x)([], dtype="i8") + idx = value = meta_series_constructor(x)([], dtype="i8") if is_series_like(idx): - return serial_frame_constructor(x)({"idx": idx, "value": value}) - return serial_frame_constructor(x)({"idx": [idx], "value": [value]}) + return meta_frame_constructor(x)({"idx": idx, "value": value}) + return meta_frame_constructor(x)({"idx": [idx], "value": [value]}) def idxmaxmin_row(x, fn=None, skipna=True): @@ -7682,8 +7680,8 @@ def idxmaxmin_row(x, fn=None, skipna=True): idx = [getattr(x.value, fn)(skipna=skipna)] value = [getattr(x.value, minmax)(skipna=skipna)] else: - idx = value = serial_series_constructor(x)([], dtype="i8") - return serial_frame_constructor(x)({"idx": idx, "value": value}) + idx = value = meta_series_constructor(x)([], dtype="i8") + return meta_frame_constructor(x)({"idx": idx, "value": value}) def idxmaxmin_combine(x, fn=None, skipna=True): @@ -7777,7 +7775,7 @@ def to_datetime(arg, meta=None, **kwargs): "non-index-able arguments (like scalars)" ) else: - meta = serial_series_constructor(arg)([pd.Timestamp("2000", **tz_kwarg)]) + meta = meta_series_constructor(arg)([pd.Timestamp("2000", **tz_kwarg)]) meta.index = meta.index.astype(arg.index.dtype) meta.index.name = arg.index.name return map_partitions(pd.to_datetime, arg, meta=meta, **kwargs) @@ -7787,7 +7785,7 @@ def to_datetime(arg, meta=None, **kwargs): def to_timedelta(arg, unit=None, errors="raise"): if not PANDAS_GT_110 and unit is None: unit = "ns" - meta = serial_series_constructor(arg)([pd.Timedelta(1, unit=unit)]) + meta = meta_series_constructor(arg)([pd.Timedelta(1, unit=unit)]) return map_partitions(pd.to_timedelta, arg, unit=unit, errors=errors, meta=meta) diff --git a/dask/dataframe/dispatch.py b/dask/dataframe/dispatch.py index ff759aca1fe..4c6badb5f23 100644 --- a/dask/dataframe/dispatch.py +++ b/dask/dataframe/dispatch.py @@ -13,7 +13,6 @@ make_meta_dispatch = Dispatch("make_meta_dispatch") make_meta_obj = Dispatch("make_meta_obj") meta_nonempty = Dispatch("meta_nonempty") -serial_constructor_from_array = Dispatch("serial_constructor_from_array") hash_object_dispatch = Dispatch("hash_object_dispatch") group_split_dispatch = Dispatch("group_split_dispatch") get_parallel_type = Dispatch("get_parallel_type") diff --git a/dask/dataframe/utils.py b/dask/dataframe/utils.py index 33d451dd37e..ba73bcdd9a2 100644 --- a/dask/dataframe/utils.py +++ b/dask/dataframe/utils.py @@ -25,12 +25,10 @@ make_meta, make_meta_obj, meta_nonempty, - serial_constructor_from_array, ) from dask.dataframe.extensions import make_scalar from dask.utils import ( asciitable, - is_arraylike, is_dataframe_like, is_index_like, is_series_like, @@ -738,7 +736,7 @@ class AttributeNotImplementedError(NotImplementedError, AttributeError): """NotImplementedError and AttributeError""" -def serial_frame_constructor(like): +def meta_frame_constructor(like): """Return a serial DataFrame constructor Parameters @@ -750,20 +748,18 @@ def serial_frame_constructor(like): try: like = like._meta except AttributeError: - like = like.meta + raise TypeError(f"{type(like)} not supported by meta_frame_constructor") if is_dataframe_like(like): return like._constructor elif is_series_like(like): return like._constructor_expanddim elif is_index_like(like): return like.to_frame()._constructor - elif is_arraylike(like): - return serial_constructor_from_array(like) else: - raise TypeError(f"{type(like)} not supported by serial_frame_constructor") + raise TypeError(f"{type(like)} not supported by meta_frame_constructor") -def serial_series_constructor(like): +def meta_series_constructor(like): """Return a serial Series constructor Parameters @@ -775,14 +771,12 @@ def serial_series_constructor(like): try: like = like._meta except AttributeError: - like = like.meta + raise TypeError(f"{type(like)} not supported by meta_series_constructor") if is_dataframe_like(like): return like._constructor_sliced elif is_series_like(like): return like._constructor elif is_index_like(like): return like.to_frame()._constructor_sliced - elif is_arraylike(like): - return serial_constructor_from_array(like, series=True) else: - raise TypeError(f"{type(like)} not supported by serial_frame_constructor") + raise TypeError(f"{type(like)} not supported by meta_series_constructor") From 8ad7e726da9f3d80e9e8912bf8457c3ad6605e0a Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 18 Oct 2022 11:08:48 -0700 Subject: [PATCH 07/11] fix index problem --- dask/dataframe/core.py | 8 +++++--- dask/dataframe/tests/test_dataframe.py | 10 ++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/dask/dataframe/core.py b/dask/dataframe/core.py index ef252edb8a4..03c37e94df7 100644 --- a/dask/dataframe/core.py +++ b/dask/dataframe/core.py @@ -2815,7 +2815,7 @@ def quantile(self, q=0.5, axis=0, numeric_only=True, method="default"): graph = HighLevelGraph.from_collections( keyname, layer, dependencies=quantiles ) - return DataFrame(graph, keyname, meta, quantiles[0].divisions) + return new_dd_object(graph, keyname, meta, quantiles[0].divisions) @derived_from(pd.DataFrame) def describe( @@ -7059,9 +7059,11 @@ def cov_corr(df, min_periods=None, corr=False, scalar=False, split_every=False): if scalar: return Scalar(graph, name, "f8") meta = make_meta( - [(c, "f8") for c in df.columns], index=df.columns, parent_meta=df._meta + [(c, "f8") for c in df.columns], + index=meta_series_constructor(df)(df.columns), + parent_meta=df._meta, ) - return DataFrame(graph, name, meta, (df.columns.min(), df.columns.max())) + return new_dd_object(graph, name, meta, (df.columns.min(), df.columns.max())) def cov_corr_chunk(df, corr=False): diff --git a/dask/dataframe/tests/test_dataframe.py b/dask/dataframe/tests/test_dataframe.py index 598f54ccbd2..a284eb3a9d8 100644 --- a/dask/dataframe/tests/test_dataframe.py +++ b/dask/dataframe/tests/test_dataframe.py @@ -3213,9 +3213,8 @@ def test_cov_gpu(): res = ddf.cov() res2 = ddf.cov(split_every=2) sol = df.cov() - # TODO: dtype of res.index is wrong before compute - assert_eq(res.compute(), sol) - assert_eq(res2.compute(), sol) + assert_eq(res, sol) + assert_eq(res2, sol) assert res._name == ddf.cov()._name assert res._name != res2._name @@ -3278,9 +3277,8 @@ def test_corr_gpu(): res = ddf.corr() res2 = ddf.corr(split_every=2) sol = df.corr() - # TODO: dtype of res.index is wrong before compute - assert_eq(res.compute(), sol) - assert_eq(res2.compute(), sol) + assert_eq(res, sol) + assert_eq(res2, sol) assert res._name == ddf.corr()._name assert res._name != res2._name From 5c49d0763c9adc1ea6d61b3ceedece6691ce170d Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 18 Oct 2022 11:54:05 -0700 Subject: [PATCH 08/11] cleanup and narrow scope --- dask/dataframe/core.py | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/dask/dataframe/core.py b/dask/dataframe/core.py index 03c37e94df7..1b95913246c 100644 --- a/dask/dataframe/core.py +++ b/dask/dataframe/core.py @@ -4402,9 +4402,7 @@ def map(self, arg, na_action=None, meta=no_default, is_monotonic=False): applied = super().map(arg, na_action=na_action, meta=meta) if is_monotonic and self.known_divisions: applied.divisions = tuple( - meta_series_constructor(self)(self.divisions).map( - arg, na_action=na_action - ) + pd.Series(self.divisions).map(arg, na_action=na_action) ) else: applied = applied.clear_divisions() @@ -5979,9 +5977,7 @@ def _repr_data(self): index = self._repr_divisions cols = meta.columns if len(cols) == 0: - series_df = meta_frame_constructor(self)( - [[]] * len(index), columns=cols, index=index - ) + series_df = pd.DataFrame([[]] * len(index), columns=cols, index=index) else: series_df = pd.concat( [_repr_data_series(s, index=index) for _, s in meta.items()], axis=1 @@ -7150,7 +7146,11 @@ def cov_corr_agg(data, cols, min_periods=2, corr=False, scalar=False, like_df=No mat = C / den if scalar: return float(mat[0, 1]) - return meta_frame_constructor(like_df)(mat, columns=cols, index=cols) + if like_df is None: + like_df = pd.DataFrame + return (pd.DataFrame if like_df is None else meta_frame_constructor(like_df))( + mat, columns=cols, index=cols + ) def pd_split(df, p, random_state=None, shuffle=False): @@ -7495,9 +7495,7 @@ def repartition_npartitions(df, npartitions): else: # Drop duplcates in case last partition has same # value for min and max division - original_divisions = divisions = meta_series_constructor(df)( - df.divisions - ).drop_duplicates() + original_divisions = divisions = pd.Series(df.divisions).drop_duplicates() if df.known_divisions and ( np.issubdtype(divisions.dtype, np.datetime64) or np.issubdtype(divisions.dtype, np.number) @@ -7516,9 +7514,7 @@ def repartition_npartitions(df, npartitions): ) if np.issubdtype(original_divisions.dtype, np.datetime64): divisions = methods.tolist( - meta_series_constructor(df)(divisions).astype( - original_divisions.dtype - ) + pd.Series(divisions).astype(original_divisions.dtype) ) elif np.issubdtype(original_divisions.dtype, np.integer): divisions = divisions.astype(original_divisions.dtype) From 8b88469f0c8e05d38ac626979f7b23dc32134d68 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 18 Oct 2022 12:04:46 -0700 Subject: [PATCH 09/11] narrow scope further --- dask/dataframe/core.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/dask/dataframe/core.py b/dask/dataframe/core.py index 1b95913246c..76527763988 100644 --- a/dask/dataframe/core.py +++ b/dask/dataframe/core.py @@ -3677,9 +3677,7 @@ def rename(self, index=None, inplace=False, sorted_index=False): res = self.map_partitions(M.rename, index, enforce_metadata=False) if self.known_divisions: if sorted_index and (callable(index) or is_dict_like(index)): - old = meta_series_constructor(self)( - range(self.npartitions + 1), index=self.divisions - ) + old = pd.Series(range(self.npartitions + 1), index=self.divisions) new = old.rename(index).index if not new.is_monotonic_increasing: msg = ( @@ -7146,8 +7144,6 @@ def cov_corr_agg(data, cols, min_periods=2, corr=False, scalar=False, like_df=No mat = C / den if scalar: return float(mat[0, 1]) - if like_df is None: - like_df = pd.DataFrame return (pd.DataFrame if like_df is None else meta_frame_constructor(like_df))( mat, columns=cols, index=cols ) @@ -7463,7 +7459,7 @@ def repartition_size(df, size): split_mem_usages = [] for n, usage in zip(nsplits, mem_usages): split_mem_usages.extend([usage / n] * n) - mem_usages = meta_series_constructor(df)(split_mem_usages) + mem_usages = pd.Series(split_mem_usages) # 2. now that all partitions are less than size, concat them up to size assert np.all(mem_usages <= size) From a2613f5f29b76f219934c1ace06f3b1bcc9c3b99 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 10 Nov 2022 11:17:36 -0800 Subject: [PATCH 10/11] add test coverage --- dask/dataframe/tests/test_utils_dataframe.py | 32 ++++++++++++++++++++ dask/dataframe/utils.py | 4 +-- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/dask/dataframe/tests/test_utils_dataframe.py b/dask/dataframe/tests/test_utils_dataframe.py index f5e46a229f6..8039800fc96 100644 --- a/dask/dataframe/tests/test_utils_dataframe.py +++ b/dask/dataframe/tests/test_utils_dataframe.py @@ -20,7 +20,9 @@ is_index_like, is_series_like, make_meta, + meta_frame_constructor, meta_nonempty, + meta_series_constructor, raise_on_meta_error, shard_df_on_index, ) @@ -618,3 +620,33 @@ def check_custom_scheduler(part: pd.DataFrame) -> pd.DataFrame: assert_eq(ddf2, ddf2, scheduler=custom_scheduler) with dask.config.set(scheduler=custom_scheduler): assert_eq(ddf2, ddf2, scheduler=None) + + +@pytest.mark.parametrize( + "data", + [ + pd.DataFrame([0]), + pd.Series([0]), + pd.Index([0]), + dd.from_dict({"x": [0]}, npartitions=1), + dd.from_dict({"x": [0]}, npartitions=1).x, + dd.from_dict({"x": [0]}, npartitions=1).index, + ], +) +def test_meta_constructor_utilities(data): + assert meta_series_constructor(data) == pd.Series + assert meta_frame_constructor(data) == pd.DataFrame + + +@pytest.mark.parametrize( + "data", + [ + dd.from_dict({"x": [0]}, npartitions=1).x.values, + np.array([0]), + ], +) +def test_meta_constructor_utilities_raise(data): + with pytest.raises(TypeError, match="not supported by meta_series"): + meta_series_constructor(data) + with pytest.raises(TypeError, match="not supported by meta_frame"): + meta_frame_constructor(data) diff --git a/dask/dataframe/utils.py b/dask/dataframe/utils.py index ba73bcdd9a2..a52a53619fb 100644 --- a/dask/dataframe/utils.py +++ b/dask/dataframe/utils.py @@ -742,7 +742,7 @@ def meta_frame_constructor(like): Parameters ---------- like : - Any series-like or dataframe-like object. + Any series-like, Index-like or dataframe-like object. """ if is_dask_collection(like): try: @@ -765,7 +765,7 @@ def meta_series_constructor(like): Parameters ---------- like : - Any series-like or dataframe-like object. + Any series-like, Index-like or dataframe-like object. """ if is_dask_collection(like): try: From ed3620f5c63f3e4eaeb9c04f9807b7d9af0c86bf Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Thu, 10 Nov 2022 14:49:40 -0600 Subject: [PATCH 11/11] Update dask/dataframe/tests/test_utils_dataframe.py Co-authored-by: James Bourbeau --- dask/dataframe/tests/test_utils_dataframe.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dask/dataframe/tests/test_utils_dataframe.py b/dask/dataframe/tests/test_utils_dataframe.py index 8039800fc96..4e0fc4502ad 100644 --- a/dask/dataframe/tests/test_utils_dataframe.py +++ b/dask/dataframe/tests/test_utils_dataframe.py @@ -634,8 +634,8 @@ def check_custom_scheduler(part: pd.DataFrame) -> pd.DataFrame: ], ) def test_meta_constructor_utilities(data): - assert meta_series_constructor(data) == pd.Series - assert meta_frame_constructor(data) == pd.DataFrame + assert meta_series_constructor(data) is pd.Series + assert meta_frame_constructor(data) is pd.DataFrame @pytest.mark.parametrize(