ray-project / ray_beam_runner

Ray-based Apache Beam runner
Apache License 2.0
41 stars 12 forks source link

Refactor how we manage pcollections #34

Closed jjyao closed 2 years ago

jjyao commented 2 years ago

Currently all the data (PCollections) is managed by PcollectionBufferManager actor and ray.put() into object store in ray_execute_bundle worker process. This approach has several implications:

  1. The owner of the data is whoever calls ray.put(), in our case, the worker processes that execute ray_execute_bundle. This is not ideal for fault tolerance since the data is fate share with the owner. In our cases, the data fate share with all the worker processes. Instead, we want the driver program to be the owner of all the data. This means instead of doing ray.put() we should return data as part of ray_execute_bundle return values so the driver program becomes the owner. More on the ownership design: https://docs.google.com/document/d/1lAy0Owi-vPz2jEqBSaHNQcy2IBSDEHyXNOQZlGuj93c/preview#heading=h.vjc9egi2q5aa
  2. Currently ray_execute_bundle gets the input data references from PcollectionBufferManager and call ray.get() to fetch the actual data. This way ray scheduler cannot do locality aware scheduling of ray_execute_bundle remote function since by examining the function arguments, it doesn't know what data it needs. Instead, we should pass data ObjRefs directly as arguments to ray_execute_bundle.

At high level, it should look like:

output_data_obj_ref1, output_data_obj_ref2, ... = ray_execute_bundle.remote(input_data_obj_ref1, input_data_obj_ref2...)

ray_execute_bundle(input_data_obj_1, input_data_obj_2...):
   return output_data_obj_1, output_data_obj_2, ...
jjyao commented 2 years ago

cc @pabloem

pabloem commented 2 years ago

The expected number of outputs from ray_execute_bundle can be computed from:

  1. result - serialized to string
  2. len(expected_outputs) + len(stage_timers)
  3. Maximum number of input transforms on bundle_process_descriptor https://github.com/ray-project/ray_beam_runner/blob/master/ray_beam_runner/portability/ray_fn_runner.py#L318

@iasoon mentions we may be able to optimize away the 3 section since it may be duplicating data

pdames commented 2 years ago

FYI, I've also been referring to Stephanie's Ownership paper for a more comprehensive discussion of the Object Ownership model: https://www.usenix.org/system/files/nsdi21-wang.pdf.