ClickHouse / clickhouse-java

ClickHouse Java Clients & JDBC Driver
https://clickhouse.com
Apache License 2.0
1.45k stars 535 forks source link

How can we insert array of objects in clickhouse using java client #1576

Open ats1999 opened 7 months ago

ats1999 commented 7 months ago

I have the following table in clickhouse

42fccdad8610 :) describe user;

DESCRIBE TABLE user

Query id: 5839f61d-c905-47f8-ac7d-92e7f76b2584

┌─name─┬─type───┬─default_type─┬─default_expression─┬─comment─┬─codec_expression─┬─ttl_expression─┐
│ name │ String │              │                    │         │                  │                │
└──────┴────────┴──────────────┴────────────────────┴─────────┴──────────────────┴────────────────┘

1 row in set. Elapsed: 0.003 sec.

I am trying to insert some data into that table using java, but it's not working. Below is the code to insert data. Following this documentation https://clickhouse.com/docs/en/integrations/java#insert

package com.watchman.etl.dbsync;

import com.clickhouse.client.ClickHouseException;
import com.clickhouse.client.ClickHouseResponse;
import com.clickhouse.data.ClickHouseFormat;
import java.io.ByteArrayInputStream;

public class Test {
  public static void main(String[] args) throws ClickHouseException {
    String data = "{\"name\":\"Shani Kumar\"}";
    ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data.getBytes());

    ClickHouseResponse clickHouseResponse =
        Clk.getClickHouseClient()
            .read("http://localhost:8123/event")
            .format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
            .write()
            .query("insert into user select name from input('name String')")
            .data(byteArrayInputStream)
            .executeAndWait();
    System.out.printf("Written %d rows\n", clickHouseResponse.getSummary().getWrittenRows());
  }
}

Here is the error details

LF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "main" com.clickhouse.client.ClickHouseException: Code: 33. DB::Exception: Cannot read all data. Bytes read: 20. Bytes expected: 123.: (at row 1)
: While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA) (version 24.1.5.6 (official build))
, server ClickHouseNode [uri=http://localhost:8123/event]@-54754220
    at com.clickhouse.client.ClickHouseException.of(ClickHouseException.java:168)
    at com.clickhouse.client.AbstractClient.lambda$execute$0(AbstractClient.java:275)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.io.IOException: Code: 33. DB::Exception: Cannot read all data. Bytes read: 20. Bytes expected: 123.: (at row 1)
: While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA) (version 24.1.5.6 (official build))

    at com.clickhouse.client.http.ApacheHttpConnectionImpl.checkResponse(ApacheHttpConnectionImpl.java:209)
    at com.clickhouse.client.http.ApacheHttpConnectionImpl.post(ApacheHttpConnectionImpl.java:243)
    at com.clickhouse.client.http.ClickHouseHttpClient.send(ClickHouseHttpClient.java:118)
    at com.clickhouse.client.AbstractClient.sendAsync(AbstractClient.java:161)
    at com.clickhouse.client.AbstractClient.lambda$execute$0(AbstractClient.java:273)
    ... 4 more

Process finished with exit code 1
olavgg commented 7 months ago

Here is an working example for the following table:

