oap-project / raydp

RayDP provides simple APIs for running Spark on Ray and integrating Spark with AI libraries.
Apache License 2.0
293 stars 66 forks source link

Implement Spark executor node affinity using custom resource #366

Closed pang-wu closed 11 months ago

pang-wu commented 11 months ago

What Problem Does the PR Solve

This PR enable raydp pin Spark executors on a specific set of machines in a Ray cluster -- a feature we found useful in our production. It is useful when the Ray cluster contains heterogeneous workloads (i.e. workloads using both Spark and Native Ray) and the total resource is less than the max resource Spark could potentially request -- which is possible in following cases:

In both scenarios, we want to limit the scheduling of the Spark executor actors into a subset of machines to avoid the Spark job take all resources in the ray cluster and starve other Ray workloads. Another scenario this feature is useful is the Spark cluster needs to be schedule on special nodes i.e. spot vs. on demand.

This feature could also benefit multi-tenant Ray clusters where different users want to run their job on different nodegroups. With the new feature, we can define a set of machines that only for scheduling Spark executors in the Ray cluster, for example:

# Ray cluster config
  # ....
  spark_on_spot:  # Spark only nodes
    resources:
      spark_executor: 100 # custom resource indicates these node group is for Spark only
    min_workers: 2
    max_workers: 10  # changing this also need to change the global max_workers
    node_config:
      # ....
  general_spot:  # Nodes for general Ray workloads
    min_workers: 2
    max_workers: 10  # changing this also need to change the global max_workers
    node_config:
      # ...

Then when initialize Spark session:

spark = raydp.init_spark(app_name='RayDP Example',
                         num_executors=executor_count,
                         executor_cores=3,
                         executor_memory=1 * 1024 * 1024 * 1024,
                         configs = {
                             ...
                             'spark.ray.raydp_spark_executor.actor.resource.spark_executor': 1,  # Schedule executor on nodes with custom resource spark_executor
                         })
carsonwang commented 11 months ago

Thanks for the contributing the PR. This will be very useful. Can you please also update this in the document? Previously you introduced node affinity for the Spark driver and introduced the configuration like spark.ray.raydp_spark_master.actor.resource.spark_master. For this new configuration which is for executor, should we add raydp_executor to the name to be consistent like spark.ray.raydp_executor.actor.resource.spark_executor?

pang-wu commented 11 months ago

@carsonwang Thanks for reviewing. Good point, I changed the config to spark.ray.raydp_spark_executor.actor.resource.* Also updated the doc

kira-lin commented 11 months ago

why delete the test test_spark_on_fractional_custom_resource?

pang-wu commented 11 months ago

@kira-lin When doing my local test, I found that test interferes with the new tests -- it could be the Spark context is partially created. I can bring it back. Another issue is the CI is failing -- I probably need some help there..

kira-lin commented 11 months ago

@pang-wu Look at raydp.yml:108, that test is run explicitly. CI failure may be due to no tests are selected.

pang-wu commented 11 months ago

@kira-lin fixed.

kira-lin commented 11 months ago

@pang-wu thanks