rapidsai / cudf

cuDF - GPU DataFrame Library
https://docs.rapids.ai/api/cudf/stable/
Apache License 2.0
8.19k stars 881 forks source link

Use Dask-cuDF for E2E notebook #4758

Closed mrocklin closed 4 years ago

mrocklin commented 5 years ago

Eventually it would be a nice to use dask-cudf in the E2E notebook. This would be useful both to make the notebook more readable, and also to drive development in cudf/dask-cudf.

mrocklin commented 5 years ago

I took a look at this today and have some feedback/questions. I'm not sure who best can help to answer these, but I suspect that either @randerzander or @mt-jones can help direct to the right people.

Reading CSV files

Currently we specify the names and dtypes explicitly and use cudf.read_csv. I think that today we might be able to use dask-cudf a bit more naively.

This seems to work

cols = [
        "loan_id", "monthly_reporting_period", "servicer", "interest_rate", "current_actual_upb",
        "loan_age", "remaining_months_to_legal_maturity", "adj_remaining_months_to_maturity",
        "maturity_date", "msa", "current_loan_delinquency_status", "mod_flag", "zero_balance_code",
        "zero_balance_effective_date", "last_paid_installment_date", "foreclosed_after",
        "disposition_date", "foreclosure_costs", "prop_preservation_and_repair_costs",
        "asset_recovery_costs", "misc_holding_expenses", "holding_taxes", "net_sale_proceeds",
        "credit_enhancement_proceeds", "repurchase_make_whole_proceeds", "other_foreclosure_proceeds",
        "non_interest_bearing_upb", "principal_forgiveness_upb", "repurchase_make_whole_proceeds_flag",
        "foreclosure_principal_write_off_amount", "servicing_activity_indicator"
]
performance = dask_cudf.read_csv('/datasets/mortgage/performance/Performance_2000*.txt', delimiter='|', names=cols)

cols = [
    'loan_id', 'orig_channel', 'seller_name', 'orig_interest_rate', 'orig_upb', 'orig_loan_term', 
    'orig_date', 'first_pay_date', 'orig_ltv', 'orig_cltv', 'num_borrowers', 'dti', 'borrower_credit_score', 
    'first_home_buyer', 'loan_purpose', 'property_type', 'num_units', 'occupancy_status', 'property_state',
    'zip', 'mortgage_insurance_percent', 'product_type', 'coborrow_credit_score', 'mortgage_insurance_type', 
    'relocation_mortgage_indicator'
]

acquisition = dask_cudf.read_csv('/datasets/mortgage/acquisition/Acquisition_*.txt', delimiter='|', names=cols)

