itinycheng / flink-connector-clickhouse

Flink SQL connector for ClickHouse. Support ClickHouseCatalog and read/write primary data, maps, arrays to clickhouse.
Apache License 2.0
363 stars 154 forks source link

The clickhouse exception stack can only be viewed in the taskmanager log #28

Closed qiunanx closed 2 years ago

qiunanx commented 2 years ago

执行sql code

ClickHouseExecutor#attemptExecuteBatch

    default void attemptExecuteBatch(ClickHousePreparedStatement stmt, int maxRetries)
            throws SQLException {
        for (int i = 0; i < maxRetries; i++) {
            try {
                stmt.executeBatch();
                return;
            } catch (Exception exception) {
                LOG.error("ClickHouse executeBatch error, retry times = {}", i, exception);
                try {
                    Thread.sleep(1000L * i);
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    throw new SQLException(
                            "Unable to flush; interrupted while doing another attempt", ex);
                }
            }
        }
        throw new SQLException(
                String.format(
                        "Attempt to execute batch failed, exhausted retry times = %d", maxRetries));
    }

flink ui exceptions 中没有具体报错信息

从报错日志我们只能查看到重试三次,同时任务会一直内部重启,从业务角度来看,他们一般只会去看 flink ui excetion。 image

查看taskmanager.log

假如运行在 yarn上,作业挂了,jobmanager 日志还查看不到报错信息。 image

qiunanx commented 2 years ago

修改

     default void attemptExecuteBatch(ClickHousePreparedStatement stmt, int maxRetries)
            throws SQLException {
        for (int i = 0; i <= maxRetries; i++) {
            try {
                stmt.executeBatch();
                return;
            } catch (Exception exception) {
                LOG.error("ClickHouse executeBatch error, retry times = {}", i, exception);
                if (i >= maxRetries) {
                    throw new SQLException(
                            String.format(
                                    "Attempt to execute batch failed, exhausted retry times = %d", maxRetries), exception);
                }
                try {
                    Thread.sleep(1000L * i);
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    throw new SQLException(
                            "Unable to flush; interrupted while doing another attempt", ex);
                }
            }
        }
    }
qiunanx commented 2 years ago

@itinycheng

itinycheng commented 2 years ago

@v-qiunan

收到了,谢谢您,确实是个问题; 已经修改,https://github.com/itinycheng/flink-connector-clickhouse/commit/8f482dd37b973d0bfd3f5dfec1bbb715337daf65 刚提交master,晚些合并分支;