rapidsai / dask-cuda

Utilities for Dask and CUDA interactions
https://docs.rapids.ai/api/dask-cuda/stable/
Apache License 2.0
292 stars 93 forks source link

TypeError: Unsupported type `<class 'numpy.ndarray'>` #1405

Open trivialfis opened 3 days ago

trivialfis commented 3 days ago

Script:

import dask
from dask import array as da
from dask import dataframe as dd
from dask_cuda import LocalCUDACluster
from distributed import Client, LocalCluster
import numpy as np

def main(client: Client) -> None:
    rng = da.random.default_rng(1994)
    X = rng.random(size=(2048, 4))

    df = dd.from_dask_array(X, columns=[f"f{i}" for i in range(4)])
    df["qid"] = rng.integers(low=0, high=4, size=(2048, ), dtype=np.int64)
    s = da.cumsum(df.groupby("qid").qid.count().to_dask_array(lengths=True)).compute()
    print(s)

if __name__ == "__main__":
    with LocalCUDACluster() as cluster:
        with Client(cluster) as client:
            with dask.config.set(
                {"array.backend": "cupy", "dataframe.backend": "cudf"}
            ):
                main(client)

Version:

pentschev commented 3 days ago

I'm no expert when it comes to Dask dataframes, after some investigation I've found the problem with the code above is to_dask_array does not automatically ensure meta is converted to match CuPy, adding meta=cp.array(()) resolves the problem. Here's the complete code with the modification that fixes the issue:

import dask
from dask import array as da
from dask import dataframe as dd
from dask_cuda import LocalCUDACluster
from distributed import Client, LocalCluster
import numpy as np
import cupy as cp

def main(client: Client) -> None:
    rng = da.random.default_rng(1994)
    X = rng.random(size=(2048, 4))

    df = dd.from_dask_array(X, columns=[f"f{i}" for i in range(4)])
    df["qid"] = rng.integers(low=0, high=4, size=(2048, ), dtype=np.int64)
    s = da.cumsum(df.groupby("qid").qid.count().to_dask_array(lengths=True, meta=cp.array(()))).compute()
    print(s)

if __name__ == "__main__":
    with LocalCUDACluster() as cluster:
        with Client(cluster) as client:
            with dask.config.set(
                {"array.backend": "cupy", "dataframe.backend": "cudf"}
            ):
                main(client)

During the investigation I could not find many uses of to_dask_array, there are some in the Dask repository and in one test in Distributed but that's it, there are none in the cuDF repository. This seems to suggest there other more appropriate ways of rewritting the code without the need for to_dask_array, perhaps @rjzamora can give some ideas here.

In any case, specifying meta appropriately resolves the original issue, but perhaps not in the desired fashion.

trivialfis commented 3 days ago

Thank you for looking into this @pentschev ! I will leave this open in case this is considered a bug.

rjzamora commented 3 days ago

Thanks for raising this @trivialfis - There are some known rough edges in DataFrame <-> Array conversion. I believe this is indeed a bug caused by the ongoing removal of legacy Dask DataFrame (https://github.com/dask/dask-expr/pull/1168).

pentschev commented 3 days ago

I believe this is indeed a bug caused by the ongoing removal of legacy Dask DataFrame (dask/dask-expr#1168).

FWIW, this is reproducible with DASK_DATAFRAME__QUERY_PLANNING=False too.

rjzamora commented 3 days ago

FWIW, this is reproducible with DASK_DATAFRAME__QUERY_PLANNING=False too.

Okay, thanks - That's useful info. The fix will require an upstream change to to_dask_array or a custom method in Dask cuDF. So, either way we will need to target 25.02.

jakirkham commented 2 days ago

Would this just be the inverse of the logic in from_dask_array?

xref: https://github.com/dask/dask/pull/9579