ClickHouse / clickhouse-kafka-connect

ClickHouse Kafka Connector
Apache License 2.0
141 stars 39 forks source link

ExtractTablesMapping's DESCRIBE query times out #184

Open hekike opened 10 months ago

hekike commented 10 months ago

Describe the bug

We see every 5-10 minutes, DESCRIBE queries timeout in Kafka Connect while they have 1ms execution_duration in system.query_log. We run our connector task with tableRefreshInterval=15. Based on system.query_log multiple extractTablesMapping overlap. I'm also seeing that SHOW TABLES queries shouldn't run less frequently than 15s runs. I'm wondering if we should add a check that doesn't start ExtractTablesMapping if it's already running.

Steps to reproduce

  1. Run a connector with tableRefreshInterval=15
  2. Generate test data into Kafka queue
  3. Observe Kafka Connect logs

Expected behavior

DESCRIBE query not to timeout.

Error log

8:23:28 AM  ERROR
Exception when running describeTable DESCRIBE TABLE `my_database`.`my_table`
connect timed out, server ClickHouseNode [uri=https://{redacted}.us-east-2.aws.clickhouse.cloud:8443/mydb, options={sslmode=STRICT}]@2074393920
com.clickhouse.client.ClickHouseException: connect timed out, server ClickHouseNode [uri=https://{redacted}.us-east-2.aws.clickhouse.cloud:8443/openmeter, options={sslmode=STRICT}]@-166211039
    at com.clickhouse.client.ClickHouseException.of(ClickHouseException.java:164)
    at com.clickhouse.client.AbstractClient.lambda$execute$0(AbstractClient.java:275)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: connect timed out
    at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
    at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
    at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
    at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.base/java.net.Socket.connect(Socket.java:609)
    at java.base/sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:305)
    at java.base/sun.net.NetworkClient.doConnect(NetworkClient.java:177)
    at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:509)
    at java.base/sun.net.www.http.HttpClient.openServer(HttpClient.java:604)
    at java.base/sun.net.www.protocol.https.HttpsClient.<init>(HttpsClient.java:266)
    at java.base/sun.net.www.protocol.https.HttpsClient.New(HttpsClient.java:373)
    at java.base/sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.getNewHttpClient(AbstractDelegateHttpsURLConnection.java:207)
    at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1187)
    at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:1081)
    at java.base/sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:193)
    at java.base/sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1367)
    at java.base/sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1342)
    at java.base/sun.net.www.protocol.https.HttpsURLConnectionImpl.getOutputStream(HttpsURLConnectionImpl.java:246)
    at com.clickhouse.client.http.HttpUrlConnectionImpl.post(HttpUrlConnectionImpl.java:225)
    at com.clickhouse.client.http.ClickHouseHttpClient.send(ClickHouseHttpClient.java:124)
    at com.clickhouse.client.AbstractClient.sendAsync(AbstractClient.java:161)
    at com.clickhouse.client.AbstractClient.lambda$execute$0(AbstractClient.java:273)

Configuration

{
  "connector.client.config.override.policy": "ALL",
  "consumer.auto.offset.reset": "latest",
  "consumer.override.auto.offset.reset": "latest",
  "consumer.override.max.poll.records": "5000",
  "database": "mydb",
  "errors.deadletterqueue.context.headers.enable": "true",
  "errors.deadletterqueue.topic.name": "om_deadletterqueue",
  "errors.deadletterqueue.topic.replication.factor": "3",
  "errors.retry.timeout": "30",
  "errors.tolerance": "all",
  "exactlyOnce": "true",
  "hostname": "{redacted}.us-east-2.aws.clickhouse.cloud",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "password": "****************",
  "port": "8443",
  "schemas.enable": "false",
  "ssl": "true",
  "tableRefreshInterval": "15",
  "topics.regex": "^om_[A-Za-z0-9]+(?:_[A-Za-z0-9]+)*_events$",
  "username": "default",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": "false"
}

Environment

ClickHouse server

