itinycheng / flink-connector-clickhouse

Flink SQL connector for ClickHouse. Support ClickHouseCatalog and read/write primary data, maps, arrays to clickhouse.
Apache License 2.0
373 stars 156 forks source link

java api 使用问题请教 #71

Closed zenvzenv closed 1 year ago

zenvzenv commented 1 year ago

我使用的是 Flink 1.13.2,flink-connector-clickhouse 1.13.2-SNAPSHOT,clickhouse 23.3.1.2823 我的代码如下:

public static void main(String[] args) {
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .inStreamingMode()
                .useBlinkPlanner()
                .build();
        TableEnvironment tEnv = TableEnvironment.create(settings);
        Map<String, String> props = new HashMap<>();
        props.put(ClickHouseConfig.DATABASE_NAME, "default");
        props.put(ClickHouseConfig.TABLE_NAME, "lineorder_flat");
        props.put(ClickHouseConfig.URL, "clickhouse://127.0.0.1:8123");
        props.put(ClickHouseConfig.USERNAME, "default");
        props.put(ClickHouseConfig.PASSWORD, "123456");
        props.put(ClickHouseConfig.SINK_FLUSH_INTERVAL, "30s");
        Catalog cHcatalog = new ClickHouseCatalog("clickhouse", props);
        tEnv.registerCatalog("clickhouse", cHcatalog);
        tEnv.useCatalog("clickhouse");
        TableResult tableResult = tEnv.executeSql("select count(1) from `default`.`lineorder_flat`");
        tableResult.collect().forEachRemaining(System.out::println);
    }

lineorder_flat 表已经事先在 clickhouse 中建好了,表里面也是有数据的。 select count(1) from default.lineorder_flat 这条语句在 sql 工具中能够运行。 select 1 能够正常执行返回结果。 但是运行程序报如下错误:

Exception in thread "main" java.lang.RuntimeException: Failed to fetch next result
    at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
    at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
    at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370)
    at java.util.Iterator.forEachRemaining(Iterator.java:115)
    at org.example.Main.main(Main.java:32)
Caused by: java.io.IOException: Failed to fetch job execution result
    at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:177)
    at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:120)
    at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
    ... 4 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
    at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175)
    ... 6 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
    at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
    at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
    at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
    at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
    at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:134)
    at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:174)
    ... 6 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
    at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:517)
    at akka.actor.Actor.aroundReceive$(Actor.scala:515)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalArgumentException: open() failed.ClickHouse exception, code: 1002, host: 127.0.0.1, port: 8123; Code: 62. DB::Exception: Syntax error: failed at position 14 ('`default`'): `default`.`lineorder_flat` . Expected one of: token, Dot, OR, AND, BETWEEN, NOT BETWEEN, LIKE, ILIKE, NOT LIKE, NOT ILIKE, REGEXP, IN, NOT IN, GLOBAL IN, GLOBAL NOT IN, MOD, DIV, IS NULL, IS NOT NULL, alias, AS, Comma, FROM, PREWHERE, WHERE, GROUP BY, WITH, HAVING, WINDOW, ORDER BY, LIMIT, OFFSET, SETTINGS, UNION, EXCEPT, INTERSECT, INTO OUTFILE, FORMAT, end of query. (SYNTAX_ERROR) (version 23.3.1.2823 (official build))

    at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchInputFormat.open(ClickHouseBatchInputFormat.java:91)
    at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:84)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
Caused by: ru.yandex.clickhouse.except.ClickHouseUnknownException: ClickHouse exception, code: 1002, host: 127.0.0.1, port: 8123; Code: 62. DB::Exception: Syntax error: failed at position 14 ('`default`'): `default`.`lineorder_flat` . Expected one of: token, Dot, OR, AND, BETWEEN, NOT BETWEEN, LIKE, ILIKE, NOT LIKE, NOT ILIKE, REGEXP, IN, NOT IN, GLOBAL IN, GLOBAL NOT IN, MOD, DIV, IS NULL, IS NOT NULL, alias, AS, Comma, FROM, PREWHERE, WHERE, GROUP BY, WITH, HAVING, WINDOW, ORDER BY, LIMIT, OFFSET, SETTINGS, UNION, EXCEPT, INTERSECT, INTO OUTFILE, FORMAT, end of query. (SYNTAX_ERROR) (version 23.3.1.2823 (official build))

    at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.getException(ClickHouseExceptionSpecifier.java:92)
    at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:56)
    at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:29)
    at ru.yandex.clickhouse.ClickHouseStatementImpl.checkForErrorAndThrow(ClickHouseStatementImpl.java:1094)
    at ru.yandex.clickhouse.ClickHouseStatementImpl.getInputStream(ClickHouseStatementImpl.java:773)
    at ru.yandex.clickhouse.ClickHouseStatementImpl.executeQueryStatement(ClickHouseStatementImpl.java:271)
    at ru.yandex.clickhouse.ClickHousePreparedStatementImpl.executeQuery(ClickHousePreparedStatementImpl.java:143)
    at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchInputFormat.open(ClickHouseBatchInputFormat.java:88)
    ... 4 more
