apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.7k stars 4.2k forks source link

`beam.CombineValues` on DataFlow runner causes ambiguous failure with python SDK #21432

Open damccorm opened 2 years ago

damccorm commented 2 years ago

 

The following beam pipeline works correctly using DirectRunner but fails with a very vague error when using DataflowRunner.


(    
pipeline    
| beam.io.ReadFromPubSub(input_topic, with_attributes=True)    
| beam.Map(pubsub_message_to_row)

| beam.WindowInto(beam.transforms.window.FixedWindows(5))    
| beam.GroupBy(<beam.Row col name>)

| beam.CombineValues(<instance of beam.CombineFn subclass>)    
| beam.Values()  
| beam.io.gcp.bigquery.WriteToBigQuery(
. . . )
)

Stacktrace:


Traceback (most recent call last):
  File "src/read_quality_pipeline/__init__.py", line 128, in <module>

   (
  File "/home/pkg_dev/.cache/pypoetry/virtualenvs/apache-beam-poc-5nxBvN9R-py3.8/lib/python3.8/site-packages/apache_beam/pipeline.py",
line 597, in __exit__
    self.result.wait_until_finish()
  File "/home/pkg_dev/.cache/pypoetry/virtualenvs/apache-beam-poc-5nxBvN9R-py3.8/lib/python3.8/site-packages/apache_beam/runners/dataflow/dataflow_runner.py",
line 1633, in wait_until_finish
    raise DataflowRuntimeException(
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException:
Dataflow pipeline failed. State: FAILED, Error:
Error processing pipeline. 

Log output:


2022-02-01T16:54:43.645Z: JOB_MESSAGE_WARNING: Autoscaling is enabled for Dataflow Streaming Engine.
Workers will scale between 1 and 100 unless maxNumWorkers is specified.
2022-02-01T16:54:43.736Z: JOB_MESSAGE_DETAILED:
Autoscaling is enabled for job 2022-02-01_08_54_40-8791019287477103665. The number of workers will be
between 1 and 100.
2022-02-01T16:54:43.757Z: JOB_MESSAGE_DETAILED: Autoscaling was automatically enabled
for job 2022-02-01_08_54_40-8791019287477103665.
2022-02-01T16:54:44.624Z: JOB_MESSAGE_ERROR: Error
processing pipeline. 

With the CombineValues step removed this pipeline successfully starts in dataflow.

 

I thought this was an issue with Dataflow on the server side since the Dataflow API (v1b3.projects.locations.jobs.messages) is just returning the textPayload: "Error processing pipeline". But then I found the issue BEAM-12636 where a go SDK user has the same error message but seemingly as a result of bugs in the go SDK?

Imported from Jira BEAM-13795. Original Jira may contain additional context. Reported by: Jake_Zuliani.

mingyao1993 commented 2 years ago

Hi, I have faced the same issue as well. Is there any workaround or stable versions to use? thanks!

with beam.Pipeline(options=pipeline_options) as p:
        p \
        | "Read From PubSub Subscription" >> ReadFromPubSub(
            subscription=subscription) \
        | beam.Map(lambda row: logging.info)
Traceback (most recent call last):
  File "src/main.py", line 101, in <module>
    run()
  File "src/main.py", line 54, in run
    p \
  File "/Users/name/.local/share/virtualenvs/QqobXVVz/lib/python3.8/site-packages/apache_beam/pipeline.py", line 597, in __exit__
    self.result.wait_until_finish()
  File "/Users/name/.local/share/virtualenvs/QqobXVVz/lib/python3.8/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 1667, in wait_until_finish
    raise DataflowRuntimeException(
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Error processing pipeline.
viniciusdsmello commented 1 year ago

Hi, I'm facing the same issue here. Did anyone figure out a workaround?

MOscity commented 1 year ago

Hey, I was facing the same issue, whole pipeline worked with DirectRunner (all steps), but DataflowRunner failed after 1-3secs and emitted "ambiguous" logs, which didn't point at the CombineValues line. Else it worked fine without the the CountCombineFn Step in the DataflowRunner. Update: I found a workaround, see below.

Original Error Logs:

ERROR:apache_beam.runners.dataflow.dataflow_runner:Console URL: https://console.cloud.google.com/dataflow/jobs/<RegionId>/2023-01-20_06_59_03-4426498189309546663?project=<ProjectId>
Traceback (most recent call last):
  File "./path/to/file/my_python.py", line 618, in <module>
    run_pipeline()
  File "./path/to/file/my_python.py", line 598, in run_pipeline
    print(f'----- After Step: {step}.')
  File "/home/myusername/.local/share/virtualenvs/pipenv_20-Y278SNFx/lib/python3.8/site-packages/apache_beam/pipeline.py", line 598, in __exit__
    self.result.wait_until_finish()
  File "/home/myusername/.local/share/virtualenvs/pipenv_20-Y278SNFx/lib/python3.8/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 1641, in wait_until_finish
    raise DataflowRuntimeException(
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Error processing pipeline.

Update: Workaround

I'm not sure if this is related to the Jira Ticket (BEAM-10297), but I found inspiration from this Apache Documentation about "CombinePerKey", and I wondered if I could just replace the beam.CombineValues(beam.combiners.CountCombineFn()) with another function. First, beam.CombineValues(beam.transforms.combiners.CountCombineFn()) didn't work either with the DataflowRunner (but locally it worked). But with CombinePerKey instead CombineValues it worked!

Solution: Use beam.CombinePerKey with a Custom Function (appropriate to your application)! With beam.CombinePerKey(CustomFn()) it also works with the DataflowRunner!


See Original "AverageFn" here: https://beam.apache.org/documentation/transforms/python/aggregation/combineperkey/#example-5-combining-with-a-combinefn New Custom Combiner:

import apache_beam as beam
# [START COPIED CODE]
class CustomFn(beam.CombineFn):
    def create_accumulator(self):
        sum = 0.0
        count = 0
        accumulator = sum, count
        return accumulator

    def add_input(self, accumulator, input):
        sum, count = accumulator
        return sum + len(input), count + 1

    def merge_accumulators(self, accumulators):
        sums, counts = zip(*accumulators)
        return sum(sums), sum(counts)

    # [END COPIED CODE] - MODIFIED LINES:
    def extract_output(self, accumulator, MODE='AVERAGE'):
        sum, count = accumulator
        if count == 0:
            return float('NaN')

        if MODE == 'SUM':
            return int(sum)
        elif MODE == 'COUNT':
            return int(count)
        elif MODE == 'INT_AVERAGE':
            return int(sum / count)
        else:
            return sum / count

Functions to group and combine values:

# ...
def prepare_key_value_pairs(element):
    key_value_tuple = ( (element['key_column_1'], 
          element['key_column_...'],  
          element['key_column_n'] 
          ), 
          element['value_column_1'] )
    return key_value_tuple

def transform_data(data):
    data_out = (
            data
            | 'Step 1' >> beam.Map(prepare_key_value_pairs)
            | 'Step 2' >> beam.GroupByKey()

            # This line fails with DataflowRunner, but not with DirectRunner (locally):
            # | 'Old Step 3' >> beam.CombineValues(beam.combiners.CountCombineFn())

            # This works with DataflowRunner: CombinePerKey instead of CombineValues
            # https://beam.apache.org/documentation/transforms/python/aggregation/combineperkey/#example-5-combining-with-a-combinefn
            | 'New Step 3 New' >> beam.CombinePerKey(CustomFn())
    )
    return data_out

# ...

Later in the beam pipeline:


with beam.Pipeline(options=pipeline_options) as pipeline:        
        # ...
        data_raw = read_data(...)
        data_transformed = transform_data(data_raw)

        # ...
linamartensson commented 1 year ago

We started encountering this issue on Nov 2 2022 with a job running daily. It runs from a template which was created on June 14 2022, but ran just fine at first. We're able to work around it with the suggestion here, but this is odd - and it doesn't line up with anything from the Dataflow release schedule as far as I can tell.

So - how could this have happened? It's worrisome that a job that was already running could just stop. I'm also wondering if we may have done some Cloud change on our end that might have suddenly triggered it. Also, clearly we should have discovered this issue sooner, but here we are. ;)

liferoad commented 1 year ago

.take-issue