ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
33.96k stars 5.77k forks source link

[Ray multiprocessing] ray.util.multiprocessing launches fixed size pool and doesn't support autoscaling #31128

Open robertnishihara opened 1 year ago

robertnishihara commented 1 year ago

What happened + What you expected to happen

I'm running the reproduction script included below.

The cluster adds a node (presumably due to autoscaling), but the second node never gets utilized presumably because the actors are placed on the first node and can't easily be moved around. [I didn't verify that that's the case, just speculating.]

Versions / Dependencies

Ray 2.2 Python 3.9

Reproduction script

from smart_open import smart_open
import pandas as pd
import sklearn

def train_model(file_path: str):
    data = pd.read_csv(smart_open(file_path, "r"))

    ## Train your model here.
    from sklearn.linear_model import LinearRegression
    lr = LinearRegression()
    # (Column names are anonymized)
    lr.fit(data[["id4", "id5"]], data["v3"])

    ## Write outputs.
    # smart_open(output, "w").write(pickle.dumps(lr))

models_to_train = [
    f"s3://air-example-data/h2oai_1m_files/file_{i:07}.csv"
    for i in range(1000000)
]

from ray.util.multiprocessing import Pool
import tqdm

# Create a pool, where each worker is assigned 1 CPU by Ray.
pool = Pool(ray_remote_args={"num_cpus": 1})

# Use the pool to run `train_model` on the data, in batches of 10.
iterator = pool.imap_unordered(train_model, models_to_train, chunksize=10)

# Track the progress using tqdm and retrieve the results into a list.
list(tqdm.tqdm(iterator, total=1000000))

Issue Severity

None

robertnishihara commented 1 year ago

cc @ericl who wrote the reproduction script

ericl commented 1 year ago

Looking into this, it seems that ray.util.multiprocessing launches a fixed sized pool of actors depending on the cluster size on start:

        ray_cpus = int(ray._private.state.cluster_resources()["CPU"])
        if processes is None:
            processes = ray_cpus

It should be possible to enhance the multiprocessing module to support autoscaling similar to Datasets's actor pool. cc @edoakes

I'll tag this as a P1 for core for now.

sudhirn-anyscale commented 1 year ago

Customer LiveEO ran into this limitation. @jjyao

rupertcw commented 6 months ago

Hi - we're seeing the same issue when launching via joblib on a head pod only Ray Cluster (which should start scaling) and wondering if it has been solved?

I've currently patched with the following: ray_cpus = int(ray._private.state.cluster_resources().get("CPU") or processes)

ruisearch42 commented 6 months ago

Hi @rupertcw, thanks for reporting. Looks like you already have a workaround? If you could provide more information regarding your use-case and its need for this feature, it will help us prioritize. cc: @jjyao @anyscalesam

rupertcw commented 6 months ago

hi @ruisearch42 - thanks for following up.

The scenario is the following:

  1. we have an AWS EKS cluster running Ray Cluster via Docker
  2. to avoid overspending we want the cluster to be at a minimal size until we want to run something on it
  3. the definition of minimal size is: one machine with head pod running 24x7, with num-cpus = 0 (to avoid running jobs on head pod)
  4. When we want to run many jobs we set each job with num-cpus = 1 in the remote args and expect the head pod to automatically scale the cluster to run all these jobs.
  5. Example: run 200 jobs with num-cpus = 1 -> 200 jobs (not counting the extra init and end_batch)
  6. this may spin up 1 or more machines (depending on the machine type) up to a limit we have set, let's say 5 machines

When using this setup, ray._private.state.cluster_resources()["CPU"] doesn't have the information to get a CPU count and so it fails. All I did was to use processes instead (this would be 200 in my example) so that it scales by that number - seems to work for now.