ray-project / ray_beam_runner

Ray-based Apache Beam runner
Apache License 2.0
41 stars 12 forks source link

implement basic watermarking #24

Open iasoon opened 2 years ago

iasoon commented 2 years ago

Re-enabled the watermarking code so that the watermark assertions pass again

iasoon commented 2 years ago

R: @pabloem

pabloem commented 2 years ago

TODO @pabloem - what is the produced watermark and why do stages differentiate

iasoon commented 2 years ago

Ran into an issue when implementing this: https://github.com/ray-project/ray_beam_runner/blob/f1b8fdef8c5be1a8d3eacb5c3bbf0fbf5fbc004b/ray_beam_runner/portability/context_management.py#L174 Overwriting the payload here also changes the transform in the original pipeline definition. This new format is different from what the watermark manager __init__ expects. I fixed it for now by making sure that the watermark manager initializes before the RayBundleContextManager runs, How is this expected to work? Should we copy the transform before we overwrite the payload?

pabloem commented 2 years ago

once we start parallelizing the execution of bundles, our watermarking will need to change (we need to wait for all parallel tasks of a given stage before advancing its watermark and executing the downstream stages).