dmlc / xgboost

Scalable, Portable and Distributed Gradient Boosting (GBDT, GBRT or GBM) Library, for Python, R, Java, Scala, C++ and more. Runs on single machine, Hadoop, Spark, Dask, Flink and DataFlow
https://xgboost.readthedocs.io/en/stable/
Apache License 2.0
26.26k stars 8.72k forks source link

xgb.dask.inplace_predict discards index of dask dataframes in mapped partitions #6939

Closed jkc1 closed 3 years ago

jkc1 commented 3 years ago

I've noticed that the dask version of .inplace_predict() creates a new dataframe in the mapped prediction function, but this function creates a new dataframe with the default index.

This makes it impossible to add the predictions back to the original dask.dataframe if that dataframe has a non-RangeIndex.

https://github.com/dmlc/xgboost/blob/4ddbaeea32b598d5daf4b8a71ecd51199de1c41e/python-package/xgboost/dask.py#L1339

A simple fix would be to add index=data.index to the pd.DataFrame constructor call

edit: also, DaskDMatrix and the .predict() method preserves the index, so this behavior is unexpectedly different

trivialfis commented 3 years ago

Could you please provide an example of this running into error? Or would you like to open a PR? ;-)

jkc1 commented 3 years ago

happy to provide a minimum working example. it's a bit big, but demonstrates the issue.

I actually misspoke on the original comment. only DaskDMatrix respects the index, if a Dask DataFrame is passed to either .predict() or .inplace_predict() then it discards the index.

import dask.dataframe as dd
import pandas as pd
import xgboost as xgb
from sklearn.datasets import load_breast_cancer
from dask.distributed import Client, LocalCluster

# load a dataset
cancer = load_breast_cancer()

# create pandas dataframes
df = pd.DataFrame(cancer["data"])
df.rename(columns=lambda x: f"feat_{x}", inplace=True)
df["target"] = cancer["target"]
df.index = df.index + 1000

# train a booster
dtrain = xgb.DMatrix(
    df.drop("target", axis=1),
    label=df["target"],
)
params = {
    "objective":"binary:logistic", 
    "eval_metric":"logloss",
    "max_depth":2,
    "eta":1,
    "seed": 1234
}
bst = xgb.train(params, dtrain)

# create a dask dataframe
cluster = LocalCluster(n_workers=1, threads_per_worker=1, processes=False)
client = Client(cluster)
ddf = dd.from_pandas(df, npartitions=2)

# create predictions using the various methods
# ways tested are:
#   1. `.predict()` on a xgb.DaskDMatrix
#   2. `.predict()` on a dask.DataFrame
#   3. `.inplace_predict()` on a dask.DataFrame
predict_on_dmatrix = xgb.dask.predict(
    client, 
    bst, 
    xgb.dask.DaskDMatrix(
        client,
        ddf.drop("target", axis=1)
    )
)

predict_on_ddf = xgb.dask.predict(
    client, 
    bst, 
    ddf.drop("target", axis=1)
)

inplace_predict_on_ddf = xgb.dask.inplace_predict(
    client, 
    bst, 
    ddf.drop("target", axis=1)
)

# add to the dask dataframe. this implicitly does a join
# on the values in the index
ddf["dmat_predictions"] = predict_on_dmatrix
ddf["ddf_predictions"] = predict_on_ddf
ddf["ddf_inplace_predictions"] = inplace_predict_on_ddf

print(ddf.compute()[["dmat_predictions", "ddf_predictions", "ddf_inplace_predictions"]])

#       dmat_predictions  ddf_predictions  ddf_inplace_predictions
# 1000          0.067773              NaN                      NaN
# 1001          0.001170              NaN                      NaN
# 1002          0.000221              NaN                      NaN
# 1003          0.015047              NaN                      NaN
# 1004          0.024266              NaN                      NaN
# ...                ...              ...                      ...
# 1564          0.000075              NaN                      NaN
# 1565          0.000191              NaN                      NaN
# 1566          0.002958              NaN                      NaN
# 1567          0.000221              NaN                      NaN
# 1568          0.979305              NaN                      NaN
#
# [569 rows x 3 columns]
jkc1 commented 3 years ago

it may also be worth checking if similar happens in the RAPIDS df a few lines above. i've never used RAPIDS but the line of code looks similar