dask / dask-xgboost

BSD 3-Clause "New" or "Revised" License
164 stars 43 forks source link

[FEATURE DISCUSSION] Adding dedicated support for Dask-cuDF #36

Open mtjrider opened 5 years ago

mtjrider commented 5 years ago

Currently, RAPIDS deploys a forked version of this repository with additions that support a few things:

  1. New Random Forests (RF) interface in XGBoost
  2. Ingest of dask-cudf objects
  3. Ingest of GPU device-resident XGBoost.DMatrix objects.

We're considering abstracting these additional features and their associated dependencies into a compat.py so that we can contribute these feature additions back to this code repository without breaking its current pipeline, or requiring any additional code dependencies (in case users want the default experience).

We're wondering what the thoughts around this are, and if the community would be receptive to these additional features.

mrocklin commented 5 years ago

Thanks for raising this issue @mt-jones !

In general contributions would be welcome. I think the only thing we would want to be careful about would be explicit dependencies on cuda-specific software, just because that might not be common case. My guess/hope is that we can achieve this with some creativity.

New Random Forests (RF) interface in XGBoost

This seems like an obvious win regardless. This project depends on XGBoost, so any code in XGBoost we can expect to be around.

Ingest of dask-cudf objects

It would be useful to see exactly what is necessary here. There has been a lot of work in recent months to make dask-cudf dataframes just dask dataframes with a couple extra methods. I wonder if we might be able to make the dask-xgboost code similarly agnostic.

If we did have to import dask-cudf if would have to be very very protected. We can't expect dask-xgboost users to have cudf installed and, even if they did, the multi-second import time of cudf probably means that we can't import it at startup.

It would be useful to see exactly what code you would need here within XGBoost. Maybe we can find a clever solution that works around this. We've done a lot of this in dask.dataframe already and workarounds have always been possible so far.

Ingest of GPU device-resident XGBoost.DMatrix objects.

As above, it would be good to get code examples of what is necessary here.

Just my current thoughts. Folks like @ogrisel or @TomAugspurger might want to chime in.

mrocklin commented 5 years ago

Also, just to be clear, I'm looking forward to including the GPU support to this project. I think that it will be a great contribution. Please don't interpret my pickiness above as a lack of enthusiasm :)

Thanks for leading on this @mt-jones

mtjrider commented 5 years ago

@mrocklin it is clear to me that you were enthusiastic from your immediate detailed response. Thank you!

Regarding RF integration

This seems like an obvious win regardless. This project depends on XGBoost, so any code in XGBoost we can expect to be around.

To clarify, this feature is already integrated into dmlc/xgboost, and required additional class infrastructure to accomplish.

Regarding dask-cudf integration necessary for functioning

It would be useful to see exactly what is necessary here. There has been a lot of work in recent months to make dask-cudf dataframes just dask dataframes with a couple extra methods. I wonder if we might be able to make the dask-xgboost code similarly agnostic.

The requirements are neatly summarized as follows:

In core.py:

from .compat import (CudfDataFrame, CudfColumn)

in compat.py:

...

# cudf
    try:
        from cudf.dataframe import DataFrame as CudfDataFrame
        from cudf.dataframe.column import Column as CudfColumn
        CUDF_INSTALLED = True
    except ImportError:

        class CudfDataFrame(object):
            """ dummy object for cudf.dataframe.DataFrame """
            pass

        class CudfColumn(object):
            """ dummy object for cudf.dataframe.column.Column """
            pass

        CUDF_INSTALLED = False
...

core.py reference here compat.py reference here

I didn't find this to be excessively onerous, but it does imply that the labels and the training data are already properly concatenated into a single DMatrix object. Error handling can be a bit tricky, here. As it wouldn't necessarily show up in the training phase, but in the prediction/test phase. There are considerable performance implications for being able to do this cleanly.

Other than that, we should ensure that these changes are done consistently with other elements of the Dask ecosystem.

mrocklin commented 5 years ago

The proper concat function needs to be called

Dask dataframe handles this with the dask.dataframe.methods.concat function, which will call the appropriate function based on the inputs provided.

that (dask-)cudf would be imported with the other pertinent packages so as not to arbitrarily inflate processing time during this step

I'm not sure I understand this. Can you elaborate?

I think that ideally it should be dask-xgboost's job to gather dataframes on the appropriate node, concatenate them together, and then hand that dataframe off to XGBoost. Ideally we don't have to handle whether or not it is a Pandas, cuDF, or other Dataframe-like object. That shouldn't matter at this high administrative level. Hopefully none of those operations require type checking. I hope that by using the dask.dataframe.methods.concat function referred to above should maybe avoid any explicit mentions of cudf?

mtjrider commented 5 years ago

Dask dataframe handles this with the dask.dataframe.methods.concat function, which will call the appropriate function based on the inputs provided.

Are you suggesting that Dask can replace the logical concat here?

Namely, XGBoost cannot be given multiple parts (e.g. a list, array, etc.) in data or labels. This is reason for the concatenation, here.

Ideally, we would move away from type-checking, and rely on Dask abstraction.

