aai-institute / pyDVL

pyDVL is a library of stable implementations of algorithms for data valuation and influence function computation
https://pydvl.org
GNU Lesser General Public License v3.0
94 stars 8 forks source link

Improve MapReduceJob and replace Futures interface (in e.g. TMCS) with joblib #387

Open AnesBenmerzoug opened 1 year ago

AnesBenmerzoug commented 1 year ago

Thanks to a new feature in joblib 1.3.0, namely the option to return the results as generator instead of waiting for all of them to finish, I think that we can get rid of the concurrent.Futures interface and replace it with joblib.

The main loop for truncated_montecarlo_shapley can be rewritten using joblib as follows:

from joblib import delayed, effective_n_jobs, Parallel

def truncated_montecarlo_shapley(
    u: Utility,
    *,
    done: StoppingCriterion,
    truncation: TruncationPolicy,
    n_jobs: int = 1,
) -> ValuationResult:
    algorithm = "truncated_montecarlo_shapley"
    # This represents the number of jobs that are running
    n_jobs = effective_n_jobs(n_jobs, config)
    # This determines the total number of submitted jobs
    # including the ones that are running
    n_submitted_jobs = 2 * n_jobs

    accumulated_result = ValuationResult.zeros(algorithm=algorithm)

    with Parallel(n_jobs=n_jobs, return_as="generator") as parallel:
        while not done(accumulated_result):
            delayed_calls = [
                delayed(_permutation_montecarlo_one_step)(u, truncation, algorithm)
                for _ in range(n_submitted_jobs)
            ]
            for result in parallel(delayed_calls):
                accumulated_result += result
                if done(accumulated_result):
                     break
    return accumulated_result

This should be almost the same as the version with the concurrent.Futures interface but with way less custom code. We can rewrite the MapReduceJob class to use this pattern instead and have the default stopping criterion be a dummy one.

We could take some inspiration from this recent open PR in joblib to implement a similar, but not quite the same, pattern.

Configuration

The users could configure the backend using using joblib's context managers parallel_backend or the newer and better parallel_config:

import joblib

with joblib.parallel_config(backend="loky"):
     truncated_montecarlo_shapley(u, done, truncation)

For ray, the user would initialize it directly if they need to otherwise it will be done automatically:

import joblib
import ray

ray.init()

with joblib.parallel_config(backend="ray"):
     truncated_montecarlo_shapley(u, done, truncation)

This would resolve #360 (handled by joblib's parallel_config function) and allow us to resolve #385

Worker Signalling Mechanism

To make this change even more worthwhile it should also resolve #363

For this we can use Event synchronization variable. I think this is a better approach than relying on yet another service e.g. redis or queue because it is simpler.

Here's how we could do it for the different backends:

Cancelling Running Jobs

Unfortunately, 'Loky' the default joblib backend does not terminate workers when we exit the Parallel context manager because it tries to reuse worker processes. We may not want that because the individual tasks may take a long time to finish and cause a memory leak.

Other backends do not suffer from this issue.

There are 2 ways to resolve this:

EDIT: Added more details and split it into different sections.

AnesBenmerzoug commented 1 year ago

@mdbenito opinions? I know I took a long while to write the RayExecutor code but I think it will just get in the way in the future (pun not intended).

AnesBenmerzoug commented 1 year ago

@mdbenito, @schroedk, @Xuzzo I have updated the description with more details and split it into different sections. What do you think? The only thing I am not sure about is how this fits in MapReduceJob and whether we should try to make it fit at all.

fcharras commented 1 year ago

Hello, I'm the contributor that proposed https://github.com/joblib/joblib/pull/1485 , I'd be interested to hear more feedback about real-world use-cases for those kind of features in joblib.

Would you mind giving more context, what features would you like to see in joblib that could help you with said usecases ?

Regarding https://github.com/joblib/joblib/pull/1485 there are some limitations that might prevent it for being extended or adapted much further than what the example already show:

Beware the dask backend does not support return_as=generator yet, I don't think the ray backend does either ? (in theory there's no blocker just need some developer time to adapt them)

Do you think an Executor-like interface in joblib, similar to concurrent.futures (i.e with Futures, wait, etc...), with joblib nice features (ability to swap backends, error surfacing, ...) would be more interesting for this kind of use cases ?

AnesBenmerzoug commented 1 year ago

Hello @fcharras, thanks for taking interest in this issue.

The pattern we use in pyDVL is a MapReduce like pattern in which we want want to be able to submit several jobs (Map) that may take a while to finish and to then combine the results (Reduce) and check a stopping criterion. If the criterion is satisfied then we do not submit more jobs and we ideally terminate any currently running jobs because they may take a long time to finish and we do want that.

The pattern implemented in joblib/joblib#1485 can fit our use case:

I didn't know that other backends didn't support returning the results as a generator out of the box, that's kind of a show blocker for this issue.

We currently the Executor interface from concurrent.futures only for one of the algorithms: we have a custom RayExecutor class for working with ray and we use the executor implementation from loky (get_reusable_executor).

We are considering to fully switch to joblib because it would simplify our code and remove the need to handle the configuration for the different backends ourselves. If there was a builtin Executor-like interface in joblib that would allow to do the same and has support for task cancellation than that would be perfect for us.

fcharras commented 1 year ago

TY for the added information.

Indeed after 1.3 backends do not support return_as out of the box, backends must be adapted to a change of logic (results are retrieved in callbacks threads, as opposed to being retrieved in the main thread before 1.3 or with non-adapted backends). Feel free to open an issue to track support for other backends, we are looking for this type of feedback.

For pattern such as https://github.com/joblib/joblib/pull/1485 , or the map-reduce you're describing, the only risk (to my knowledge) is indeed the one that is already documented in the PR example, a callback thread can't be asked to wait for completion of pending jobs in other workers. It can be worked around by pre-running enough tasks at the start, and ensuring that a cycle in the feedback loop creates as much task as it consumed results.

Since it's implicit, maybe joblib shouldn't encourage towards this kind of patterns, maybe anExecutor-like interface could be considered instead, if it can be seen to fit in joblib scope.

kosmitive commented 10 months ago

@AnesBenmerzoug

What about?

import joblib

@contextmanager
def Parallel(*args, **kwargs):
    with joblib.Parallel(*args, **kwargs) as parallel:
        yield parallel
        parallel._backend.abort_everything(ensure_ready=False)

MapReduce is in general a problem due to the StoppingCriterion. So we should actually try to get rid of MapReduce for Shapley-like algorithms

mdbenito commented 10 months ago

MapReduce is in general a problem due to the StoppingCriterion. So we should actually try to get rid of MapReduce for Shapley-like algorithms

Agreed. A central dispatcher is also simpler for handling randomness (although we already have the seed sequences). Unless there is some use case I'm forgetting, we can think of dropping mapreduce

fcharras commented 10 months ago

I didn't know that other backends didn't support returning the results as a generator out of the box, that's kind of a show blocker for this issue.

@AnesBenmerzoug FYI support with dask backend was just merged , will be available from the next release on.

For support with the ray backend I've also opened a PR at the ray project at https://github.com/ray-project/ray/pull/41028, the merge depends on if ray maintainers notice / are interested in the feature. But the diff is trivial, in fact users can unlock the feature already by adding one attribute to the RayBackend class.

I think with those two on top all major backends are now adressed ?