10xfuturetechnologies / kafka-connect-iceberg

Kafka Connector for Iceberg tables
Apache License 2.0
16 stars 5 forks source link

Timestamp conversion issue #29

Open github-raphael-douyere opened 1 year ago

github-raphael-douyere commented 1 year ago

I have an Iceberg table like:

CREATE TABLE some_table (
  `last_update_time` timestamp
)

And some avro content like:

...
    {
      "name":"last_update_time",
      "type": {
         "type": "long",
         "logicalType": "timestamp-millis"
       }
    }
...

Now when I try to use the kafka connect sink I get an error that seems to be related to mapping

Caused by: java.lang.ClassCastException: class java.time.LocalDateTime cannot be cast to class java.time.OffsetDateTime (java.time.LocalDateTime and java.time.OffsetDateTime are in module java.base of loader 'bootstrap')

What would be the way to handle those cases ?

ddcprg commented 1 year ago

Can you post a full stack trace and the connector config?

github-raphael-douyere commented 1 year ago

Sure, the stack trace is as follow :

[2023-05-03 15:08:11,082] ERROR WorkerSinkTask{id=test-partitioned-table-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
2023-05-03 17:08:11 org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
2023-05-03 17:08:11     at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:615)
2023-05-03 17:08:11     at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
2023-05-03 17:08:11     at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
2023-05-03 17:08:11     at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
2023-05-03 17:08:11     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
2023-05-03 17:08:11     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
2023-05-03 17:08:11     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
2023-05-03 17:08:11     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2023-05-03 17:08:11     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
2023-05-03 17:08:11     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
2023-05-03 17:08:11     at java.base/java.lang.Thread.run(Thread.java:829)
2023-05-03 17:08:11 Caused by: org.apache.kafka.connect.errors.ConnectException: Unable to write Iceberg record
2023-05-03 17:08:11     at com.github.tenxfuturetechnologies.kafkaconnecticeberg.sink.iceberg.DefaultIcebergWriter.write(DefaultIcebergWriter.java:36)
2023-05-03 17:08:11     at com.github.tenxfuturetechnologies.kafkaconnecticeberg.sink.RecordWriter.writeRecord(RecordWriter.java:155)
2023-05-03 17:08:11     at com.github.tenxfuturetechnologies.kafkaconnecticeberg.sink.RecordWriter.flush(RecordWriter.java:115)
2023-05-03 17:08:11     at com.github.tenxfuturetechnologies.kafkaconnecticeberg.sink.IcebergSinkTask.put(IcebergSinkTask.java:72)
2023-05-03 17:08:11     at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:585)
2023-05-03 17:08:11     ... 10 more
2023-05-03 17:08:11 Caused by: java.lang.ClassCastException: class java.time.LocalDateTime cannot be cast to class java.time.OffsetDateTime (java.time.LocalDateTime and java.time.OffsetDateTime are in module java.base of loader 'bootstrap')
2023-05-03 17:08:11     at org.apache.iceberg.data.InternalRecordWrapper.lambda$converter$4(InternalRecordWrapper.java:58)
2023-05-03 17:08:11     at org.apache.iceberg.data.InternalRecordWrapper.get(InternalRecordWrapper.java:98)
2023-05-03 17:08:11     at org.apache.iceberg.Accessors$PositionAccessor.get(Accessors.java:71)
2023-05-03 17:08:11     at org.apache.iceberg.Accessors$PositionAccessor.get(Accessors.java:58)
2023-05-03 17:08:11     at org.apache.iceberg.PartitionKey.partition(PartitionKey.java:106)
2023-05-03 17:08:11     at com.github.tenxfuturetechnologies.kafkaconnecticeberg.sink.iceberg.DefaultPartitionedFanoutWriter.partition(DefaultPartitionedFanoutWriter.java:34)
2023-05-03 17:08:11     at com.github.tenxfuturetechnologies.kafkaconnecticeberg.sink.iceberg.DefaultPartitionedFanoutWriter.partition(DefaultPartitionedFanoutWriter.java:14)
2023-05-03 17:08:11     at org.apache.iceberg.io.PartitionedFanoutWriter.write(PartitionedFanoutWriter.java:52)
2023-05-03 17:08:11     at com.github.tenxfuturetechnologies.kafkaconnecticeberg.sink.iceberg.DefaultIcebergWriter.write(DefaultIcebergWriter.java:33)
2023-05-03 17:08:11     ... 14 more

And the connector config is:

{
  "name": "test-partitioned-table-sink",
  "config": {
    "connector.class": "com.github.tenxfuturetechnologies.kafkaconnecticeberg.sink.IcebergSinkConnector",
    "producer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor",
    "consumer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor",
    "topics.regex": "topic1",
    "errors.log.enable": "true",
    "errors.retry.delay.max.ms": "60000",
    "errors.retry.timeout": "5",
    "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
    "header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schemas.enable": "true",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.enhanced.avro.schema.support": true,

    "table.name": "example_table",
    "timezone": "UTC",
    "flush.size": 5000,
    "iceberg.catalog-impl": "org.apache.iceberg.hadoop.HadoopCatalog",
    "iceberg.warehouse": "file:///tmp/iceberg_warehouse",
    "iceberg.fs.defaultFS": "file:///tmp/iceberg_warehouse"
  }
}