Though it may be that there were particular dtypes that we were specifically trying to target. My hope is that the dtype information is no longer necessary (it's sometimes hard to ask users to produce this information in practice)

create_ever_features

I actually don't know the context of this computation, but there is something that looks like the following:

def create_ever_features(gdf, **kwargs):
    everdf = gdf[['loan_id', 'current_loan_delinquency_status']]
    everdf = everdf.groupby('loan_id', method='hash').max()
    del(gdf)
    everdf['ever_30'] = (everdf['max_current_loan_delinquency_status'] >= 1).astype('int8')
    everdf['ever_90'] = (everdf['max_current_loan_delinquency_status'] >= 3).astype('int8')
    everdf['ever_180'] = (everdf['max_current_loan_delinquency_status'] >= 6).astype('int8')
    everdf.drop_column('max_current_loan_delinquency_status')
    return everdf

Which I think we can probably get by with something like the following (on dask-cudf)

everdf = performance[['loan_id', 'current_loan_delinquency_status']]
everdf = everdf.groupby('loan_id').max()
everdf['ever_30'] = (everdf['current_loan_delinquency_status'] >= 1).astype('int8')
everdf['ever_90'] = (everdf['current_loan_delinquency_status'] >= 3).astype('int8')
everdf['ever_180'] = (everdf['current_loan_delinquency_status'] >= 6).astype('int8')

Which raises the following traceback when you call everdf.compute()

```python-traceback --------------------------------------------------------------------------- GDFError Traceback (most recent call last) in 4 everdf['ever_90'] = (everdf['current_loan_delinquency_status'] >= 3).astype('int8') 5 everdf['ever_180'] = (everdf['current_loan_delinquency_status'] >= 6).astype('int8') ----> 6 everdf.compute() ~/dask/dask/base.py in compute(self, **kwargs) 154 dask.base.compute 155 """ --> 156 (result,) = compute(self, traverse=False, **kwargs) 157 return result 158 ~/dask/dask/base.py in compute(*args, **kwargs) 396 keys = [x.__dask_keys__() for x in collections] 397 postcomputes = [x.__dask_postcompute__() for x in collections] --> 398 results = schedule(dsk, keys, **kwargs) 399 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)]) 400 ~/dask/dask/local.py in get_sync(dsk, keys, **kwargs) 499 """ 500 kwargs.pop('num_workers', None) # if num_workers present, remove it --> 501 return get_async(apply_sync, 1, dsk, keys, **kwargs) 502 503 ~/dask/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs) 445 # Seed initial tasks into the thread pool 446 while state['ready'] and len(state['running']) < num_workers: --> 447 fire_task() 448 449 # Main loop, wait on tasks to finish, insert new ones ~/dask/dask/local.py in fire_task() 441 args=(key, dumps((dsk[key], data)), 442 dumps, loads, get_id, pack_exception), --> 443 callback=queue.put) 444 445 # Seed initial tasks into the thread pool ~/dask/dask/local.py in apply_sync(func, args, kwds, callback) 488 def apply_sync(func, args=(), kwds={}, callback=None): 489 """ A naive synchronous version of apply_async """ --> 490 res = func(*args, **kwds) 491 if callback is not None: 492 callback(res) ~/dask/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception) 233 failed = False 234 except BaseException as e: --> 235 result = pack_exception(e, dumps) 236 failed = True 237 return key, result, failed ~/dask/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception) 228 try: 229 task, data = loads(task_info) --> 230 result = _execute_task(task, data) 231 id = get_id() 232 result = dumps((result, id)) ~/dask/dask/core.py in _execute_task(arg, cache, dsk) 117 func, args = arg[0], arg[1:] 118 args2 = [_execute_task(a, cache) for a in args] --> 119 return func(*args2) 120 elif not ishashable(arg): 121 return arg ~/dask/dask/compatibility.py in apply(func, args, kwargs) 91 def apply(func, args, kwargs=None): 92 if kwargs: ---> 93 return func(*args, **kwargs) 94 else: 95 return func(*args) ~/dask/dask/dataframe/groupby.py in _apply_chunk(df, *index, **kwargs) 235 if isinstance(columns, (tuple, list, set, pd.Index)): 236 columns = list(columns) --> 237 return func(g[columns]) 238 239 ~/dask/dask/utils.py in __call__(self, obj, *args, **kwargs) 714 715 def __call__(self, obj, *args, **kwargs): --> 716 return getattr(obj, self.method)(*args, **kwargs) 717 718 def __reduce__(self): ~/cudf/python/cudf/groupby/groupby.py in max(self, sort) 309 310 def max(self, sort=True): --> 311 return self._apply_basic_agg("max", sort) 312 313 def count(self, sort=True): ~/cudf/python/cudf/groupby/groupby.py in _apply_basic_agg(self, agg_type, sort_results) 250 result = self._apply_agg( 251 agg_type, result, add_col_values, ctx, val_columns, --> 252 val_columns_out, sort_result=sort_results) 253 254 # If a Groupby has one index column and one value column ~/cudf/python/cudf/groupby/groupby.py in _apply_agg(self, agg_type, result, add_col_values, ctx, val_columns, val_columns_out, sort_result) 194 out_col_values, 195 out_col_agg, --> 196 ctx) 197 198 if (err is not None): ~/miniconda/envs/cudf/lib/python3.7/site-packages/libgdf_cffi/wrapper.py in wrap(*args) 25 if errcode != self._api.GDF_SUCCESS: 26 errname, msg = self._get_error_msg(errcode) ---> 27 raise GDFError(errname, msg) 28 29 wrap.__name__ = fn.__name__ GDFError: GDF_VALIDITY_UNSUPPORTED ```

Unfortunately I'm not sure what GDFError: GDF_VALIDITY_UNSUPPORTED means. I'm hoping that @jrhemstad can help explain that error. Unfortunately I don't have a minimal reproducible example here.

Delinquency over time

Then there is delinquency over time. I'm a bit confused by the use of query here and the repetition. I wonder if there is a way to make this cell a bit smaller while achieving the same result (as well as working on dask-cudf dataframes)

delinq = performance[['loan_id', 'monthly_reporting_period', 'current_loan_delinquency_status']]
delinq_30 = delinq.query('current_loan_delinquency_status >= 1')[['loan_id', 'monthly_reporting_period']].groupby('loan_id').min()
delinq_30['delinquency_30'] = delinq_30['monthly_reporting_period']
# del delinq['monthly_reporting_period']

delinq_90 = delinq.query('current_loan_delinquency_status >= 3')[['loan_id', 'monthly_reporting_period']].groupby('loan_id').min()
delinq_90['delinquency_90'] = delinq_90['monthly_reporting_period']
# del delinq_90['monthly_reporting_period']

delinq_180 = delinq.query('current_loan_delinquency_status >= 6')[['loan_id', 'monthly_reporting_period']].groupby('loan_id').min()
delinq_180['delinquency_180'] = delinq_180['monthly_reporting_period']
# del edelinq_180['monthly_reporting_period']

delinq_merge = delinq_30.merge(delinq_90, how='left', on=['loan_id'])
delinq_merge['delinquency_90'] = delinq_merge['delinquency_90'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))
delinq_merge = delinq_merge.merge(delinq_180, how='left', on=['loan_id'])
delinq_merge['delinquency_180'] = delinq_merge['delinquency_180'].fillna(np.dtype('datetime64[ms]').type('1970-01-01').astype('datetime64[ms]'))

