databricks / iceberg-kafka-connect

Apache License 2.0
220 stars 49 forks source link

Iceberg UUID type issue #314

Open julien-alpaca opened 2 days ago

julien-alpaca commented 2 days ago

Iceberg sink is failing when writing to a table with a column of type uuid. The sink connector convert the string into a java.util.UUID but fails when writing because iceberg is expecting a fixed byte array. I tried to use a custom converter to convert the string into a byte array but the the schema is inferred from the table

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:635)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:344)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:246)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:215)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:225)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:280)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.kafka.connect.errors.DataException: An error occurred converting record, topic: gobroker.public.orders, partition, 0, offset: 0
    at io.tabular.iceberg.connect.data.IcebergWriter.write(IcebergWriter.java:79)
    at io.tabular.iceberg.connect.channel.Worker.lambda$routeRecordStatically$1(Worker.java:112)
    at java.base/java.util.Arrays$ArrayList.forEach(Arrays.java:4305)
    at io.tabular.iceberg.connect.channel.Worker.routeRecordStatically(Worker.java:110)
    at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:99)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    at io.tabular.iceberg.connect.channel.Worker.write(Worker.java:85)
    at io.tabular.iceberg.connect.channel.TaskImpl.put(TaskImpl.java:42)
    at io.tabular.iceberg.connect.IcebergSinkTask.put(IcebergSinkTask.java:76)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:605)
    ... 11 more
Caused by: java.lang.ClassCastException: class java.util.UUID cannot be cast to class [B (java.util.UUID and [B are in module java.base of loader 'bootstrap')
    at org.apache.iceberg.data.parquet.BaseParquetWriter$FixedWriter.write(BaseParquetWriter.java:293)
    at org.apache.iceberg.parquet.ParquetValueWriters$OptionWriter.write(ParquetValueWriters.java:356)
    at org.apache.iceberg.parquet.ParquetValueWriters$StructWriter.write(ParquetValueWriters.java:589)
    at org.apache.iceberg.parquet.ParquetWriter.add(ParquetWriter.java:135)
    at org.apache.iceberg.io.DataWriter.write(DataWriter.java:71)
    at org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.write(BaseTaskWriter.java:362)
    at org.apache.iceberg.io.BaseTaskWriter$RollingFileWriter.write(BaseTaskWriter.java:345)
    at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.write(BaseTaskWriter.java:277)
    at org.apache.iceberg.io.UnpartitionedWriter.write(UnpartitionedWriter.java:42)
    at io.tabular.iceberg.connect.data.IcebergWriter.write(IcebergWriter.java:71)
    ... 20 more
julien-alpaca commented 2 days ago

@bryanck