apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.8k stars 4.22k forks source link

[Flaky]: apache_beam/runners/portability/portable_runner_test.py::PortableRunnerTestWithSubprocesses::test_pardo_state_with_custom_key_coder #24313

Open BjornPrime opened 1 year ago

BjornPrime commented 1 year ago

What happened?

Transient failure encountered when running post-commit tests in GitHub Actions. See output below.

self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_pardo_state_with_custom_key_coder>

    def test_pardo_state_with_custom_key_coder(self):
      """Tests that state requests work correctly when the key coder is an
      SDK-specific coder, i.e. non standard coder. This is additionally enforced
      by Java's ProcessBundleDescriptorsTest and by Flink's
      ExecutableStageDoFnOperator which detects invalid encoding by checking for
      the correct key group of the encoded key."""
      index_state_spec = userstate.CombiningValueStateSpec('index', sum)

      # Test params
      # Ensure decent amount of elements to serve all partitions
      n = 200
      duplicates = 1

      split = n // (duplicates + 1)
      inputs = [(i % split, str(i % split)) for i in range(0, n)]

      # Use a DoFn which has to use FastPrimitivesCoder because the type cannot
      # be inferred
      class Input(beam.DoFn):
        def process(self, impulse):
          for i in inputs:
            yield i

      class AddIndex(beam.DoFn):
        def process(self, kv, index=beam.DoFn.StateParam(index_state_spec)):
          k, v = kv
          index.add(1)
          yield k, v, index.read()

      expected = [(i % split, str(i % split), i // split + 1)
                  for i in range(0, n)]

      with self.create_pipeline() as p:
        assert_that(
            p
            | beam.Impulse()
            | beam.ParDo(Input())
            | beam.ParDo(AddIndex()),
>           equal_to(expected))
INFO:apache_beam.runners.portability.local_job_service:Worker: severity: INFO timestamp {   seconds: 1667837893   nanos: 328462600 } message: "Pipeline_options: {\'job_name\': \'test_pardo_state_with_custom_key_coder_1667837883.477778\', \'gcp_oauth_scopes\': [\'[https://www.googleapis.com/auth/bigquery\](https://www.googleapis.com/auth/bigquery/)', \'[https://www.googleapis.com/auth/cloud-platform\](https://www.googleapis.com/auth/cloud-platform/)', \'[https://www.googleapis.com/auth/devstorage.full_control\](https://www.googleapis.com/auth/devstorage.full_control/)', \'[https://www.googleapis.com/auth/userinfo.email\](https://www.googleapis.com/auth/userinfo.email/)', \'[https://www.googleapis.com/auth/datastore\](https://www.googleapis.com/auth/datastore/)', \'[https://www.googleapis.com/auth/spanner.admin\](https://www.googleapis.com/auth/spanner.admin/)', \'[https://www.googleapis.com/auth/spanner.data\](https://www.googleapis.com/auth/spanner.data/)', \'[https://www.googleapis.com/auth/bigquery\](https://www.googleapis.com/auth/bigquery/)', \'[https://www.googleapis.com/auth/cloud-platform\](https://www.googleapis.com/auth/cloud-platform/)', \'[https://www.googleapis.com/auth/devstorage.full_control\](https://www.googleapis.com/auth/devstorage.full_control/)', \'[https://www.googleapis.com/auth/userinfo.email\](https://www.googleapis.com/auth/userinfo.email/)', \'[https://www.googleapis.com/auth/datastore\](https://www.googleapis.com/auth/datastore/)', \'[https://www.googleapis.com/auth/spanner.admin\](https://www.googleapis.com/auth/spanner.admin/)', \'[https://www.googleapis.com/auth/spanner.data\](https://www.googleapis.com/auth/spanner.data/)'], \'experiments\': [\'state_cache_size=100\', \'data_buffer_time_limit_ms=1000\', \'beam_fn_api\'], \'sdk_location\': \'container\', \'job_endpoint\': \'localhost:63832\', \'environment_type\': \'beam:env:harness_subprocess_python:v1\', \'environment_config\': \'D:\\\\a\\\\beam\\\\beam\\\\sdks\\\\python\\\\target\\\\.tox\\\\py37-win\\\\Scripts\\\\python.exe -m apache_beam.runners.worker.sdk_worker_main\', \'sdk_worker_parallelism\': \'1\', \'environment_cache_millis\': \'0\'}" log_location: "D:\\a\\beam\\beam\\sdks\\python\\apache_beam\\runners\\worker\\sdk_worker_main.py:128" thread: "MainThread"
FAILED apache_beam/runners/portability/portable_runner_test.py::PortableRunnerTestWithSubprocesses::test_pardo_state_with_custom_key_coder

Issue Failure

Failure: Test is flaky

Issue Priority

Priority: 1

Issue Component

Component: sdk-py-core

BjornPrime commented 1 year ago

Full job output can be found here: https://github.com/apache/beam/actions/runs/3411967870/jobs/5676856627

Also see fuller output on the test failure, which was truncated in the initial post:

================================== FAILURES ===================================
__ PortableRunnerTestWithSubprocesses.test_pardo_state_with_custom_key_coder __
[gw3] win32 -- Python 3.7.9 D:\a\beam\beam\sdks\python\target\.tox\py37-win\Scripts\python.exe

self = <apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses testMethod=test_pardo_state_with_custom_key_coder>

    def test_pardo_state_with_custom_key_coder(self):
      """Tests that state requests work correctly when the key coder is an
      SDK-specific coder, i.e. non standard coder. This is additionally enforced
      by Java's ProcessBundleDescriptorsTest and by Flink's
      ExecutableStageDoFnOperator which detects invalid encoding by checking for
      the correct key group of the encoded key."""
      index_state_spec = userstate.CombiningValueStateSpec('index', sum)

      # Test params
      # Ensure decent amount of elements to serve all partitions
      n = 200
      duplicates = 1

      split = n // (duplicates + 1)
      inputs = [(i % split, str(i % split)) for i in range(0, n)]

      # Use a DoFn which has to use FastPrimitivesCoder because the type cannot
      # be inferred
      class Input(beam.DoFn):
        def process(self, impulse):
          for i in inputs:
            yield i

      class AddIndex(beam.DoFn):
        def process(self, kv, index=beam.DoFn.StateParam(index_state_spec)):
          k, v = kv
          index.add(1)
          yield k, v, index.read()

      expected = [(i % split, str(i % split), i // split + 1)
                  for i in range(0, n)]

      with self.create_pipeline() as p:
        assert_that(
            p
            | beam.Impulse()
            | beam.ParDo(Input())
            | beam.ParDo(AddIndex()),
>           equal_to(expected))

apache_beam\runners\portability\portable_runner_test.py:207: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
apache_beam\pipeline.py:598: in __exit__
    self.result.wait_until_finish()
apache_beam\runners\portability\portable_runner.py:607: in wait_until_finish
    raise self._runtime_exception
apache_beam\runners\portability\portable_runner.py:613: in _observe_state
    for state_response in self._state_stream:
target\.tox\py37-win\lib\site-packages\grpc\_channel.py:[426](https://github.com/apache/beam/actions/runs/3411967870/jobs/5676856627#step:7:427): in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_MultiThreadedRendezvous of RPC that terminated with:
    status = StatusCode.DEADLINE_EXCEEDED
    details = "Deadline Exc...%5B::1%5D:63832 {created_time:"2022-11-07T16:19:05.79894806+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>

    def _next(self):
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(self._state,
                                               self._response_deserializer)
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler)
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self

            def _response_ready():
                return (self._state.response is not None or
                        (cygrpc.OperationType.receive_message
                         not in self._state.due and
                         self._state.code is not None))

            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                       status = StatusCode.DEADLINE_EXCEEDED
E                       details = "Deadline Exceeded"
E                       debug_error_string = "UNKNOWN:Error received from peer ipv6:%5B::1%5D:63832 {created_time:"2022-11-07T16:19:05.7989[480](https://github.com/apache/beam/actions/runs/3411967870/jobs/5676856627#step:7:481)6+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
E                   >

target\.tox\py37-win\lib\site-packages\grpc\_channel.py:826: _MultiThreadedRendezvous
---------------------------- Captured stderr call -----------------------------