rapidsai / cudf

cuDF - GPU DataFrame Library
https://docs.rapids.ai/api/cudf/stable/
Apache License 2.0
8.31k stars 886 forks source link

[FEA] Async mode for cudf.series operations #13087

Open lmeyerov opened 1 year ago

lmeyerov commented 1 year ago

Is your feature request related to a problem? Please describe.

We get wide dataframes in situations like machine learning (easily 1-5K cols) and genomics (10K+ cols), and while there is some speedup from cudf (say 2-3X), it'd be easy to get to the 10X+ level with much higher GPU utilization if we could spawn concurrent tasks for each column . Getting this all the way to the df level seems tricky, but async primitives at the column level would get us far.

One Python-native idea is doing via async/await, when one cudf operation is getting scheduled, allocated, & run, we can be scheduling the next, and ideally, cudf can run them independently . It smoothed out 2-3 years ago in python + javascript as a popular native choice, and has since been a lot more popular in pydata, e.g., langchain just rewrote to support async versions of all methods. Ex: https://trends.google.com/trends/explore?date=all&q=async%20await&hl=en . Separately, there's heightened value for pydata dashboarding scenarios like plotly, streamlit, etc as these ecosystem increasingly build for async io underneath as well.

(Another idea with precedent is a lazy mode similar to haskell or dask, discussed below as well)

Describe the solution you'd like

I'd like to be do something like:


async def f(s: cudf.Series) -> cudf.Series:
    # async mode for core series operations lets other f() calls proceed while this runs
    s2 = await  s.stra.hex_to_int('AABBCC')

    # math can be clean and enable the same
    # if we're super clever, this may even unlock query plan optimizations like fusion in the future
    async with cudf.async.binop_mode:
        s3_a = s2 + 1 / 3
        s3 = await s3_a

   return s3

cols2 = await async.gather([  f(df[col]) for col in df ])

Describe alternatives you've considered

  1. Use existing abstractions

In theory we can setup threads or multiple dask workers, but (1) both are super awkward, (2) underneath, cudf will not do concurrent jobs

  1. Lazy cudf

Another thought is to create a lazy mode for cudf. This has precedent with Haskell, and in modern pydata land, more so with polars. Dask does this too, and we'd use it if that can work, but it's awkward -- I haven't used, but polars sounds to be more friendly in practice:


def f(s: cudf.Series) -> cudf.Series:
    # explicitly lazy ops
    s2 = s.str_lazy.hex_to_int('AABBCC')

    # binops know they're lazy
    s3 = s2 + 1 / 3

    return s3

# force with async friendliness  
cols2 = await cudf_client.compute_async([  f(df[col]) for col in df ])

Underneath, cudf can reinvent async/io, dask, or whatever

Additional context

Slack thread: https://rapids-goai.slack.com/archives/C5E06F4DC/p1680710488795869

lmeyerov commented 1 year ago

From Slack: There's thinking of starting at the level of pylibcudf cpython bindings layer

I can imagine there may be some worthwhile cudf internals that can be the first consumer of such things. Ex: Maybe some cuml kernels, or df.str.xyz df-level ones like hashing.

GregoryKimball commented 1 year ago

Thank you @lmeyerov for raising this, and thank you for joining into the Slack discussion.

Here are some points from the discussion:

lmeyerov commented 1 year ago

Yes, that'll be interesting to us wrt some of the hot loops in https://github.com/graphistry/cu-cat: parallel per-column tasks, some sparse matrix math, ...