rapidsai / cudf

cuDF - GPU DataFrame Library
https://docs.rapids.ai/api/cudf/stable/
Apache License 2.0
8.44k stars 903 forks source link

[QST] convert column of datetime string to column of datetime object #13040

Open stucash opened 1 year ago

stucash commented 1 year ago

I am a new user of Dask and RapidsAI. An exerpt of my data (in csv format):

Symbol,Date,Open,High,Low,Close,Volume
AADR,17-Oct-2017 09:00,57.47,58.3844,57.3645,58.3844,2094
AADR,17-Oct-2017 10:00,57.27,57.2856,57.25,57.27,627
AADR,17-Oct-2017 11:00,56.99,56.99,56.99,56.99,100
AADR,17-Oct-2017 12:00,56.98,57.05,56.98,57.05,200
AADR,17-Oct-2017 13:00,57.14,57.16,57.14,57.16,700
AADR,17-Oct-2017 14:00,57.13,57.13,57.13,57.13,100
AADR,17-Oct-2017 15:00,57.07,57.07,57.07,57.07,200
AAMC,17-Oct-2017 09:00,87,87,87,87,100
AAU,17-Oct-2017 09:00,1.1,1.13,1.0832,1.121,67790
AAU,17-Oct-2017 10:00,1.12,1.12,1.12,1.12,100
AAU,17-Oct-2017 11:00,1.125,1.125,1.125,1.125,200
AAU,17-Oct-2017 12:00,1.1332,1.15,1.1332,1.15,27439
AAU,17-Oct-2017 13:00,1.15,1.15,1.13,1.13,8200
AAU,17-Oct-2017 14:00,1.1467,1.1467,1.14,1.1467,1750
AAU,17-Oct-2017 15:00,1.1401,1.1493,1.1401,1.1493,4100
AAU,17-Oct-2017 16:00,1.13,1.13,1.13,1.13,100
ABE,17-Oct-2017 09:00,14.64,14.64,14.64,14.64,200
ABE,17-Oct-2017 10:00,14.67,14.67,14.66,14.66,1200
ABE,17-Oct-2017 11:00,14.65,14.65,14.65,14.65,600
ABE,17-Oct-2017 15:00,14.65,14.65,14.65,14.65,836

Note Date column is of type string.

I have some example stock market timeseries data (i.e., DOHLCV) in csv files and I read them into a dask_cudf dataframe (my dask.dataframe backend is cudf and read.csv is a creation dispacther that conveniently gives me a cudf.dataframe).

import dask_cudf 
import cudf
from dask import dataframe as dd

ddf = dd.read_csv('path/to/my/data/*.csv')
ddf
# output
<dask_cudf.DataFrame | 450 tasks | 450 npartitions>

# test csv data above can be retrieved using following statements
# df = pd.read_clipboard(sep=",")
# cdf = cudf.from_pandas(df)
# ddf = dask_cudf.from_cudf(cdf, npartitions=2)

I then try to convert datetime string into real datetime object (np.datetime64[ns] or anything equivalent in cudf/dask world). I then failed with error.

df["Date"] = dd.to_datetime(df["Date"], format="%d-%b-%Y %H:%M").head(5)
df.set_index("Date", inplace=True) # This failed with different error, will raise in a different SO thread.
# Following statement gives me same error.
# cudf.to_datetime(df["Date"], format="%d-%b-%Y %H:%M")

Full error log is to the end.

The error message seems to suggest that I'd need to compute the dask_cudf.dataframe, turning it into a real cudf object, then I can do as I would in pandas:

df["Date"] = cudf.to_datetime(df.Date)
df = df.set_index(df.Date)

This apparently isn't ideal and it very much is the thing that dask is for: we'd delay this and only calculate the ultimate number we need.

what is the dask/dask_cudf way to convert a string column to datetime column in dask_cudf? As far as I can see, if the backend is pandas, the conversion is done smoothly and rarely has problem.

Or, is it that cudf or GPU world in general, is not supposed to do much with date types like datetime, string ? (e.g., ideally GPU is geared towards expensive numerical computations).

My use case involves some filtering to do with string and datetime, therefore I need to set up the dataframe with proper datetime object.

Error Log

TypeError                                 Traceback (most recent call last)
Cell In[52], line 1
----> 1 dd.to_datetime(df["Date"], format="%d-%b-%Y %H:%M").head(2)

File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/dataframe/core.py:1268, in _Frame.head(self, n, npartitions, compute)
   1266 # No need to warn if we're already looking at all partitions
   1267 safe = npartitions != self.npartitions
-> 1268 return self._head(n=n, npartitions=npartitions, compute=compute, safe=safe)

