apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.76k stars 4.21k forks source link

[Bug]: ParDo#with_exception_handling(use_subprocess=True) doesn't work because of missing _context.process_instruction_id #25374

Open cozos opened 1 year ago

cozos commented 1 year ago

What happened?

Using ParDo#with_exception_handling(use_subprocess=True) seems to fail because it relies on _context.process_instruction_id which doesn't exist in the process.

Traceback (most recent call last):
 File "/data_gen_docker_pybinary.runfiles/python3_x86_64/lib/python3.7/multiprocessing/queues.py", line 236, in _feed
 obj = _ForkingPickler.dumps(obj)
 File "/data_gen_docker_pybinary.runfiles/python3_x86_64/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
 cls(buf, protocol).dump(obj)
 File "apache_beam/coders/coder_impl.py", line 1035, in apache_beam.coders.coder_impl._ConcatSequence.__reduce__
 File "apache_beam/coders/coder_impl.py", line 1025, in __iter__
 File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py", line 1184, in _lazy_iterator
 self._underlying.get_raw(state_key, continuation_token))
 File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py", line 986, in get_raw
 continuation_token=continuation_token)))
 File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py", line 1019, in _blocking_request
 req_future = self._request(request)
 File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py", line 1009, in _request
 request.instruction_id = self._context.process_instruction_id
AttributeError: \'_thread._local\' object has no attribute \'process_instruction_id\'
"""
', '
The above exception was the direct cause of the following exception:

', 'Traceback (most recent call last):
', ' File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/transforms/core.py", line 1895, in process
 result = self._fn.process(*args, **kwargs)
', ' File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/transforms/core.py", line 1933, in process
 return self._call_remote(self._remote_process, *args, **kwargs)
', ' File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/transforms/core.py", line 1948, in _call_remote
 return self._pool.submit(method, *args, **kwargs).result()
', ' File "/data_gen_docker_pybinary.runfiles/python3_x86_64/lib/python3.7/concurrent/futures/_base.py", line 432, in result
 return self.__get_result()
', ' File "/data_gen_docker_pybinary.runfiles/python3_x86_64/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
 raise self._exception
', ' File "/data_gen_docker_pybinary.runfiles/python3_x86_64/lib/python3.7/multiprocessing/queues.py", line 236, in _feed
 obj = _ForkingPickler.dumps(obj)
', ' File "/data_gen_docker_pybinary.runfiles/python3_x86_64/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
 cls(buf, protocol).dump(obj)
', ' File "apache_beam/coders/coder_impl.py", line 1035, in apache_beam.coders.coder_impl._ConcatSequence.__reduce__
', ' File "apache_beam/coders/coder_impl.py", line 1025, in __iter__
', ' File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py", line 1184, in _lazy_iterator
 self._underlying.get_raw(state_key, continuation_token))
', ' File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py", line 986, in get_raw
 continuation_token=continuation_token)))
', ' File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py", line 1019, in _blocking_request
 req_future = self._request(request)
', ' File "/data_gen_docker_pybinary.runfiles/cruise_ws/cruise/mlp/cfs/projects/trajectory_ranking/pipeline/data_gen_docker_pybinary_exedir/apache_beam/runners/worker/sdk_worker.py", line 1009, in _request
 request.instruction_id = self._context.process_instruction_id
', "AttributeError: '_thread._local' object has no attribute 'process_instruction_id'

I will come up with reproducible example

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

tvalentyn commented 1 year ago

Sounds like a bug.

I will come up with reproducible example

That would really help, thank you!

tvalentyn commented 1 year ago

The reason may be that the pipeline is using a side input (or state) and we don't plumb the required thread context to the underlying subprocess. If confirmed, this would be a gap in functionality currently not supported with use_subprocess.

cozos commented 1 year ago

I am having trouble to create a minimal reproducible example. We have a pretty custom build system with Bazel and also link C++ pybind extensions. Extracting this out of our build system to create a generic reproducible example is a lot of work and I don't have bandwidth at the moment - sorry.

cozos commented 1 year ago

I think that the use of ProcessPoolExecutor backed by fork is problematic for anything that uses threads/locks and things like gRPC (as fork copies gRPC client or threads in an inconsistent state) https://github.com/grpc/grpc/tree/master/examples/python/multiprocessing. Using spawn helped things.

ckchow commented 10 months ago

I think I might be being bit by this while using RunInference.with_exception_handling, even though in that code the default seems to be use_subprocess=False. I'm seeing occasional logs like the following from the google dataflow service:

Error processing instruction process_bundle-7777374602412410313-190. Original traceback is
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1423, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 839, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 932, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 439, in __getitem__
    self._cache[target_window] = self._side_input_data.view_fn(raw_view)
  File "/usr/local/lib/python3.8/site-packages/apache_beam/pvalue.py", line 393, in <lambda>
    lambda iterable: from_runtime_iterable(iterable, view_options))
  File "/usr/local/lib/python3.8/site-packages/apache_beam/pvalue.py", line 514, in _from_runtime_iterable
    head = list(itertools.islice(it, 2))
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 1251, in _lazy_iterator
    self._underlying.get_raw(state_key, continuation_token))
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 1060, in get_raw
    response = self._blocking_request(
  File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 1105, in _blocking_request
    raise RuntimeError(response.error)
RuntimeError: Instruction id was not registered.
tvalentyn commented 10 months ago

Thanks, @ckchow .

cc: @AnandInguva , also @damccorm FYI.

If we can get a somewhat reliable repro that would be a good start.