apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.86k stars 4.26k forks source link

[Bug]: apache_beam.dataframe.convert.to_pcollection() fails on deferred dataframe of csv with only header row #30449

Open elderpinzon opened 8 months ago

elderpinzon commented 8 months ago

What happened?

After reading a csv file containing only the header row using apache_beam.dataframe.io.read_csv, the to_pcollection method fails with the following error: OverflowError: cannot convert float infinity to integer [while running 'Unbatch 'placeholder_DataFrame_6102733264''].

Please use the python code below to reproduce this issue

import logging
import argparse
import sys
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions
from apache_beam.dataframe.io import read_csv
from apache_beam.dataframe import convert

logging.getLogger().setLevel(logging.INFO)

parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(sys.argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True

p = beam.Pipeline(options=pipeline_options)

beam_df = p | "Read csv" >> read_csv("only_header.csv")

pcol1 = (
    # Convert the Beam DataFrame to a PCollection.
    convert.to_pcollection(beam_df)
    | beam.Map(print)
)

p.run().wait_until_finish()

To reiterate, in this example the file only_header.csv only has the header rows.

This issue appeared while attempting to migrate from version 2.41.0 to 2.51.0, but confirmed it also appears with 2.54.0.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

elderpinzon commented 8 months ago

As the error message suggests the issue is caused by a division by zero. I found a quick way to bypass this issue by modifying the following line: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/opcounters.py#L225

As follows:

mean_element_size = self.producer_batch_converter.estimate_byte_size(
    windowed_batch.values) / batch_length if batch_length !=0 else 0

I confirmed my test code above runs after this change and also confirmed that all the unit tests in that folder pass after the change (tried to run all tests in the runners folder but a bunch failed due to gcloud authentication issues)