File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/dataframe/core.py:1302, in _Frame._head(self, n, npartitions, compute, safe)
   1297 result = new_dd_object(
   1298     graph, name, self._meta, [self.divisions[0], self.divisions[npartitions]]
   1299 )
   1301 if compute:
-> 1302     result = result.compute()
   1303 return result

File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/base.py:314, in DaskMethodsMixin.compute(self, **kwargs)
    290 def compute(self, **kwargs):
    291     """Compute this dask collection
    292 
    293     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    312     dask.base.compute
    313     """
--> 314     (result,) = compute(self, traverse=False, **kwargs)
    315     return result

File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/base.py:599, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    596     keys.append(x.__dask_keys__())
    597     postcomputes.append(x.__dask_postcompute__())
--> 599 results = schedule(dsk, keys, **kwargs)
    600 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/threaded.py:89, in get(dsk, keys, cache, num_workers, pool, **kwargs)
     86     elif isinstance(pool, multiprocessing.pool.Pool):
     87         pool = MultiprocessingPoolExecutor(pool)
---> 89 results = get_async(
     90     pool.submit,
     91     pool._max_workers,
     92     dsk,
     93     keys,
     94     cache=cache,
     95     get_id=_thread_get_id,
     96     pack_exception=pack_exception,
     97     **kwargs,
     98 )
    100 # Cleanup pools associated to dead threads
    101 with pools_lock:

File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/local.py:511, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    509         _execute_task(task, data)  # Re-execute locally
    510     else:
--> 511         raise_exception(exc, tb)
    512 res, worker_id = loads(res_info)
    513 state["cache"][key] = res

File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/local.py:319, in reraise(exc, tb)
    317 if exc.__traceback__ is not tb:
    318     raise exc.with_traceback(tb)
--> 319 raise exc

File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/local.py:224, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    222 try:
    223     task, data = loads(task_info)
--> 224     result = _execute_task(task, data)
    225     id = get_id()
    226     result = dumps((result, id))

File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/optimization.py:990, in SubgraphCallable.__call__(self, *args)
    988 if not len(args) == len(self.inkeys):
    989     raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 990 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))

File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/core.py:149, in get(dsk, out, cache)
    147 for key in toposort(dsk):
    148     task = dsk[key]
--> 149     result = _execute_task(task, cache)
    150     cache[key] = result
    151 result = _execute_task(out, cache)

File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/utils.py:72, in apply(func, args, kwargs)
     41 """Apply a function given its positional and keyword arguments.
     42 
     43 Equivalent to ``func(*args, **kwargs)``
   (...)
     69 >>> dsk = {'task-name': task}  # adds the task to a low level Dask task graph
     70 """
     71 if kwargs:
---> 72     return func(*args, **kwargs)
     73 else:
     74     return func(*args)

File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/dataframe/core.py:6821, in apply_and_enforce(*args, **kwargs)
   6819 func = kwargs.pop("_func")
   6820 meta = kwargs.pop("_meta")
-> 6821 df = func(*args, **kwargs)
   6822 if is_dataframe_like(df) or is_series_like(df) or is_index_like(df):
   6823     if not len(df):

File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/pandas/core/tools/datetimes.py:1100, in to_datetime(arg, errors, dayfirst, yearfirst, utc, format, exact, unit, infer_datetime_format, origin, cache)
   1098         result = _convert_and_box_cache(argc, cache_array)
   1099     else:
-> 1100         result = convert_listlike(argc, format)
   1101 else:
   1102     result = convert_listlike(np.array([arg]), format)[0]

File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/pandas/core/tools/datetimes.py:413, in _convert_listlike_datetimes(arg, format, name, tz, unit, errors, infer_datetime_format, dayfirst, yearfirst, exact)
    410         return idx
    411     raise
--> 413 arg = ensure_object(arg)
    414 require_iso8601 = False
    416 if infer_datetime_format and format is None:

File pandas/_libs/algos_common_helper.pxi:33, in pandas._libs.algos.ensure_object()

File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/cudf/core/frame.py:451, in Frame.__array__(self, dtype)
    450 def __array__(self, dtype=None):
--> 451     raise TypeError(
    452         "Implicit conversion to a host NumPy array via __array__ is not "
    453         "allowed, To explicitly construct a GPU matrix, consider using "
    454         ".to_cupy()\nTo explicitly construct a host matrix, consider "
    455         "using .to_numpy()."
    456     )

TypeError: Implicit conversion to a host NumPy array via __array__ is not allowed, To explicitly construct a GPU matrix, consider using .to_cupy()
To explicitly construct a host matrix, consider using .to_numpy().
GregoryKimball commented 1 year ago

Thank you @stucash for your message. Are you able to transform your string/datetime columns appropriately in cuDf-python? Or is the problem only at the dask_cudf layer?