apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.81k stars 4.23k forks source link

[Bug]: WriteToParquet exhibits different behavior when using return versus yield #29591

Open maininformer opened 10 months ago

maininformer commented 10 months ago

What happened?

Beam version is 2.52.0

Please see the test below:

import apache_beam as beam
import pyarrow as pa

data_batches = [
    [{'name': 'emin', 'age': 35}, {'name': 'kobe', 'age': 2}],
    [{'name': 'annie', 'age': 6}, {'name': 'bryant', 'age': 33}]    
 ]

class Yielder(beam.DoFn):
    def process(self, nothing):
        for batch in data_batches:
            yield batch

class Returner(beam.DoFn):
    def process(self, nothing):
        for batch in data_batches:
            return batch

print('Running Yielder')
try:
    with beam.Pipeline() as p:
        (p
        | 'Noop' >> beam.Create([None])
        | 'Yield it' >> beam.ParDo(Yielder())
        | 'Write To Parquet' >> beam.io.parquetio.WriteToParquet(
                file_path_prefix='output-yield',
                schema=pa.schema([('name', pa.string()), ('age', pa.int64())]))
        )
except Exception as e:
    print(e)

print('\n\n')

print('Running Returner')
try:
    with beam.Pipeline() as p:
        (p
        | 'Noop' >> beam.Create([None])
        | 'Return it' >> beam.ParDo(Returner())
        | 'Write To Parquet' >> beam.io.parquetio.WriteToParquet(
                file_path_prefix='output-return',
                schema=pa.schema([('name', pa.string()), ('age', pa.int64())]))
        )
except Exception as e:
    print(e)

I expect both Yielder and Returner to have the same behavior, however in parquetio.py:L119, row is a list for Yielder and a dict for Returner. Thus, Yielder produces:

TypeError: list indices must be integers or slices, not str
list indices must be integers or slices, not str [while running 'Write To Parquet/ParDo(_RowDictionariesToArrowTable)']

While Returner succeeds.

Please run the test to reproduce. Many thanks.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

AnandInguva commented 10 months ago

I expect both Yielder and Returner to have the same behavior.

This is wrong here.

yield is used in python generators. It returns a value and pauses execution so that it can be resumed later to produce successive values.

returns exits the function. When a function returns, its execution completely stops.

In the Returner class, only the first batch will be returned where as in Yielder, it will yield one batch at a time. I haven't run the code yet to reproduce.

maininformer commented 10 months ago

I expect both Yielder and Returner to have the same behavior.

This is wrong here. ...

You are correct. I should have added, in the context of this code*. Both output the same type, yield one list at a time and return one list total. As far as the downstream transformer is concerned it is the same input.