Here was my attempt

delinq = performance[['loan_id', 'monthly_reporting_period', 'current_loan_delinquency_status']]

d30 = delinq[delinq.current_loan_delinquency_status >= 1].groupby('loan_id').min()
d90 = delinq[delinq.current_loan_delinquency_status >= 3].groupby('loan_id').min()
d180 = delinq[delinq.current_loan_delinquency_status >= 6].groupby('loan_id').min()

I would then probably call dd.concat to merge these together. However at this point we get an error. It looks like cudf selections don't support bools. I boiled this down and went to raise an issue but found that it had already been raised at https://github.com/rapidsai/cudf/issues/991

```python --------------------------------------------------------------------------- AssertionError Traceback (most recent call last) in ----> 1 d30.compute() ~/dask/dask/base.py in compute(self, **kwargs) 154 dask.base.compute 155 """ --> 156 (result,) = compute(self, traverse=False, **kwargs) 157 return result 158 ~/dask/dask/base.py in compute(*args, **kwargs) 396 keys = [x.__dask_keys__() for x in collections] 397 postcomputes = [x.__dask_postcompute__() for x in collections] --> 398 results = schedule(dsk, keys, **kwargs) 399 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)]) 400 ~/dask/dask/local.py in get_sync(dsk, keys, **kwargs) 499 """ 500 kwargs.pop('num_workers', None) # if num_workers present, remove it --> 501 return get_async(apply_sync, 1, dsk, keys, **kwargs) 502 503 ~/dask/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs) 466 467 while state['ready'] and len(state['running']) < num_workers: --> 468 fire_task() 469 470 succeeded = True ~/dask/dask/local.py in fire_task() 441 args=(key, dumps((dsk[key], data)), 442 dumps, loads, get_id, pack_exception), --> 443 callback=queue.put) 444 445 # Seed initial tasks into the thread pool ~/dask/dask/local.py in apply_sync(func, args, kwds, callback) 488 def apply_sync(func, args=(), kwds={}, callback=None): 489 """ A naive synchronous version of apply_async """ --> 490 res = func(*args, **kwds) 491 if callback is not None: 492 callback(res) ~/dask/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception) 233 failed = False 234 except BaseException as e: --> 235 result = pack_exception(e, dumps) 236 failed = True 237 return key, result, failed ~/dask/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception) 228 try: 229 task, data = loads(task_info) --> 230 result = _execute_task(task, data) 231 id = get_id() 232 result = dumps((result, id)) ~/dask/dask/core.py in _execute_task(arg, cache, dsk) 116 elif istask(arg): 117 func, args = arg[0], arg[1:] --> 118 args2 = [_execute_task(a, cache) for a in args] 119 return func(*args2) 120 elif not ishashable(arg): ~/dask/dask/core.py in (.0) 116 elif istask(arg): 117 func, args = arg[0], arg[1:] --> 118 args2 = [_execute_task(a, cache) for a in args] 119 return func(*args2) 120 elif not ishashable(arg): ~/dask/dask/core.py in _execute_task(arg, cache, dsk) 113 """ 114 if isinstance(arg, list): --> 115 return [_execute_task(a, cache) for a in arg] 116 elif istask(arg): 117 func, args = arg[0], arg[1:] ~/dask/dask/core.py in (.0) 113 """ 114 if isinstance(arg, list): --> 115 return [_execute_task(a, cache) for a in arg] 116 elif istask(arg): 117 func, args = arg[0], arg[1:] ~/dask/dask/core.py in _execute_task(arg, cache, dsk) 117 func, args = arg[0], arg[1:] 118 args2 = [_execute_task(a, cache) for a in args] --> 119 return func(*args2) 120 elif not ishashable(arg): 121 return arg ~/dask/dask/optimization.py in __call__(self, *args) 940 % (len(self.inkeys), len(args))) 941 return core.get(self.dsk, self.outkey, --> 942 dict(zip(self.inkeys, args))) 943 944 def __reduce__(self): ~/dask/dask/core.py in get(dsk, out, cache) 147 for key in toposort(dsk): 148 task = dsk[key] --> 149 result = _execute_task(task, cache) 150 cache[key] = result 151 result = _execute_task(out, cache) ~/dask/dask/core.py in _execute_task(arg, cache, dsk) 117 func, args = arg[0], arg[1:] 118 args2 = [_execute_task(a, cache) for a in args] --> 119 return func(*args2) 120 elif not ishashable(arg): 121 return arg ~/cudf/python/cudf/dataframe/dataframe.py in __getitem__(self, arg) 243 index = self.index.take(selinds.to_gpu_array()) 244 for col in self._cols: --> 245 df[col] = Series(self._cols[col][arg], index=index) 246 df.set_index(index) 247 else: ~/cudf/python/cudf/dataframe/series.py in __getitem__(self, arg) 224 elif arg.dtype in [np.bool, np.bool_]: 225 selvals, selinds = columnops.column_select_by_boolmask( --> 226 self._column, arg) 227 index = self.index.take(selinds.to_gpu_array()) 228 else: ~/cudf/python/cudf/dataframe/columnops.py in column_select_by_boolmask(column, boolmask) 107 """ 108 from cudf.dataframe.numerical import NumericalColumn --> 109 assert column.null_count == 0 # We don't properly handle the boolmask yet 110 boolbits = cudautils.compact_mask_bytes(boolmask.to_gpu_array()) 111 indices = cudautils.arange(len(boolmask)) AssertionError: ```

