apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.76k stars 4.21k forks source link

[Task]: Fill Runner Metrics support gap in Sources #32021

Open Abacn opened 1 month ago

Abacn commented 1 month ago

What needs to happen?

It is found some runners does not support reporting metrics in

  1. BoundedSource.split()
    • Direct runner
    • non-portable Flink runner
    • non-portable Spark runner
    • unknown - Samza runner, AttemptedMetrics tests are excluded altogether

Dataflow runner (legacy / runner v2) are supported.

  1. BoundedReader.advance()
    • Spark Structured Streaming Runner

Notably, portable runners support metrics report in split, as it executes Source as a splittable DoFn, at the point the metrics container is present

This task is created to track these gaps.

Issue Priority

Priority: 2 (default / most normal work should be filed as P2)

Issue Components

Abacn commented 1 month ago

By saying unsupported, the following log is seen:

Direct runner:

[Test worker] ERROR org.apache.beam.sdk.metrics.MetricsEnvironment - Unable to update metrics on the current thread. Most likely caused by using metrics outside the managed work-execution thread:
  java.lang.Thread.getStackTrace(Thread.java:1564)
org.apache.beam.sdk.metrics.MetricsEnvironment.getCurrentContainer(MetricsEnvironment.java:140)
org.apache.beam.sdk.metrics.DelegatingCounter.inc(DelegatingCounter.java:76)
org.apache.beam.sdk.metrics.DelegatingCounter.inc(DelegatingCounter.java:67)
org.apache.beam.sdk.metrics.MetricsTest$CountingSourceWithMetrics.split(MetricsTest.java:495)
org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$InputProvider.getInitialInputs(BoundedReadEvaluatorFactory.java:217)
org.apache.beam.runners.direct.ReadEvaluatorFactory$InputProvider.getInitialInputs(ReadEvaluatorFactory.java:88)
org.apache.beam.runners.direct.RootProviderRegistry.getInitialInputs(RootProviderRegistry.java:80)
org.apache.beam.runners.direct.ExecutorServiceParallelExecutor.start(ExecutorServiceParallelExecutor.java:161)
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213)
...

Flink runner:

[jobmanager-io-thread-1] ERROR org.apache.beam.sdk.metrics.MetricsEnvironment - Unable to update metrics on the current thread. Most likely caused by using metrics outside the managed work-execution thread:
  java.lang.Thread.getStackTrace(Thread.java:1564)
org.apache.beam.sdk.metrics.MetricsEnvironment.getCurrentContainer(MetricsEnvironment.java:140)
org.apache.beam.sdk.metrics.DelegatingCounter.inc(DelegatingCounter.java:76)
org.apache.beam.sdk.metrics.DelegatingCounter.inc(DelegatingCounter.java:67)
org.apache.beam.sdk.metrics.MetricsTest$CountingSourceWithMetrics.split(MetricsTest.java:495)
org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:135)
org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat.createInputSplits(SourceInputFormat.java:44)
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:251)
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:894)
org.apache.flink.runtime.executiongraph.ExecutionGraph.initializeJobVertex(ExecutionGraph.java:224)
...

This is different from a log ususally seen when reporting metrics in a callback thread

https://github.com/apache/beam/blob/89d5e2f29615c1d4dba41c42a2161d5c5d5f39a8/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java#L144

here Java core's isMetricsSupported() returns true, but not actually supported by the runner. So this is a feature gap rather than WAI by spec