itinycheng / flink-connector-clickhouse

Flink SQL connector for ClickHouse. Support ClickHouseCatalog and read/write primary data, maps, arrays to clickhouse.
Apache License 2.0
346 stars 148 forks source link

`select count(field) from table` failed when using flink table api #95

Closed KarlManong closed 10 months ago

KarlManong commented 10 months ago

错误

java.lang.RuntimeException: Failed to fetch next result

    at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
    at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
    at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
    at org.apache.flink.table.utils.print.TableauStyle.print(TableauStyle.java:120)
    at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:153)
    at org.apache.flink.test.ClickHouseDynamicTableTest.testSink(ClickHouseDynamicTableTest.java:84)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:725)
    at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
    at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
    at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:214)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:210)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:135)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:66)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
    at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
    at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
    at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:57)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
    at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
    at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:232)
    at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:55)
Caused by: java.io.IOException: Failed to fetch job execution result
    at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:184)
    at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:121)
    at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
    ... 74 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022)
    at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:182)
    ... 76 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
    at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
    at java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:680)
    at java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:658)
    at java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2094)
    at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:138)
    at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:181)
    ... 76 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:256)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:247)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:240)
    at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:738)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:715)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:477)
    at jdk.internal.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:537)
    at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
    at akka.actor.ActorCell.invoke(ActorCell.scala:548)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    at akka.dispatch.Mailbox.run(Mailbox.scala:231)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: java.lang.IllegalArgumentException: open() failed.ClickHouse exception, code: 1002, host: localhost, port: 59296; Code: 62. DB::Exception: Syntax error: failed at position 14 ('`default`'): `default`.`clickhouse_test_table` . Expected one of: token, Dot, OR, AND, IS NULL, IS NOT NULL, BETWEEN, NOT BETWEEN, LIKE, ILIKE, NOT LIKE, NOT ILIKE, REGEXP, IN, NOT IN, GLOBAL IN, GLOBAL NOT IN, MOD, DIV, alias, AS, Comma, FROM, PREWHERE, WHERE, GROUP BY, WITH, HAVING, WINDOW, ORDER BY, LIMIT, OFFSET, FETCH, SETTINGS, UNION, EXCEPT, INTERSECT, INTO OUTFILE, FORMAT, end of query. (SYNTAX_ERROR) (version 23.7.4.5 (official build))

    at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchInputFormat.open(ClickHouseBatchInputFormat.java:91)
    at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:84)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333)
Caused by: ru.yandex.clickhouse.except.ClickHouseUnknownException: ClickHouse exception, code: 1002, host: localhost, port: 59296; Code: 62. DB::Exception: Syntax error: failed at position 14 ('`default`'): `default`.`clickhouse_test_table` . Expected one of: token, Dot, OR, AND, IS NULL, IS NOT NULL, BETWEEN, NOT BETWEEN, LIKE, ILIKE, NOT LIKE, NOT ILIKE, REGEXP, IN, NOT IN, GLOBAL IN, GLOBAL NOT IN, MOD, DIV, alias, AS, Comma, FROM, PREWHERE, WHERE, GROUP BY, WITH, HAVING, WINDOW, ORDER BY, LIMIT, OFFSET, FETCH, SETTINGS, UNION, EXCEPT, INTERSECT, INTO OUTFILE, FORMAT, end of query. (SYNTAX_ERROR) (version 23.7.4.5 (official build))

    at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.getException(ClickHouseExceptionSpecifier.java:92)
    at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:56)
    at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:29)
    at ru.yandex.clickhouse.ClickHouseStatementImpl.checkForErrorAndThrow(ClickHouseStatementImpl.java:1094)
    at ru.yandex.clickhouse.ClickHouseStatementImpl.getInputStream(ClickHouseStatementImpl.java:773)
    at ru.yandex.clickhouse.ClickHouseStatementImpl.executeQueryStatement(ClickHouseStatementImpl.java:271)
    at ru.yandex.clickhouse.ClickHousePreparedStatementImpl.executeQuery(ClickHousePreparedStatementImpl.java:143)
    at org.apache.flink.connector.clickhouse.internal.ClickHouseBatchInputFormat.open(ClickHouseBatchInputFormat.java:88)
    ... 4 more
Caused by: java.lang.Throwable: Code: 62. DB::Exception: Syntax error: failed at position 14 ('`default`'): `default`.`clickhouse_test_table` . Expected one of: token, Dot, OR, AND, IS NULL, IS NOT NULL, BETWEEN, NOT BETWEEN, LIKE, ILIKE, NOT LIKE, NOT ILIKE, REGEXP, IN, NOT IN, GLOBAL IN, GLOBAL NOT IN, MOD, DIV, alias, AS, Comma, FROM, PREWHERE, WHERE, GROUP BY, WITH, HAVING, WINDOW, ORDER BY, LIMIT, OFFSET, FETCH, SETTINGS, UNION, EXCEPT, INTERSECT, INTO OUTFILE, FORMAT, end of query. (SYNTAX_ERROR) (version 23.7.4.5 (official build))

    at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:54)
    ... 10 more

