ververica / flink-jdbc-driver

Apache License 2.0
129 stars 61 forks source link

Invalid SQL query #2

Closed likaiqing closed 4 years ago

likaiqing commented 4 years ago

hi,whether kafka source is suppoted with flink-jdbc-driver,below is my code and exception message,can give me some prompt. pom:

org.apache.flink flink-connector-kafka_${scala.version} ${flink.version} com.ververica flink-jdbc-driver 0.1-SNAPSHOT

code: public static void main(String[] args) throws SQLException { try { Class.forName("com.ververica.flink.table.jdbc.FlinkDriver"); } catch (ClassNotFoundException e) { e.printStackTrace(); } Connection connection = DriverManager.getConnection("jdbc:flink://localhost:8083?planner=blink"); Statement statement = connection.createStatement(); statement.executeUpdate("CREATE TABLE user_behavior (\n" + " userId BIGINT,\n" + " itemId BIGINT,\n" + " categoryId BIGINT,\n" + " behavior STRING,\n" + " ts TIMESTAMP\n" + ") WITH (\n" + " 'connector.type' = 'kafka',\n" + " 'connector.version' = 'universal',\n" + " 'connector.topic' = 'user_behavior',\n" + " 'connector.startup-mode' = 'earliest-offset',\n" + " 'connector.properties.zookeeper.connect' = 'localhost:2181',\n" + " 'connector.properties.bootstrap.servers' = 'localhost:9092',\n" + " 'format.type' = 'json'\n" + ")"); ResultSet resultSet = statement.executeQuery("select userId,itemId from user_behavior"); while (resultSet.next()) { System.out.println(resultSet.getLong(1) + ", " + resultSet.getLong(2)); } statement.close(); connection.close(); }

Exception: Exception in thread "main" java.sql.SQLException: Failed to submit statement select userId,itemId from user_behavior to server at com.ververica.flink.table.jdbc.rest.SessionClient.submitStatement(SessionClient.java:160) at com.ververica.flink.table.jdbc.rest.SessionClient.submitStatement(SessionClient.java:145) at com.ververica.flink.table.jdbc.FlinkStatement$AtomicResultSetStatements.runNext(FlinkStatement.java:513) at com.ververica.flink.table.jdbc.FlinkStatement.execute(FlinkStatement.java:185) at com.ververica.flink.table.jdbc.FlinkStatement.executeQuery(FlinkStatement.java:85) at com.xiaomi.bigdata.learn.sql.jdbc.Sample.main(Sample.java:70) Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side: com.ververica.flink.table.gateway.SqlExecutionException: Invalid SQL query. at com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:253) at com.ververica.flink.table.gateway.operation.SelectOperation.execute(SelectOperation.java:89) ...... at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:

FlinkLogicalSink(name=[default_catalog.default_database._tmp_table_b27a218f0b104fc384d1a93b894cf2fc], fields=[userId, itemId]) +- FlinkLogicalCalc(select=[userId, itemId]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, user_behavior, source: [KafkaTableSource(userId, itemId, categoryId, behavior, ts)]]], fields=[userId, itemId, categoryId, behavior, ts])

This exception indicates that the query uses an unsupported SQL feature. Please check the documentation for the set of currently supported SQL features. at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:72) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62) ..... at com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:241) ... 46 more Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=BATCH_PHYSICAL, FlinkRelDistributionTraitDef=any, sort=[]. Missing conversion is FlinkLogicalTableSourceScan[convention: LOGICAL -> BATCH_PHYSICAL] There is 1 empty subset: rel#1712:Subset#3.BATCH_PHYSICAL.any.[], the relevant part of the original plan is as follows 1695:FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, user_behavior, source: [KafkaTableSource(userId, itemId, categoryId, behavior, ts)]]], fields=[userId, itemId, categoryId, behavior, ts])

