apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.88k stars 4.27k forks source link

[Bug]: FnAPI Java SDK Harness doesn't update user counters in OnTimer callback functions #29099

Open y1chi opened 1 year ago

y1chi commented 1 year ago

What happened?

This can be re-produced with simple DoFn with user counters, the updates in counters in processElement are populated properly to MointoringInfos when sending back ProcessBundleInstructionResponse, however the counters in onTimer callback are not recorded.

DoFn<KV<String, Long>, KV<Long, Instant>> fn =
          new DoFn<KV<String, Long>, KV<Long, Instant>>() {

            @TimerId(timerId)
            private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

            final Counter startTimers = Metrics.counter("timers", "started_timers_count");
            final Counter firedTimers = Metrics.counter("timers", "fired_timers_count");

            @ProcessElement
            public void processElement(
                @TimerId(timerId) Timer timer,
                @Timestamp Instant timestamp,
                OutputReceiver<KV<Long, Instant>> r) {
              timer
                  .align(Duration.standardMinutes(1))
                  .offset(Duration.standardSeconds(1))
                  .setRelative();
              LOG.info("started timer");
              startTimers.inc();
              r.output(KV.of(3L, timestamp));
            }

            @OnTimer(timerId)
            public void onTimer(@Timestamp Instant timestamp, OutputReceiver<KV<Long, Instant>> r) {
              LOG.info("fired timer");
              firedTimers.inc();
              r.output(KV.of(42L, timestamp));
            }
          };

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

chamikaramj commented 1 year ago

Changing to P1 since this sounds like a potential correctness issue.

@y1chi @kennknowles any idea regarding the fix ?

kennknowles commented 1 year ago

Agree with P1. I don't immediately know the fix. Was it just forgotten in the implementation?

y1chi commented 1 year ago

The Counter logic seems pretty complicated. I suspect there is a missing piece in FnApiDoFnRunner to update the counter containers when the onTimer invocation happens https://github.com/apache/beam/blob/ffb43321a0a4a646c6568336b8fba5787b1b64a0/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L1743

yardenbm commented 11 months ago

Not 100% sure this is related, but with my pipeline I have a problem where @ OnTimer("loopingTimer") is not triggered when upgraded to 2.52.0. my pipeline is pretty similar in logic to: https://beam.apache.org/blog/looping-timers/ , while it is working on 2.51.0.

kennknowles commented 11 months ago

@yardenbm I don't think it is related. This bug is much older than that, and the execution does not depend on counters. Can you file a new bug for your issue so we can get details and track it?