apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
Apache License 2.0
7.7k stars 4.2k forks source link

[Feature Request]: Add `with_exception_handling()` for PTransforms in Python #31193

Open jd185367 opened 2 months ago

jd185367 commented 2 months ago

What would you like to happen?

Add a way to handle uncaught runtime exceptions thrown within a transform to the Python SDK, e.g. something like this with_exception_handling() method:

def log_errors(error_item: tuple):
    item, error_info = error_item
    logging.error(f"Failed to save item: {item}")

_, errors = items | MyTransform().with_exception_handling()

This already exists for DoFns in DoFn.with_exception_handling, and the Java SDK appears to offer something similar for PTransforms: https://beam.apache.org/releases/javadoc/2.15.0/index.html?org/apache/beam/sdk/transforms/WithFailures.html


Google Cloud Dataflow will automatically re-try failed messages in streaming jobs; however, in the case of messages that cause runtime errors due to bad data/etc., this can cause messages to be retried infinitely and block other messages from being processed. The only fix we've found is to drain and re-start the pipeline to flush the bad messages, which is manual and risks losing data. There's no way to set a maximum number of retries per message. While we try to parse + validate messages up-front as much as possible, bugs have slipped through to production and caused runtime errors (and obviously, we can't prevent 100% of bugs).

Being able to add a top-level error handler to the pipeline (or a root transform) would solve this, since in a worst-case scenario we could catch any failed messages/collections, log them, and not block the rest of the pipeline.

Right now, though, adding a top-level exception handler isn't possible. For instance, this example will not catch the raised error in Apache Beam 2.56.0, which is very unintuitive:

import logging
import apache_beam as beam

class BuggedTransform(beam.PTransform):
    def expand(self, messages: beam.PCollection) -> beam.PCollection:
        return messages | "Call function w/ bug" >> beam.Map(self.raise_error)

    def raise_error(self, m: str):
        if m == "bar":
            raise ValueError("This should be caught")
        return m

class MyTransform(beam.PTransform):
    def expand(self, messages: beam.PCollection) -> beam.PCollection:
            return messages | "Run transform w/ bug" >> BuggedTransform()
        except Exception as e:      # This should catch the error, but doesn't!
            logging.error(f"Error happened {e}")
            return messages

with beam.Pipeline() as pipeline:
    result = (
        | "Create example data" >> beam.Create(["foo", "bar", "baz"])
        | "Apply my transform" >> MyTransform()


ERROR:apache_beam.runners.common:This should be caught [while running 'Apply my transform/Run transform w/ bug/Call function w/ bug']
Traceback (most recent call last):
  File "apache_beam\runners\common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 640, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\transforms\core.py", line 1969, in <lambda>
    wrapper = lambda x: [fn(x)]
  File ".\beam_exception_example.py", line 12, in raise_error
    raise ValueError("This should be caught")
ValueError: This should be caught
Traceback (most recent call last):
  File "apache_beam\runners\common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 640, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\transforms\core.py", line 1969, in <lambda>
    wrapper = lambda x: [fn(x)]
  File ".\beam_exception_example.py", line 12, in raise_error
    raise ValueError("This should be caught")
ValueError: This should be caught

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File ".\beam_exception_example.py", line 27, in <module>
    result = (
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\pipeline.py", line 613, in __exit__
    self.result = self.run()
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\pipeline.py", line 587, in run
    return self.runner.run_pipeline(self, self._options)
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\direct\direct_runner.py", line 128, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 204, in run_pipeline
    self._latest_run_result = self.run_via_runner_api(
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 228, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 483, in run_stages
    bundle_results = self._execute_bundle(
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 811, in _execute_bundle
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 1048, in _run_bundle
    result, splits = bundle_manager.process_bundle(
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\fn_runner.py", line 1384, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\portability\fn_api_runner\worker_handlers.py", line 384, in push
    response = self.worker.do_instruction(request)
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 656, in do_instruction
    return getattr(self, request_type)(
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\worker\sdk_worker.py", line 694, in process_bundle
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 1113, in process_bundle
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\runners\worker\bundle_processor.py", line 237, in process_encoded
  File "apache_beam\runners\worker\operations.py", line 569, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam\runners\worker\operations.py", line 571, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam\runners\worker\operations.py", line 262, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam\runners\worker\operations.py", line 265, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam\runners\worker\operations.py", line 952, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\worker\operations.py", line 953, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\common.py", line 1437, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 1526, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam\runners\common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 639, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam\runners\common.py", line 1621, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam\runners\common.py", line 1734, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam\runners\worker\operations.py", line 265, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam\runners\worker\operations.py", line 952, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\worker\operations.py", line 953, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\common.py", line 1437, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 1526, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam\runners\common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 639, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam\runners\common.py", line 1621, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam\runners\common.py", line 1734, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam\runners\worker\operations.py", line 265, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam\runners\worker\operations.py", line 952, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\worker\operations.py", line 953, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\common.py", line 1437, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 1526, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam\runners\common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 639, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam\runners\common.py", line 1621, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam\runners\common.py", line 1734, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam\runners\worker\operations.py", line 265, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam\runners\worker\operations.py", line 952, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\worker\operations.py", line 953, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam\runners\common.py", line 1437, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 1547, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam\runners\common.py", line 1435, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam\runners\common.py", line 640, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "C:\Users\REDACTED\.venv\lib\site-packages\apache_beam\transforms\core.py", line 1969, in <lambda>
    wrapper = lambda x: [fn(x)]
  File ".\beam_exception_example.py", line 12, in raise_error
    raise ValueError("This should be caught")
ValueError: This should be caught [while running 'Apply my transform/Run transform w/ bug/Call function w/ bug']

The only solution we've found is to add this sort of error handling separately to every pipeline step, which isn't maintainable (e.g. if we have hundreds of DoFns, adding try-except blocks to all of them individually is labor-intensive).

Related Issues

Issue Priority

Priority: 2 (default / most feature requests should be filed as P2)

Issue Components

liferoad commented 2 months ago

https://github.com/apache/beam/issues/24209 solved this for RunInference.

jd185367 commented 2 months ago

@liferoad while that's helpful as a pattern to look at, I don't think that solves the general issue of catching exceptions in transforms for 2 reasons:

  1. That error-wrapping only applies to the RunInference transform, specifically.
  2. The error wrapping was possible for RunInference since most of its work was just calling a single DoFn, so it could use the existing DoFn.with_exception_handling() method. For transforms that call other transforms, this pattern isn't possible unless every single called transform implements this pattern (which'd require all those transforms to catch their errors this way, etc.) - which basically boils down to forcing every transform to implement its own exception-handling. That doesn't give the option of just adding a top-level error-handler (like my suggestion), which'd be more maintainable IMO.
liferoad commented 2 months ago

@liferoad while that's helpful as a pattern to look at, I don't think that solves the general issue of catching exceptions in transforms for 2 reasons:

  1. That error-wrapping only applies to the RunInference transform, specifically.
  2. The error wrapping was possible for RunInference since most of its work was just calling a single DoFn, so it could use the existing DoFn.with_exception_handling() method. For transforms that call other transforms, this pattern isn't possible unless every single called transform implements this pattern (which'd require all those transforms to catch their errors this way, etc.) - which basically boils down to forcing every transform to implement its own exception-handling. That doesn't give the option of just adding a top-level error-handler (like my suggestion), which'd be more maintainable IMO.

I agree with what you said. I just want to list the current implementations to solve the error handling. And https://github.com/apache/beam/pull/29164 introduces withBadRecordHandler for Java to handle IO transforms.