rapidsai / cudf

cuDF - GPU DataFrame Library
https://docs.rapids.ai/api/cudf/stable/
Apache License 2.0
8.39k stars 897 forks source link

[BUG]Group by aggregate fails in distributed settings #3798

Closed VibhuJawa closed 4 years ago

VibhuJawa commented 4 years ago

Group by aggregate fails in distributed settings

Below code fails when launched with LocalCUDACluster but works without it.

from dask_cuda import LocalCUDACluster
from dask.distributed import Client
import dask_cudf
import cudf
cluster = LocalCUDACluster(CUDA_VISIBLE_DEVICES="0,1")
client = Client(cluster)

df = cudf.DataFrame({'a':[1,2,3,4],'b':[5,1,2,5]})
dask_df = dask_cudf.from_cudf(df,npartitions=2)
dask_df_g = dask_df.groupby(['a']).agg({'b': ['count', 'mean']}).reset_index().compute()

Stack Trace:

/raid/vjawa/conda/conda_installation/envs/cudf_12_15_jan/lib/python3.7/site-packages/cudf/core/dataframe.py:380: UserWarning: Columns may not be added to a DataFrame using a new attribute name. A new attribute will be created: 'multi_cols'
  UserWarning,
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-2-e4a2b9c9f7d0> in <module>
     12 df = cudf.DataFrame({'a':[1,2,3,4],'b':[5,1,2,5]})
     13 dask_df = dask_cudf.from_cudf(df,npartitions=2)
---> 14 dask_df_g = dask_df.groupby(['a']).agg({'b': ['count', 'mean']}).reset_index().compute()