Root: rel#1708:Subset#5.BATCH_PHYSICAL.any.[] Original rel: FlinkLogicalSink(subset=[rel#1688:Subset#2.LOGICAL.any.[]], name=[default_catalog.default_database._tmp_table_b27a218f0b104fc384d1a93b894cf2fc], fields=[userId, itemId]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1691 ..... Sets: Set#3, type: RecordType(BIGINT userId, BIGINT itemId, BIGINT categoryId, VARCHAR(2147483647) behavior, TIMESTAMP(6) ts) rel#1703:Subset#3.LOGICAL.any.[], best=rel#1695, importance=0.7290000000000001 rel#1695:FlinkLogicalTableSourceScan.LOGICAL.any.[](table=[default_catalog, default_database, user_behavior, source: [KafkaTableSource(userId, itemId, categoryId, behavior, ts)]],fields=userId, itemId, categoryId, behavior, ts), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 4.8E9 io, 0.0 network, 0.0 memory} rel#1712:Subset#3.BATCH_PHYSICAL.any.[], best=null, importance=0.81 Set#4, type: RecordType(BIGINT userId, BIGINT itemId) rel#1705:Subset#4.LOGICAL.any.[], best=rel#1704, importance=0.81 rel#1704:FlinkLogicalCalc.LOGICAL.any.[](input=RelSubset#1703,select=userId, itemId), rowcount=1.0E8, cumulative cost={2.0E8 rows, 1.0E8 cpu, 4.8E9 io, 0.0 network, 0.0 memory} rel#1710:Subset#4.BATCH_PHYSICAL.any.[], best=null, importance=0.9 rel#1713:BatchExecCalc.BATCH_PHYSICAL.any.[](input=RelSubset#1712,select=userId, itemId), rowcount=1.0E8, cumulative cost={inf} Set#5, type: RecordType:peek_no_expand(BIGINT userId, BIGINT itemId) rel#1707:Subset#5.LOGICAL.any.[], best=rel#1706, importance=0.9 rel#1706:FlinkLogicalSink.LOGICAL.any.[](input=RelSubset#1705,name=default_catalog.default_database._tmp_table_b27a218f0b104fc384d1a93b894cf2fc,fields=userId, itemId), rowcount=1.0E8, cumulative cost={3.0E8 rows, 2.0E8 cpu, 4.8E9 io, 0.0 network, 0.0 memory} rel#1708:Subset#5.BATCH_PHYSICAL.any.[], best=null, importance=1.0 rel#1709:AbstractConverter.BATCH_PHYSICAL.any., rowcount=1.0E8, cumulative cost={inf} rel#1711:BatchExecSink.BATCH_PHYSICAL.any.[](input=RelSubset#1710,name=default_catalog.default_database._tmp_table_b27a218f0b104fc384d1a93b894cf2fc,fields=userId, itemId), rowcount=1.0E8, cumulative cost={inf}

Graphviz: digraph G { root [style=filled,label="Root"]; subgraph cluster3{ label="Set 3 RecordType(BIGINT userId, BIGINT itemId, BIGINT categoryId, VARCHAR(2147483647) behavior, TIMESTAMP(6) ts)"; rel1695 [label="rel#1695:FlinkLogicalTableSourceScan\ntable=[default_catalog, default_database, user_behavior, source: [KafkaTableSource(userId, itemId, categoryId, behavior, ts)]],fields=userId, itemId, categoryId, behavior, ts\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 4.8E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box] subset1703 [label="rel#1703:Subset#3.LOGICAL.any.[]"] subset1712 [label="rel#1712:Subset#3.BATCH_PHYSICAL.any.[]",color=red] } subgraph cluster4{ label="Set 4 RecordType(BIGINT userId, BIGINT itemId)"; rel1704 [label="rel#1704:FlinkLogicalCalc\ninput=RelSubset#1703,select=userId, itemId\nrows=1.0E8, cost={2.0E8 rows, 1.0E8 cpu, 4.8E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box] rel1713 [label="rel#1713:BatchExecCalc\ninput=RelSubset#1712,select=userId, itemId\nrows=1.0E8, cost={inf}",shape=box] subset1705 [label="rel#1705:Subset#4.LOGICAL.any.[]"] subset1710 [label="rel#1710:Subset#4.BATCH_PHYSICAL.any.[]"] } subgraph cluster5{ label="Set 5 RecordType:peek_no_expand(BIGINT userId, BIGINT itemId)"; rel1706 [label="rel#1706:FlinkLogicalSink\ninput=RelSubset#1705,name=default_catalog.default_database._tmp_table_b27a218f0b104fc384d1a93b894cf2fc,fields=userId, itemId\nrows=1.0E8, cost={3.0E8 rows, 2.0E8 cpu, 4.8E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box] rel1709 [label="rel#1709:AbstractConverter\ninput=RelSubset#1707,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=any,sort=[]\nrows=1.0E8, cost={inf}",shape=box] rel1711 [label="rel#1711:BatchExecSink\ninput=RelSubset#1710,name=default_catalog.default_database._tmp_table_b27a218f0b104fc384d1a93b894cf2fc,fields=userId, itemId\nrows=1.0E8, cost={inf}",shape=box] subset1707 [label="rel#1707:Subset#5.LOGICAL.any.[]"] subset1708 [label="rel#1708:Subset#5.BATCH_PHYSICAL.any.[]"] } root -> subset1708; subset1703 -> rel1695[color=blue]; subset1705 -> rel1704[color=blue]; rel1704 -> subset1703[color=blue]; subset1710 -> rel1713; rel1713 -> subset1712; subset1707 -> rel1706[color=blue]; rel1706 -> subset1705[color=blue]; subset1708 -> rel1709; rel1709 -> subset1707; subset1708 -> rel1711; rel1711 -> subset1710; } at org.apache.calcite.plan.volcano.RelSubset$CheapestPlanReplacer.visit(RelSubset.java:587) at org.apache.calcite.plan.volcano.RelSubset.buildCheapestPlan(RelSubset.java:304) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:647) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) ... 74 more

End of exception on server side>] at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1887) at com.ververica.flink.table.jdbc.rest.SessionClient.submitStatement(SessionClient.java:158) ... 5 more Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side: com.ververica.flink.table.gateway.SqlExecutionException: Invalid SQL query. at com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:253) .... at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query:

