coiled / feedback

A place to provide Coiled feedback
14 stars 3 forks source link

Joblib Scikit-Learn not parallelizing Resample #237

Closed maxall41 closed 1 year ago

maxall41 commented 1 year ago

Describe the bug Im trying to parallelize the scikit-learn resample function to balance a dataset. But i can't find a good way todo this in a distributed method I have tried the below code but it seems to only run on one node and not the others:

df_majority = part.query("prcp < 0.4")
df_minority = part.query("prcp > 0.4")
n_samples = len(df_minority.index)

--- SEPERATE CELL ---

import joblib
with joblib.parallel_backend('dask'):
     df_majority_downsampled = delayed(resample)(df_majority, 
                                    replace=False,    
                                    n_samples=n_samples,    
                                    random_state=123) 
     # data = dd.concat([df_majority_downsampled.compute(),df_minority])
     data = dd.concat([client.gather(wait(df_majority_downsampled.persist()).done),df_minority])

Does anybody know why this might be happening or how i can fix it?

dchudz commented 1 year ago

Thanks Max! Just to be sure, I assume you created a Coiled cluster (cluster = coiled.Cluster(..)) and the client is attached to your cluster (client = cluster.get_client())?

What do you see that indicates it's on one node? (Looking at the scheduler dashboard and you see work on one node?)

maxall41 commented 1 year ago

Yes i created the cluster and attached it to the client. In the dashboard a single node's CPU get's pinned to >100% while the rest are around 5% - 10%. And the memory on the node with the high CPU usage also starts spilling into disk.

mrocklin commented 1 year ago

It looks like you're mixing a few different abstractions here:

with joblib.parallel_backend('dask'):

Is the following line dask.delayed or joblib.delayed. It looks like you're trying to use joblib, but then you treat this object as though it were a Dask thing.

     df_majority_downsampled = delayed(resample)(df_majority,   
                                    replace=False,    
                                    n_samples=n_samples,    
                                    random_state=123) 
     # data = dd.concat([df_majority_downsampled.compute(),df_minority]). 

It looks like you ask for one task to run on a remote machine, then you gather it locally, then you put it back into a Dask thing (but the data is on your local computer now).

     data = dd.concat([client.gather(wait(df_majority_downsampled.persist()).done),df_minority])

In short, I suspect that you are mixing joblib, dask delayed, dask futures, and dask dataframes in a way that is probably not doing what you're trying to do. I recommend trying to reduce the number of APIs active here, like just using Joblib or just using Dask futures and maybe things will work themselves out.

If you say more about what you want to achieve I suspect that we can point you towards a ready example.

maxall41 commented 1 year ago

Yeah i was trying to avoid using dask delayed() here but it seems to be necessary otherwise i get:

TypeError: Truth of Delayed objects is not supported

Im pretty new to using Dask, and joblib so I'm assuming that using joblib.delayed instead of dask.delayed would be the correct way todo this, but I'm not sure how to use joblib.delayed and then combine the under-sampled majority with the minority in a dask dataframe. For context I'm trying to balance a weather dataset based on the precipitation value for a given day.

mrocklin commented 1 year ago

Unfortunately the context you're providing doesn't give us enough information to help you effectively. For example "For context I'm trying to balance a weather dataset based on the precipitation value for a given day" could mean many different things based on how your data is stored, etc..

I recommend starting from an example, like one of these.

For context, there are many different APIs that use Dask. You're kinda mixing a lot of them. As an example if someone didn't know pandas/scikit-learn/numpy/python it's as though they wrote code like the following:

df = pandas.read_csv(...)

df.fit(numpy.dot(5))

It's mixing a bunch of abstractions. They all work well on their own, but mixing them doesn't make a lot of sense. I recommend using 1-2 of them. Don't use joblib and dask delayed together for example. They do the same thing.

Again, I recommend starting from an example.