/raid/vjawa/conda/conda_installation/envs/cudf_12_15_jan/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
    163         dask.base.compute
    164         """
--> 165         (result,) = compute(self, traverse=False, **kwargs)
    166         return result
    167 

/raid/vjawa/conda/conda_installation/envs/cudf_12_15_jan/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
    434     keys = [x.__dask_keys__() for x in collections]
    435     postcomputes = [x.__dask_postcompute__() for x in collections]
--> 436     results = schedule(dsk, keys, **kwargs)
    437     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    438 

/raid/vjawa/conda/conda_installation/envs/cudf_12_15_jan/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2574                     should_rejoin = False
   2575             try:
-> 2576                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2577             finally:
   2578                 for f in futures.values():

/raid/vjawa/conda/conda_installation/envs/cudf_12_15_jan/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1872                 direct=direct,
   1873                 local_worker=local_worker,
-> 1874                 asynchronous=asynchronous,
   1875             )
   1876 

/raid/vjawa/conda/conda_installation/envs/cudf_12_15_jan/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    767         else:
    768             return sync(
--> 769                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    770             )
    771 

/raid/vjawa/conda/conda_installation/envs/cudf_12_15_jan/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    333     if error[0]:
    334         typ, exc, tb = error[0]
--> 335         raise exc.with_traceback(tb)
    336     else:
    337         return result[0]

/raid/vjawa/conda/conda_installation/envs/cudf_12_15_jan/lib/python3.7/site-packages/distributed/utils.py in f()
    317             if callback_timeout is not None:
    318                 future = gen.with_timeout(timedelta(seconds=callback_timeout), future)
--> 319             result[0] = yield future
    320         except Exception as exc:
    321             error[0] = sys.exc_info()

/raid/vjawa/conda/conda_installation/envs/cudf_12_15_jan/lib/python3.7/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

/raid/vjawa/conda/conda_installation/envs/cudf_12_15_jan/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1728                             exc = CancelledError(key)
   1729                         else:
-> 1730                             raise exception.with_traceback(traceback)
   1731                         raise exc
   1732                     if errors == "skip":

/raid/vjawa/conda/conda_installation/envs/cudf_12_15_jan/lib/python3.7/site-packages/dask/optimization.py in __call__()
    980         if not len(args) == len(self.inkeys):
    981             raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 982         return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
    983 
    984     def __reduce__(self):

/raid/vjawa/conda/conda_installation/envs/cudf_12_15_jan/lib/python3.7/site-packages/dask/core.py in get()
    147     for key in toposort(dsk):
    148         task = dsk[key]
--> 149         result = _execute_task(task, cache)
    150         cache[key] = result
    151     result = _execute_task(out, cache)

/raid/vjawa/conda/conda_installation/envs/cudf_12_15_jan/lib/python3.7/site-packages/dask/core.py in _execute_task()
    117         func, args = arg[0], arg[1:]
    118         args2 = [_execute_task(a, cache) for a in args]
--> 119         return func(*args2)
    120     elif not ishashable(arg):
    121         return arg

/raid/vjawa/conda/conda_installation/envs/cudf_12_15_jan/lib/python3.7/site-packages/dask/utils.py in apply()
     27 def apply(func, args, kwargs=None):
     28     if kwargs:
---> 29         return func(*args, **kwargs)
     30     else:
     31         return func(*args)

/raid/vjawa/conda/conda_installation/envs/cudf_12_15_jan/lib/python3.7/site-packages/dask/dataframe/core.py in apply_and_enforce()
   4979             return meta
   4980         if is_dataframe_like(df):
-> 4981             check_matching_columns(meta, df)
   4982             c = meta.columns
   4983         else:

/raid/vjawa/conda/conda_installation/envs/cudf_12_15_jan/lib/python3.7/site-packages/dask/dataframe/utils.py in check_matching_columns()
    676             " the columns in the provided metadata\n"
    677             "  Extra:   %s\n"
--> 678             "  Missing: %s" % (extra, missing)
    679         )
    680 

ValueError: The columns in the computed data do not match the columns in the provided metadata
  Extra:   ['index']
  Missing: ['a']

Additional context:

Below Fails

dask_df_g = dask_df.groupby(['a']).agg({'b': ['count', 'mean']}).reset_index().compute()

Below Works

dask_df_g = dask_df.groupby(['a']).agg({'b': ['count', 'mean']}).compute().reset_index() 

Environment Info


 packages in environment at /raid/vjawa/conda/conda_installation/envs/cudf_12_15_jan:

cudf                      0.12.0b200116         py37_1385    rapidsai-nightly
dask-cudf                 0.12.0b200116         py37_1385    rapidsai-nightly
libcudf                   0.12.0b200116     cuda10.0_1385    rapidsai-nightly

dask                      2.9.1                      py_0    conda-forge
dask-core                 2.9.1                      py_0    conda-forge
dask-cuda                 0.12.0a200115           py37_47    rapidsai-nightly
dask-cudf                 0.12.0b200116         py37_1385    rapidsai-nightly

This environment is after the https://github.com/rapidsai/cudf/pull/3741 was merged in and i can confirm that the issues in https://github.com/rapidsai/cudf/issues/3719 are resolved in my environment.

CC: @rjzamora @beckernick @shwina

kkraus14 commented 4 years ago

Confirmed I can reproduce. Seems like we're losing index names in general when computing. More general reproducer:

from dask_cuda import LocalCUDACluster
from dask.distributed import Client
import dask_cudf
import cudf
cluster = LocalCUDACluster(CUDA_VISIBLE_DEVICES="0,1")
client = Client(cluster)

df = cudf.DataFrame({'a':[1,2,3,4],'b':[5,1,2,5]})
df = df.set_index('b')
dask_df = dask_cudf.from_cudf(df,npartitions=2)
computed_index = dask_df.index.compute()
assert(computed_index.name == "b")
rjzamora commented 4 years ago

Good reproducer @kkraus14 - It seems that the problem shows up even without using dask_cudf (i.e. using LocalCluster)

rjzamora commented 4 years ago

This may have the same cause as #3420, but cudf does seem to be preserve the index name in a round-trip serialization. After repro above:

(h, f) = df[:0].serialize()
cudf.DataFrame.deserialize(h,f).index.name

Output:

'b'
shwina commented 4 years ago

Looks like the issue is that we're not hitting the DataFrame serialization/deserialization, but rather pickling:

In [10]: a                                                                                                                                                                                    
Out[10]: 
   a
b   
3  1
4  2
5  3

In [11]: pickle.loads(pickle.dumps(a))                                                                                                                                                        
Out[11]: 
   a
3  1
4  2
5  3

I can submit a fix for the pickling but it looks like there's an orthogonal problem here.

rjzamora commented 4 years ago

..it looks like there's an orthogonal problem here.

Indeed - I'm trying to figure out why we are pickling everything.