apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.88k stars 4.27k forks source link

[Bug]: Python `AfterProcessingTime` behaves different than Java #23071

Closed InigoSJ closed 2 years ago

InigoSJ commented 2 years ago

What happened?

The Python trigger AfterProcessingTime behaves different than Java's AfterProcessingTime.pastFirstElementInPane().plusDelayOf.

While Java behaves as "wait X time since the first element to trigger", Python behaves similar to a Session Window, where the wait is since the previous element instead of the first element in pane:

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/trigger.py#L387

  def on_element(self, element, window, context):
    context.set_timer(
        '', TimeDomain.REAL_TIME, context.get_current_time() + self.delay)

You can see this in this example:

Python

def get_input_stream():
  stream = (
      TestStream().add_elements([
          TimestampedValue("1", timestamp=0)
      ])
      .advance_watermark_to(1.5)
      .advance_processing_time(1.5)
      .add_elements([
          TimestampedValue("2", timestamp=1)
      ])
      .advance_watermark_to(2.5)
      .advance_processing_time(1) # Running Processing time 2.5 (it should trigger now)
      .add_elements([
          TimestampedValue("3", timestamp=3)
      ])
      .advance_watermark_to(3)
      .advance_processing_time(0.5) # Running Processing time 3
      .add_elements([
          TimestampedValue("4", timestamp=4)
      ])
      .advance_watermark_to(5)
      .advance_processing_time(2) # Running Processing time 5, it triggers now since it's 2s since last element
      .add_elements([
          TimestampedValue("5", timestamp=4)
      ])
      .advance_watermark_to(6)
      .advance_processing_time(1.5)
      .add_elements([
          TimestampedValue("6", timestamp=8)
      ])
      .advance_watermark_to(9)
      .advance_processing_time(1.5)
      .add_elements([
          TimestampedValue("7", timestamp=7),
      ])
      .advance_watermark_to(10)
      .advance_processing_time(1.5)
      .add_elements([
          TimestampedValue("8", timestamp=8),
      ])
      .advance_watermark_to(11)
      .advance_processing_time(1.5)
      .add_elements([
          TimestampedValue("9", timestamp=7),
      ])
      .advance_watermark_to(12)
      .advance_processing_time(1.5) # Running Processing time 11.5, not triggering since 5
      .add_elements([
          TimestampedValue("10", timestamp=9),
      ])
      .advance_processing_time(2)
      .advance_watermark_to_infinity()
  )
  return stream

options = PipelineOptions(streaming=True)

p = TestPipeline(options=options)

window_size_seconds = 10
window_allowed_lateness_seconds = 5
count_pass = 3
delay = 2

stream = get_input_stream()

(p | stream
   | WindowInto(
      FixedWindows(size=window_size_seconds),
      allowed_lateness=window_allowed_lateness_seconds,
      accumulation_mode=trigger.AccumulationMode.DISCARDING,
      trigger=trigger.Repeatedly(trigger.AfterProcessingTime(delay))
  )
 | Map(lambda e: ("key", e))
 | GroupByKey()
 | Map(print)
)

p.run()

Java

Pipeline p = Pipeline.create(options);

        Integer windowLength = 10;
        Integer allowLateSize = 5;
        Integer delay = 2;

        TestStream<String> streamEvents = TestStream.create(StringUtf8Coder.of())
                .addElements(
                        TimestampedValue.of("1", new Instant(0))
                )
                .advanceWatermarkTo(new Instant(1500))
                .advanceProcessingTime(Duration.millis(1500))
                .addElements(
                        TimestampedValue.of("2", new Instant(1000))
                )
                .advanceWatermarkTo(new Instant(2500))
                .advanceProcessingTime(Duration.millis(1000))  // Running Processing time 2.5, it triggers here
                .addElements(
                        TimestampedValue.of("3", new Instant(3000))
                )
                .advanceWatermarkTo(new Instant(3000))
                .advanceProcessingTime(Duration.millis(1500))
                .addElements(
                        TimestampedValue.of("4", new Instant(4000))
                )
                .advanceWatermarkTo(new Instant(5000))
                .advanceProcessingTime(Duration.standardSeconds(2))
                .addElements(
                        TimestampedValue.of("5", new Instant(4000))
                )
                .advanceWatermarkTo(new Instant(6000))
                .advanceProcessingTime(Duration.millis(1500))
                .addElements(
                        TimestampedValue.of("6", new Instant(8000))
                        )
                .advanceWatermarkTo(new Instant(9000))
                .advanceProcessingTime(Duration.millis(1500))
                .addElements(
                        TimestampedValue.of("7", new Instant(7000))
                )
                .advanceWatermarkTo(new Instant(10000))
                .advanceProcessingTime(Duration.millis(1500))
                .addElements(
                        TimestampedValue.of("8", new Instant(8000))

                )
                .advanceWatermarkTo(new Instant(11000))
                .advanceProcessingTime(Duration.millis(1500))
                .addElements(
                        TimestampedValue.of("9", new Instant(7000))

                )
                .advanceWatermarkTo(new Instant(12000))
                .advanceProcessingTime(Duration.millis(1500))
                .addElements(
                        TimestampedValue.of("10", new Instant(9000))
                )
                .advanceWatermarkToInfinity();

        p.apply(streamEvents)
                .apply("KVs", ParDo.of(new DoFn<String, KV<String, String>>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) {
                        c.output(KV.of("key", c.element()));
                    }
                }))
                .apply(Window.<KV<String, String>>into(
                        FixedWindows.of(Duration.standardSeconds(windowLength)))
                            .withAllowedLateness(Duration.standardSeconds(allowLateSize))
                            .triggering(
                                Repeatedly.forever(
                                        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(delay)))
                                )
                        .discardingFiredPanes()
                )
                .apply(GroupByKey.create())
                .apply("Log", ParDo.of(new DoFn<KV<String, Iterable<String>>, String>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) {
                        LOG.info("\n TRIGGER " + c.element().getValue().toString());
                        c.output(c.pane().toString());
                    }
                }));

        p.run();