A lot of this notebook looks like more of the same. I'm going to pause for now and see if others have some feedback, or if there are others that can continue in this direction.

Setup

Also, I should note that I found it much simpler when iterating to drop all of the subprocess and LocalCUDACluster stuff and just work from a single thread on a smaller sample of the data (if you look above you'll see that I'm only looking at the year 2000.

jrhemstad commented 5 years ago

Unfortunately I'm not sure what GDFError: GDF_VALIDITY_UNSUPPORTED means. I'm hoping that @jrhemstad can help explain that error. Unfortunately I don't have a minimal reproducible example here.

This means your "key" columns or your "aggregation" columns have null values, and the libcudf groupby implementation currently does not support null values. fillna has been an appropriate workaround in the past.

mrocklin commented 5 years ago

FYI when folks talk about improving error messages, this is what we mean. The message you just provided would be an excellent (if perhaps sligthly verbose) message. This may or may not be on you though.

If we wanted to not work around this is there an appropriate thing to push on? Can we move this to some issue in rapidsai/cudf?

mrocklin commented 5 years ago

Ah, sorry, the second statement there was more about null handling, and less about error message handling, which I think we should probably also discuss somewhere other than here if there isn't already a conversation happening.

jrhemstad commented 5 years ago

FYI when folks talk about improving error messages, this is what we mean. The message you just provided would be an excellent (if perhaps sligthly verbose) message. This may or may not be on you though.

At present, all we can do in the C++ side is return an integer-valued error code. This limits the amount of context we can give you without having 10,000 different possible error codes.

That said, this can be improved by throwing exceptions from the C++ side and converting those into Python exceptions. This would allow us to provide more detailed error description strings.

See https://github.com/rapidsai/cudf/issues/862

If we wanted to not work around this is there an appropriate thing to push on? Can we move this to some issue in rapidsai/cudf?

That would be pushing on me to implement this https://github.com/rapidsai/cudf/issues/544

It's on my list as part of a much larger refactor of the groupby implementation. https://github.com/rapidsai/cudf/issues/543

mrocklin commented 5 years ago

OK great. Thanks for the pointers. Subscribed to those issues (and my apologies for not checking earlier)

kkraus14 commented 4 years ago

Closing as this is stale.