apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.78k stars 4.22k forks source link

[Bug]: apache.calcite CannotPlanException: All the inputs have relevant nodes, however the cost is still infinite. #30466

Open crbl1122 opened 6 months ago

crbl1122 commented 6 months ago

What happened?

I use SqlTransform component in an Apache Beam pipeline running in DataFlow. If I add one more variable in the SQL query, I get the error:

RuntimeError: org.apache.beam.sdk.extensions.sql.impl.SqlConversionException: Unable to convert query .... Caused by: org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=BEAM_LOGICAL. All the inputs have relevant nodes, however the cost is still infinite.

So, this query is working:

windowing_query = """SELECT DATE_STR, SUBS_ID, (NUM_1 + NUM_2 + NUM_3) AS TOTAL_COST,
                    AVG(NUM_1) OVER (w ROWS 2 PRECEDING) AS NUM_1_sliding_3M
                    FROM PCOLLECTION 
                    WINDOW w AS (PARTITION BY SUBS_ID ORDER BY DATE_STR)"""

While, this query is not working:

windowing_query = """SELECT DATE_STR, SUBS_ID, (NUM_1 + NUM_2 + NUM_3) AS TOTAL_COST,
                    AVG(NUM_1) OVER (w ROWS 2 PRECEDING) AS NUM_1_sliding_3M,
                    AVG(NUM_2) OVER (w ROWS 2 PRECEDING) AS NUM_2_sliding_3M
                    FROM PCOLLECTION 
                    WINDOW w AS (PARTITION BY SUBS_ID ORDER BY DATE_STR)"""

The difference between these two queries is only one line: AVG(NUM_2) OVER (w ROWS 2 PRECEDING) AS NUM_2_sliding_3M

Pipeline definition:

   with beam.Pipeline(runner, options=pipeline_options) as pipeline:
        logging.info(f'pipeline_options: {pipeline_options}')
        logging.getLogger().setLevel(logging.INFO)

        # Preprocess train data
        step = 'train'
        # Read raw train data from BQ
        raw_train_dataset = read_from_bq(pipeline, step, data_size) 
        rows_train_dataset = raw_train_dataset[0] | 'Convert to Rows' >> beam.ParDo(ConvertToRow(data_types))

        # Apply the SQL transform
        filtered_rows = rows_train_dataset | SqlTransform(windowing_query)

Why the SqlTransform does not accept more than one rolling average computation?

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

Amar3tto commented 3 months ago

I tested locally different queries and here are the results. I guess total and avg1, avg2 columns are somehow related and together lead to an error in certain cases.

Successful:

  1. SELECT (f_int2 + f_int2) as total, AVG(f_int) OVER (w ROWS 2 PRECEDING) as avg1 FROM PCOLLECTION WINDOW w AS (ORDER BY f_double asc)
  2. SELECT AVG(f_int) OVER (w ROWS 2 PRECEDING) as avg1, AVG(f_int2) OVER (w ROWS 2 PRECEDING) as avg2 FROM PCOLLECTION WINDOW w AS (ORDER BY f_double asc)
  3. SELECT (f_int2 + f_int2) as total, AVG(f_int) OVER (w ROWS 2 PRECEDING) as avg1, AVG(f_int) OVER (w ROWS 2 PRECEDING) as avg2 FROM PCOLLECTION WINDOW w AS (ORDER BY f_double asc)

With error:

  1. SELECT (f_int + f_int) as total, AVG(f_int) OVER (w ROWS 2 PRECEDING) as avg1 FROM PCOLLECTION WINDOW w AS (ORDER BY f_double asc)
  2. SELECT (f_int + f_int) as total, AVG(f_int2) OVER (w ROWS 2 PRECEDING) as avg1 FROM PCOLLECTION WINDOW w AS (ORDER BY f_double asc)
  3. SELECT (f_int2 + f_int2) as total, AVG(f_int) OVER (w ROWS 2 PRECEDING) as avg1, AVG(f_int2) OVER (w ROWS 2 PRECEDING) as avg2 FROM PCOLLECTION WINDOW w AS (ORDER BY f_double asc)