ClickHouse / clickhouse-java

ClickHouse Java Clients & JDBC Driver
https://clickhouse.com
Apache License 2.0
1.44k stars 533 forks source link

com.clickhouse.client.ClickHouseException: Read timed out after 30000 ms, server ClickHouseNode [uri=http://xxxxx:xxx/default] #1352

Closed joeCarf closed 1 year ago

joeCarf commented 1 year ago

WeChata57bc82ae5eb497dcb45ffe254f2d4cd

Hi guys, I'm currently working on RocketMQ-Connect: ClickHouseSinkConnector, which is like Kafka-Connect. I want to pipe data into ClickHouse using ClickHouse Java client. But I encountered READ TIMEOUT when writing.

My codes are as below, and I'm using ClickHouseFormat.JSONEachRow format.

My problem is: what shoul JSON string be like when doing BinaryStreamUtils.writeBytes(stream, gsonString.getBytes(StandardCharsets.UTF_8));?

My JSON is like {"column1":"data","column2":"data"} (a table with two columns) , is it right?

           ClickHouseClient client = ClickHouseClient.newInstance(server.getProtocol());

            for (ConnectRecord record : sinkRecords) {

                String table = record.getSchema().getName();

                ClickHouseRequest.Mutation request = client.connect(server)
                    .write()
                    .table(table)
                    .format(ClickHouseFormat.JSONEachRow);

                ClickHouseConfig config = request.getConfig();
                request.option(ClickHouseClientOption.WRITE_BUFFER_SIZE, 8192);

                try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance()
                    .createPipedOutputStream(config, (Runnable) null)) {
                    CompletableFuture<ClickHouseResponse> future = request.data(stream.getInputStream()).execute();

                    final List<Field> fields = record.getSchema().getFields();
                    final Struct structData = (Struct) record.getData();
                    Gson gson = new Gson();
                    Map<String, Object> data = new HashMap<>();
                    java.lang.reflect.Type gsonType = new TypeToken<HashMap>(){}.getType();

                    for (Field field : fields) {
                        data.put(field.getName(), structData.get(field));
                    }

                    String gsonString = gson.toJson(data,gsonType);

                    BinaryStreamUtils.writeBytes(stream, gsonString.getBytes(StandardCharsets.UTF_8));
                    try (ClickHouseResponse response = future.get()) {
                        ClickHouseResponseSummary summary = response.getSummary();

                    }
                }

            }
joeCarf commented 1 year ago

BTW, I'm sure that my ClickHouse server works fine, I can execute queries to read data but cannot insert.

zhicwu commented 1 year ago

Hi @joeCarf, JSONEachRow is text-based data format, which might not be ideal when performance is critical. BinaryStreamUtils on the other hand is a helper class for dealing with a completely different data format, RowBinary. Perhaps you can start with JDBC API using ClickHouseWriter like shown below? You just need to change the format clause and use output.writeBytes("<json>".getBytes(<charset>)).writeByte('\n') for writing each row.

https://github.com/ClickHouse/clickhouse-java/blob/46914be6ef13c574a48b1691ca805692bb72f2cc/examples/jdbc/src/main/java/com/clickhouse/examples/jdbc/Basic.java#L116-L140

joeCarf commented 1 year ago

Hi @joeCarf, JSONEachRow is text-based data format, which might not be ideal when performance is critical. BinaryStreamUtils on the other hand is a helper class for dealing with a completely different data format, RowBinary. Perhaps you can start with JDBC API using ClickHouseWriter like shown below? You just need to change the format clause and use output.writeBytes("<json>".getBytes(<charset>)).writeByte('\n') for writing each row.

https://github.com/ClickHouse/clickhouse-java/blob/46914be6ef13c574a48b1691ca805692bb72f2cc/examples/jdbc/src/main/java/com/clickhouse/examples/jdbc/Basic.java#L116-L140

Thank you for replying. What should I do if I only want to insert with java client? Because I have used java client for querying, it's better not importing jdbc driver. Other clickhouse formats are also acceptable, but I didn't see other examples. The reason I don't want to use SQL for inserting is it's not flexible enough in my scenario.

zhicwu commented 1 year ago

I see. If you prefer Java client, you can remove piped stream and use ClickHouseWriter instead, for instance:

try (ClickHouseResponse resp = request.write().format(ClickHouseFormat.RowBinary).data(output -> {
    // declare columns to write
    List<ClickHouseColumn> columns = ClickHouseColumn.parse("a Int32, b Nullable(String), ...");
    // reusable value container for each column
    ClickHouseValue[] values = ClickHouseValues.newValues(config, columns);
    // get serializers
    ClickHouseDataProcessor processor = ClickHouseDataStreamFactory.getInstance().getProcessor(config, null, output, null, list);
    ClickHouseSerializer[] serializers = processor.getSerializers();

    // write row
    for (int i = 0, len = values.length; i<len; i++) {
      serializers[i].serialize(values[i].update(<value>), output);
    }
}).executeAndWait()) {
 ...
}

I'll raise a PR later to update examples.

joeCarf commented 1 year ago

I see. If you prefer Java client, you can remove piped stream and use ClickHouseWriter instead, for instance:

try (ClickHouseResponse resp = request.write().format(ClickHouseFormat.RowBinary).data(output -> {
    // declare columns to write
    List<ClickHouseColumn> columns = ClickHouseColumn.parse("a Int32, b Nullable(String), ...");
    // reusable value container for each column
    ClickHouseValue[] values = ClickHouseValues.newValues(config, columns);
    // get serializers
    ClickHouseDataProcessor processor = ClickHouseDataStreamFactory.getInstance().getProcessor(config, null, output, null, list);
    ClickHouseSerializer[] serializers = processor.getSerializers();

    // write row
    for (int i = 0, len = values.length; i<len; i++) {
      serializers[i].serialize(values[i].update(<value>), output);
    }
}).executeAndWait()) {
 ...
}

I'll raise a PR later to update examples.

Thank you a lot! 👍 I will try it