CREATE OR REPLACE TABLE events (
                       id                      UUID,
                       external_id             String CODEC(ZSTD(9)),
                       type                    String CODEC(ZSTD(9)),
                       sub_type                Nullable(String) CODEC(ZSTD(9)),
                       description             String CODEC(ZSTD(9)),
                       data_set_id             Int32 CODEC(ZSTD(9)),
                       source                  String CODEC(ZSTD(9)),
                       date_created            DateTime64(3) CODEC(DoubleDelta, ZSTD(9)),
                       last_updated            DateTime64(3) CODEC(DoubleDelta, ZSTD(9)),
                       start_time              DateTime64(3) CODEC(DoubleDelta, ZSTD(9)),
                       end_time                Nullable(DateTime64(3)) CODEC(DoubleDelta, ZSTD(9)),
                       INDEX event_type_idx (type) TYPE minmax GRANULARITY 4,
                       INDEX event_sub_type_idx (sub_type) TYPE minmax GRANULARITY 4,
                       INDEX event_start_time_idx (start_time) TYPE minmax GRANULARITY 4,
                       INDEX event_end_time_idx (end_time) TYPE minmax GRANULARITY 4
) ENGINE = MergeTree()
ORDER BY (id, data_set_id)
PRIMARY KEY id
PARTITION BY (YEAR(date_created), data_set_id);
public ClickHouseService(
            @Value("${clickhouse.database.name:intellistream}") String dbName,
            @Value("${clickhouse.database.host:192.168.1.1}") String dbHost,
            @Value("${clickhouse.database.username:default}") String dbUser,
            @Value("${clickhouse.database.password:mypassword}") String dbPw
    ){
        this.DATABASE_HOST = dbHost;
        this.DATABASE_NAME = dbName;
        this.DATABASE_USER = dbUser;
        this.DATABASE_PASSWORD = dbPw;
        this.server = ClickHouseNode.builder()
                .host(DATABASE_HOST)
                .port(ClickHouseProtocol.HTTP, 8123)
                .database(DATABASE_NAME)
                .credentials(ClickHouseCredentials.fromUserAndPassword(DATABASE_USER, DATABASE_PASSWORD))
                .build();
    }

    public ClickHouseClient getClient(){
        return ClickHouseClient.builder()
                .nodeSelector(ClickHouseNodeSelector.of(null, ClickHouseProtocol.HTTP))
                .option(ClickHouseClientOption.COMPRESS_ALGORITHM, ClickHouseCompression.ZSTD)
                .option(ClickHouseClientOption.COMPRESS_LEVEL, 1)
                .option(ClickHouseClientOption.SOCKET_TIMEOUT, 8000)
                .option(ClickHouseClientOption.MAX_THREADS_PER_CLIENT, 4)
                .build();
    }

    public void createEvents(EventCudMessage message) throws ClickHouseException {
        try (ClickHouseClient client = getClient()) {

            ClickHouseRequest.Mutation request = client.write(server).table("events")
                    .format(ClickHouseFormat.RowBinary);
            ClickHouseConfig config = request.getConfig();

            CompletableFuture<ClickHouseResponse> future = null;
            // back-pressuring is not supported, you can adjust the first two arguments
            try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance()
                    .createPipedOutputStream(config, (Runnable) null)) {
                // in async mode, which is default, execution happens in a worker thread
                future = request.data(stream.getInputStream()).execute();

                for(EventModel eventModel : message.getEvents()) {

                    // writing happens in main thread
                    var createdTime = ZonedDateTime.now();
                    var lastUpdatedTime = ZonedDateTime.now();
                    var eventStartTime = DateTimeHandler.fromEpochUTCTimeAsZonedDateTime(eventModel.getStartTime());
                    ZonedDateTime eventEndTime = null;
                    LocalDateTime eventEndDateTime = null;
                    if(eventModel.getEndTime() != null){
                        eventEndTime = DateTimeHandler.fromEpochUTCTimeAsZonedDateTime(eventModel.getEndTime());
                        eventEndDateTime = DateTimeHandler.toUTC(eventEndTime);
                    }
                    LocalDateTime createdDateTime = DateTimeHandler.toUTC(createdTime);
                    LocalDateTime lastUpdatedDateTime = DateTimeHandler.toUTC(lastUpdatedTime);
                    LocalDateTime eventStartDateTime = DateTimeHandler.toUTC(eventStartTime);

                    BinaryStreamUtils.writeUuid(stream, UUID.fromString(eventModel.getId()));
                    BinaryStreamUtils.writeString(stream, eventModel.getExternalId());
                    BinaryStreamUtils.writeString(stream, eventModel.getType());
                    if(eventModel.getSubType() != null){
                        BinaryStreamUtils.writeNonNull(stream);
                        BinaryStreamUtils.writeString(stream, eventModel.getSubType());
                    } else {
                        BinaryStreamUtils.writeNull(stream);
                    }
                    BinaryStreamUtils.writeString(stream, eventModel.getDescription());
                    BinaryStreamUtils.writeInt32(stream, eventModel.getDataSetId());
                    BinaryStreamUtils.writeString(stream, eventModel.getSource());

                    BinaryStreamUtils.writeDateTime(stream, createdDateTime, TIMESTAMP_SCALE, TimeZone.getTimeZone(ZoneId.of("UTC")));
                    BinaryStreamUtils.writeDateTime(stream, lastUpdatedDateTime, TIMESTAMP_SCALE, TimeZone.getTimeZone(ZoneId.of("UTC")));
                    BinaryStreamUtils.writeDateTime(stream, eventStartDateTime, TIMESTAMP_SCALE, TimeZone.getTimeZone(ZoneId.of("UTC")));
                    if(eventEndDateTime != null){
                        BinaryStreamUtils.writeNonNull(stream);
                        BinaryStreamUtils.writeDateTime(stream, eventEndDateTime, TIMESTAMP_SCALE, TimeZone.getTimeZone(ZoneId.of("UTC")));
                    } else {
                        BinaryStreamUtils.writeNull(stream);
                    }
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }

            if(future != null){
                // response should be always closed
                try (ClickHouseResponse response = future.get()) {
                    ClickHouseResponseSummary summary = response.getSummary();
                    log.debug("Datapoints summary: {} rows inserted.", summary.getWrittenRows());
                } catch (ExecutionException e){
                    log.error(e.getMessage(), e);
                    throw ClickHouseException.of(e, server);
                } catch (InterruptedException e) {
                    log.error(e.getMessage(), e);
                    throw new RuntimeException(e);
                }
            }

        }

    }

