apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.77k stars 4.21k forks source link

OVER clause converted from ZetaSQL to Logical SQL plan #20965

Open damccorm opened 2 years ago

damccorm commented 2 years ago

The OVER clause isn't supported by our ZetaSQL to Calcite translator. It can be trivially enabled in the parser with the example below, but there is some work required to convert the parsed ZetaSQL proto to Calcite logical operators (mostly in AggregateScanConverter).

This is the "over clause" TODO here: https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/AggregateScanConverter.java#L147


a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java
+++
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java
@@
-144,6 +144,7 @@ public class SqlAnalyzer {
         .setEnabledLanguageFeatures(
             new
HashSet<>(
                 Arrays.asList(
+                    LanguageFeature.FEATURE_ANALYTIC_FUNCTIONS,

                    LanguageFeature.FEATURE_NUMERIC_TYPE,
                     LanguageFeature.FEATURE_DISALLOW_GROUP_BY_FLOAT,

                    LanguageFeature.FEATURE_V_1_2_CIVIL_TIME,
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
index
33889f34884..fd107ac5721 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
+++
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
@@
-3461,6 +3461,16 @@ public class ZetaSqlDialectSpecTest extends ZetaSqlTestBase {
     zetaSQLQueryPlanner.convertToBeamRel(sql);

  }

+  @Test
+  public void testAnalyticOver() {
+    String sql = "select sum(Key) over () From
KeyValue";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+  
 thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage("Does not support sub-queries");
+
   zetaSQLQueryPlanner.convertToBeamRel(sql);
+  }
+
   @Test
   public void testSubstr() {

  String sql = "SELECT substr(@p0, @p1, @p2)"; 

Current state the test fails:


java.lang.UnsupportedOperationException: Conversion of RESOLVED_ANALYTIC_SCAN is not supported    

        at org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.getConverterRule(QueryStatementConverter.java:108
)

        at org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.convertNode(QueryStatementConverter.java:99)

        at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)

        at java.base/java.util.Collections$2.tryAdvance(Collections.java:4756)

        at java.base/java.util.Collections$2.forEachRemaining(Collections.java:4764)

        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)

        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)

        at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)

        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)

        at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)

        at org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.convertNode(QueryStatementConverter.java:101)

        at org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.convert(QueryStatementConverter.java:89)

        at org.apache.beam.sdk.extensions.sql.zetasql.translation.QueryStatementConverter.convertRootQuery(QueryStatementConverter.java:55)

       at org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLPlannerImpl.rel(ZetaSQLPlannerImpl.java:98)

        at org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRelInternal(ZetaSQLQueryPlanner.java:313)

        at org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRel(ZetaSQLQueryPlanner.java:301)

        at org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRel(ZetaSQLQueryPlanner.java:285)

        at org.apache.beam.sdk.extensions.sql.zetasql.ZetaSqlDialectSpecTest.testAnalyticOver(ZetaSqlDialectSpecTest.java:3471)

Imported from Jira BEAM-12097. Original Jira may contain additional context. Reported by: apilloud.

damccorm commented 2 years ago

Unable to assign user @roger-mike. If able, self-assign, otherwise tag @damccorm so that he can assign you. Because of GitHub's spam prevention system, your activity is required to enable assignment in this repo.

roger-mike commented 2 years ago

Hi @damccorm, could you assign this to me? Thanks

damccorm commented 2 years ago

Sure! With https://github.com/apache/beam/pull/21719 merged you should be able to self assign with the command .take-issue though, so you should be able to do that going forward! Let me know if you run into any issues with that