EpistasisLab / tpot

A Python Automated Machine Learning tool that optimizes machine learning pipelines using genetic programming.
http://epistasislab.github.io/tpot/
GNU Lesser General Public License v3.0
9.68k stars 1.57k forks source link

How to optimize Dask config for TPOT #847

Open hoangthienan95 opened 5 years ago

hoangthienan95 commented 5 years ago

Hi there,

I have a question regarding using Dask-jobqueue on HPC to parallelize TPOT. I have 10 nodes, with 47 cores each. I can either do:

cluster = LSFCluster(queue='corradin_long',
                     cores= 47,
                     walltime='100000:00',
                     memory='256GB',
                     death_timeout=600

cluster.adapt(minimum= 5, maximum=10)

or

cluster = LSFCluster(queue='corradin_long',
                     cores= 2,
                     walltime='100000:00',
                     memory='10GB',
                     death_timeout=600

cluster.adapt(minimum= 200, maximum=235)

According to Dask resources: here and here:

Using few processes and many threads per process is good if you are doing mostly numeric workloads, such as are common in Numpy, Pandas, and Scikit-Learn code, which is not affected by Python's Global Interpreter Lock (GIL).

However, if you are spending most of your compute time manipulating Pure Python objects like strings or dictionaries then you may want to avoid GIL issues by having more processes with fewer threads each

So which configuration is best for TPOT ? In the example video of using TPOT with Dask from Dask, they used 40 workers and 80 cores (2 cores per worker) and it worked fine, but when I used 2 cores per worker with 200 workers it was very slow and results in a weird task graph. Although I couldn't see their cluster initialization so I might be doing it wrong (processes and ncpus keywords for the LSFCluster object initialization did not work for me) Sorry I'm new to both libraries and an inexperienced coder, so more layman terms explanation is greatly appreciated!

Also, I look at the conversation between dask and TPOT (#304) and while it's interesting, are there documentation on things to note/ limitations/ tips and tricks when using Dask with TPOT to achieve maximum efficiency? The dask example binder is a great starting point but it offers a very minimum example. I'd be willing to write up something to help people who uses the two libraries together.

Thanks so much for your work on this incredible library!!

hoangthienan95 commented 5 years ago

Update:

I tried both of the configs. Both of them fail in the first generation.

For the "less workers, more cores per worker" config, I encounter the problem where only one worker takes all the memory and tasks (up to 231GB, which is the limit for one worker!) as shown below. This results in the worker being killed multiple times and ultimately fail the computation after 3 retries. Doing this way, the command .fit() fails more quickly, with no task graphs observed on Dask UI.

image

The same thing happens when I do 2 cores per worker, here you can see that one worker takes the entire 5.12 GB and 120 tasks, while others do no work. However, it was able to build the task graph and do some computation.

image

image

What's happening here?? TPOT works fine (but incredibly slow) if I use client as local machine instead of the LSF cluster:

from dask.distributed import Client
client = Client(n_workers=4, threads_per_worker=1)
client
weixuanfu commented 5 years ago

To clarify, I don't have any experience about using LSFcluster. Thank you for submitting this issue and testing TPOT in HPC environemnt.

Hope the suggestions below are helpful.

Could you please try client object in the demo below before setting TPOT object? Also please print out the client to check the connection and configuration of the LSF cluster?

cluster = LSFCluster(queue='corradin_long',
                     cores= 2,
                     walltime='100000:00',
                     memory='10GB',
                     death_timeout=600

# Start 50 workers in 50 jobs that match the description above
cluster.scale(50)  # this may take a few seconds to launch
from dask.distributed import Client
# connect to the cluster
client = Client(cluster)

When the setting is 2 cores per worker, what is n_jobs setting in TPOT? In the latest version of TPOT (v 0.9.6), we reduced to n_jobs to control the chunk size (it should be n_jobs * 4 in this case) for building task graph (The docs for this part is out of date. We will fix that in next release.). Increasing n_jobs (e.g n_jobs=25 for population_size >= 100 and workers # =50) may help if you are using this version.

hoangthienan95 commented 5 years ago

@weixuanfu Thank you for your quick reply. Here is my newest version of notebook. Also, i'm giving you the video of a run, where I caught the moment when the program terminates (1:33 - 1:40 in the video). At no point in the execution did the memory load for any individual worker exceeds the limit I set (never >1 GB while the limit I set was 15 GB, see video). Attempt 2 section in the notebook is the code that corresponds with the run in the video.

Notebook: https://github.com/hoangthienan95/sharing/blob/master/TPOT_test_error_report.ipynb Video: https://streamable.com/zenu6

As you can see, Im using v0.9.5 of TPOT, before this attempt, I did not set n_jobs because I was afraid it would interfere with use_dask. Now I set it to half the worker# like you said, and the problem of only one worker doing all the work is gone. However, my program still terminate with alot of errors.

There are also quite alot of Worker restarted, might be due to memory errors, which really confuses me because of the observation above. I can try running using the joblib backend method, but as another user noted in #779, it doesn't really solve the problem.

This might be a dask/dask-jobqueue/dask-distributed thing, if that's the case please point me to who I can ask for help. Although I have performed other computation with Dask using this cluster and set up, so I thought it could be a problem with TPOT.

Let me know if you need any other information to diagnose the problem!