apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.81k stars 4.23k forks source link

[Bug]: `beam.transforms.util.LogElements(with_timestamp=True, with_window=True)` does not work with GlobalWindows #27036

Open liferoad opened 1 year ago

liferoad commented 1 year ago

What happened?

With this test code,

import time

import apache_beam as beam
from apache_beam.transforms import trigger, window
from apache_beam.transforms.periodicsequence import PeriodicImpulse
from apache_beam.utils.timestamp import Timestamp

with beam.Pipeline() as p:
    input = (
        p
        | PeriodicImpulse(
            start_timestamp=time.time(),
            stop_timestamp=time.time() + 6,
            fire_interval=1,
            apply_windowing=False,
        )
        | beam.Map(lambda x: "test")
        | beam.transforms.util.LogElements()
        | beam.WindowInto(
            window.GlobalWindows(),
            trigger=trigger.Repeatedly(trigger.AfterCount(1)),
            accumulation_mode=trigger.AccumulationMode.DISCARDING,
        )
        | beam.combiners.Count.Globally()
        | "Print Windows"
        >> beam.transforms.util.LogElements(with_timestamp=True, with_window=True)
    )

it crashes with:

File ~/PlayGround/venv-beam-2-43-0/lib/python3.9/site-packages/apache_beam/utils/timestamp.py:163, in Timestamp.to_utc_datetime(self)
    159 def to_utc_datetime(self):
    160   # type: () -> datetime.datetime
    161   # We can't easily construct a datetime object from microseconds, so we
    162   # create one at the epoch and add an appropriate timedelta interval.
--> 163   return self._epoch_datetime_utc().replace(tzinfo=None) + datetime.timedelta(
    164       microseconds=self.micros)

OverflowError: date value out of range [while running '[4]: Print Windows/ParDo(_LoggingFn)']

Removing with_timestamp=True, with_window=True from LogElements works.

Issue Priority

Priority: 3 (minor)

Issue Components

liferoad commented 1 year ago

.add-labels 'good first issue'

github-actions[bot] commented 1 year ago

Label "good cannot be managed because it does not exist in the repo. Please check your spelling.

TristanCrudge commented 1 year ago

Is this not expected behaviour? The Global window advances to infinity and the resulting timestamp value voids rfc3339 compliance.

liferoad commented 1 year ago

trigger=trigger.Repeatedly(trigger.AfterCount(1)),, does this trigger a new window? If so, for this window, the timestamps should be finite.

TristanCrudge commented 1 year ago

I see your point, but the global is a single pane. Everything emitted from the trigger belongs to the one window.

liferoad commented 1 year ago

Well, what you said is also true. I think at least, we should report a better error message instead of crashing.