apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.9k stars 4.27k forks source link

[Bug]: DirectRunner fails after grouping into batches, and applying flat map with assertion error #28716

Open damianr13 opened 1 year ago

damianr13 commented 1 year ago

What happened?

I am running the following code:

import argparse

import apache_beam as beam
import structlog
from apache_beam.options.pipeline_options import PipelineOptions

logger = structlog.getLogger()

def run(argv=None, save_main_session=True):
    parser = argparse.ArgumentParser()

    # Parse the arguments from the command line as defined in the options class
    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)

    with (beam.Pipeline(options=pipeline_options) as p):
        (
            p
            | "Read Vertex embedder output" >> beam.Create(["a", "b", "c"])
            | "Assign dummy keys" >> beam.Map(lambda x: (None, x))
            | "Batch up to 30 elements" >> beam.GroupIntoBatches(30)
            | "Flat map" >> beam.FlatMap(lambda x: x)
            | "Write to GCS"
            >> beam.io.WriteToText(
                "gs://my_bucket/apache_beam_test/output",
                file_name_suffix=".jsonl",
            )
        )

if __name__ == "__main__":
    run()

As you can see this code generates 3 dummy values. Let's say I want to take my elements in batches of 30, so I assign the same dummy key to all of them. After working with my batches, I want to go back to single elements, and then publish the results to Google Cloud Storage.

Running the code above fails with the following error:

