apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.84k stars 4.24k forks source link

[Bug]: Beam Sql is ignoring aliases fields in some situations which causes to huge data loss #30498

Closed brachipa closed 6 months ago

brachipa commented 7 months ago

What happened?

We have many SqlTransform in our pipeline, running SQL query and sink output into external system (Kinesis), we found that in some cases our SQL results aren't matching the SQL we provided. for example the bellow simple SQL:

select event as event_name, count(*) as c
from PCOLLECTION
group by event

returns output with fields "event" and "c" ignoring the alias "event_name"

It happens without any reason, and by applying some simple change (e.g removing one column from group by ), the query stopped outputting data into the "event_name" only to "event" column and it caused us to huge data loss, since all our metrics are based on event_name, and this one become always null.

We need to understand what causes aliases to be ignored, since we are using this pattern in most queries and in different fields. all are in production critical system.

logs for the above query:

2024-03-05 12:41:53.351  INFO 54818 --- [o-auto-1-exec-1] o.a.b.s.e.sql.impl.CalciteQueryPlanner   : SQL:
SELECT `PCOLLECTION`.`event` AS `event_name`, COUNT(*) AS `c`
FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`
GROUP BY `PCOLLECTION`.`event`
2024-03-05 12:41:59.172  INFO 54818 --- [o-auto-1-exec-1] o.a.b.s.e.sql.impl.CalciteQueryPlanner   : SQLPlan>
LogicalAggregate(group=[{0}], c=[COUNT()])
  LogicalProject(event_name=[$2])
    BeamIOSourceRel(table=[[beam, PCOLLECTION]])

2024-03-05 12:43:06.961  INFO 54818 --- [o-auto-1-exec-1] o.a.b.s.e.sql.impl.CalciteQueryPlanner   : BEAMPlan>
BeamAggregationRel(group=[{2}], c=[COUNT()])
  BeamIOSourceRel(table=[[beam, PCOLLECTION]])

But the row we get: row.toString():

Row: 
event:"abc"
c:1

The pipeline steps look like:

.apply(SqlTransform.query(query))
                .apply("to-table-row", ParDo.of(new DoFn<Row, TableRow>() {
                    @ProcessElement
                    public void processElement(@Element Row row, ProcessContext context) {
                        System.out.println(row.toString());
                        TableRow tableRow = BigQueryUtils.toTableRow(row);
                        context.output(tableRow);
                    }
                }))

Adding more fields to the query can sometimes help, but this is not always the case, and we turned into situation that we can't edit our queries since we don't know what the impact will be.

Query that works:

select event as event_name,a,b,c,d,e,f,g, count(*) as c
from PCOLLECTION
group by event,a,b,c,d,e,f,g

Logs:

2024-03-05 12:51:51.168  INFO 56085 --- [o-auto-1-exec-1] o.a.b.s.e.sql.impl.CalciteQueryPlanner   : SQL:
SELECT `PCOLLECTION`.`event` AS `event_name`, `PCOLLECTION`.`a`, `PCOLLECTION`.`b`, `PCOLLECTION`.`c`, `PCOLLECTION`.`d`, `PCOLLECTION`.`e`, `PCOLLECTION`.`f`, `PCOLLECTION`.`g`, COUNT(*) AS `c`
FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`
GROUP BY `PCOLLECTION`.`event`, `PCOLLECTION`.`a`, `PCOLLECTION`.`b`, `PCOLLECTION`.`c`, `PCOLLECTION`.`d`, `PCOLLECTION`.`e`, `PCOLLECTION`.`f`, `PCOLLECTION`.`g`
2024-03-05 12:51:55.128  INFO 56085 --- [o-auto-1-exec-1] o.a.b.s.e.sql.impl.CalciteQueryPlanner   : SQLPlan>
LogicalProject(event_name=[$0], a=[$1], b=[$2], c=[$3], d=[$4], e=[$5], f=[$6], g=[$7], c0=[$8])
  LogicalAggregate(group=[{0, 1, 2, 3, 4, 5, 6, 7}], c=[COUNT()])
    LogicalProject(event_name=[$2], a=[$3], b=[$4], c=[$5], d=[$6], e=[$7], f=[$8], g=[$9])
      BeamIOSourceRel(table=[[beam, PCOLLECTION]])

