oap-project / raydp

RayDP provides simple APIs for running Spark on Ray and integrating Spark with AI libraries.
Apache License 2.0
315 stars 69 forks source link

How to request all available resources in the ray cluster in init_spark()? #256

Closed Hoeze closed 1 year ago

Hoeze commented 2 years ago

Hi, how would I use the full Ray cluster for Spark? I.e. how do I run one executor on each node, taking all available resources?

carsonwang commented 2 years ago

You can use raydp.init_spark(..., placement_group_strategy="SPREAD") to spread the Spark executors to all nodes. Please see https://github.com/oap-project/raydp/blob/master/doc/spark_on_ray.md#placement-group

Hoeze commented 2 years ago

Yes, but my issue is that I still need to specify number of executor cores and executor memory. Even with my best-effort guess I cannot get all workers to run:

def init_spark_on_ray(
        executor_cores=16,
        executor_memory_overhead=0.95,
        configs=None,
        spark_conf_args=None,
        **kwargs
):
    import ray
    import raydp

    if configs is None:
        configs = {}
    if spark_conf_args is None:
        spark_conf_args = {
            "max_result_size": "64G",
        }

    spark_conf = _spark_conf(**spark_conf_args)
    configs = {
        **configs,
        **spark_conf
    }

    spark = raydp.init_spark(
        app_name="raydp",
        num_executors=int(ray.available_resources()["CPU"] / executor_cores),
        executor_cores=executor_cores,
        executor_memory=int(
            (ray.available_resources()["memory"] / ray.available_resources()["CPU"]) * executor_cores * executor_memory_overhead
        ),
        configs=configs,
        **kwargs,
    )

    return spark

import ray
ray.init()
spark = init_spark_on_ray()

print(ray.available_resources())

Results in:

{'object_store_memory': 262143982693.0,
 'node:192.168.16.29': 1.0,
 'GPU': 8.0,
 'node:192.168.16.28': 1.0,
 'node:192.168.16.27': 1.0,
 'memory': 491520000000.0,
 'node:192.168.16.33': 1.0,
 'accelerator_type:A40': 2.0,
 'node:192.168.16.22': 1.0,
 'CPU': 128.0}

I.e. I'm wasting tons of resources.

kira-lin commented 2 years ago

When printing available resources, have all executor actors started? It might take some time. Does all nodes have the same resources(at least CPU and memory)?

Hoeze commented 2 years ago

@kira-lin I found the issue: One of the hosts had only 2GB of memory per core available. Except this one, all our hosts have >=4GB/core.

Would it be possible to account for such node-specific differences somehow?

kira-lin commented 2 years ago

hmm, I think you can use 2GB memory per core, so that you can use all cores on your cluster. If that's not enough for your workload, then you have to spare some cores on that node.

kira-lin commented 1 year ago

close as stale