ClickHouse / clickhouse-kafka-connect

ClickHouse Kafka Connector
Apache License 2.0
155 stars 43 forks source link

If a UUID with an empty string is error exception #479

Open DmitryTuryshev opened 1 week ago

DmitryTuryshev commented 1 week ago

https://github.com/ClickHouse/clickhouse-kafka-connect/blob/fef6bb714d2cca61bead75f16dd9c09ea215b9c5/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java#L611

Example debezium record here trigger_id_uuid is empty, but not null :

{"trigger_id_uuid":"", "more_":"...", "id":"123",
__deleted":{"string":"true"},
"__ts_ms":{"long":111111111},
"__op":{"string":"d"}}

Error trace:

WorkerSinkTask{id=clickhouse_sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Number of records: 1000 (org.apache.kafka.connect.runtime.WorkerSinkTask:633)
log_1: java.lang.RuntimeException: Number of records: 1000
log_1:         at com.clickhouse.kafka.connect.util.Utils.handleException(Utils.java:126)
log_1:         at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:71)
log_1:         at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:605)
log_1:         at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:344)
log_1:         at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:246)
log_1:         at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:215)
log_1:         at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:225)
log_1:         at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:280)
log_1:         at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
log_1:         at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
log_1:         at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
log_1:         at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
log_1:         at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
log_1:         at java.base/java.lang.Thread.run(Thread.java:840)
log_1: Caused by: java.lang.RuntimeException: Topic: [zoom_trigger_trigger_users], Partition: [0], MinOffset: [58743], MaxOffset: [58745], (QueryId: [uuuui-uuuu-490f-bc2c-1uuuub2])
log_1:         at com.clickhouse.kafka.connect.sink.processing.Processing.doInsert(Processing.java:62)
log_1:         at com.clickhouse.kafka.connect.sink.processing.Processing.doLogic(Processing.java:196)
log_1:         at com.clickhouse.kafka.connect.sink.ProxySinkTask.put(ProxySinkTask.java:99)
log_1:         at com.clickhouse.kafka.connect.sink.ClickHouseSinkTask.put(ClickHouseSinkTask.java:65)
log_1:         ... 12 more
**log_1: Caused by: java.lang.IllegalArgumentException: Invalid UUID string:
log_1:         at java.base/java.util.UUID.fromString1(UUID.java:280)
log_1:         at java.base/java.util.UUID.fromString(UUID.java:258)**
log_1:         at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doWritePrimitive(ClickHouseWriter.java:605)
log_1:         at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doWriteColValue(ClickHouseWriter.java:403)
log_1:         at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doWriteCol(ClickHouseWriter.java:661)
log_1:         at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsertRawBinaryV1(ClickHouseWriter.java:774)
log_1:         at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsertRawBinary(ClickHouseWriter.java:683)
log_1:         at com.clickhouse.kafka.connect.sink.db.ClickHouseWriter.doInsert(ClickHouseWriter.java:197)
log_1:         at com.clickhouse.kafka.connect.sink.processing.Processing.doInsert(Processing.java:60)
log_1:         ... 15 more
DmitryTuryshev commented 6 days ago
case UUID:
  if (value instanceof String && ((String) value).isEmpty()) {
      // Write a zero UUID (all bits set to zero)
      UUID zeroUUID = new UUID(0L, 0L);
      BinaryStreamUtils.writeUuid(stream, zeroUUID);
      LOGGER.debug("Written zero UUID for empty string value.");
  } else {
      BinaryStreamUtils.writeUuid(stream, UUID.fromString((String) value));
  }
  break;

I just replaced this case on the above. The built project works well. https://github.com/ClickHouse/clickhouse-kafka-connect/blob/fef6bb714d2cca61bead75f16dd9c09ea215b9c5/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java#L611

Please give me advice if I forgot something.