dask-contrib / dask-sql

Distributed SQL Engine in Python using Dask
https://dask-sql.readthedocs.io/
MIT License
385 stars 71 forks source link

[BUG] Joins on GPU tables with multiple partitions fail #610

Open ChrisJar opened 2 years ago

ChrisJar commented 2 years ago

What happened: Joining tables backed by dask_cudf dataframes with multiple partitions causes the error AttributeError: 'Int64Index' object has no attribute '_get_attributes_dict' to be thrown

Minimal Complete Verifiable Example:

import numpy as np
import cudf
import dask_cudf
from dask_sql import Context

df_a = cudf.DataFrame({'a':[1, 2, 3, 4, 5], 'b':[0]*5})
df_b = cudf.DataFrame({'a':[5, 4, 3, 2, 1], 'c':[1]*5})

ddf_a = dask_cudf.from_cudf(df_a, npartitions=2)
ddf_b = dask_cudf.from_cudf(df_b, npartitions=2)

c = Context()

c.create_table("dfa", ddf_a)
c.create_table("dfb", ddf_b)

print(c.sql("SELECT * FROM dfa INNER JOIN dfb ON dfa.a=dfb.a").compute())

throws

Traceback

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Input In [2], in ()
     14 c.create_table("dfa", ddf_a)
     15 c.create_table("dfb", ddf_b)
---> 17 print(c.sql("SELECT * FROM dfa INNER JOIN dfb ON dfa.a=dfb.a").compute())

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/base.py:314, in DaskMethodsMixin.compute(self, **kwargs)
    290 def compute(self, **kwargs):
    291     """Compute this dask collection
    292 
    293     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    312     dask.base.compute
    313     """
--> 314     (result,) = compute(self, traverse=False, **kwargs)
    315     return result

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/base.py:602, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    599     keys.append(x.__dask_keys__())
    600     postcomputes.append(x.__dask_postcompute__())
--> 602 results = schedule(dsk, keys, **kwargs)
    603 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/local.py:557, in get_sync(dsk, keys, **kwargs)
    552 """A naive synchronous version of get_async
    553 
    554 Can be useful for debugging.
    555 """
    556 kwargs.pop("num_workers", None)  # if num_workers present, remove it
--> 557 return get_async(
    558     synchronous_executor.submit,
    559     synchronous_executor._max_workers,
    560     dsk,
    561     keys,
    562     **kwargs,
    563 )

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/local.py:500, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    498 while state["waiting"] or state["ready"] or state["running"]:
    499     fire_tasks(chunksize)
--> 500     for key, res_info, failed in queue_get(queue).result():
    501         if failed:
    502             exc, tb = loads(res_info)

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/concurrent/futures/_base.py:437, in Future.result(self, timeout)
    435     raise CancelledError()
    436 elif self._state == FINISHED:
--> 437     return self.__get_result()
    439 self._condition.wait(timeout)
    441 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/concurrent/futures/_base.py:389, in Future.__get_result(self)
    387 if self._exception:
    388     try:
--> 389         raise self._exception
    390     finally:
    391         # Break a reference cycle with the exception in self._exception
    392         self = None

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/local.py:542, in SynchronousExecutor.submit(self, fn, *args, **kwargs)
    540 fut = Future()
    541 try:
--> 542     fut.set_result(fn(*args, **kwargs))
    543 except BaseException as e:
    544     fut.set_exception(e)

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/local.py:238, in batch_execute_tasks(it)
    234 def batch_execute_tasks(it):
    235     """
    236     Batch computing of multiple tasks with `execute_task`
    237     """
--> 238     return [execute_task(*a) for a in it]

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/local.py:238, in (.0)
    234 def batch_execute_tasks(it):
    235     """
    236     Batch computing of multiple tasks with `execute_task`
    237     """
--> 238     return [execute_task(*a) for a in it]

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/local.py:229, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    227     failed = False
    228 except BaseException as e:
--> 229     result = pack_exception(e, dumps)
    230     failed = True
    231 return key, result, failed

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/local.py:224, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    222 try:
    223     task, data = loads(task_info)
--> 224     result = _execute_task(task, data)
    225     id = get_id()
    226     result = dumps((result, id))

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/dask/dataframe/shuffle.py:968, in shuffle_group_3(df, col, npartitions, p)
    966 g = df.groupby(col)
    967 d = {i: g.get_group(i) for i in g.groups}