2024-03-05 12:52:17.640  INFO 56085 --- [o-auto-1-exec-1] o.a.b.s.e.sql.impl.CalciteQueryPlanner   : BEAMPlan>
BeamCalcRel(expr#0..8=[{inputs}], proj#0..8=[{exprs}])
  BeamAggregationRel(group=[{2, 3, 4, 5, 6, 7, 8, 9}], c=[COUNT()])
    BeamIOSourceRel(table=[[beam, PCOLLECTION]])

Row looks like:

Row: Row: 
event_name:abc
a:<null>
b:<null>
c:<null>
d:<null>
e:<null>
f:<null>
g:<null>
c0:1

Event_name column is correct, but why C0? and not c?

Also, it is not always number of group by fields, if I send the exact same query, only with real fields (not a,b,c) it is again stop sending event_name only event.

Can you explain this strange bug?

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

Polber commented 7 months ago

I am unable to reproduce the error.

I used the following pipeline:

Schema SCHEMA = Schema.of(Schema.Field.of("event", Schema.FieldType.STRING));
Pipeline pipeline = Pipeline.create();
PCollection<Row> input = pipeline.apply(
    Create.of(
        Row.withSchema(SCHEMA).withFieldValue("event", "abc").build()
    )
).setRowSchema(SCHEMA);

PCollection<Row> transformed = input.apply(
    SqlTransform.query("select event as event_name, count(*) as c from PCOLLECTION group by event"));
transformed.apply(ParDo.of(new DoFn<Row, Object>() {
  @ProcessElement
  public void processElement(ProcessContext c) throws Exception {
    System.out.println(c.element());
    c.output(c.element());
  }
})).setRowSchema(
    Schema.of(
          Schema.Field.of("event", Schema.FieldType.STRING),
          Schema.Field.of("c", Schema.FieldType.INT64)
      )
  );

pipeline.run();

Logs:

Mar 22, 2024 2:30:06 PM org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
INFO: SQL:
SELECT `PCOLLECTION`.`event` AS `event_name`, COUNT(*) AS `c`
FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`
GROUP BY `PCOLLECTION`.`event`
Mar 22, 2024 2:30:07 PM org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
INFO: SQLPlan>
LogicalProject(event_name=[$0], c=[$1])
  LogicalAggregate(group=[{0}], c=[COUNT()])
    BeamIOSourceRel(table=[[beam, PCOLLECTION]])

Mar 22, 2024 2:30:07 PM org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner convertToBeamRel
INFO: BEAMPlan>
BeamCalcRel(expr#0..1=[{inputs}], proj#0..1=[{exprs}])
  BeamAggregationRel(group=[{0}], c=[COUNT()])
    BeamIOSourceRel(table=[[beam, PCOLLECTION]])

Mar 22, 2024 2:30:08 PM org.apache.beam.sdk.util.construction.Environments$JavaVersion forSpecification
WARNING: Unsupported Java version: 18, falling back to: 17

Row:

Row: 
event_name:abc
c:1

If you give me more details on your pipeline, I can try to reproduce, but if you can model yours more like the one I posted above and see if that resolves the issue, that would be a good first step.


As far as...

Event_name column is correct, but why C0? and not c?

This is to avoid name collision. In your query, you select a column c and also create a column c by aggregating count(*), so this is working as expected.

brachipa commented 7 months ago

Hi @Polber . Thanks for checking it out. I run your example and it works me fine, but once I added a new field to the schema it stopped working and event is outputted as event and not as its alias name: added id field:

   Schema SCHEMA = Schema.of(Schema.Field.of("event", Schema.FieldType.STRING)
                , Schema.Field.of("id", Schema.FieldType.INT32)); //new field added
        Pipeline pipeline = Pipeline.create();
        PCollection<Row> input = pipeline.apply(
                Create.of(
                        Row.withSchema(SCHEMA).withFieldValue("event", "abc")
                                .withFieldValue("id", 222) //new field added
                                .build()
                )
        ).setRowSchema(SCHEMA);

        PCollection<Row> transformed = input.apply(
                SqlTransform.query("select event as event_name, count(*) as c from PCOLLECTION group by event"));
        transformed.apply(ParDo.of(new DoFn<Row, Object>() {
            @ProcessElement
            public void processElement(ProcessContext c) throws Exception {
                System.out.println(c.element());
                c.output(c.element());
            }
        })).setRowSchema(
                Schema.of(
                        Schema.Field.of("id", Schema.FieldType.INT32), //new field added
                        Schema.Field.of("event", Schema.FieldType.STRING),
                        Schema.Field.of("c", Schema.FieldType.INT64)
                )
        );

        pipeline.run();
    }

As you can see the beam plan is slightly different (sql plan is the same as the working example), the difference is that in the not working one, you are missing the "BeamCalcRel" step, I guess this is what causing it to take the real name and not the alias name, but I can't figure out what causes the different plan.

2024-03-23 22:43:51.345  INFO 71505 --- [    Test worker] o.a.b.s.e.sql.impl.CalciteQueryPlanner   : SQLPlan>
LogicalAggregate(group=[{0}], c=[COUNT()])
  LogicalProject(event_name=[$0])
    BeamIOSourceRel(table=[[beam, PCOLLECTION]])

2024-03-23 22:43:51.478  INFO 71505 --- [    Test worker] o.a.b.s.e.sql.impl.CalciteQueryPlanner   : BEAMPlan>
BeamAggregationRel(group=[{0}], c=[COUNT()])
  BeamIOSourceRel(table=[[beam, PCOLLECTION]])

Row: 
event:abc
c:1

My origin pipeline subscribes to pubsub, does 1 minute window aggregation, runs sql and sends results also to PubSub. I can have a wider schema, and sql may query only subset of the schema fields.

In case my shared example is working to you, can you please share your java version? build dependencies? Thanks again!

Polber commented 6 months ago

I played around with this a bit more and realized it has to do with Calcite (the engine that ultimately decides on the SQL plan) developing a slightly different syntax for the inputs that Beam SQL does not treat properly.

I am not well-versed in the Beam SQL framework itself, so it is difficult to say what needs to be fixed on that end, but as far as your issue, we just need to "trick" Beam SQL into developing the same Beam Plan.

Using our example above, I accomplished that by changing the query the following:

select t.event as event_name, t.c from (select event, count(*) as c from PCOLLECTION group by event) as t

This resulted in the desired Beam Plan:

INFO: BEAMPlan>
BeamCalcRel(expr#0..1=[{inputs}], proj#0..1=[{exprs}])
  BeamAggregationRel(group=[{0}], c=[COUNT()])
    BeamIOSourceRel(table=[[beam, PCOLLECTION]])

Essentially, the workaround is to nest the aggregation select event, count(*) as c from PCOLLECTION group by event in a select statement that does the column aliasing. This should prevent the need to select all the columns (resulting in null values) while preserving the expected Beam plan.

Polber commented 6 months ago

@kennknowles Tagging you to see if you know why BeamCalcRel(expr#0..1=[{inputs}], proj#0..1=[{exprs}]) is dropped from the BEAMPlan when not all input columns are consumed on aggregations.

kennknowles commented 6 months ago

I think you are right that it needs a Calc or Project in order to change the name. The reason it is dropped is probably that it is a noop except for the alias.

The rules we have implemented to convert Logical relational expressions to BeamRel are at https://github.com/apache/beam/tree/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule

We also toss in a large standard set of optimization rules that come with Calcite. There could be a bug in one of them that drops the alias.

brachipa commented 6 months ago

It looks more like a bug, I tried to debug it and all the time I see the alias until it dropped at some unknown point. Which I couldn't identify.

In the examples above, same query, behaves differently, depends on the columns exist in the schema.

Regarding the workaround, I wonder what will be the impact on the cpu utilization, we have huge scale and many different queries

Also, this workaround applies on this specific query, and we have many different queries provided by different teams, It doesn't scale to examine each query case and try find workaround for any query for any change. ( Even working query can break by some minor change)

I hope this can be addressed and fixed.

kennknowles commented 6 months ago

First step is I want to figure out if it is a Beam bug or a Calcite bug. I expect it to be a Beam bug. A likely source of the problem would be something like https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamAggregateProjectMergeRule.java

This is a rule that merges aggregation and projects, which seems to be what happens here. We have our own version of the rule because Beam has special projection pushdown into IOs. Just as an example of how I am thinking about debugging this. I will now read that file and report back if I see anything obvious.

kennknowles commented 6 months ago

It has been a few years, but I believe you can somehow turn on detailed debugging of Calcite which will log every rule applied. It takes some work to understand the output but it is just rewrites of those same outputs that you have pasted above. You could see exactly when it disappears and which rule caused it.

brachipa commented 6 months ago

Ok, I think I find what cause it. calcite checks if expression node is equal to row fields https://github.com/apache/calcite/blob/dec167ac18272c0cd8be477d6b162d7a31a62114/core/src/main/java/org/apache/calcite/tools/RelBuilder.java#L2068C5-L2069C63 using RexUtil.isIdentity method:

 public static boolean isIdentity(List<? extends RexNode> exps,
      RelDataType inputRowType) {
    return inputRowType.getFieldCount() == exps.size()
        && containIdentity(exps, inputRowType, Litmus.IGNORE);
  }

This is the problematic part inputRowType.getFieldCount() == exps.size()

In the failing example, we have row with wider schema from what we actually select. most of our schemas has much more data from what we select, identifying the row as not identical row causes it to create a project with fields as they appear in the select , meaning with their alias and not with their origin field name.

And then it is ignored in the "rename" method later on https://github.com/apache/calcite/blob/dec167ac18272c0cd8be477d6b162d7a31a62114/core/src/main/java/org/apache/calcite/tools/RelBuilder.java#L2130

and alias is skipped

https://github.com/apache/calcite/blob/dec167ac18272c0cd8be477d6b162d7a31a62114/core/src/main/java/org/apache/calcite/tools/RelBuilder.java#L2142

I believe the isIdentity check can cause more issues, and we must understand why this is enforced? isn't it valid to have different size of fields in select from what we have in the schema?

In our case we have one big row and we run on it different queries, each with different fields in the select.

brachipa commented 6 months ago

I also tried run calcite (same version as beam uses) unit test with the same query and it works fine, with my fix and without my fix. I think it sounds like an issue in beam added in org.apache.calcite.test.JdbcTest

  @Test void testSimple() {
    final String sql = "select \"name\" as n1, count(*) as c\n"
        + "from \"hr\".\"emps\" group by \"name\"";
    CalciteAssert.that()
        .with(CalciteAssert.Config.REGULAR)
        .query(sql)
        .returns("N1=Theodore; C=1\nN1=Eric; C=1\nN1=Bill; C=1\nN1=Sebastian; C=1\n");
  }
kennknowles commented 6 months ago

Wow nice detective work. If we still see it in Beam but not Calcite then it must be one of our optimization rules. I would look at the debug trace for when it goes wrong. Or with a short test like that, maybe just brute force removing one rule at a time. Or remove all the optimization rules and add them back one at a time.

brachipa commented 6 months ago

Yes, removing BeamAggregateProjectMergeRule helps. same for keeping this rule and fixing the isIdentity The rule for itself is valid, but because of the calcite issue with the "isIdentity" the rule optimization gives us an invalid column name.

Fix can be in beam side or in calcite side....

liferoad commented 6 months ago

@brachipa would you like to do a PR on Beam to give it a try?

brachipa commented 6 months ago

opened a PR (with my personal user) FYI

kennknowles commented 6 months ago

Thank you!!