cc @mzitnik

mzitnik commented 10 months ago

@hekike How many workers are you running in parallel & can you show the output from the query log

Paultagoras commented 9 months ago

Hi @hekike - I'm closing this because we haven't heard anything in a while, but if it's still an issue please comment with the details @mzitnik was asking for and we can take another look into it. Thanks!

hekike commented 9 months ago

We chatted with @mzitnik offline but couldn't figure out root cause.

Keremgunduz7 commented 2 weeks ago

I think the issue here sending lots of describe queries simultaneously with an HTTP connection.

We are receiving a similar issue, instead of timeout. Clickhouse starts to refuse new HTTP connections somehow although there is no limit.

But if we open the connectors one by one, (in other words not sending lots of describe tables simultaneously) Everything works fine.

Here is a patched file to resolve HTTP connection issues. Maybe that can be the also upper issue's cause.

package com.clickhouse.kafka.connect.sink.db.helper;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.clickhouse.client.ClickHouseClient;
import com.clickhouse.client.ClickHouseException;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseNodeSelector;
import com.clickhouse.client.ClickHouseProtocol;
import com.clickhouse.client.ClickHouseResponse;
import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.client.config.ClickHouseProxyType;
import com.clickhouse.config.ClickHouseOption;
import com.clickhouse.data.ClickHouseFormat;
import com.clickhouse.data.ClickHouseRecord;
import com.clickhouse.data.ClickHouseValue;
import com.clickhouse.kafka.connect.sink.ClickHouseSinkConfig;
import com.clickhouse.kafka.connect.sink.db.mapping.Column;
import com.clickhouse.kafka.connect.sink.db.mapping.Table;
import com.clickhouse.kafka.connect.util.Utils;
import com.fasterxml.jackson.core.JsonProcessingException;

import lombok.Getter;

public class ClickHouseHelperClient {