简单的测试代码

package org.apache.flink.test;

import org.junit.jupiter.api.BeforeEach;
import org.testcontainers.containers.ClickHouseContainer;
import ru.yandex.clickhouse.ClickHouseDataSource;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;

import static java.util.Objects.requireNonNull;

public abstract class ClickhouseSetup {

    protected static final String USERNAME = "default";
    protected static final String PASSWORD = "";
    protected static final String DATABASE = "default";
    protected static final String TABLE_NAME = "clickhouse_test_table";

    private ClickHouseContainer clickHouseContainer;

    /**
     * current has some issue
     *
     * @see <a href='https://github.com/testcontainers/testcontainers-java/commit/fb2fbbcff5040b9eee35b2322ac63171081a84a7'>commit</a>
     */
    @BeforeEach
    public void setup() {
        clickHouseContainer = new ClickHouseContainer(
                "clickhouse/clickhouse-server:23")
                .withEnv("CLICKHOUSE_USER", USERNAME)
                .withEnv("CLICKHOUSE_DB", DATABASE)
                .withEnv("CLICKHOUSE_PASSWORD", PASSWORD)
                .withEnv("CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT", "1");
        clickHouseContainer.start();
        try {
            javax.sql.DataSource dataSource = new ClickHouseDataSource(
                    clickHouseContainer.getJdbcUrl());
            try (Connection connection = dataSource.getConnection()) {
                Statement statement = connection.createStatement();
                statement.execute(initialSql());
            }
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    protected abstract String initialSql();

    protected ClickHouseContainer getClickHouseContainer() {
        return requireNonNull(clickHouseContainer);
    }
}
package org.apache.flink.test;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;

import org.junit.jupiter.api.Test;

import java.util.concurrent.ExecutionException;

public class ClickHouseDynamicTableTest extends ClickhouseSetup {
    @Test
    public void testSink() throws ExecutionException, InterruptedException {
        TableEnvironment tableEnvironment = TableEnvironment.create(new Configuration());

        String jdbcUrl = getClickHouseContainer().getJdbcUrl().substring(5);

        tableEnvironment.executeSql(String.format("CREATE TABLE t_source (\n" +
                "    `user_id` BIGINT,\n" +
                "    `user_type` INTEGER,\n" +
                "    `language` STRING,\n" +
                "    `country` STRING,\n" +
                "    `gender` STRING,\n" +
                "    `score` DOUBLE,\n" +
                "    `list` ARRAY<STRING>,\n" +
                "    `map` Map<STRING, BIGINT>,\n" +
                "    PRIMARY KEY (`user_id`) NOT ENFORCED\n" +
                ") WITH (\n" +
                "    'connector' = 'clickhouse',\n" +
                "    'url' = '%s',\n" +
                "    'username'='default',\n" +
                "    'password'='', \n" +
                "    'database-name' = 'default',\n" +
                "    'table-name' = 'clickhouse_test_table',\n" +
                "    'sink.batch-size' = '500',\n" +
                "    'sink.flush-interval' = '1000',\n" +
                "    'sink.max-retries' = '3'\n" +
                ");\n", jdbcUrl));

        TableResult tableResult = tableEnvironment.executeSql("select count(user_id) from t_source");
        tableResult.print();
    }

    @Override
    protected String initialSql() {
        return "CREATE TABLE clickhouse_test_table (\n" +
                "    `user_id` BIGINT,\n" +
                "    `user_type` Integer,\n" +
                "    `language` String,\n" +
                "    `country` String,\n" +
                "    `gender` String,\n" +
                "    `score` Double,\n" +
                "    `list` Array(String),\n" +
                "    `map` Map(String, BIGINT),\n" +
                "    PRIMARY KEY (`user_id`)\n" +
                ") ENGINE = MergeTree \n";
    }
}
itinycheng commented 10 months ago

@KarlManong Refer to: https://github.com/itinycheng/flink-connector-clickhouse/pull/65

KarlManong commented 10 months ago

@KarlManong Refer to: #65

@itinycheng 切换到flink1.17后,结果变了详情,而不是count值

+----+----------------------+
| op |               EXPR$0 |
+----+----------------------+
| +I |                    1 |
| -U |                    1 |
| +U |                    2 |
| -U |                    2 |
| +U |                    3 |
| -U |                    3 |
| +U |                    4 |
| -U |                    4 |
| +U |                    5 |
| -U |                    5 |
| +U |                    6 |
| -U |                    6 |
| +U |                    7 |
| -U |                    7 |
| +U |                    8 |
| -U |                    8 |
| +U |                    9 |
| -U |                    9 |
| +U |                   10 |
| -U |                   10 |
| +U |                   11 |
| -U |                   11 |
| +U |                   12 |
| -U |                   12 |
| +U |                   13 |
| -U |                   13 |
| +U |                   14 |
| -U |                   14 |
| +U |                   15 |
| -U |                   15 |
| +U |                   16 |
| -U |                   16 |
| +U |                   17 |
| -U |                   17 |
| +U |                   18 |
| -U |                   18 |
| +U |                   19 |
| -U |                   19 |
| +U |                   20 |
| -U |                   20 |
| +U |                   21 |
| -U |                   21 |
| +U |                   22 |
| -U |                   22 |
| +U |                   23 |
| -U |                   23 |
| +U |                   24 |
| -U |                   24 |
| +U |                   25 |
| -U |                   25 |
| +U |                   26 |
| -U |                   26 |
| +U |                   27 |
| -U |                   27 |
| +U |                   28 |
| -U |                   28 |
| +U |                   29 |
| -U |                   29 |
| +U |                   30 |
| -U |                   30 |
| +U |                   31 |
| -U |                   31 |
| +U |                   32 |
| -U |                   32 |
| +U |                   33 |
| -U |                   33 |
| +U |                   34 |
| -U |                   34 |
| +U |                   35 |
| -U |                   35 |
| +U |                   36 |
| -U |                   36 |
| +U |                   37 |
| -U |                   37 |
| +U |                   38 |
| -U |                   38 |
| +U |                   39 |
| -U |                   39 |
| +U |                   40 |
| -U |                   40 |
| +U |                   41 |
| -U |                   41 |
| +U |                   42 |
| -U |                   42 |
| +U |                   43 |
| -U |                   43 |
| +U |                   44 |
| -U |                   44 |
| +U |                   45 |
| -U |                   45 |
| +U |                   46 |
| -U |                   46 |
| +U |                   47 |
| -U |                   47 |
| +U |                   48 |
| -U |                   48 |
| +U |                   49 |
| -U |                   49 |
| +U |                   50 |
| -U |                   50 |
| +U |                   51 |
| -U |                   51 |
| +U |                   52 |
| -U |                   52 |
| +U |                   53 |
| -U |                   53 |
| +U |                   54 |
| -U |                   54 |
| +U |                   55 |
| -U |                   55 |
| +U |                   56 |
| -U |                   56 |
| +U |                   57 |
| -U |                   57 |
| +U |                   58 |
| -U |                   58 |
| +U |                   59 |
| -U |                   59 |
| +U |                   60 |
| -U |                   60 |
| +U |                   61 |
| -U |                   61 |
| +U |                   62 |
| -U |                   62 |
| +U |                   63 |
| -U |                   63 |
| +U |                   64 |
| -U |                   64 |
| +U |                   65 |
| -U |                   65 |
| +U |                   66 |
| -U |                   66 |
| +U |                   67 |
| -U |                   67 |
| +U |                   68 |
| -U |                   68 |
| +U |                   69 |
| -U |                   69 |
| +U |                   70 |
| -U |                   70 |
| +U |                   71 |
| -U |                   71 |
| +U |                   72 |
| -U |                   72 |
| +U |                   73 |
| -U |                   73 |
| +U |                   74 |
| -U |                   74 |
| +U |                   75 |
| -U |                   75 |
| +U |                   76 |
| -U |                   76 |
| +U |                   77 |
| -U |                   77 |
| +U |                   78 |
| -U |                   78 |
| +U |                   79 |
| -U |                   79 |
| +U |                   80 |
| -U |                   80 |
| +U |                   81 |
| -U |                   81 |
| +U |                   82 |
| -U |                   82 |
| +U |                   83 |
| -U |                   83 |
| +U |                   84 |
| -U |                   84 |
| +U |                   85 |
| -U |                   85 |
| +U |                   86 |
| -U |                   86 |
| +U |                   87 |
| -U |                   87 |
| +U |                   88 |
| -U |                   88 |
| +U |                   89 |
| -U |                   89 |
| +U |                   90 |
| -U |                   90 |
| +U |                   91 |
| -U |                   91 |
| +U |                   92 |
| -U |                   92 |
| +U |                   93 |
| -U |                   93 |
| +U |                   94 |
| -U |                   94 |
| +U |                   95 |
| -U |                   95 |
| +U |                   96 |
| -U |                   96 |
| +U |                   97 |
| -U |                   97 |
| +U |                   98 |
| -U |                   98 |
| +U |                   99 |
| -U |                   99 |
| +U |                  100 |
+----+----------------------+
KarlManong commented 10 months ago

flink default behaviors, close this issue

itinycheng commented 10 months ago

@KarlManong Moreover, the connector doesn't support the count function pushdown; For details, refer to: https://github.com/itinycheng/flink-connector-clickhouse/blob/master/src/main/java/org/apache/flink/connector/clickhouse/util/FilterPushDownHelper.java