FlinkLogicalSink(name=[default_catalog.default_database._tmp_table_b27a218f0b104fc384d1a93b894cf2fc], fields=[userId, itemId]) +- FlinkLogicalCalc(select=[userId, itemId]) +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, user_behavior, source: [KafkaTableSource(userId, itemId, categoryId, behavior, ts)]]], fields=[userId, itemId, categoryId, behavior, ts])

This exception indicates that the query uses an unsupported SQL feature. Please check the documentation for the set of currently supported SQL features. at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:72) ..... at com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:241) ... 46 more Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=BATCH_PHYSICAL, FlinkRelDistributionTraitDef=any, sort=[]. Missing conversion is FlinkLogicalTableSourceScan[convention: LOGICAL -> BATCH_PHYSICAL] There is 1 empty subset: rel#1712:Subset#3.BATCH_PHYSICAL.any.[], the relevant part of the original plan is as follows 1695:FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, user_behavior, source: [KafkaTableSource(userId, itemId, categoryId, behavior, ts)]]], fields=[userId, itemId, categoryId, behavior, ts])

Root: rel#1708:Subset#5.BATCH_PHYSICAL.any.[] Original rel: FlinkLogicalSink(subset=[rel#1688:Subset#2.LOGICAL.any.[]], name=[default_catalog.default_database._tmp_table_b27a218f0b104fc384d1a93b894cf2fc], fields=[userId, itemId]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1691 FlinkLogicalCalc(subset=[rel#1690:Subset#1.LOGICAL.any.[]], select=[userId, itemId]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 1694 FlinkLogicalTableSourceScan(subset=[rel#1693:Subset#0.LOGICAL.any.[]], table=[[default_catalog, default_database, user_behavior, source: [KafkaTableSource(userId, itemId, categoryId, behavior, ts)]]], fields=[userId, itemId, categoryId, behavior, ts]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 4.8E9 io, 0.0 network, 0.0 memory}, id = 1695

Sets: Set#3, type: RecordType(BIGINT userId, BIGINT itemId, BIGINT categoryId, VARCHAR(2147483647) behavior, TIMESTAMP(6) ts) rel#1703:Subset#3.LOGICAL.any.[], best=rel#1695, importance=0.7290000000000001 rel#1695:FlinkLogicalTableSourceScan.LOGICAL.any.[](table=[default_catalog, default_database, user_behavior, source: [KafkaTableSource(userId, itemId, categoryId, behavior, ts)]],fields=userId, itemId, categoryId, behavior, ts), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 4.8E9 io, 0.0 network, 0.0 memory} rel#1712:Subset#3.BATCH_PHYSICAL.any.[], best=null, importance=0.81 Set#4, type: RecordType(BIGINT userId, BIGINT itemId) rel#1705:Subset#4.LOGICAL.any.[], best=rel#1704, importance=0.81 rel#1704:FlinkLogicalCalc.LOGICAL.any.[](input=RelSubset#1703,select=userId, itemId), rowcount=1.0E8, cumulative cost={2.0E8 rows, 1.0E8 cpu, 4.8E9 io, 0.0 network, 0.0 memory} rel#1710:Subset#4.BATCH_PHYSICAL.any.[], best=null, importance=0.9 rel#1713:BatchExecCalc.BATCH_PHYSICAL.any.[](input=RelSubset#1712,select=userId, itemId), rowcount=1.0E8, cumulative cost={inf} Set#5, type: RecordType:peek_no_expand(BIGINT userId, BIGINT itemId) rel#1707:Subset#5.LOGICAL.any.[], best=rel#1706, importance=0.9 rel#1706:FlinkLogicalSink.LOGICAL.any.[](input=RelSubset#1705,name=default_catalog.default_database._tmp_table_b27a218f0b104fc384d1a93b894cf2fc,fields=userId, itemId), rowcount=1.0E8, cumulative cost={3.0E8 rows, 2.0E8 cpu, 4.8E9 io, 0.0 network, 0.0 memory} rel#1708:Subset#5.BATCH_PHYSICAL.any.[], best=null, importance=1.0 rel#1709:AbstractConverter.BATCH_PHYSICAL.any., rowcount=1.0E8, cumulative cost={inf} rel#1711:BatchExecSink.BATCH_PHYSICAL.any.[](input=RelSubset#1710,name=default_catalog.default_database._tmp_table_b27a218f0b104fc384d1a93b894cf2fc,fields=userId, itemId), rowcount=1.0E8, cumulative cost={inf}