Regarding the inflation comment: if the concatenation function for the cuDF is not available to the worker, and it has to import it at call time, it will inflate the time taken to actually perform the concatenation. This concatenation time is already quite fast when compared with typical workloads. Does that make sense?

mrocklin commented 5 years ago

Are you suggesting that Dask can replace the logical concat here?

Yes. I recommend taking a look at dask.dataframe.methods.concat, which has a dispatched solution.

Namely, XGBoost cannot be given multiple parts (e.g. a list, array, etc.) in data or labels. This is reason for the concatenation, here.

Yes, I agree, hence the suggestion to use dask.dataframe.methods.concat from before.

Regarding the inflation comment: if the concatenation function for the cuDF is not available to the worker, and it has to import it at call time, it will inflate the time taken to actually perform the concatenation. This concatenation time is already quite fast when compared with typical workloads. Does that make sense?

You're going to have to suffer the import time at some point. Might as well put it off until we're sure we're going to need it?

Also, I suspect that there is no fundamental reason for cudf to take so long to import. If this affects you as well then you might want to chime in here: https://github.com/rapidsai/cudf/issues/627

mtjrider commented 5 years ago

You're going to have to suffer the import time at some point. Might as well put it off until we're sure we're going to need it?

Where performance is of principal concern, it would be important to have the import done once, rather than incurring it multiple times. It's not about paying the cost, it's about ensuring that the cost to import is properly segregated from any particular working function, and that these imports occur only once.

It's not that the import takes long, it's that the concat time is very short. In most workloads, it's less than 1s.

mrocklin commented 5 years ago

Imports only happen once per Python process

In [1]: %time import cudf
CPU times: user 804 ms, sys: 1.45 s, total: 2.25 s
Wall time: 4.51 s

In [2]: %time import cudf
CPU times: user 0 ns, sys: 0 ns, total: 0 ns
Wall time: 6.2 µs

it's about ensuring that the cost to import is properly segregated from any particular working function

I agree that this screws with benchmarking and such, but I'm not sure how feasible it is. We can't predict and pre-import every module that the user might want to use.

mtjrider commented 5 years ago

I agree that this screws with benchmarking and such, but I'm not sure how feasible it is. We can't predict and pre-import every module that the user might want to use.

This is a fair observation. Your design suggestion also implies a cleaner interface overall. I think I would be in support of it. Really keen to hear from @TomAugspurger @ogrisel

mrocklin commented 5 years ago

In a side channel conversation @mt-jones asks

If you could provide me a sample of the method you referred to in order to abstract the concat method, that would be helpful.

I think that you just need to use the dask.dataframe.methods.concat function. I don't think that there is any other work for you to do. You don't need to implement anything, it's already implemented

Additionally, it would be great to see an example of what you have in mind regarding minimizing the dependencies.

I hope that you won't need to do any work here. Hopefully if you use the concat function above you won't ever have to import cudf explicitly, and so this problem goes away

mtjrider commented 5 years ago

Yes, I believe I miscommunicated @mrocklin! Sorry about that. Let me clarify:

There is a section of code which type-checks inputs for concat. This is done primarily to ensure that any other datatype is not used. See, here.

def concat(L):
    if isinstance(L[0], np.ndarray):
        return np.concatenate(L, axis=0)
    elif isinstance(L[0], (pd.DataFrame, pd.Series)):
        return pd.concat(L, axis=0)
    elif ss and isinstance(L[0], ss.spmatrix):
        return ss.vstack(L, format='csr')
    elif sparse and isinstance(L[0], sparse.SparseArray):
        return sparse.concatenate(L, axis=0)
    else:
        raise TypeError("Data must be either numpy arrays or pandas dataframes"
                        ". Got %s" % type(L[0]))

It is clear that this section of code would be reduced by dask.dataframe.methods.concat

My view is that we need to do the following:

  1. Reduce the import dependencies from our fork:
    
    from collections import defaultdict
    import logging
    from threading import Thread

import time import numpy as np import pandas as pd import cudf as gd from libgdf_cffi import libgdf from toolz import first, assoc from tornado import gen

try: import sparse import scipy.sparse as ss except ImportError: sparse = False ss = False

from dask import delayed from dask.delayed import Delayed, delayed from dask.base import tokenize, normalize_token, DaskMethodsMixin from dask.utils import funcname, M, OperatorMethodMixin from dask.context import _globals from dask.core import flatten from dask.threaded import get as threaded_get from dask.optimization import cull, fuse from toolz import merge, partition_all from dask.distributed import wait, default_client import dask.dataframe as dd import dask.array as da import dask_cudf as dgd

import xgboost as xgb

from .tracker import RabitTracker


2. Ensure that the correct datatypes are passed to the worker to provide more transparent/easier to understand error messages. As I understand it, this will require that we probe the datatype on each worker, prompting an error where appropriate.

What are your thoughts on this?
mrocklin commented 5 years ago

I would nuke the concat function defined in this repository and replace it with dask.dataframe.methods.concat. If there are cases not handled by dask.dataframe.methods.concat then I would add them to that function as extra dispatches (feel free to ask about this should it become necessary).

this will require that we probe the datatype on each worker, prompting an error where appropriate

I would just rely on dask.dataframe.methods.concat to raise the error. I don't think that we should do any explicit type checking.