apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.81k stars 4.23k forks source link

[Failing Test]: GroupByKeyTest$BasicTests$.testAfterProcessingTimeContinuationTriggerUsingState() #25485

Open becketqin opened 1 year ago

becketqin commented 1 year ago

What happened?

When the validation test for Flink runner runs, GroupByKeyTest$BasicTests$.testAfterProcessingTimeContinuationTriggerUsingState() fails from time to time. I checked the test but had some issue understanding the expected behavior.

As far as I understand, the expected simplified input and output sequence of the PTransforms are following:

  1. Source a. emit 0 b. emit Watermark.MAX_VALUE
  2. WithKeys a. [in] 0 b. [out] KV{"dummy key", 0} c. [in] Watermark.MAX_VALUE d. [out] Watermark.MAX_VALUE
  3. ParDo a. [in] KV{"dummy key", 0} b. [out] 42 c. [in] Watermark.MAX_VALUE d. [out] Watermark.MAX_VALUE (which has a delay of 500 ms after it receives the Watermark.MAX_VALUE)
  4. First Sum a. [in] 42 b. [out] {42, Timing=EARLY} (emitted 1 ms after it receives the input of 42) c. [in] Watermark.MAX_VALUE (roughly 500 ms after it emits {42, Timing=EARLY} d. [out] {42, Timing=ON_TIME} e. [out] Watermark.MAX_VALUE
  5. Second Sum a. [in] {42, Timing=EARLY} b. [out] (42, Timing=EARLY) (emitted 1 ms after it receives the input of {42, Timing=EARLY} c. [in] {42, Timing=ON_TIME} d. [out] {84, Timing=EARLY} (emitted 1 ms after it receives the input of {42, Timing=ON_TIME}, if the Watermark.MAX_VALUE hasn't arrived by then. Otherwise, this output will be skipped.) e. [in] Watermark.MAX_VALUE f. [out] {84, Timing=ON_TIME}

I am wondering if the race condition between 5.d and 5.e causes the test to be flaky.

However, I tried to tweak the delay of AfterProcessingTime.pastFirstElementInPane() a little bit, the test seems still failing ~50% of the runs. So I am not quite sure if my theory above makes sense.

@kennknowles It looks like this test was added by you. Could you help check? Thanks.

Issue Failure

Failure: Test is flaky

Issue Priority

Priority: 2 (backlog / disabled test but we think the product is healthy)

Issue Components

kennknowles commented 1 year ago

I spent a while to trace your logic, and it makes sense to me. I'm not sure the best way to proceed. I hate tests with delays or inherent races in them, but of course this one is specifically testing processing time and also specifically avoiding dependency on TestStream.

Couple thoughts:

Nice analysis!

becketqin commented 1 year ago

@kennknowles Thanks for the quick reply.

I was trying to play with the delay time a little bit, and the semantic of the following code block is unclear to me.

Repeatedly.forever(
                              AfterProcessingTime.pastFirstElementInPane()
                                  .plusDelayOf(Duration.millis(triggerMillis))))

I printed out the timestamps of each element processing, watermark processing and emission. From what I see the plusDelayOf() was only honored for the first firing of the trigger. The rest of the firings seem got triggered immediately upon records arrival without any delay. Is that expected?

If we want to make 5.e happen before 5.d, I think we need to increase triggerMillis. But if that only works for the first firing, it won't help either.

I will temporarily exclude this test from the Flink runner's validation test. The processing timing triggered behavior is indeed tricky to test. Some kind of manually triggered executors might be helpful. Flink heavily relies on that for processing time related tests.

kennknowles commented 1 year ago

The delay should apply for each triggering. It is not expected that the next triggering happens immediately. However, note that the ON_TIME pane is not governed by the trigger. It is governed only by the watermark passing the end of the window.

becketqin commented 1 year ago

I dug a little bit more into the code. It looks that one potential issue here is that GroupByKey actually applies to its output PCollection a Trigger whose behavior is different from the WindowingStrategy of its input PCollection.

In this case, the input of the first sum has a WindowingStrategy with trigger of AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 second), but the output WindowingStrategy is AfterSynchronizedProcessingTime.pastFirstElementInPane(), so the delay was ignored and not passed on to the second sum. This is caused by the getContinuationTrigger(List<Trigger> continuationTriggers) implementation in AfterProcessingTime, which always returns AfterSynchronizedProcessingTime.ofFirstElement() and ignores the TimestampTransform. Due to this, the second sum in this test does not honor the trigger delay.

Is this expected or is it a bug?

kennknowles commented 1 year ago

This is expected. The intent is that the triggering controls the first GBK, and later GBKs will trigger "as fast as reasonable". So synchronized processing time ensures that if someone set a trigger to be aligned to a minute that downstream outputs wait for all the upstream triggers to be fired.

kennknowles commented 1 year ago

It is a bit esoteric, but it is the way it is

kennknowles commented 1 year ago

So when I say "The delay should apply for each triggering" I mean two different trigger firings for the first GBK.