apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.85k stars 4.25k forks source link

[Bug]: ValueError: Invalid DisplayDataItem when using AsSingleton for side input (need better parameter validation before it hits this) #25860

Open redfungus opened 1 year ago

redfungus commented 1 year ago

What happened?

Using a side input in the form in WriteToText with Dataflow as the runner causes an error.

File ".venv\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py", line 877, in run_ParDo step = self._add_step( File ".venv\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py", line 652, in _add_step [ File ".venv\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py", line 653, in <listcomp> item.get_dict() File ".venv\lib\site-packages\apache_beam\transforms\display.py", line 370, in get_dict self.is_valid() File ".venv\lib\site-packages\apache_beam\transforms\display.py", line 336, in is_valid raise ValueError( ValueError: Invalid DisplayDataItem. Value <apache_beam.pvalue.AsDict object at 0x000001AA22BED610> is of an unsupported type.

The pipeline works fine when running locally but fails when using a Dataflow runner. Tested with all the different beam.pvalue.As... too and it still happens.

SDK version with the error: 2.46.0 Python version used: 3.9.13

Code of the whole pipeline:

import argparse
import json
import logging
from typing import Any, Dict, List

import apache_beam as beam
import apache_beam.io.fileio
from apache_beam.options.pipeline_options import PipelineOptions

def is_valid(json_file):
  """
  This function is used as a filter.

    Args:
        json_file: json file corresponding to one episode.

    Returns:
        True if the episode is valid, False otherwise.
  """
  with json_file.open() as f:
    first_line = f.readline()
    first_line_json = json.loads(first_line)
    return first_line_json['username'] == "username"

def run(
    input_raw_data: str,
    output_path: str,
    beam_args: List[str] = None,
) -> None:
    """Build and run the pipeline."""
    options = PipelineOptions(beam_args, save_main_session=True, streaming=False)

    with beam.Pipeline(options=options) as pipeline:
        filtered_files = (
            pipeline
            | 'Match file paths' >> beam.io.fileio.MatchFiles(input_raw_data)
            | 'Read file paths' >> beam.io.fileio.ReadMatches()
            | 'Filter correct episodes' >> beam.Filter(is_valid)
        )
        file_count = (
            filtered_files
            | 'Count files' >> beam.combiners.Count.Globally()
        )
        #removing the file_count_integer as a side input fixes the error.
        file_count_integer = beam.pvalue.AsSingleton(file_count)
        (   
            filtered_files
            | 'Read files' >> beam.Map(lambda x: x.read_utf8())
            | 'Write to files' >> beam.io.WriteToText(
                output_path,
                file_name_suffix='.json',
                num_shards=file_count_integer)
        )

if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)

    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--output-path",
        required=True,
        help="Name of the folder to output the cleaned data files. ",
    )
    parser.add_argument(
        "--input-raw-data",
        required=True,
        help="Name of the folder containing the raw data files." ,
    )
    args, beam_args = parser.parse_known_args()

    run(
        input_raw_data=args.input_raw_data,
        output_path=args.output_path,
        beam_args=beam_args,
    )

Command used to run:

python -m script.py --region europe-west1 --input gs://path-to-files/*.json --output gs://path-to-utputs/results/outputs --runner DataflowRunner --project google_project --temp_location gs://temp_bucket/tmp/

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

redfungus commented 1 year ago

It seems the main issue is with using the side input as num_shards in WriteToText. Switching to WriteToFile fixed the issue.

Abacn commented 1 year ago

Thanks for reporting. Did this pipeline worked before (the error happens when upgrading the beam version). If yes which version it worked?

kennknowles commented 1 year ago

For num_shards you have to pass an int, so you cannot pass a side input. This error really should be caught earlier by parameter validation.

Reference: https://beam.apache.org/releases/pydoc/2.46.0/apache_beam.io.textio.html?highlight=writetotext#apache_beam.io.textio.WriteToText