apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.78k stars 4.21k forks source link

[Bug]: test_sdf_with_check_done_failed throws ValueError in OffsetRestrictionTracker #23948

Open riteshghorse opened 1 year ago

riteshghorse commented 1 year ago

What happened?

In Python SDK, test_sdf_with_check_done_failed throws ValueError as

apache_beam/runners/portability/portable_runner_test.py::PortableRunnerOptimizedWithoutFusion::test_sdf_with_check_done_failed

File "/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python_Cron/src/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/io/restriction_trackers.py", line 93, in check_done
      raise ValueError(
  ValueError: OffsetRestrictionTracker is not done since work in range [0, 6) has not been claimed

Issue Priority

Priority: 2

Issue Component

Component: sdk-py-core

riteshghorse commented 1 year ago

Not an issue, confirmed

tvalentyn commented 10 months ago

saw this error again today:

apache_beam/runners/portability/portable_runner_test.py::PortableRunnerTest::test_sdf_with_check_done_failed
  /runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/_pytest/threadexception.py:73: PytestUnhandledThreadExceptionWarning: Exception in thread Thread-1078

  Traceback (most recent call last):
    File "/opt/hostedtoolcache/Python/3.9.18/x64/lib/python3.9/threading.py", line 980, in _bootstrap_inner
      self.run()
    File "/opt/hostedtoolcache/Python/3.9.18/x64/lib/python3.9/threading.py", line 917, in run
      self._target(*self._args, **self._kwargs)
    File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/portability/local_job_service.py", line 296, in _run_job
      self.result = self._invoke_runner()
    File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/portability/local_job_service.py", line 318, in _invoke_runner
      return fn_runner.FnApiRunner(
    File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 224, in run_via_runner_api
      return self.run_stages(stage_context, stages)
    File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 455, in run_stages
      bundle_results = self._execute_bundle(
    File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 783, in _execute_bundle
      self._run_bundle(
    File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1020, in _run_bundle
      result, splits = bundle_manager.process_bundle(
    File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1356, in process_bundle
      result_future = self._worker_handler.control_conn.push(process_bundle_req)
    File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 379, in push
      response = self.worker.do_instruction(request)
    File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 626, in do_instruction
      return getattr(self, request_type)(
    File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py", line 664, in process_bundle
      bundle_processor.process_bundle(instruction_id))
    File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1056, in process_bundle
      input_op_by_transform_id[element.transform_id].process_encoded(
    File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py", line 237, in process_encoded
      self.output(decoded_value)
    File "apache_beam/runners/worker/operations.py", line 570, in apache_beam.runners.worker.operations.Operation.output
      def output(self, windowed_value, output_index=0):
    File "apache_beam/runners/worker/operations.py", line 572, in apache_beam.runners.worker.operations.Operation.output
      _cast_to_receiver(self.receivers[output_index]).receive(windowed_value)
    File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
      def receive(self, windowed_value):
    File "apache_beam/runners/worker/operations.py", line 266, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
      self.consumer.process(windowed_value)
    File "apache_beam/runners/worker/operations.py", line 1067, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
      with self.scoped_process_state:
    File "apache_beam/runners/worker/operations.py", line 1076, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
      delayed_applications = self.dofn_runner.process_with_sized_restriction(
    File "apache_beam/runners/common.py", line 1467, in apache_beam.runners.common.DoFnRunner.process_with_sized_restriction
      return self.do_fn_invoker.invoke_process(
    File "apache_beam/runners/common.py", line 831, in apache_beam.runners.common.PerWindowInvoker.invoke_process
      residual = self._invoke_process_per_window(
    File "apache_beam/runners/common.py", line 1002, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
      self.threadsafe_restriction_tracker.check_done()
    File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/runners/sdf_utils.py", line 111, in check_done
      return self._restriction_tracker.check_done()
    File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py39/build/srcs/sdks/python/target/.tox-py39-cloud/py39-cloud/lib/python3.9/site-packages/apache_beam/io/restriction_trackers.py", line 93, in check_done
      raise ValueError(
  ValueError: OffsetRestrictionTracker is not done since work in range [0, 6) has not been claimed.

    warnings.warn(pytest.PytestUnhandledThreadExceptionWarning(msg))