Graphviz: digraph G { root [style=filled,label="Root"]; subgraph cluster3{ label="Set 3 RecordType(BIGINT userId, BIGINT itemId, BIGINT categoryId, VARCHAR(2147483647) behavior, TIMESTAMP(6) ts)"; rel1695 [label="rel#1695:FlinkLogicalTableSourceScan\ntable=[default_catalog, default_database, user_behavior, source: [KafkaTableSource(userId, itemId, categoryId, behavior, ts)]],fields=userId, itemId, categoryId, behavior, ts\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 4.8E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box] subset1703 [label="rel#1703:Subset#3.LOGICAL.any.[]"] subset1712 [label="rel#1712:Subset#3.BATCH_PHYSICAL.any.[]",color=red] } subgraph cluster4{ label="Set 4 RecordType(BIGINT userId, BIGINT itemId)"; rel1704 [label="rel#1704:FlinkLogicalCalc\ninput=RelSubset#1703,select=userId, itemId\nrows=1.0E8, cost={2.0E8 rows, 1.0E8 cpu, 4.8E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box] rel1713 [label="rel#1713:BatchExecCalc\ninput=RelSubset#1712,select=userId, itemId\nrows=1.0E8, cost={inf}",shape=box] subset1705 [label="rel#1705:Subset#4.LOGICAL.any.[]"] subset1710 [label="rel#1710:Subset#4.BATCH_PHYSICAL.any.[]"] } subgraph cluster5{ label="Set 5 RecordType:peek_no_expand(BIGINT userId, BIGINT itemId)"; rel1706 [label="rel#1706:FlinkLogicalSink\ninput=RelSubset#1705,name=default_catalog.default_database._tmp_table_b27a218f0b104fc384d1a93b894cf2fc,fields=userId, itemId\nrows=1.0E8, cost={3.0E8 rows, 2.0E8 cpu, 4.8E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box] rel1709 [label="rel#1709:AbstractConverter\ninput=RelSubset#1707,convention=BATCH_PHYSICAL,FlinkRelDistributionTraitDef=any,sort=[]\nrows=1.0E8, cost={inf}",shape=box] rel1711 [label="rel#1711:BatchExecSink\ninput=RelSubset#1710,name=default_catalog.default_database._tmp_table_b27a218f0b104fc384d1a93b894cf2fc,fields=userId, itemId\nrows=1.0E8, cost={inf}",shape=box] subset1707 [label="rel#1707:Subset#5.LOGICAL.any.[]"] subset1708 [label="rel#1708:Subset#5.BATCH_PHYSICAL.any.[]"] } root -> subset1708; subset1703 -> rel1695[color=blue]; subset1705 -> rel1704[color=blue]; rel1704 -> subset1703[color=blue]; subset1710 -> rel1713; rel1713 -> subset1712; subset1707 -> rel1706[color=blue]; rel1706 -> subset1705[color=blue]; subset1708 -> rel1709; rel1709 -> subset1707; subset1708 -> rel1711; rel1711 -> subset1710; } at org.apache.calcite.plan.volcano.RelSubset$CheapestPlanReplacer.visit(RelSubset.java:587) at org.apache.calcite.plan.volcano.RelSubset.buildCheapestPlan(RelSubset.java:304) at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:647) at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327) at org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64) ... 74 more

End of exception on server side>] at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390) at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374) at org.apache.flink.runtime.rest.RestClient$$Lambda$19/96406857.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

godfreyhe commented 4 years ago

@likaiqing Thanks for reporting this. Currently flink-jdbc-driver only supports batch, the support for streaming is under discussion. You can try hive, csv, jdbc.

likaiqing commented 4 years ago

@likaiqing Thanks for reporting this. Currently flink-jdbc-driver only supports batch, the support for streaming is under discussion. You can try hive, csv, jdbc.

Thanks very mych, yes, i saw the source code with execution_type parameter in url. but, whether flink-sql-gate-way supports streaming

godfreyhe commented 4 years ago

flink-sql-gateway supports streaming, and users can submit streaming job through REST API in the future.