--> 968 p.append(d, fsync=True)

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/partd/encode.py:23, in Encode.append(self, data, **kwargs)
     22 def append(self, data, **kwargs):
---> 23     data = valmap(self.encode, data)
     24     data = valmap(frame, data)
     25     self.partd.append(data, **kwargs)

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/toolz/dicttoolz.py:83, in valmap(func, d, factory)
     72 """ Apply function to values of dictionary
     73 
     74 >>> bills = {"Alice": [20, 15, 30], "Bob": [10, 35]}
   (...)
     80     itemmap
     81 """
     82 rv = factory()
---> 83 rv.update(zip(d.keys(), map(func, d.values())))
     84 return rv

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/partd/pandas.py:181, in serialize(df)
    176 """ Serialize and compress a Pandas DataFrame
    177 
    178 Uses Pandas blocks, snappy, and blosc to deconstruct an array into bytes
    179 """
    180 col_header, col_bytes = index_to_header_bytes(df.columns)
--> 181 ind_header, ind_bytes = index_to_header_bytes(df.index)
    182 headers = [col_header, ind_header]
    183 bytes = [col_bytes, ind_bytes]

File ~/miniconda3/envs/6-6-gpu-bdb/lib/python3.8/site-packages/partd/pandas.py:113, in index_to_header_bytes(ind)
    110     cat = None
    111     values = ind.values
--> 113 header = (type(ind), ind._get_attributes_dict(), values.dtype, cat)
    114 bytes = pnp.compress(pnp.serialize(values), values.dtype)
    115 return header, bytes

AttributeError: 'Int64Index' object has no attribute '_get_attributes_dict'

However, when the same query is performed with CPU dataframes:

import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask_sql import Context

df_a = pd.DataFrame({'a':[1, 2, 3, 4, 5], 'b':[0]*5})
df_b = pd.DataFrame({'a':[5, 4, 3, 2, 1], 'c':[1]*5})

ddf_a = dd.from_pandas(df_a, npartitions=2)
ddf_b = dd.from_pandas(df_b, npartitions=2)

c = Context()

c.create_table("dfa", ddf_a)
c.create_table("dfb", ddf_b)

print(c.sql("SELECT * FROM dfa INNER JOIN dfb ON dfa.a=dfb.a").compute())

Or GPU dataframes with a single partition:

import numpy as np
import cudf
import dask_cudf
from dask_sql import Context

df_a = cudf.DataFrame({'a':[1, 2, 3, 4, 5], 'b':[0]*5})
df_b = cudf.DataFrame({'a':[5, 4, 3, 2, 1], 'c':[1]*5})

ddf_a = dask_cudf.from_cudf(df_a, npartitions=1)
ddf_b = dask_cudf.from_cudf(df_b, npartitions=1)

c = Context()

c.create_table("dfa", ddf_a)
c.create_table("dfb", ddf_b)

print(c.sql("SELECT * FROM dfa INNER JOIN dfb ON dfa.a=dfb.a").compute())

the result is:

   a  b  a0  c
0  1  0   1  1
0  4  0   4  1
1  5  0   5  1
2  2  0   2  1
3  3  0   3  1

Environment:

beckernick commented 2 years ago

This comes from partd. Solving this would also help enable groupby.apply with cudf backed dask dataframes: https://github.com/rapidsai/cudf/issues/5755#issuecomment-976823896

randerzander commented 1 year ago

This comes from partd. Solving this would also help enable groupby.apply with cudf backed dask dataframes: rapidsai/cudf#5755 (comment)

Partd no longer depends on the deprecated Pandas index class, but the reproducer still throws the same exception

beckernick commented 1 year ago

Nice! Are you saying the traceback you're now seeing still references the removed internal pandas functionality? Do we possibly need to update a dependency pinning somewhere?

randerzander commented 1 year ago

Oops, I missed a subtle difference in the trace:

File /opt/conda/envs/rapids/lib/python3.9/site-packages/partd/pandas.py:111, in index_to_header_bytes(ind)
    108     cat = None
    109     values = ind.values
--> 111 header = (type(ind), {k: getattr(ind, k, None) for k in ind._attributes}, values.dtype, cat)
    112 bytes = pnp.compress(pnp.serialize(values), values.dtype)
    113 return header, bytes

AttributeError: 'Int64Index' object has no attribute '_attributes'

It looks like _attributes is also gone from Pandas indexes?

Do we possibly need to update a pinning somewhere

I don't think so- the latest version in pypi is 1.3.0 which is what I have in my env.