ray-project / ray_beam_runner

Ray-based Apache Beam runner
Apache License 2.0
42 stars 12 forks source link

Aggregate metrics within the main thread #47

Closed pabloem closed 1 year ago

pabloem commented 2 years ago

When a Beam bundle is executed, a list of metrics is returned with the result. These metrics measure the occurrence of certain events. For example:

class MyDoFn(DoFn):
    def process(self, element):
        Metrics.counter('sample_dofn', 'sum_of_events').inc(element)
        Metrics.counter('sample_dofn', 'count_of_events').inc(1)
with beam.Pipeline() as p:
    (p 
     | beam.Create([1, 2, 3])
     | beam.ParDo(MyDoFn())

p.result.metrics()  # => Returns {('sample_dofn', 'sum_of_events'): 6, ('sample_dofn', 'count_of_events'): 3}] 

The metrics are reported in the InstructionResult object that we recover after each bundle execution: https://github.com/ray-project/ray_beam_runner/blob/master/ray_beam_runner/portability/ray_fn_runner.py#L332

Here's the definition of the InstructionResult:

And specifically, this contains a ProcessBundleResponse, which has monitoring_data:

So - what we want to do is take those metrics and aggregate them to have a unified view of them.

Here's an example of code doing that in the local runner. We may basically copy that code fully and run it on our runner as well:

Probably around the spot where we recover the bundle result

Finally, another code sample that is less important, but worth knowing about is the part of the SDK worker that actually fills up these metrics: https://github.com/apache/beam/blob/0e61b026ea7accd666fc443f3aeec7f93147a3b6/sdks/python/apache_beam/runners/worker/sdk_worker.py#L635-L646

pabloem commented 2 years ago

hi @valiantljk - I think this issue would be a better starter issue. I've added several code pointers for it here. LMK if that's enough / reasonable, and if you'd like to take a look at tackling this issue.

Thanks!