Caused by: java.lang.Throwable: Code: 62. DB::Exception: Syntax error: failed at position 14 ('`default`'): `default`.`lineorder_flat` . Expected one of: token, Dot, OR, AND, BETWEEN, NOT BETWEEN, LIKE, ILIKE, NOT LIKE, NOT ILIKE, REGEXP, IN, NOT IN, GLOBAL IN, GLOBAL NOT IN, MOD, DIV, IS NULL, IS NOT NULL, alias, AS, Comma, FROM, PREWHERE, WHERE, GROUP BY, WITH, HAVING, WINDOW, ORDER BY, LIMIT, OFFSET, SETTINGS, UNION, EXCEPT, INTERSECT, INTO OUTFILE, FORMAT, end of query. (SYNTAX_ERROR) (version 23.3.1.2823 (official build))

    at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:54)
    ... 10 more

Process finished with exit code 1

还希望指点下~

itinycheng commented 1 year ago

@zenvzenv 这个看起来应该是flink没 projection column;把count(1)换成具count(column_name)应该就可以了; 类似问题详细可以关注:https://github.com/itinycheng/flink-connector-clickhouse/pull/65

zenvzenv commented 1 year ago

非常感谢你的回复 @itinycheng 按照你说的方式我对代码进行了修改,我将 count(1) 换成了 count(具体的字段),代码没有报错,但好像结果不是我预期的那样。修改后的代码如下所示:

 public static void main(String[] args) {
    EnvironmentSettings settings = EnvironmentSettings
            .newInstance()
            .inStreamingMode()
            .useBlinkPlanner()
            .build();
    TableEnvironment tEnv = TableEnvironment.create(settings);
    Map<String, String> props = new HashMap<>();
    props.put(ClickHouseConfig.DATABASE_NAME, "default");
    props.put(ClickHouseConfig.TABLE_NAME, "lineorder_flat");
    props.put(ClickHouseConfig.URL, "clickhouse://20.20.44.223:8123");
    props.put(ClickHouseConfig.USERNAME, "default");
    props.put(ClickHouseConfig.PASSWORD, "123456");
    props.put(ClickHouseConfig.SINK_FLUSH_INTERVAL, "30s");
    Catalog cHcatalog = new ClickHouseCatalog("clickhouse", props);
    tEnv.registerCatalog("clickhouse", cHcatalog);
    tEnv.useCatalog("clickhouse");
    System.out.println(System.currentTimeMillis());
    TableResult tableResult = tEnv.executeSql("select count(LO_ORDERKEY) from lineorder_flat");
    tableResult.print();
    System.out.println(System.currentTimeMillis());
}

返回的结果是:

| +U |             21393709 |
| -U |             21393709 |
| +U |             21393710 |

我理解的是应该是返回一个数字,这个返回结果有点看不懂。 还有就是读取速度有点慢,我的测试表里面有1亿条数据,上面的程序需要运行很久

itinycheng commented 1 year ago

非常感谢你的回复 @itinycheng 按照你说的方式我对代码进行了修改,我将 count(1) 换成了 count(具体的字段),代码没有报错,但好像结果不是我预期的那样。修改后的代码如下所示:

 public static void main(String[] args) {
    EnvironmentSettings settings = EnvironmentSettings
            .newInstance()
            .inStreamingMode()
            .useBlinkPlanner()
            .build();
    TableEnvironment tEnv = TableEnvironment.create(settings);
    Map<String, String> props = new HashMap<>();
    props.put(ClickHouseConfig.DATABASE_NAME, "default");
    props.put(ClickHouseConfig.TABLE_NAME, "lineorder_flat");
    props.put(ClickHouseConfig.URL, "clickhouse://20.20.44.223:8123");
    props.put(ClickHouseConfig.USERNAME, "default");
    props.put(ClickHouseConfig.PASSWORD, "123456");
    props.put(ClickHouseConfig.SINK_FLUSH_INTERVAL, "30s");
    Catalog cHcatalog = new ClickHouseCatalog("clickhouse", props);
    tEnv.registerCatalog("clickhouse", cHcatalog);
    tEnv.useCatalog("clickhouse");
    System.out.println(System.currentTimeMillis());
    TableResult tableResult = tEnv.executeSql("select count(LO_ORDERKEY) from lineorder_flat");
    tableResult.print();
    System.out.println(System.currentTimeMillis());
}

返回的结果是:

| +U |             21393709 |
| -U |             21393709 |
| +U |             21393710 |

我理解的是应该是返回一个数字,这个返回结果有点看不懂。 还有就是读取速度有点慢,我的测试表里面有1亿条数据,上面的程序需要运行很久

@zenvzenv 当前示例中count()是不会下推到ClickHouse的,在Flink端会有一个count() aggregator operator,传递给ClickHouse的SQL是select LO_ORDERKEY from lineorder_flat,返回数据在Flink中做count计算;

itinycheng commented 1 year ago

当前connector只实现一些filter push down,参考:https://github.com/itinycheng/flink-connector-clickhouse/blob/master/src/main/java/org/apache/flink/connector/clickhouse/util/FilterPushDownHelper.java

KarlManong commented 1 year ago

这个看起来应该是flink没 projection column;把count(1)换成具count(column_name)应该就可以了;

在table api的情况下,count(column_name) 依旧报错。生成的sql是 select from `database`.`table`