dask / dask-xgboost

BSD 3-Clause "New" or "Revised" License
162 stars 43 forks source link

xgboost stop working after train finished with dask.distributed.SSHCluster #73

Closed dding3 closed 4 years ago

dding3 commented 4 years ago

What happened: I have a cluster with 3 nodes and I tried to run xgb.dask.train() with SSHCluster as:

   with SSHCluster(["localhost", "node005", "node006"],worker_options={"nthreads": 2}) as cluster:
        cluster.scale(3)
        with Client(cluster) as client:
            output = xgb.dask.train(client,
                            {'verbosity': 1,
                             'tree_method': 'hist',
                             'n_estimators': 5,
                             'max_depth': 50,
                             'n_jobs': -1,
                             'random_state': 2,
                             'learning_rate': 0.1,
                             'min_child_weight': 1,
                             'seed': 0,
                             'subsample': 0.8,
                             'colsample_bytree': 0.8,
                             'gamma': 0,
                             'reg_alpha': 0,
                             'reg_lambda': 1},
                            dtrain,
                            num_boost_round=500, evals=[(dTest, 'train')])
            print("after train")
            bst = output['booster']
            history = output['history']
            with open('/tmp/model_xgbregressor0.pkl', 'wb') as f1:
                  pickle.dump(bst, f1)
            cluster.scale(0)
            client.shutdown()

I can see the model began training on 2 nodes as cpu usage for one python process is about 100%. However after training finished(cpu usage drop under 10%), nothing happens in nodes and client as I didn't see debug log "after train" and the model is not dumped on either node. And it never finished.

Environment:

quasiben commented 4 years ago

@dding3 can you build a minimal reproducer of the problem ? Does this fail on a a single machine without using dask-ssh ?

dding3 commented 4 years ago

@dding3 can you build a minimal reproducer of the problem ? Does this fail on a a single machine without using dask-ssh ?

The program can work if using dask.distributed.LocalCluster on a single machine. Please try below code:

def main(client):
    import pandas as pd
    import numpy as np
    numpy_data = np.random.rand(3000000,10)
    train_x = pd.DataFrame(data=numpy_data, columns=["column1", "column2", "column3", "column4", "column5", "column6", "column7", "column8", "column9", "column10"])
    numpy_label = np.random.uniform(115.5, 117.5, 3000000)
    train_y = pd.DataFrame(data=numpy_label, columns=["label"])

    from dask import dataframe as dd
    X = dd.from_pandas(train_x, npartitions=3)
    y1 = dd.from_pandas(train_y, npartitions=3)

    dtrain = DaskDMatrix(client, X, y1)

    print("before train")
    output = xgb.dask.train(client,
                            {'verbosity': 1,
                             'tree_method': 'hist',
                             'n_estimators': 500,
                             'max_depth': 50,
                             'n_jobs': -1,
                             'random_state': 2,
                             'learning_rate': 0.1,
                             'min_child_weight': 1,
                             'seed': 0,
                             'subsample': 0.8,
                             'colsample_bytree': 0.8,
                             'gamma': 0,
                             'reg_alpha': 0,
                             'reg_lambda': 1},
                            dtrain,
                            num_boost_round=500, evals=[(dtrain, 'train')])
    print("after train")
    bst = output['booster']
    history = output['history']
    # you can pass output directly into `predict` too.
    prediction = xgb.dask.predict(client, bst, dtrain)
    print('Evaluation history:', history)
    return prediction

if __name__ == '__main__':
    # or use other clusters for scaling
    from dask.distributed import Client, SSHCluster
    with SSHCluster(["localhost", "aep-005", "aep-006"],worker_options={"nthreads": 2}) as cluster:
        cluster.scale(3)
        with Client(cluster) as client:
            main(client)
            cluster.scale(0)
            client.shutdown()

    #with LocalCluster(n_workers=1, threads_per_worker=1) as cluster:
    #    with Client(cluster) as client:
    #        main(client)
