oap-project / raydp

RayDP provides simple APIs for running Spark on Ray and integrating Spark with AI libraries.
Apache License 2.0
308 stars 68 forks source link

When converting spark df to ray dataset, you can choose whether to release the resources requested by spark #309

Closed kitty-eu-org closed 1 year ago

kitty-eu-org commented 1 year ago

Restore the resources occupied by the park, so that the subsequent tasks can use the CPU, memory and other resources used by the previous spark

When init spark uses a lot of all available resources, if you call ray.data.from_spark(df), the task will be blocked at this time, and ray will give us a warning log to increase resources, as shown in the following figure: 企业微信截图_16772426688561

Add a parameter recover_ray_resources_from_spark in ray.data.from_spark. The default value is False, which means that resources are not restored. In this way, users can choose whether to stop sparkSession during the process of converting spark df to ray dataset, so as to release the resources occupied by spark , can convert spark dataframe into ray dataset smoothly, but this will bring another problem, that is, spark and existing dataframe and RDD objects will be unavailable. At this time, you need to call raydp.stop_spark() to stop spark. You also need to use spark, you can re-init_spark

kitty-eu-org commented 1 year ago

@kira-lin This is not a bug in the code I submitted, what should I do? My test case has passed.

kira-lin commented 1 year ago

You can call spark_dataset_to_ray_dataset, and then raydp.stop_spark(cleanup_data=False). Is this not enough for you? Your code will leave our code in an inconsistent state, i.e. SparkCluster still think its spark session is alive.

kitty-eu-org commented 1 year ago

You can call spark_dataset_to_ray_dataset, and then raydp.stop_spark(cleanup_data=False). Is this not enough for you? Your code will leave our code in an inconsistent state, i.e. SparkCluster still think its spark session is alive.

Thank you very much for your reply. I have seen the implementation of the raydp.stop_spark(cleanup_data=False) function, but it actually stops the sparkContext. At this time, the existing spark dataframe cannot be converted into a ray dataset. The problem I encountered is: if I only have 4-core CPU, I used up all the CPU during init spark. At this time, when I call from_spark to convert to dataset, because there are no available cpu resources, it will not convert. I will look carefully. The source code spark_dataset_to_ray_dataset actually converts the java df object into a python object and then calls ray.put to save the object. At this time, the existing spark and df are no longer needed (of course, this is optional for the user). Release the resources occupied by spark and use them for ray

kira-lin commented 1 year ago

After calling spark_dataset_to_ray_dataset, you can close the spark session by raydp.stop_spark(cleanup_data=False). You also need to set _use_owner when calling spark_dataset_to_ray_dataset. I see, your problem is because ray.data.from_arrow_refs need some CPU to be scheduled, but all CPUs have been taken up by spark. What version of ray are you using? I think ray actor only require CPU when scheduling, after it starts it should release the resources. As a work-around, you can claim you have more CPU than you actually have when starting the ray cluster if possible.

kitty-eu-org commented 1 year ago

After calling spark_dataset_to_ray_dataset, you can close the spark session by raydp.stop_spark(cleanup_data=False). You also need to set _use_owner when calling spark_dataset_to_ray_dataset. I see, your problem is because ray.data.from_arrow_refs need some CPU to be scheduled, but all CPUs have been taken up by spark. What version of ray are you using? I think ray actor only require CPU when scheduling, after it starts it should release the resources. As a work-around, you can claim you have more CPU than you actually have when starting the ray cluster if possible.

The ray actor does occupy resources when it is in use, but if spark has applied for all the resources, calling ray.data.from_spark at this time will not convert the spark df into a ray dataset. Although calling stop_spark can indeed release resources, but After the resource is released, the ray.data.from_spark call reports an error. I read the source code and found that the data conversion process uses ray put and ray get. After the spark df object is put, the sparkSession may no longer be needed at this time. At this time, the resources occupied by spark can be released, and then the ray actor There are schedulable resources, so I have the previous PR