ERROR:apache_beam.runners.direct.executor:Giving up after 4 attempts.
WARNING:apache_beam.runners.direct.executor:A task failed with exception: 
Traceback (most recent call last):
  File "/Users/robert-andreidamian/Workspace/panprices/product_image_hasher/after_embeddings.py", line 34, in <module>
    run()
  File "/Users/robert-andreidamian/Workspace/panprices/product_image_hasher/after_embeddings.py", line 18, in run
    with (beam.Pipeline(options=pipeline_options) as p):
  File "/Users/robert-andreidamian/.local/share/virtualenvs/product_image_hasher-uOTOvNp0/lib/python3.11/site-packages/apache_beam/pipeline.py", line 601, in __exit__
    self.result.wait_until_finish()
  File "/Users/robert-andreidamian/.local/share/virtualenvs/product_image_hasher-uOTOvNp0/lib/python3.11/site-packages/apache_beam/runners/direct/direct_runner.py", line 585, in wait_until_finish
    self._executor.await_completion()
  File "/Users/robert-andreidamian/.local/share/virtualenvs/product_image_hasher-uOTOvNp0/lib/python3.11/site-packages/apache_beam/runners/direct/executor.py", line 432, in await_completion
    self._executor.await_completion()
  File "/Users/robert-andreidamian/.local/share/virtualenvs/product_image_hasher-uOTOvNp0/lib/python3.11/site-packages/apache_beam/runners/direct/executor.py", line 480, in await_completion
    raise update.exception
  File "/Users/robert-andreidamian/.local/share/virtualenvs/product_image_hasher-uOTOvNp0/lib/python3.11/site-packages/apache_beam/runners/direct/executor.py", line 370, in call
    self.attempt_call(
  File "/Users/robert-andreidamian/.local/share/virtualenvs/product_image_hasher-uOTOvNp0/lib/python3.11/site-packages/apache_beam/runners/direct/executor.py", line 413, in attempt_call
    evaluator.process_element(value)
  File "/Users/robert-andreidamian/.local/share/virtualenvs/product_image_hasher-uOTOvNp0/lib/python3.11/site-packages/apache_beam/runners/direct/transform_evaluator.py", line 939, in process_element
    assert not self.global_state.get_state(
AssertionError

I am running the code in a pipenv environent, with python version 3.11. I attached the requirements.
requirements.txt

Here is my Pipfile:

[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"

[packages]
google-cloud-bigquery-storage = "*"
google-cloud-bigquery = {extras = ["bqstorage", "pandas"], version = "*"}
google-cloud-tasks = "*"
firebase-admin = "*"
Pillow = "*"
imagehash = "*"
structlog = "*"
functions-framework = "*"
opencv-python = "*"
numpy = "*"
asyncio = "*"
pydantic = {extras = ["dotenv"], version="~=1.10"}
psycopg2-binary = "*"
google-cloud-aiplatform = "*"
gcloud-aio-storage = "*"
gcloud-rest-storage = "*"
validators = "*"
avif = {extras = ["pillow"], version="*"}
pillow-avif-plugin = "*"
wheel = "*"
apache-beam = {extras = ["gcp"], version = "*"}
cloud-sql-python-connector = {extras = ["pg8000"], version = "*"}

[dev-packages]
flask = {extras = ["async"], version="*"}

[requires]
python_version = "3.11"

Issue Priority

Priority: 3 (minor)

Issue Components

jrmccluskey commented 1 year ago

Okay so I can repro the issue, specifically when the batch size specified is > the number of elements in the input PCollection and the another GBK happens in the pipeline after the GroupIntoBatches call. (GBK is marked as complete here and then re-invoked here.)

This would happen if _is_final_bundle() returns true, AKA the watermark was advanced to infinity prematurely for the next invocation of a GBK. This seems to be the case if batch size > num elements, with the watermark still being at positive infinity on the next invocation instead of being reset to negative infinity for the next DoFn.

I can demonstrate this with this pipeline + some extra logging within the GBK transform evaluator:

def run(argv=None, save_main_session=True):
    parser = argparse.ArgumentParser()

    # Parse the arguments from the command line as defined in the options class
    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)

    with (beam.Pipeline(options=pipeline_options) as p):
        (
            p
            | "Read Vertex embedder output" >> beam.Create(["a", "b", "c"])
            | "Assign dummy keys" >> beam.Map(lambda x: ("key", x))
            | "Batch up to 30 elements" >> beam.GroupIntoBatches(3)
            | "Flat map" >> beam.FlatMap(lambda x: x[1])
            | "re-key" >> beam.Map(lambda x: ("key2", x))
            | "gbk" >> beam.GroupByKey()
        )

if __name__ == "__main__":
    run()

With batch size = 3 we get each element through both GBKs, but with batch size > 3 the second GBK invocation fails on the first element because the Completion Tag and watermark are not reset. I haven't quite found where the reset between DoFn invocations happens yet, but this is where the problem is.

tvalentyn commented 1 year ago

Somehow this pipeline is using a wrong direct runner (apache_beam/runners/direct/executor.py).

the old BundleBased direct runner is no longer maintained, and it is not expected that it is still being used for this batch pipeline.

The relevant code is in https://github.com/apache/beam/blob/223dded769df48270df317868dc32144ec2fb353/sdks/python/apache_beam/runners/direct/direct_runner.py#L115-L126

We should have used the FnApi runner.

damianr13 commented 1 year ago

@tvalentyn

Thanks for your reply. Based on what you said I added some breakpoints and I discovered that this is where the decision to not use FnApi runner is taken (line 111):

https://github.com/apache/beam/blob/05861613f484a6159c21796006c52d8da5e10b2a/sdks/python/apache_beam/runners/direct/direct_runner.py#L107-L112

This happens while it checks _GroupIntoBatchesDoFn.

Here is a screenshot of the timer data I took in my IDE: Screenshot 2023-10-13 at 11 55 01

I could investigate further and come up with a PR maybe, but I will need a few pointers:

tvalentyn commented 1 year ago

Thanks!

The implementation of GroupIntoBatches includes a processing-time timer: https://github.com/apache/beam/blob/3a45ecf4b271997b5ce03e1181676356eaa351e1/sdks/python/apache_beam/transforms/util.py#L1067 , you can find more information about timers in https://beam.apache.org/documentation/programming-guide/#processing-time-timers.

This logic was added in https://github.com/apache/beam/pull/13144

@robertwb do you know if this restriction still applies: https://github.com/apache/beam/blob/223dded769df48270df317868dc32144ec2fb353/sdks/python/apache_beam/runners/direct/direct_runner.py#L110-L111 ?

We could also try to lift it, run the tests and see what fails.

damianr13 commented 1 year ago

I was trying to set up my environment using the following guide: https://cwiki.apache.org/confluence/display/BEAM/Python+Tips

I noticed that the build-requirements.txt file is missing. I am guessing that one can get the dependencies using the gradle build file.

Should I open another issue that is about updating the documentation?

AnandInguva commented 1 year ago

@damianr13 I am working in the updating the wiki page. Thanks.

You don't need to use build-requirements.txt anymore if you rebase/merge onto master branch.

keunhong commented 3 months ago

GroupIntoBatches seems to still cause this issue with beam 2.58.0 when using DirectRunner and even TestPipeline.