oap-project / raydp

RayDP provides simple APIs for running Spark on Ray and integrating Spark with AI libraries.
Apache License 2.0
293 stars 66 forks source link

Add the ability to set the owner of objects when calling _save_spark_df_to_object_store() #375

Closed max-509 closed 7 months ago

max-509 commented 10 months ago

Hello! Thank you for awesome library that helps me use Spark and Ray advantages.

When I transform Spark Dataframe to ray Dataset, I have only 2 options for specifying the owner of serialized partitions:

  1. Each executor owns its own partitions (_use_owner=False)
  2. Ray DP Master is the owner of all serialized partitions (_use_owner=True).

I will give a usage scenario when none of the ownership options can be satisfactory.

I want to do some preprocessing in Spark, convert a preprocessed DataFrame into a ray Dataset, and stop Spark (call raydp.stop_spark()) to free up ray cluster resources. But after stopping Spark, I can't use the created ray Dataset because the owner of the serialized tables has died. I suggest adding a function that can accept an actor who should become the owner of serialized partitions. For example:


@dataclass
class ObjectsOwner:
    # Actor owner name
    actor_name: str 
    # Function that set serialized parquet objects to actor owner state and return result of .remote() calling
    set_reference_as_state: Callable[[ray.actor.ActorHandle, List[ObjectRef]], ObjectRef]

def _save_spark_df_to_object_store(df: sql.DataFrame, use_batch: bool = True,
                                   objects_owner: Optional[ObjectsOwner] = None):
    # call java function from python
    jvm = df.sql_ctx.sparkSession.sparkContext._jvm
    jdf = df._jdf
    object_store_writer = jvm.org.apache.spark.sql.raydp.ObjectStoreWriter(jdf)
    if objects_owner is None:
        records = object_store_writer.save(use_batch, "")
    else:
        records = object_store_writer.save(use_batch, objects_owner.actor_name) # !

    record_tuples = [(record.objectId(), record.ownerAddress(), record.numRecords())
                        for record in records]
    blocks, block_sizes = _register_objects(record_tuples)

    if actor_owner is not None:
        actor = ray.get_actor(objects_owner.actor_name)
        ray.get(objects_owner.set_reference_as_state(actor, blocks)) # !

    return blocks, block_sizes

I hope that my suggestion will be useful.

kira-lin commented 10 months ago

Hi @max-509 , thanks for using RayDP! In this case, you can assign ownership to RayDPMaster, and use raydp.stop_spark(cleanup_data=False) to stop the session and free up the resources. By setting cleanup_data to False, RayDPMaster is actually not killed, so the data is still accessible.

But yes, your suggestion makes sense, ownership should be able to be assigned to a user specified actor. This should be very easy, are you willing to submit a PR?