apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.81k stars 4.23k forks source link

[Bug]: unexpected duplicate outputs triggered by quota exceeded exception in finish_bundle() #27156

Open fathom-zhou opened 1 year ago

fathom-zhou commented 1 year ago

What happened?

Beam version: 2.46.0 Language: Python Runner: Dataflow

I am encountering an issue with my BEAM pipeline, which involves performing transformations on a set of documents and saving them into TFRecord. Within the pipeline, I have incorporated a DoFn that logs the transformation info using Stackdriver logging. However, when a quota exceeded exception is raised in the finish_bundle phase of DoFn, the pipeline doesn't fail and some documents end up being saved multiple times, leading to unexpected duplication. This behaviour may be a bug in BEAM or Dataflow.

My current solution is add retry in the finish_bundle whenever quota issue is encountered and this successfully prevents the duplicated docs.

Due to the proprietary restrictions, I am not able to share the source code here. Please let me know if you need more details about this issue.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

tvalentyn commented 1 year ago

it sounds like your process and/or finish_bundle methods may be not idempotent. Idempotency is an important consideration when writing IO code. Are you using TFRecordIO by chance? If not: where is saving the document part is happening? https://github.com/apache/beam/blob/ca674aad086e4d1a40e8c01598073030a22484fa/sdks/python/apache_beam/io/tfrecordio.py

tvalentyn commented 1 year ago

Friendly reminder re: the above question. Thanks!

kvudata commented 1 year ago

Yes, we are using TFRecordIO (specifically beam.io.WriteToTFRecord).

it sounds like your process and/or finish_bundle methods may be not idempotent. Idempotency is an important consideration when writing IO code.

Note that we're only logging to Stackdriver aka Google Cloud Logging in our DoFn (we gather logs in process() and then log the batch in finish_bundle()), and we're ok with duplicate logs being generated. We're observing that Dataflow seems to perform some kind of retry if our finish_bundle() fails, and the later WriteToTFRecord (which doesn't depend on this DoFn in our pipeline) ends up writing duplicates.

Another interesting observation is that the metrics for the number of items processed / written in the Dataflow UI is the number of elements we would expect if duplicates were not being written - the duplicates are only apparent from inspecting the output tfrecord(s).

tvalentyn commented 9 months ago

The behavior you are describing is surprising. A failure in finish_bundle results in bundle retry, but that should not result in elements written twice in the IO step. I do not reproduce the behavior described here. I tried running the following pipeline on Dataflow:

import argparse
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.testing.synthetic_pipeline import SyntheticSource

import logging
_LOGGER = logging.getLogger(__name__)
tf_out_path = 'gs://clouddfe-valentyn/finish_repro_10m'

def tf_record_finish_bundle(p):
  class SometimesFailFn(beam.DoFn):
    def start_bundle(self):
      global bundle_counter
      _LOGGER.warning("Start processing bundle")

    def finish_bundle(self):
      from random import random
      if random() < 0.5:
        raise RuntimeError(f"Intentionally failing finish_bundle.")

      _LOGGER.warning("Finished processing bundle")

    def process(self, element):
      yield element[1]

  (
    p
    | beam.io.Read(
              SyntheticSource({
                  "numRecords": 1000000, "keySizeBytes": 1, "valueSizeBytes": 1
              }))

   | beam.Reshuffle()
   | beam.ParDo(SometimesFailFn())
   | beam.io.tfrecordio.WriteToTFRecord(tf_out_path)
  )

def verify_tfrecord_output(p):
  (p
   | beam.io.tfrecordio.ReadFromTFRecord(tf_out_path+"*")
   | beam.combiners.Count.Globally()
   | "LogCount" >> beam.LogElements(prefix="TotalCount ", level=logging.WARNING)
  )

def run(argv=None):
  parser = argparse.ArgumentParser()
  known_args, pipeline_args = parser.parse_known_args(argv)
  print(known_args)
  print(pipeline_args)

  with beam.Pipeline(argv=pipeline_args) as p:
    tf_record_finish_bundle(p)

  with beam.Pipeline(argv=pipeline_args) as p:
    verify_tfrecord_output(p)

I verified that there was at least 1 "Intentionally failing finish_bundle" error, but the total number of elements written by TFRecordIO was as expected.

...
INFO:apache_beam.runners.dataflow.internal.apiclient:Submitted job: 2023-12-08_23_25_33-8586414311134949410
INFO:apache_beam.runners.dataflow.internal.apiclient:To access the Dataflow monitoring console, please navigate to https://console.cloud.google.com/dataflow/jobs/us-central1/2023-12-08_23_25_33-8586414311134949410?project=google.com:clouddfe

...

ERROR:apache_beam.runners.dataflow.dataflow_runner:2023-12-09T07:30:14.196Z: JOB_MESSAGE_ERROR: SDK harness sdk-0-0 disconnected. This usually means that the process running the pipeline code has crashed. Inspect the Worker Logs and the Diagnostics tab to determine the cause of the crash.
ERROR:apache_beam.runners.dataflow.dataflow_runner:2023-12-09T07:30:23.465Z: JOB_MESSAGE_ERROR: SDK harness sdk-0-0 disconnected. This usually means that the process running the pipeline code has crashed. Inspect the Worker Logs and the Diagnostics tab to determine the cause of the crash.
ERROR:apache_beam.runners.dataflow.dataflow_runner:2023-12-09T07:30:32.533Z: JOB_MESSAGE_ERROR: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1493, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 574, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "apache_beam/runners/common.py", line 580, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "/home/valentyn/projects/pipes/simple.py", line 23, in finish_bundle
RuntimeError: Intentionally failing finish_bundle.

...

INFO:apache_beam.runners.dataflow.dataflow_runner:Job 2023-12-08_23_25_33-8586414311134949410 is in state JOB_STATE_DONE

...

Count outputs with direct runner: 
...

INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x7f76fed13ca0> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
INFO:apache_beam.io.gcp.gcsio:Finished listing 5 files in 0.27153849601745605 seconds.
WARNING:root:TotalCount 1000000
tvalentyn commented 9 months ago

Would you be able to provide a repro that demonstrates that duplicate elements are written?