rapidsai / dask-cuda

Utilities for Dask and CUDA interactions
https://docs.rapids.ai/api/dask-cuda/stable/
Apache License 2.0
288 stars 91 forks source link

[BUG] Failure when performing ORDER BY desc query with JIT_UNSPILL enabled #714

Closed ChrisJar closed 3 years ago

ChrisJar commented 3 years ago

I get an unexpected error when performing the ORDER BY desc operation when using dask-sql with a dask-cuda cluster with JIT unspilling enabled.

For example:

from dask_cuda import LocalCUDACluster
from dask.distributed import Client

cluster = LocalCUDACluster(n_workers=16, device_memory_limit="15GB", enable_tcp_over_ucx=True, enable_nvlink=True, rmm_pool_size="29GB", jit_unspill=True)
client = Client(cluster)

import cudf, dask_cudf
from dask_sql import Context

c = Context()

df = cudf.DataFrame({"id":[1,4,4,5,3], "val":[4,6,6,3,8]})
ddf = dask_cudf.from_cudf(df, npartitions=1)
c.create_table("df", ddf)

query = "SELECT * FROM df ORDER BY id desc"

c.sql(query).compute()

returns:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
/tmp/ipykernel_75923/4184803532.py in <module>
     16 query = "SELECT * FROM df ORDER BY id desc"
     17 
---> 18 c.sql(query).compute()
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask_sql/context.py in sql(self, sql, return_futures, dataframes)
    420         rel, select_names, _ = self._get_ral(sql)
    421 
--> 422         dc = RelConverter.convert(rel, context=self)
    423 
    424         if dc is None:
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask_sql/physical/rel/convert.py in convert(cls, rel, context)
     54             f"Processing REL {rel} using {plugin_instance.__class__.__name__}..."
     55         )
---> 56         df = plugin_instance.convert(rel, context=context)
     57         logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
     58         return df
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask_sql/physical/rel/logical/project.py in convert(self, rel, context)
     25     ) -> DataContainer:
     26         # Get the input of the previous step
---> 27         (dc,) = self.assert_inputs(rel, 1, context)
     28 
     29         df = dc.df
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask_sql/physical/rel/base.py in assert_inputs(rel, n, context)
     79         from dask_sql.physical.rel.convert import RelConverter
     80 
---> 81         return [RelConverter.convert(input_rel, context) for input_rel in input_rels]
     82 
     83     @staticmethod
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask_sql/physical/rel/base.py in <listcomp>(.0)
     79         from dask_sql.physical.rel.convert import RelConverter
     80 
---> 81         return [RelConverter.convert(input_rel, context) for input_rel in input_rels]
     82 
     83     @staticmethod
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask_sql/physical/rel/convert.py in convert(cls, rel, context)
     54             f"Processing REL {rel} using {plugin_instance.__class__.__name__}..."
     55         )
---> 56         df = plugin_instance.convert(rel, context=context)
     57         logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
     58         return df
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask_sql/physical/rel/logical/sort.py in convert(self, rel, context)
     42 
     43             df = df.persist()
---> 44             df = apply_sort(df, sort_columns, sort_ascending, sort_null_first)
     45 
     46         offset = rel.offset
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask_sql/physical/utils/sort.py in apply_sort(df, sort_columns, sort_ascending, sort_null_first)
     21     # As sorting is rather expensive, we bether persist here
     22     df = df.persist()
---> 23     df = _sort_first_column(
     24         df, first_sort_column, first_sort_ascending, first_null_first
     25     )
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask_sql/physical/utils/sort.py in _sort_first_column(df, first_sort_column, first_sort_ascending, first_null_first)
     84     else:
     85         df_is_na = None
---> 86         df_not_is_na = df.set_index(first_sort_column, drop=False).reset_index(
     87             drop=True
     88         )
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask_cudf/core.py in set_index(***failed resolving arguments***)
    219             return df2
    220 
