apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.88k stars 4.27k forks source link

[Bug]: Cannot transform multiple PCollections to Dataframe using the apache_beam.dataframe.transforms.DataframeTransform function #30445

Open prakhar-dhakar opened 8 months ago

prakhar-dhakar commented 8 months ago

What happened?

To reproduce the issue, you can simply use the below python code

from apache_beam.dataframe.convert import to_dataframe, to_pcollection
from apache_beam.dataframe.transforms import DataframeTransform
import logging
import argparse
import sys
import pandas

logging.getLogger().setLevel(logging.INFO)

parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(sys.argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True

# Just a Dummy Dataframe Transform Function, Ignore the logic
def transformdf(a, b):
    a["addr"] = "addr-common"
    return a

p = beam.Pipeline(options=pipeline_options)

# Schema Aware Pcollection
data1 = [Row(id=1, name="abc"), Row(id=2, name="def"), Row(id=3, name="ghi")]
pcol1 = (p | "Create1" >> beam.Create(data1))

data2 = [Row(addr="addr1"), Row(addr="addr2"), Row(addr="addr3")]
pcol2 = (p | "Create2" >> beam.Create(data2))

pcol = ({"a":pcol1, "b":pcol2} | "TransformedDF" >> DataframeTransform(transformdf))
# The above throws issue with duplicate label error

pcol | "Map" >> beam.Map(lambda row: {"id":row.id, "name":row.name, "addr":row.addr}) | "Print" >> beam.Map(print)

p.run().wait_until_finish()

this is sourced from this https://stackoverflow.com/questions/70937308/apache-beam-multiple-pcollection-dataframetransform-issue issue.

To explain the issue in detail,

When we are passing multiple PCollection to DataframeTransform function, we get the following error back RuntimeError: A transform with label "TransformedDF/BatchElements(pc)" already exists in the pipeline.

This is happening even though the PCollection is schema aware

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

prakhar-dhakar commented 8 months ago

A Quick Fix for this can be the following change in the python-SDK in the following function apache_beam.dataframe.transforms.DataframeTransform.expand

  def expand(self, input_pcolls):
    # Avoid circular import.
    from apache_beam.dataframe import convert

    # Convert inputs to a flat dict.
    input_dict = _flatten(input_pcolls)  # type: Dict[Any, PCollection]

    proxies = _flatten(self._proxy) if self._proxy is not None else {
        tag: None
        for tag in input_dict
    }

    input_frames = {
        k: convert.to_dataframe(pc, proxies[k]) 
        for k, pc in input_dict.items()
    }  # type: Dict[Any, DeferredFrame] # noqa: F821

The issue is occuring because of the label 'None' being passed to convert.to_dataframe(), this causes the label to be fetched from variable name, and the variable name for all PCollections is being retrieved as pc, hence pipeline fails due to transforms with duplicate names. If we pass the label str(k)) while calling the function, this issue can be resolved as shown below.

k: convert.to_dataframe(pc, proxies[k], str(k))

This is a very quick fix that can result in the pipeline being used as intended.

damccorm commented 8 months ago

Hey @prakhar-dhakar thanks for the report and the suggested fix. I agree that is a reasonable approach. Would you be open to creating a pull request with your suggested fix?

prakhar-dhakar commented 8 months ago

Sure, i will create a pull request with the changes