ClickHouse / clickhouse-kafka-connect

ClickHouse Kafka Connector
Apache License 2.0
151 stars 42 forks source link

Decimal field throws an error #448

Open justas200 opened 2 weeks ago

justas200 commented 2 weeks ago

Describe the bug

I have a kafka topic test-decimal where I place messages in json format with a decimal field

{
    "RandomNumber": 100.0
}

Kafka Schema for this topic is

{
    "$schema": "http://json-schema.org/draft-04/schema#",
    "title": "Test",
    "type": "object",
    "additionalProperties": false,
    "properties": {
        "RandomNumber": {
            "type": "number",
            "format": "decimal"
        }
    }
}

I then created a table for this in Clickhouse

CREATE TABLE test.test
(
    `RandomNumber` Decimal(28,18)
)
ENGINE = MergeTree
    PRIMARY KEY ()
    ORDER BY ();

and a connector

{
    "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
    "topic2TableMap": "test-decimal=test",
    "topics": "test-decimal",
    "tasks.max": "1",
    "keeperOnCluster": "main",
    "consumer.override.fetch.min.bytes": "50000000",
    "consumer.override.fetch.max.bytes": "200000000",
    "ssl": "false",
    "consumer.override.fetch.max.wait.ms": "25000",
    "database": "test",
    "hostname": "<hidden>",
    "password": "******",
    "consumer.override.auto.offset.reset": "earliest",
    "port": "8123",
    "consumer.override.max.poll.records": "100000",
    "value.converter.schemas.enable": "true",
    "name": "test-decimal",
    "consumer.override.max.partition.fetch.bytes": "200000000",
    "errors.tolerance": "none",
    "exactlyOnce": "false",
    "username": "general"
}

and the connector keeps throwing an error

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) Caused by: java.lang.RuntimeException: Number of records: 3 at com.clickhouse.kafka.connect.util.Utils.handleException(Utils.java:126) at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:71) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601) ... 11 more Caused by: java.lang.RuntimeException: Topic: [test-decimal], Partition: [0], MinOffset: [0], MaxOffset: [2], (QueryId: [4d408091-d810-4675-8c82-c96b1d652378]) at com.clickhouse.kafka.connect.sink.processing.Processing.doInsert(Processing.java:68) at com.clickhouse.kafka.connect.sink.processing.Processing.doLogic(Processing.java:133) at com.clickhouse.kafka.connect.sink.ProxySinkTask.put(ProxySinkTask.java:99) at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:65) ... 12 more Caused by: java.lang.NullPointerException: Cannot invoke "String.equals(Object)" because the return value of "org.apache.kafka.connect.data.Schema.name()" is null at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.validateDataSchema(ClickHouseWriter.java:247) at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsertRawBinaryV1(ClickHouseWriter.java:742) at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsertRawBinary(ClickHouseWriter.java:676) at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsert(ClickHouseWriter.java:196) at com.clickhouse.kafka.connect.sink.processing.Processing.doInsert(Processing.java:66) ... 15 more

which appears to be this line here https://github.com/ClickHouse/clickhouse-kafka-connect/blob/main/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java#L247 due to objSchema.name() being null.

Expected behaviour

I exepect to see the decimal value go to clickhouse.

Configuration

Environment

ClickHouse server

Paultagoras commented 2 weeks ago

What is your value.converter and do you have a schema registry configured?

justas200 commented 2 weeks ago

I have a schema registry configured and it looks like this for this topic. I am writing decimal type from a C# application.

{
    "$schema": "http://json-schema.org/draft-04/schema#",
    "title": "Test",
    "type": "object",
    "additionalProperties": false,
    "properties": {
        "RandomNumber": {
            "type": "number",
            "format": "decimal"
        }
    }
}

value.converter: io.confluent.connect.json.JsonSchemaConverter