mrocklin commented 4 years ago

Thank you for the issue and the example @dding3

I tried running it with LocalCluster but I ran into some issues. Verifying is DaskDMatrix from the xgboost.dask package? I did a web search and that's where it came up, but I'm not familiar with this package. I imported that and things seemed to run.

Then eventually my laptop crashed. Log messages were complaining about running out of memory so I suspect that this problem is too large for my laptop. Do you have a smaller problem that maintainers can run locally? (most of us work from laptops)

quasiben commented 4 years ago

The cluster.scale(3) is strange given you are using SSHCluster. I would not expect this to work cleanly.

As @mrocklin said, I also get an error here:

Traceback (most recent call last):
    dtrain = DaskDMatrix(client, X, y1)
NameError: name 'DaskDMatrix' is not defined
dding3 commented 4 years ago

Tried to remove cluster.scale(3), still have same issue.

Please add from xgboost.dask import DaskDMatrix before calling DaskMatrix.

I have tried to reduce the training data size and model size, however seems they are too small to repro the problem. I will do more experiements to find a appropriate size to repro the problem.

mrocklin commented 4 years ago

Maybe you are running out of memory when trying to solve this problem? XGBoost requires that the full computation fit comfortably in memory.

On Fri, Jul 10, 2020 at 7:17 PM dding3 notifications@github.com wrote:

Tried to remove cluster.scale(3), still have same issue.

Please add from xgboost.dask import DaskDMatrix before calling DaskMatrix.

I have tried to reduce the training data size and model size, however seems they are too small to repro the problem. I will do more experiements to find a appropriate size to repro the problem.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/dask-xgboost/issues/73#issuecomment-656968934, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTB7YEDUG3ECSCEJ66DR27DVHANCNFSM4OTIIL3Q .

quasiben commented 4 years ago

If you setup the workers manually you can see messages like the following:

distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 19.20 GB -- Worker memory limit: 27.05 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 19.60 GB -- Worker memory limit: 27.05 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 19.99 GB -- Worker memory limit: 27.05 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 20.40 GB -- Worker memory limit: 27.05 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 20.79 GB -- Worker memory limit: 27.05 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 21.19 GB -- Worker memory limit: 27.05 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 21.59 GB -- Worker memory limit: 27.05 GB
distributed.worker - WARNING - Worker is at 81% memory usage. Pausing worker.  Process memory: 21.99 GB -- Worker memory limit: 27.05 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 21.99 GB -- Worker memory limit: 27.05 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 22.38 GB -- Worker memory limit: 27.05 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 22.78 GB -- Worker memory limit: 27.05 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 23.18 GB -- Worker memory limit: 27.05 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 23.55 GB -- Worker memory limit: 27.05 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 23.96 GB -- Worker memory limit: 27.05 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 24.35 GB -- Worker memory limit: 27.05 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 24.75 GB -- Worker memory limit: 27.05 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 25.15 GB -- Worker memory limit: 27.05 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 25.55 GB -- Worker memory limit: 27.05 GB
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - INFO - Worker process 71622 was killed by signal 15
distributed.nanny - WARNING - Restarting worker
distributed.worker - INFO -       Start worker at:  tcp://10.33.227.163:36209
distributed.worker - INFO -          Listening to:  tcp://10.33.227.163:36209
distributed.worker - INFO -          dashboard at:        10.33.227.163:44433
distributed.worker - INFO - Waiting to connect to:   tcp://10.33.227.163:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          2
distributed.worker - INFO -                Memory:                   27.05 GB
distributed.worker - INFO -       Local Directory: /home/nfs/bzaitlen/dask-worker-space/dask-worker-space/worker-ygeidf95
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to:   tcp://10.33.227.163:8786
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection

@mrocklin's suggestion if probably correct -- that is, xgb requires a fair amount of memory. In the messages above you can see the work is reset with the memory exceeds 95%. I can't close bugs in this repo but that would be my recommendation. If you feel like there is a bug please reopen with a reproducer