apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.78k stars 4.21k forks source link

TypeError: Cannot convert GlobalWindow to apache_beam.utils.windowed_value._IntervalWindowBase #25014

Open nathancCG opened 1 year ago

nathancCG commented 1 year ago

Hi Team,

We recently went into this error: TypeError: Cannot convert GlobalWindow to apache_beam.utils.windowed_value._IntervalWindowBase

Full error log:

Error message from worker: generic::unknown: Traceback (most recent call last): File "apache_beam/runners/common.py", line 1458, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method File "apache_beam/runners/common.py", line 561, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle File "apache_beam/runners/common.py", line 566, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle File "apache_beam/runners/common.py", line 1747, in apache_beam.runners.common._OutputHandler.finish_bundle_outputs File "apache_beam/runners/worker/operations.py", line 237, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 239, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 198, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start File "apache_beam/runners/worker/opcounters.py", line 215, in apache_beam.runners.worker.opcounters.OperationCounters.update_from File "apache_beam/runners/worker/opcounters.py", line 267, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample File "apache_beam/coders/coder_impl.py", line 1472, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 1490, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 1222, in apache_beam.coders.coder_impl.SequenceCoderImpl.estimate_size File "apache_beam/coders/coder_impl.py", line 1238, in apache_beam.coders.coder_impl.SequenceCoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 208, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 829, in apache_beam.coders.coder_impl.IntervalWindowCoderImpl.estimate_size TypeError: Cannot convert GlobalWindow to apache_beam.utils.windowed_value._IntervalWindowBase

Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 287, in _execute response = task() File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 360, in <lambda> lambda: self.create_worker().do_instruction(request), request) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 597, in do_instruction getattr(request, request_type), request.instruction_id) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 634, in process_bundle bundle_processor.process_bundle(instruction_id)) File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1009, in process_bundle op.finish() File "apache_beam/runners/worker/operations.py", line 939, in apache_beam.runners.worker.operations.DoOperation.finish File "apache_beam/runners/worker/operations.py", line 942, in apache_beam.runners.worker.operations.DoOperation.finish File "apache_beam/runners/worker/operations.py", line 943, in apache_beam.runners.worker.operations.DoOperation.finish File "apache_beam/runners/common.py", line 1479, in apache_beam.runners.common.DoFnRunner.finish File "apache_beam/runners/common.py", line 1460, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method File "apache_beam/runners/common.py", line 1507, in apache_beam.runners.common.DoFnRunner._reraise_augmented File "apache_beam/runners/common.py", line 1458, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method File "apache_beam/runners/common.py", line 561, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle File "apache_beam/runners/common.py", line 566, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle File "apache_beam/runners/common.py", line 1747, in apache_beam.runners.common._OutputHandler.finish_bundle_outputs File "apache_beam/runners/worker/operations.py", line 237, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 239, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive File "apache_beam/runners/worker/operations.py", line 198, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start File "apache_beam/runners/worker/opcounters.py", line 215, in apache_beam.runners.worker.opcounters.OperationCounters.update_from File "apache_beam/runners/worker/opcounters.py", line 267, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample File "apache_beam/coders/coder_impl.py", line 1472, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 1490, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 1222, in apache_beam.coders.coder_impl.SequenceCoderImpl.estimate_size File "apache_beam/coders/coder_impl.py", line 1238, in apache_beam.coders.coder_impl.SequenceCoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 208, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables File "apache_beam/coders/coder_impl.py", line 829, in apache_beam.coders.coder_impl.IntervalWindowCoderImpl.estimate_size TypeError: Cannot convert GlobalWindow to apache_beam.utils.windowed_value._IntervalWindowBase

our pipeline looks something like this: ` pipeline | "Read PubSub Messages"

beam.io.ReadFromPubSub( subscription=options.input_subscription, with_attributes=False ) | f"Group Messages into {options.window_size} seconds window" beam.WindowInto( window.FixedWindows(options.window_size), allowed_lateness=Duration(seconds=1 60 60), ) # 1 hour | "Extract data from requests" >> beam.ParDo(ExtractDataFromRequest() `

This pipeline has been running successfully for a couple months and these errors only shown up recently and it seems nowhere to start debugging with. Any ideas?

Abacn commented 1 year ago

Could you please provide more details. which version of Beam are you running? Does the succeeded and failed job on same Beam version?

nathancCG commented 1 year ago

@Abacn Yes. Beam version: python sdk 2.43.0. Succeeded job and failed job both running on 2.43.0. After we restarted a new job with same code this error is not been seen yet.

Abacn commented 1 year ago

thanks for reporting. Sounds like this is intermittent which makes it hard to investigate. Will try to investigate.