    private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseHelperClient.class);

    private final String hostname;
    private final int port;
    private final String username;
    @Getter
    private final String database;
    private final String password;
    private final boolean sslEnabled;
    private final String jdbcConnectionProperties;
    private final int timeout;
    @Getter
    private ClickHouseNode server = null;
    private final int retry;
    private ClickHouseProxyType proxyType = null;
    private String proxyHost = null;
    private int proxyPort = -1;

    public ClickHouseHelperClient(ClickHouseClientBuilder builder) {
        this.hostname = builder.hostname;
        this.port = builder.port;
        this.username = builder.username;
        this.password = builder.password;
        this.database = builder.database;
        this.sslEnabled = builder.sslEnabled;
        this.jdbcConnectionProperties = builder.jdbcConnectionProperties;
        this.timeout = builder.timeout;
        this.retry = builder.retry;
        this.proxyType = builder.proxyType;
        this.proxyHost = builder.proxyHost;
        this.proxyPort = builder.proxyPort;
        this.server = create();
    }

    public Map<ClickHouseOption, Serializable> getDefaultClientOptions() {
        Map<ClickHouseOption, Serializable> options = new HashMap<>();
        options.put(ClickHouseClientOption.PRODUCT_NAME, "clickhouse-kafka-connect/"+ClickHouseClientOption.class.getPackage().getImplementationVersion());
        if (proxyType != null && !proxyType.equals(ClickHouseProxyType.IGNORE)) {
            options.put(ClickHouseClientOption.PROXY_TYPE, proxyType);
            options.put(ClickHouseClientOption.PROXY_HOST, proxyHost);
            options.put(ClickHouseClientOption.PROXY_PORT, proxyPort);
        }
        return options;
    }

    private ClickHouseNode create() {
        String protocol = "http";
        if (this.sslEnabled)
            protocol += "s";

        String tmpJdbcConnectionProperties = jdbcConnectionProperties;
        if (tmpJdbcConnectionProperties != null && !tmpJdbcConnectionProperties.startsWith("?")) {
            tmpJdbcConnectionProperties = "?" + tmpJdbcConnectionProperties;
        }

        String url = String.format("%s://%s:%d/%s%s", 
                protocol, 
                hostname, 
                port, 
                database,
                tmpJdbcConnectionProperties
        );

        LOGGER.info("ClickHouse URL: {}", url);

        if (username != null && password != null) {
            LOGGER.debug(String.format("Adding username [%s]", username));
            Map<String, String> options = new HashMap<>();
            options.put("user", username);
            options.put("password", password);
            server = ClickHouseNode.of(url, options);
        } else {
            server = ClickHouseNode.of(url);
        }
        return server;
    }

    public boolean ping() {
        ClickHouseClient clientPing = ClickHouseClient.builder()
                .options(getDefaultClientOptions())
                .nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
                .build();
        LOGGER.debug(String.format("Server [%s] , Timeout [%d]", server, timeout));
        int retryCount = 0;

        while (retryCount < retry) {
            if (clientPing.ping(server, timeout)) {
                LOGGER.info("Ping was successful.");
                clientPing.close();
                return true;
            }
            retryCount++;
            LOGGER.warn(String.format("Ping retry %d out of %d", retryCount, retry));
        }
        LOGGER.error("Unable to ping ClickHouse instance.");
        clientPing.close();
        return false;
    }

    public String version() {
        try (ClickHouseClient client = ClickHouseClient.builder()
                .options(getDefaultClientOptions())
                .nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
                .build();
             ClickHouseResponse response = client.read(server)
                     .query("SELECT VERSION()")
                     .executeAndWait()) {
            return response.firstRecord().getValue(0).asString();
        } catch (ClickHouseException e) {
            LOGGER.error("Exception when trying to retrieve VERSION()", e);
            return null;
        }
    }

    public ClickHouseResponse query(String query) {
        return query(query, null);
    }

    public ClickHouseResponse query(String query, ClickHouseFormat clickHouseFormat) {
        int retryCount = 0;
        ClickHouseException ce = null;
        while (retryCount < retry) {
            try (ClickHouseClient client = ClickHouseClient.builder()
                    .options(getDefaultClientOptions())
                    .nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
                    .build();
                 ClickHouseResponse response = client.read(server)
                         .format(clickHouseFormat)
                         .query(query)
                         .executeAndWait()) {
                return response;
            } catch (ClickHouseException e) {
                retryCount++;
                LOGGER.warn(String.format("Query retry %d out of %d", retryCount, retry), e);
                ce = e;
            }
        }
        throw new RuntimeException(ce);
    }

    public List<String> showTables(String database) {
        List<String> tablesNames = new ArrayList<>();
        try (ClickHouseClient client = ClickHouseClient.builder()
                .options(getDefaultClientOptions())
                .nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
                .build();
             ClickHouseResponse response = client.read(server)
                     .query(String.format("SHOW TABLES FROM `%s`", database))
                     .executeAndWait()) {
            for (ClickHouseRecord r : response.records()) {
                ClickHouseValue v = r.getValue(0);
                String tableName = v.asString();
                tablesNames.add(tableName);
            }
        } catch (ClickHouseException e) {
            LOGGER.error("Failed in show tables", e);
        }
        return tablesNames;
    }

    public Table describeTable(String database, String tableName, ClickHouseClient client) {
        if (tableName.startsWith(".inner"))
            return null;
        String describeQuery = String.format("DESCRIBE TABLE `%s`.`%s`", database, tableName);
        LOGGER.debug(describeQuery);

        if (client == null) {
            try {
                client = ClickHouseClient.builder()
                    .options(getDefaultClientOptions())
                    .nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
                    .build(); 
                }
            catch (Exception e) {
                LOGGER.error("Failed to create client", e);
                return null;
            }
        }

        try (ClickHouseResponse response = client.read(server)
                     .set("describe_include_subcolumns", true)
                     .format(ClickHouseFormat.JSONEachRow)
                     .query(describeQuery)
                     .executeAndWait()) {

            Table table = new Table(database, tableName);
            for (ClickHouseRecord r : response.records()) {
                ClickHouseValue v = r.getValue(0);

                ClickHouseFieldDescriptor fieldDescriptor = ClickHouseFieldDescriptor.fromJsonRow(v.asString());
                if (fieldDescriptor.isAlias() || fieldDescriptor.isMaterialized()) {
                    LOGGER.debug("Skipping column {} as it is an alias or materialized view", fieldDescriptor.getName());
                    continue;
                }

                if (fieldDescriptor.hasDefault()) {
                    table.hasDefaults(true);
                }

                Column column = Column.extractColumn(fieldDescriptor);
                //If we run into a rare column we can't handle, just ignore the table and warn the user
                if (column == null) {
                    LOGGER.warn("Unable to handle column: {}", fieldDescriptor.getName());
                    return null;
                }
                table.addColumn(column);
            }
            return table;
        } catch (ClickHouseException | JsonProcessingException e) {
            LOGGER.error(String.format("Exception when running describeTable %s", describeQuery), e);
            return null;
        }
    }

    public Table describeTable(String database, String tableName) {
        return describeTable(database, tableName, null);
    }

    public List<Table> extractTablesMapping(String database, Map<String, Table> cache) {
        List<Table> tableList =  new ArrayList<>();
        try (ClickHouseClient client = ClickHouseClient.builder()
                .options(getDefaultClientOptions())
                .nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
                .build()) {
            for (String tableName : showTables(database) ) {
                // (Full) Table names are escaped in the cache
                String escapedTableName = Utils.escapeTableName(database, tableName);

                // Read from cache if we already described this table before
                // This means we won't pick up edited table configs until the connector is restarted
                if (cache.containsKey(escapedTableName)) {
                    tableList.add(cache.get(escapedTableName));
                    continue;
                }

                Table table = describeTable(this.database, tableName, client);
                if (table != null )
                    tableList.add(table);
            }
            return tableList;
        } catch (Exception e) {
        LOGGER.error("Failed to extract tables mapping", e);
        return null;
    }

    public static class ClickHouseClientBuilder {
        private ClickHouseSinkConfig config = null;
        private String hostname = null;
        private int port = -1;
        private String username = "default";
        private String database = "default";
        private String password = "";
        private boolean sslEnabled = false;
        private String jdbcConnectionProperties = "";
        private int timeout = ClickHouseSinkConfig.timeoutSecondsDefault * ClickHouseSinkConfig.MILLI_IN_A_SEC;
        private int retry = ClickHouseSinkConfig.retryCountDefault;

        private ClickHouseProxyType proxyType = null;
        private String proxyHost = null;
        private int proxyPort = -1;

        public ClickHouseClientBuilder(String hostname, int port, ClickHouseProxyType proxyType, String proxyHost, int proxyPort) {
            this.hostname = hostname;
            this.port = port;
            this.proxyType = proxyType;
            this.proxyHost = proxyHost;
            this.proxyPort = proxyPort;
        }

        public ClickHouseClientBuilder setUsername(String username) {
            this.username = username;
            return this;
        }

        public ClickHouseClientBuilder setPassword(String password) {
            this.password = password;
            return this;
        }

        public ClickHouseClientBuilder setDatabase(String database) {
            this.database = database;
            return this;
        }

        public ClickHouseClientBuilder sslEnable(boolean sslEnabled) {
            this.sslEnabled = sslEnabled;
            return this;
        }

        public ClickHouseClientBuilder setJdbcConnectionProperties(String jdbcConnectionProperties) {
            this.jdbcConnectionProperties = jdbcConnectionProperties;
            return this;
        }

        public ClickHouseClientBuilder setTimeout(int timeout) {
            this.timeout = timeout;
            return this;
        }

        public ClickHouseClientBuilder setRetry(int retry) {
            this.retry = retry;
            return this;
        }

        public ClickHouseHelperClient build(){
            return new ClickHouseHelperClient(this);
        }

    }
}

@mzitnik