apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.72k stars 4.21k forks source link

[Bug]: Pickling error in save_main_session with STORAGE_WRITE_API in Dataflow Python pipeline #31587

Open baeminbo opened 1 month ago

baeminbo commented 1 month ago

What happened?

WriteToBigQuery with STORAGE_WRITE_API method can cause a pickling error [1] if

  1. The WriteToBigQuery step is applied after a multi-output step
  2. And, the pipeline variable is in "main" scope.
  3. And, the --save_main_session is True.

See this example of code and run script to reproduce this error.

There are 3 mitigation ways:

  1. Apply WriteToBigQuery after a single-output step (example).
  2. Or, define the pipeline graph inside a function (example).
  3. Or, use --pickle_library=cloudpickle.

[1]

  File "/Users/baeminbo/.pyenv/versions/3.11.4/lib/python3.11/pickle.py", line 972, in save_dict
    self._batch_setitems(obj.items())
  File "/Users/baeminbo/.pyenv/versions/3.11.4/lib/python3.11/pickle.py", line 998, in _batch_setitems
    save(v)
  File "/Users/baeminbo/.pyenv/versions/3.11.4/lib/python3.11/pickle.py", line 578, in save
    rv = reduce(self.proto)
         ^^^^^^^^^^^^^^^^^^
  File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__

See the full output at https://gist.github.com/baeminbo/bd23df65e5604cf24213c2e1d6a46a25

Issue Priority

Priority: 3 (minor)

Issue Components

tvalentyn commented 1 month ago

Thanks @baeminbo for a detailed repro.

Eventually, we plan to swtich to cloudpickle pickler, which doesn't require saving the main session.

Structuring a pipeline as a package is the best way to avoid having to pass --save_main_session and can also help provide better structure for complex pipelines. A few examples:

https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/#multiple-file-dependencies https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/dataflow/flex-templates/pipeline_with_dependencies/main.py