The output in Python is two panes ['1', '2', '3', '4'], ['5', '6', '7', '8', '9', '10'] and Java is the "right" output ['1', '2'], ['3', '4'], ['5', '6'], ['7', '8'], ['9', '10'].


The fix doesn't seem hard (worse thing to say ever), but given that users may be using this trigger already, I am not sure how to proceed.

Issue Priority

Priority: 2

Issue Component

Component: sdk-py-core

InigoSJ commented 2 years ago

I have made a fix for this, changing it to how Java works, will submit the PR tomorrow

TheNeuralBit commented 2 years ago

Is there a possibility this will be a breaking change for people that rely on the old behavior?

pabloem commented 2 years ago

hm there is. However this code change only affects DirectRunner pipelines. For other runners, the trigger code is executed by the runners....

InigoSJ commented 2 years ago

Just for clarification, I tested the trigger in Dataflow (both Legacy and UW) and it works as in Java (as it should).

I am not aware how this works in other runners, but I think users would expect it to work as in Java and how it's explained in the documentation.

https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.trigger.html?#apache_beam.transforms.trigger.AfterProcessingTime

Do you have any suggestion on how to approach this @TheNeuralBit ? Maybe adding notes in the release?

Let me know if you want to discuss this offline

TheNeuralBit commented 2 years ago

Just for clarification, I tested the trigger in Dataflow (both Legacy and UW) and it works as in Java (as it should).

Do you mean it worked as in Java on Dataflow, even before your change?

I am not aware how this works in other runners, but I think users would expect it to work as in Java and how it's explained in the documentation.

Yes but users sometimes come to rely on buggy behavior :) (obligatory xkcd: https://xkcd.com/1172/)

Do you have any suggestion on how to approach this @TheNeuralBit ? Maybe adding notes in the release?

I don't think I have enough context here to advise on a specific action. If the change really is just affecting the DirectRunner (non-production) I think it makes sense to just mention this in the release notes. If there's a possibility it will affect production pipelines on Dataflow or other runners we might consider a deprecation cycle (add a warning that behavior will change in a future release, possibly with a new trigger option that preserves existing behavior).

InigoSJ commented 2 years ago

Do you mean it worked as in Java on Dataflow, even before your change?

Yes, it works as Java on Dataflow (both Legacy and UW)

I don't think I have enough context here to advise on a specific action. If the change really is just affecting the DirectRunner (non-production) I think it makes sense to just mention this in the release notes. If there's a possibility it will affect production pipelines on Dataflow or other runners we might consider a deprecation cycle (add a warning that behavior will change in a future release, possibly with a new trigger option that preserves existing behavior).

I don't have the capacity to test on other runners besides DF and DirectRunner, so not sure how to proceed.

To be fair, the documentation marks the trigger as experimental. Considering the description, how Java works and what most user would actually want (*) and this being experimental, I think adding this change will help more than can hinder users. I'd say it's worse if a customer relies on it working as described but it doesn't, than a customer relying on something buggy.

(*) As of now (with no fix), the trigger just works as a session window, which doesn't really add value to the poll of triggers, while having something as "wait X time and then trigger" does.

Of course, this is just my personal opinion and I'm nobody to decide what to do. Let me know what you want to do and i'd do my best :D

TheNeuralBit commented 2 years ago

Got it, maybe let's just record it as a bugfix in CHANGES.md

InigoSJ commented 2 years ago

Created this PR

https://github.com/apache/beam/pull/23297