rapidsai / cudf

cuDF - GPU DataFrame Library
https://docs.rapids.ai/api/cudf/stable/
Apache License 2.0
8.5k stars 908 forks source link

[FEA] Support GPU execution engine in LazyFrame profiler #16224

Open beckernick opened 4 months ago

beckernick commented 4 months ago

We should instrument the GPU physical execution engine so that it is compatible with the built-in Polars LazyFrame profiler (or similar).

# !wget https://raw.githubusercontent.com/mwaskom/seaborn-data/master/titanic.csv
ldf = (
    pl.scan_csv("titanic.csv")
    .group_by(pl.col("age").round(0))
    .agg(pl.col("embark_town").count())
    .sort("age")
)
ldf.profile(show_plot=True)
(shape: (72, 2)
 ┌──────┬─────────────┐
 │ age  ┆ embark_town │
 │ ---  ┆ ---         │
 │ f64  ┆ u32         │
 ╞══════╪═════════════╡
 │ null ┆ 177         │
 │ 0.0  ┆ 1           │
 │ 1.0  ┆ 13          │
 │ 2.0  ┆ 10          │
 │ 3.0  ┆ 6           │
 │ …    ┆ …           │
 │ 66.0 ┆ 1           │
 │ 70.0 ┆ 2           │
 │ 71.0 ┆ 3           │
 │ 74.0 ┆ 1           │
 │ 80.0 ┆ 1           │
 └──────┴─────────────┘,
 shape: (4, 3)
 ┌──────────────────┬───────┬──────┐
 │ node             ┆ start ┆ end  │
 │ ---              ┆ ---   ┆ ---  │
 │ str              ┆ u64   ┆ u64  │
 ╞══════════════════╪═══════╪══════╡
 │ optimization     ┆ 0     ┆ 16   │
 │ csv(titanic.csv) ┆ 16    ┆ 1330 │
 │ group_by(age)    ┆ 1347  ┆ 2141 │
 │ sort(age)        ┆ 2152  ┆ 6140 │
 └──────────────────┴───────┴──────┘)
Screen Shot 2024-04-04 at 10 23 06 AM
Matt711 commented 1 week ago

Notes: We want to be able to profile GPU-accelerated queries with the Polars LazyFrame profiler. I imagine the API should look like how we execute a query using collect (ie. .collect(engine="gpu")) So API would look like

lf = pl.LazyFrame(
    {
        "a": ["a", "b", "a", "b", "b", "c"],
        "b": [1, 2, 3, 4, 5, 6],
        "c": [6, 5, 4, 3, 2, 1],
    }
)
q = lf.group_by("a", maintain_order=True).agg(pl.all().sum()).sort(
    "a"
)
df, df_times = lf.profile(engine="gpu")

Currently the Polars LazyFrame profiler works like this:

  1. Logical Plan Creation: Polars builds a logical plan representing the query.
  2. Optimization: Lots of optimizations Logical plan --> Physical Plan
  3. Execution with Timing: The physical plan is executed, and execution time for each node is recorded.
  4. Result: Finally, a tuple of DataFrames is returned one with the query result and the other with the timing information

I think if we allow the profile functions in the Polars rust layer to accept a callback (in the same way we do for collect), we can get the timing information from step 3. We'd need to change these functions (copied from https://github.com/pola-rs/polars/tree/main/crates/polars-python/src/lazyframe)

// LazyFrame::profile (Can this remain unchanged?)
pub fn profile(self) -> PolarsResult<(DataFrame, DataFrame)> {
    let (mut state, mut physical_plan, _) = self.prepare_collect(false)?;
    state.time_nodes();
    let out = physical_plan.execute(&mut state)?;
    let timer_df = state.finish_timer()?;
    Ok((out, timer_df))
}
// PyLazyFrame::profile
fn profile(&self, py: Python, lambda_post_opt: Option<PyObject>) -> PyResult<(PyDataFrame, PyDataFrame)> {
    // follow the logic in collect
}

And we'd need to add an engine kwarg to profile function in polars

def profile(
    self,
    *,
    ...
    engine: EngineType = "cpu",
) -> tuple[DataFrame, DataFrame]:
    ...
    # Following the logic in collect
    callback = None
    if engine == "gpu":
        cudf_polars = import_optional(
            "cudf_polars",
            ...
        )
        if not isinstance(engine, GPUEngine):
            engine = GPUEngine()
        callback = partial(cudf_polars.execute_with_cudf, config=engine)

    df, timings = self._ldf.profile(callback)
    df, timings = wrap_df(df), wrap_df(timings)

    ...

    return df, timings

Finally, we should do a docs refresh.