You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Describe the bug
After some discussions in dask-contrib/dask-sql#838, we found that dd.to_datetime doesn't work completely with cuDF-backed Series. More specifically, it looks like errors arise when compute() is called.
The fix may lie in Dask itself, but since it's cuDF raising the error, I'm opening it here for now.
Steps/Code to reproduce bug @ayushdg provided this example:
import cudf
import dask_cudf
import dask.dataframe as dd
a = cudf.Series(['2022-01-01', '2022-02-02'])
dser = dask_cudf.from_cudf(a, 1)
res = dd.to_datetime(dser).compute() # note that there are no errors if we don't call compute()
which errors with:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Cell In [19], line 6
4 a = cudf.Series(['2022-01-01', '2022-02-02'])
5 dser = dask_cudf.from_cudf(a, 1)
----> 6 res = dd.to_datetime(dser).compute()
File ~/miniconda3/envs/dsql_rapids-22.12/lib/python3.9/site-packages/dask/base.py:315, in DaskMethodsMixin.compute(self, **kwargs)
291 def compute(self, **kwargs):
292 """Compute this dask collection
293
294 This turns a lazy Dask collection into its in-memory equivalent.
(...)
313 dask.base.compute
314 """
--> 315 (result,) = compute(self, traverse=False, **kwargs)
316 return result
File ~/miniconda3/envs/dsql_rapids-22.12/lib/python3.9/site-packages/dask/base.py:600, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
597 keys.append(x.__dask_keys__())
598 postcomputes.append(x.__dask_postcompute__())
--> 600 results = schedule(dsk, keys, **kwargs)
601 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File ~/miniconda3/envs/dsql_rapids-22.12/lib/python3.9/site-packages/dask/threaded.py:89, in get(dsk, keys, cache, num_workers, pool, **kwargs)
86 elif isinstance(pool, multiprocessing.pool.Pool):
87 pool = MultiprocessingPoolExecutor(pool)
---> 89 results = get_async(
90 pool.submit,
91 pool._max_workers,
92 dsk,
93 keys,
94 cache=cache,
95 get_id=_thread_get_id,
96 pack_exception=pack_exception,
97 **kwargs,
98 )
100 # Cleanup pools associated to dead threads
101 with pools_lock:
File ~/miniconda3/envs/dsql_rapids-22.12/lib/python3.9/site-packages/dask/local.py:511, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
509 _execute_task(task, data) # Re-execute locally
510 else:
--> 511 raise_exception(exc, tb)
512 res, worker_id = loads(res_info)
513 state["cache"][key] = res
File ~/miniconda3/envs/dsql_rapids-22.12/lib/python3.9/site-packages/dask/local.py:319, in reraise(exc, tb)
317 if exc.__traceback__ is not tb:
318 raise exc.with_traceback(tb)
--> 319 raise exc
File ~/miniconda3/envs/dsql_rapids-22.12/lib/python3.9/site-packages/dask/local.py:224, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
222 try:
223 task, data = loads(task_info)
--> 224 result = _execute_task(task, data)
225 id = get_id()
226 result = dumps((result, id))
File ~/miniconda3/envs/dsql_rapids-22.12/lib/python3.9/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File ~/miniconda3/envs/dsql_rapids-22.12/lib/python3.9/site-packages/dask/optimization.py:990, in SubgraphCallable.__call__(self, *args)
988 if not len(args) == len(self.inkeys):
989 raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 990 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
File ~/miniconda3/envs/dsql_rapids-22.12/lib/python3.9/site-packages/dask/core.py:149, in get(dsk, out, cache)
147 for key in toposort(dsk):
148 task = dsk[key]
--> 149 result = _execute_task(task, cache)
150 cache[key] = result
151 result = _execute_task(out, cache)
File ~/miniconda3/envs/dsql_rapids-22.12/lib/python3.9/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File ~/miniconda3/envs/dsql_rapids-22.12/lib/python3.9/site-packages/dask/utils.py:71, in apply(func, args, kwargs)
40 """Apply a function given its positional and keyword arguments.
41
42 Equivalent to ``func(*args, **kwargs)``
(...)
68 >>> dsk = {'task-name': task} # adds the task to a low level Dask task graph
69 """
70 if kwargs:
---> 71 return func(*args, **kwargs)
72 else:
73 return func(*args)
File ~/miniconda3/envs/dsql_rapids-22.12/lib/python3.9/site-packages/dask/dataframe/core.py:6770, in apply_and_enforce(*args, **kwargs)
6768 func = kwargs.pop("_func")
6769 meta = kwargs.pop("_meta")
-> 6770 df = func(*args, **kwargs)
6771 if is_dataframe_like(df) or is_series_like(df) or is_index_like(df):
6772 if not len(df):
File ~/miniconda3/envs/dsql_rapids-22.12/lib/python3.9/site-packages/pandas/core/tools/datetimes.py:1100, in to_datetime(arg, errors, dayfirst, yearfirst, utc, format, exact, unit, infer_datetime_format, origin, cache)
1098 result = _convert_and_box_cache(argc, cache_array)
1099 else:
-> 1100 result = convert_listlike(argc, format)
1101 else:
1102 result = convert_listlike(np.array([arg]), format)[0]
File ~/miniconda3/envs/dsql_rapids-22.12/lib/python3.9/site-packages/pandas/core/tools/datetimes.py:413, in _convert_listlike_datetimes(arg, format, name, tz, unit, errors, infer_datetime_format, dayfirst, yearfirst, exact)
410 return idx
411 raise
--> 413 arg = ensure_object(arg)
414 require_iso8601 = False
416 if infer_datetime_format and format is None:
File pandas/_libs/algos_common_helper.pxi:33, in pandas._libs.algos.ensure_object()
File ~/miniconda3/envs/dsql_rapids-22.12/lib/python3.9/site-packages/cudf/core/frame.py:447, in Frame.__array__(self, dtype)
446 def __array__(self, dtype=None):
--> 447 raise TypeError(
448 "Implicit conversion to a host NumPy array via __array__ is not "
449 "allowed, To explicitly construct a GPU matrix, consider using "
450 ".to_cupy()\nTo explicitly construct a host matrix, consider "
451 "using .to_numpy()."
452 )
TypeError: Implicit conversion to a host NumPy array via __array__ is not allowed, To explicitly construct a GPU matrix, consider using .to_cupy()
To explicitly construct a host matrix, consider using .to_numpy().
The text was updated successfully, but these errors were encountered:
Describe the bug
After some discussions in dask-contrib/dask-sql#838, we found that
dd.to_datetime
doesn't work completely with cuDF-backed Series. More specifically, it looks like errors arise whencompute()
is called.The fix may lie in Dask itself, but since it's cuDF raising the error, I'm opening it here for now.
Steps/Code to reproduce bug
@ayushdg provided this example:
which errors with:
The text was updated successfully, but these errors were encountered: