ClickHouse / clickhouse-java

ClickHouse Java Clients & JDBC Driver
https://clickhouse.com
Apache License 2.0
1.45k stars 534 forks source link

How to create batch insert with columns list with clickhouse-http-client? #1446

Open serhii-m-prime opened 1 year ago

serhii-m-prime commented 1 year ago

Hello, the question how to implements inserting process by butch to table by columns list. I have table:

CREATE TABLE test.test
(
    a String,
    b String DEFAULT 'unknown',
    c String DEFAULT 'unknown',
    d String DEFAULT 'unknown',
)
ENGINE = MergeTree
ORDER BY a
SETTINGS index_granularity = 8192;

and ingested data in formats like:

[
  [
      {
          "a": "UUID as text",
           "c": "some text"
      },
      {
          "a": "UUID as text",
           "c": "some text"
      },
      {
          "a": "UUID as text",
           "c": "some text"
      }
  ],
  [
      {
          "a": "UUID as text",
           "b": "some text"
      },
      {
          "a": "UUID as text",
           "b": "some text"
      },
      {
          "a": "UUID as text",
           "b": "some text"
      }
  ]
]

(some columns but not from ordering key might be absent in objects) Currently when I for example try to use it as I understand subscribed in documentation:

RowBinaryWithNames
Similar to [RowBinary](https://clickhouse.com/docs/en/interfaces/formats#rowbinary), but with added header:
- [LEB128](https://en.wikipedia.org/wiki/LEB128)-encoded number of columns (N)
- N Strings specifying column names
public void insert(ClickHouseNode server, List<Object> data) throws ClickHouseException {
        try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP)) {
            ClickHouseRequest.Mutation request = client.read(server).write().table("test").format(ClickHouseFormat.RowBinaryWithNames).set("send_progress_in_http_headers", 1);
            ClickHouseConfig config = request.getConfig();
            CompletableFuture<ClickHouseResponse> future;
            try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(config, (Runnable) null)) {
                future = request.data(stream.getInputStream()).execute();
                BinaryStreamUtils.writeInt128(stream, BigInteger.valueOf(3));
                BinaryStreamUtils.writeString(stream, "a");
                BinaryStreamUtils.writeString(stream, "b");
                BinaryStreamUtils.writeString(stream, "c");
                for (int i = 0; i < 10_000; i++) {
                    BinaryStreamUtils.writeString(stream, UUID.randomUUID().toString());
                    BinaryStreamUtils.writeString(stream, String.valueOf(i));
                    BinaryStreamUtils.writeString(stream, String.valueOf(i%4));
                }
            }
            try (ClickHouseResponse response = future.get()) {
                ClickHouseResponseSummary summary = response.getSummary();
                log.debug("Inserted: {}", summary.getWrittenRows());
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw ClickHouseException.forCancellation(e, server);
        } catch (ExecutionException | IOException e) {
            throw ClickHouseException.of(e, server);
        }
    }

it return errors like:

data-sync  |    Caused by: java.io.IOException: Code: 626. DB::Exception: Cannot skip unknown field in RowBinaryWithNames format, because it's type is unknown: While executing BinaryRowInputFormat. (CANNOT_SKIP_UNKNOWN_FIELD) (version 23.1.2.9 (official build))
data-sync  | 
data-sync  |    at com.clickhouse.client.http.HttpUrlConnectionImpl.checkResponse(HttpUrlConnectionImpl.java:184) ~[clickhouse-http-client-0.4.6.jar:clickhouse-http-client 0.4.6 (revision: dd91e17)]
data-sync  |    at com.clickhouse.client.http.HttpUrlConnectionImpl.post(HttpUrlConnectionImpl.java:227) ~[clickhouse-http-client-0.4.6.jar:clickhouse-http-client 0.4.6 (revision: dd91e17)]
data-sync  |    at com.clickhouse.client.http.ClickHouseHttpClient.send(ClickHouseHttpClient.java:124) ~[clickhouse-http-client-0.4.6.jar:clickhouse-http-client 0.4.6 (revision: dd91e17)]
data-sync  |    at com.clickhouse.client.AbstractClient.sendAsync(AbstractClient.java:161) ~[clickhouse-client-0.4.6.jar:clickhouse-client 0.4.6 (revision: dd91e17)]
data-sync  |    at com.clickhouse.client.AbstractClient.lambda$execute$0(AbstractClient.java:273) ~[clickhouse-client-0.4.6.jar:clickhouse-client 0.4.6 (revision: dd91e17)]
data-sync  |    ... 4 common frames omitted

or

data-sync  | Caused by: java.io.IOException: Code: 33. DB::Exception: Cannot read all data. Bytes read: 89. Bytes expected: 101.: (at row 2669)
data-sync  | : While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA) (version 23.1.2.9 (official build))
data-sync  | 
data-sync  |    at com.clickhouse.client.http.HttpUrlConnectionImpl.checkResponse(HttpUrlConnectionImpl.java:184) ~[clickhouse-http-client-0.4.6.jar:clickhouse-http-client 0.4.6 (revision: dd91e17)]
data-sync  |    at com.clickhouse.client.http.HttpUrlConnectionImpl.post(HttpUrlConnectionImpl.java:227) ~[clickhouse-http-client-0.4.6.jar:clickhouse-http-client 0.4.6 (revision: dd91e17)]
data-sync  |    at com.clickhouse.client.http.ClickHouseHttpClient.send(ClickHouseHttpClient.java:124) ~[clickhouse-http-client-0.4.6.jar:clickhouse-http-client 0.4.6 (revision: dd91e17)]
data-sync  |    at com.clickhouse.client.AbstractClient.sendAsync(AbstractClient.java:161) ~[clickhouse-client-0.4.6.jar:clickhouse-client 0.4.6 (revision: dd91e17)]
data-sync  |    at com.clickhouse.client.AbstractClient.lambda$execute$0(AbstractClient.java:273) ~[clickhouse-client-0.4.6.jar:clickhouse-client 0.4.6 (revision: dd91e17)]
data-sync  |    ... 4 common frames omitted

Is it possible and how to implement it with for example: ClickHouseFormat.RowBinaryWithNamesAndTypes or ClickHouseFormat.RowBinaryWithNames??? Or if not how to use classic SQL insert for batch insert with clickhouse-http-client 0.4.6?.. (really I didn't found in the documentation :'( )

serhii-m-prime commented 1 year ago

With clickhouse-jdbc It can be implemented as

public void insertJDBC() {

        String sql = String.format(
                "INSERT INTO %s (a, b, c) SELECT a, b, c FROM input('a String, b String, c String')",
                "test"
        );
        try (PreparedStatement ps = this.connection.prepareStatement(sql)) {
            for (int i = 0; i < 10_000; i++) {
                ps.setString(1, UUID.randomUUID().toString());
                ps.setString(2, "b" + i);
                ps.setString(3, "c" + i);
                ps.addBatch();
            }
            int count = 0;
            for (int i : ps.executeBatch()) {
                if (i > 0) {
                    count += i;
                }
            }
            log.debug("Inserted: " + count);
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }

    }

But don't see a way to use types like UInt128 with this solution

zhicwu commented 1 year ago

Hi @SergImmortal, sorry for the terrible document and cumbersome API.

But don't see a way to use types like UInt128 with JDBC

Please refer to type mappings at https://github.com/ClickHouse/clickhouse-java/tree/main/clickhouse-data#type-mapping

Is it possible and how to implement it with for example: ClickHouseFormat.RowBinaryWithNamesAndTypes or ClickHouseFormat.RowBinaryWithNames???

Once you're clear on the type mapping, you may follow examples at https://github.com/ClickHouse/clickhouse-java/issues/1425#issuecomment-1691394343

serhii-m-prime commented 1 year ago

Hi, @zhicwu thanks you for the answer

I already try use data serialization like:

ClickHouseRequest.Mutation request = client.read(server).write().table("test").format(ClickHouseFormat.RowBinaryWithNamesAndTypes).set("send_progress_in_http_headers", 1);
            ClickHouseConfig config = request.getConfig();
            CompletableFuture<ClickHouseResponse> future;

            List<ClickHouseColumn> columns = ClickHouseColumn.parse("a String, b Nullable(String), c String, d String");

            try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance()
                                                                                    .createPipedOutputStream(config, (Runnable) null)) {
                future = request.data(stream.getInputStream()).execute();

                ClickHouseDataProcessor processor = ClickHouseDataStreamFactory.getInstance().getProcessor(config, null,
                        stream, null, columns);
                ClickHouseValue[] values = new ClickHouseValue[] {
                        ClickHouseStringValue.ofNull(), // columns.get(0).newValue()
                        ClickHouseStringValue.ofNull(), // columns.get(1).newValue()
                        ClickHouseStringValue.ofNull(), // columns.get(2).newValue()
                        ClickHouseStringValue.ofNull() // columns.get(3).newValue()
                };
                ClickHouseSerializer[] serializers = processor.getSerializers(config, columns);
                // writing happens in main thread
                for (int i = 0; i < 10_000; i++) {
                    serializers[0].serialize(values[0].update(String.valueOf(i % 16)), stream);
                    serializers[1].serialize(values[1].update(UUID.randomUUID().toString()), stream);
                    serializers[2].serialize(values[2].update(String.format("I'm %s", i)), stream);
                    serializers[3].serialize(values[3].update(String.format("%s", i)), stream);
                }
            }
            // response should be always closed
            try (ClickHouseResponse response = future.get()) {
                ClickHouseResponseSummary summary = response.getSummary();
                log.debug("Inserted: {}", summary.getWrittenRows());
            }

But for me, it works only in case when the number of columns for insert and columns that exists in the table are equal

About type mapping When I use JDBC driver I can't set some ClickHouse wrapper classes as java.sql.PreparedStatement can't handled it

mtrienis commented 1 year ago

Anyone figure out a workaround for this issue? @zhicwu

But for me, it works only in case when the number of columns for insert and columns that exists in the table are equal

I need to use an async interface, so not sure if the JDBC interface will work in that respect.

FWIW, if you use ClickHouseFormat.RowBinaryWithNamesAndTypes with BinaryStreamUtils, you can insert a subset of columns values into a table, but it's pretty painful to debug if something goes wrong, especially with more complex types.

tsvico commented 9 months ago

You need to set the header before setting the value to use rowbinarywithnamesandtypes https://clickhouse.com/docs/en/interfaces/formats#rowbinarywithnamesandtypes Here is a kotlin orm part of the code I tried

          val columns: List<ClickHouseColumn> = properties.map { property ->
                property.isAccessible = true
                val columnName = getColumnName(property)
                val type = clickHouseCode2dbType(getColumnType(property), property)
                ClickHouseColumn.of(columnName, type)
            }

            val processor = ClickHouseDataStreamFactory.getInstance().getProcessor(
                config, null,
                stream, null, columns
            )

            val serializers = processor.getSerializers(config, columns)

            val values: List<ClickHouseValue> = columns.map { column ->
                column.newValue(config)
            }

            // RowBinaryWithNamesAndTypes https://clickhouse.com/docs/en/interfaces/formats#rowbinarywithnamesandtypes
            BinaryStreamUtils.writeUnsignedInt8(s, properties.size)
            // Look at this.
            properties.forEach { property ->
                property.isAccessible = true
                val columnName = getColumnName(property)
                BinaryStreamUtils.writeString(s, columnName)
            }
            // Look at this.
            properties.forEach { property ->
                val type = clickHouseCode2dbType(getColumnType(property), property)
                BinaryStreamUtils.writeString(s, type)
            }

            data.forEach { entity ->
                properties.forEachIndexed { index, property ->
                    when (getColumnType(property)) {
                        ClickHouseTypes.String -> {
                            serializers[index].serialize(
                                (values[index] as ClickHouseStringValue).update(property.get(entity)), s
                            )
                        }
...
manfrid commented 2 months ago

You need to set the header before setting the value to use rowbinarywithnamesandtypes https://clickhouse.com/docs/en/interfaces/formats#rowbinarywithnamesandtypes Here is a kotlin orm part of the code I tried

          val columns: List<ClickHouseColumn> = properties.map { property ->
                property.isAccessible = true
                val columnName = getColumnName(property)
                val type = clickHouseCode2dbType(getColumnType(property), property)
                ClickHouseColumn.of(columnName, type)
            }

            val processor = ClickHouseDataStreamFactory.getInstance().getProcessor(
                config, null,
                stream, null, columns
            )

            val serializers = processor.getSerializers(config, columns)

            val values: List<ClickHouseValue> = columns.map { column ->
                column.newValue(config)
            }

            // RowBinaryWithNamesAndTypes https://clickhouse.com/docs/en/interfaces/formats#rowbinarywithnamesandtypes
            BinaryStreamUtils.writeUnsignedInt8(s, properties.size)
            // Look at this.
            properties.forEach { property ->
                property.isAccessible = true
                val columnName = getColumnName(property)
                BinaryStreamUtils.writeString(s, columnName)
            }
            // Look at this.
            properties.forEach { property ->
                val type = clickHouseCode2dbType(getColumnType(property), property)
                BinaryStreamUtils.writeString(s, type)
            }

            data.forEach { entity ->
                properties.forEachIndexed { index, property ->
                    when (getColumnType(property)) {
                        ClickHouseTypes.String -> {
                            serializers[index].serialize(
                                (values[index] as ClickHouseStringValue).update(property.get(entity)), s
                            )
                        }
...

I would like to see the implementation code of this method

ukarim commented 1 month ago

Probably should have used writeVarInt here BinaryStreamUtils.writeInt128(stream, BigInteger.valueOf(3)); in the first snippet, because number of columns in the header must be LEB128 encoded