This is blazingly fast and I could insert 100k records in 3 seconds.

ats1999 commented 6 months ago

Isn’t there any simple API that allows developers to insert data into any database table, just like how JDBC work?

Here is a simple example of JDBC, easy to understand, write and debug. I understand JDBC can’t be used directly with click house, but at least we can write a simple API which is easy to write, understand

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class Main {
    public static void main(String[] args) {
        // Database connection parameters
        String url = "jdbc:mysql://localhost:3306/mydatabase";
        String username = "username";
        String password = "password";

        // SQL query to insert data into a table
        String sql = "INSERT INTO mytable (column1, column2, column3) VALUES (?, ?, ?)";

        // Data to be inserted
        String value1 = "Value1";
        int value2 = 123;
        String value3 = "Value3";

        try (
            // Establish a connection to the database
            Connection conn = DriverManager.getConnection(url, username, password);
            // Create a PreparedStatement to execute the SQL query
            PreparedStatement pstmt = conn.prepareStatement(sql)
        ) {
            // Set values for PreparedStatement parameters
            pstmt.setString(1, value1);
            pstmt.setInt(2, value2);
            pstmt.setString(3, value3);

            // Execute the query to insert data
            int rowsAffected = pstmt.executeUpdate();

            // Check if any rows were affected
            if (rowsAffected > 0) {
                System.out.println("Data inserted successfully.");
            } else {
                System.out.println("No rows affected. Data may not have been inserted.");
            }
        } catch (SQLException e) {
            // Handle any SQL errors
            e.printStackTrace();
        }
    }
}
ats1999 commented 6 months ago

Looks like BinaryStreamUtils does not have any support for array data type

chernser commented 6 months ago

Good day, @ats1999 ! BinaryStreamUtils is working only with primitives, unfortunately. Simple arrays however may be written with it by writing vatInt and then each primitive value.

BinaryStreamUtils.writeVarInt(out, array.length)
for (String item : array) {  
   BinaryStreamUtils.writeString(out, item);
}

Btw, JDBC client may be directly used with ClickHouse. Or have you met some issues?