ververica / flink-jdbc-driver

Apache License 2.0
129 stars 61 forks source link

Is this flink jdbc just support batch process? #9

Open zjw940419 opened 4 years ago

zjw940419 commented 4 years ago

mycode is :

public static void main(String[] args) throws Exception {
        Connection connection = DriverManager.getConnection("jdbc:flink://10.33.164.97:8083?planner=old");
        Statement statement = connection.createStatement();

        String sql = "CREATE TABLE user_behavior (\n" +
                "    user_id BIGINT,\n" +
                "    item_id BIGINT,\n" +
                "    category_id BIGINT,\n" +
                "    behavior STRING,\n" +
                "    ts TIMESTAMP(3)\n" +
                ") WITH (\n" +
                "    'connector.type' = 'kafka',\n" +
                "    'connector.version' = 'universal',\n" +
                "    'connector.topic' = 'user_behavior',\n" +
                "    'connector.startup-mode' = 'earliest-offset',\n" +
                "    'connector.properties.zookeeper.connect' = 'localhost:2181',\n" +
                "    'connector.properties.bootstrap.servers' = 'localhost:9094',\n" +
                "    'format.type' = 'json'\n" +
                ")";

        int i = statement.executeUpdate(sql);
        ResultSet rs = statement.executeQuery("SELECT * FROM user_behavior");
        while (rs.next()) {
            System.out.println(rs.getString(4));
        }
        statement.close();
        connection.close();
    }

But the result has some error.

End of exception on server side>]
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
    at com.ververica.flink.table.jdbc.rest.SessionClient.submitStatement(SessionClient.java:158)
    ... 5 more
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error., <Exception on server side:
com.ververica.flink.table.gateway.SqlExecutionException: Invalid SQL query.
    at com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:253)
    at com.ververica.flink.table.gateway.operation.SelectOperation.execute(SelectOperation.java:89)
    at com.ververica.flink.table.gateway.Session.runStatement(Session.java:84)
    at com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler.handleRequest(StatementExecuteHandler.java:81)
    at com.ververica.flink.table.gateway.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:77)
    at com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:178)
    at com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:75)
    at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
    at org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:110)
    at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:89)
    at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:54)
    at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
    at org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
    at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:174)
    at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:68)
    at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
    at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
    at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)
    at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)
    at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
    at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1421)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
    at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
    at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.TableException: Only BatchTableSource and InputFormatTableSource are supported in BatchTableEnvironment.
    at org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:118)
    at org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:306)
    at org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:281)
    at org.apache.flink.table.api.internal.BatchTableEnvImpl.writeToSink(BatchTableEnvImpl.scala:138)
    at org.apache.flink.table.api.internal.TableEnvImpl.insertInto(TableEnvImpl.scala:664)
    at org.apache.flink.table.api.internal.TableEnvImpl.insertInto(TableEnvImpl.scala:616)
    at org.apache.flink.table.api.internal.BatchTableEnvImpl.insertInto(BatchTableEnvImpl.scala:60)
    at org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:428)
    at com.ververica.flink.table.gateway.operation.SelectOperation.lambda$executeQueryInternal$0(SelectOperation.java:243)
    at com.ververica.flink.table.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:230)
    at com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:241)
    ... 46 more

End of exception on server side>]
    at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:390)
    at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:374)
    at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
    at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
    at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

So what am i doing wrong? or maybe it's just supported batch process,thanks~

egwada commented 4 years ago

Hi all,

I get the same issue when I try to query an existing table from the Flink-Sql-Training example: as already defined in the Flink-Sql-Gatway as mentioned on that blog post https://gethue.com/blog/sql-editor-for-apache-flink-sql/ and the conf file of the gateway (https://raw.githubusercontent.com/romainr/flink-sql-gateway/master/docs/demo/sql-gateway-defaults.yaml).

Could indicate please if a properties, a config parameter is needed somewhere ?

Regards