anyoptimization / pymoo

NSGA2, NSGA3, R-NSGA3, MOEAD, Genetic Algorithms (GA), Differential Evolution (DE), CMAES, PSO
https://pymoo.org
Apache License 2.0
2.23k stars 382 forks source link

Minimize function stuck forever when using CPU parallelization #610

Open muazhari opened 3 months ago

muazhari commented 3 months ago

I have failed test results using ElementWiseProblem with dask, ray, starmap multiprocessing, and future process pool executor in jupyter notebook. All of it except starmap have the same outcome, stuck forever and not utilizing all CPU cores (just using one core). Even the starmap configured with >1 core (interpolated to 24 cores), only makes the execution longer in duration and just uses 1 core. What is left is only using the default runner, LoopedElementwiseEvaluation. Unexpectedly, the default runner is the fastest and works compared to all parallelized runners (still only utilizes 1 core). I already tested future executor, dask, and ray separately using a similar Pymoo runner implementation. Unknowingly, ray is too slow and does not utilize all CPU cores, dask can utilize all CPU cores but slower than the future executor, and future executor is the fastest.

from pymoo.core.problem import ElementwiseProblem

import ray

ray.shutdown() ray.init(dashboard_host="0.0.0.0") ray.available_resources()

from distributed import LocalCluster from dask.distributed import Client

cluster = LocalCUDACluster()

cluster = LocalCluster(n_workers=24, threads_per_worker=1) client = Client(cluster) client

class MultiObjectiveMixedVariableProblem(ElementwiseProblem):

def __init__(self, **kwargs):
    vars = {
        "b": Binary(),
        "x": Choice(options=["nothing", "multiply"]),
        "y": Integer(bounds=(-2 * 10 ** 5, 2 * 10 ** 5)),
        "z": Real(bounds=(-5 * 10 ** 3, 5 * 10 ** 3)),
    }
    super().__init__(vars=vars, n_obj=6, n_ieq_constr=0, **kwargs)

def _evaluate(self, X, out, *args, **kwargs):
    b, x, z, y = X["b"], X["x"], X["z"], X["y"]
    f1 = z ** 2 + y ** 2
    f2 = (z + 2) ** 2 + (y - 1) ** 2
    f3 = (z ** 2) / 2 + (y + 1)
    f4 = -z ** 2
    f5 = z ** 2
    f6 = z / 2 - y - y / z

    if b:
        f2 = 100 * f2

    if x == "multiply":
        f2 = 10 * f2

    out["F"] = [f1, f2, f3, f4, f5, f6]

runner = RayParallelization(

job_resources={

"num_gpus": 1,

"num_cpus": 24,

}

)

runner = DaskParallelization(

client=client

)

pool = multiprocessing.Pool(24)

runner = StarmapParallelization(pool.starmap)

runner = LoopedElementwiseEvaluation()

class ConcurrentParallelization:

def __init__(self, max_workers) -> None:
    super().__init__()
    self.max_workers = max_workers

def __call__(self, f, X):
    with futures.ProcessPoolExecutor(max_workers=self.max_workers) as executor:
        function_futures = [executor.submit(f, x) for x in X]
        futures.wait(function_futures, return_when=futures.ALL_COMPLETED)
        return [function_future.result() for function_future in function_futures]

def __getstate__(self):
    state = self.__dict__.copy()
    state.pop("max_workers", None)
    return state

runner = ConcurrentParallelization( max_workers=24 )

problem = MultiObjectiveMixedVariableProblem(elementwise_runner=runner)

algorithm = MixedVariableGA( survival=RankAndCrowding() )

res = minimize( problem, algorithm, seed=1 )

blankjul commented 3 months ago

Looking at your code my assumption is that the parallelization with dask and ray introduces a signifcant amount of overhead (because of serialization). In my opinion the main advantages is using parallelization with a cloud service (e.g. AWS lambda functions) on a larger scale. For instance, launching 200 instances in parallel for sure will beat running this on 4 cores.

Can you confirm your results with a computation heavier problem as well? Let us say a (time-discrete) simulation that requires 1 minute or so? Happy to discuss this a little more here.

muazhari commented 3 months ago

I think it wasn't caused by overhead. Unknowingly, when using ray, it works when I do not configure the computing resources (using default ray init configuration at 32 logical cores)*. However, sometimes it crashes. Click here for details.

class OptimizationProblemRunner:
    def __init__(self):
        pass

    def __call__(self, f, X):
        runnable = ray.remote(f.__call__.__func__)
        futures = [runnable.remote(f, x) for x in X]
        return ray.get(futures)

    def __getstate__(self):
        state = self.__dict__.copy()
        return state

*update: somehow it doesn't work again, even by not supplying any resource configuration.