--> 221         return super().set_index(
    222             other,
    223             sorted=pre_sorted,
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask/dataframe/core.py in set_index(***failed resolving arguments***)
   4235             from .shuffle import set_index
   4236 
-> 4237             return set_index(
   4238                 self,
   4239                 other,
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask/dataframe/shuffle.py in set_index(df, index, npartitions, shuffle, compute, drop, upsample, divisions, partition_size, **kwargs)
    160 
    161     if divisions is None:
--> 162         divisions, mins, maxes = _calculate_divisions(
    163             df, index2, repartition, npartitions, upsample, partition_size
    164         )
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask/dataframe/shuffle.py in _calculate_divisions(df, partition_col, repartition, npartitions, upsample, partition_size)
     33     mins = partition_col.map_partitions(M.min)
     34     maxes = partition_col.map_partitions(M.max)
---> 35     divisions, sizes, mins, maxes = base.compute(divisions, sizes, mins, maxes)
     36     divisions = methods.tolist(divisions)
     37     if type(sizes) is not list:
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    566         postcomputes.append(x.__dask_postcompute__())
    567 
--> 568     results = schedule(dsk, keys, **kwargs)
    569     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    570 
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2669                     should_rejoin = False
   2670             try:
-> 2671                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2672             finally:
   2673                 for f in futures.values():
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1946             else:
   1947                 local_worker = None
-> 1948             return self.sync(
   1949                 self._gather,
   1950                 futures,
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    843             return future
    844         else:
--> 845             return sync(
    846                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    847             )
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    323     if error[0]:
    324         typ, exc, tb = error[0]
--> 325         raise exc.with_traceback(tb)
    326     else:
    327         return result[0]
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/distributed/utils.py in f()
    306             if callback_timeout is not None:
    307                 future = asyncio.wait_for(future, callback_timeout)
--> 308             result[0] = yield future
    309         except Exception:
    310             error[0] = sys.exc_info()
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1811                             exc = CancelledError(key)
   1812                         else:
-> 1813                             raise exception.with_traceback(traceback)
   1814                         raise exc
   1815                     if errors == "skip":
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask/dataframe/partitionquantiles.py in percentiles_summary()
    419     ):
    420         interpolation = "nearest"
--> 421     vals, n = _percentile(data, qs, interpolation=interpolation)
    422     if (
    423         is_cupy_type(data)
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask/utils.py in __call__()
    572         Call the corresponding method based on type of argument.
    573         """
--> 574         meth = self.dispatch(type(arg))
    575         return meth(arg, *args, **kwargs)
    576 
~/anaconda3/envs/cudf-gpu-bdb/lib/python3.8/site-packages/dask/utils.py in dispatch()
    566                 lk[cls] = lk[cls2]
    567                 return lk[cls2]
--> 568         raise TypeError("No dispatch for {0}".format(cls))
    569 
    570     def __call__(self, arg, *args, **kwargs):
TypeError: No dispatch for <class 'dask_cuda.proxify_device_objects._register_cudf.<locals>.FrameProxyObject'>

Environment: dask - 2021.8.1 dask-sql - 0.3.10 cudf - 21.10 dask-cudf - 21.10 dask-cuda - 21.10

quasiben commented 3 years ago

Thanks @ChrisJar I can also reproduce. Here is perhaps a simpler reproducer:

from dask_cuda import LocalCUDACluster
from dask.distributed import Client

cluster = LocalCUDACluster(n_workers=1, jit_unspill=True)
client = Client(cluster)

import cudf, dask_cudf
from dask_sql import Context

c = Context()

df = cudf.DataFrame({"id":[1,4,4,5,3], "val":[4,6,6,3,8]})
ddf = dask_cudf.from_cudf(df, npartitions=1)
c.create_table("df", ddf)

query = "SELECT * FROM df ORDER BY id desc"

c.sql(query).compute()

Seeing _percentile come up in the traceback, I'm wondering if we need to add additional dispatches in Dask. @galipremsagar do you have thoughts here ?

galipremsagar commented 3 years ago

@galipremsagar do you have thoughts here ?

Looking into it

galipremsagar commented 3 years ago

716 contains the fix to this issue. But what's the expected result type for c.sql(query).compute() is what I want to know from dask-cuda experts, the reason I'm asking is a FrameProxyObject which is a proxy object that acts as a pass-through to Frame like objects will be returned:

>>> from dask_cuda import LocalCUDACluster
>>> from dask.distributed import Client

>>> cluster = LocalCUDACluster(n_workers=1, jit_unspill=True)
>>> client = Client(cluster)

>>> import cudf, dask_cudf
>>> from dask_sql import Context

>>> c = Context()

>>> df = cudf.DataFrame({"id":[1,4,4,5,3], "val":[4,6,6,3,8]})
>>> ddf = dask_cudf.from_cudf(df, npartitions=1)
>>> c.create_table("df", ddf)

>>> query = "SELECT * FROM df ORDER BY id desc"

>>> c.sql(query).compute()

>>> x = c.sql(query).compute()
>>> x.to_pandas()
   id  val
4   5    3
3   4    6
2   4    6
1   3    8
0   1    4
>>> type(x)
<class 'dask_cuda.proxify_device_objects.FrameProxyObject'>
madsbk commented 3 years ago

716 contains the fix to this issue. But what's the expected result type for c.sql(query).compute() is what I want to know from dask-cuda experts, the reason I'm asking is a FrameProxyObject which is a proxy object that acts as a pass-through to Frame like objects will be returned:

Yes, that is as expected. The proxy objects leaks into userspace, unless setting DASK_JIT_UNSPILL_COMPATIBILITY_MODE=True.