10xfuturetechnologies / kafka-connect-iceberg

Kafka Connector for Iceberg tables
Apache License 2.0
16 stars 5 forks source link

Exception writing records : java.lang.IllegalStateException #28

Open haripriyarhp opened 1 year ago

haripriyarhp commented 1 year ago

Hi, I am trying to use this connector to read from kafka and write to S3. I have given the below properties and the JsonConverter does not have schema and hence key.converter.schemas.enable is false. Getting the below error. Does this connector only support Avroformat?

          format.class: org.apache.kafka.connect.json.JsonFormat
          header.converter: org.apache.kafka.connect.converters.ByteArrayConverter
          key.converter: org.apache.kafka.connect.storage.StringConverter
          key.converter.schemas.enable: false
          value.converter: org.apache.kafka.connect.json.JsonConverter
          value.converter.schemas.enable: false
          value.converter.schema.registry.url: https://schema-registry-url
          key.converter.enhanced.avro.schema.support: true

ERROR [kafka-connector-iceberg|task-0] Exception writing records (com.github.tenxfuturetechnologies.kafkaconnecticeberg.sink.IcebergSinkTask) [task-thread-kafka-connector-iceberg-0] java.lang.IllegalStateException: No records have been tracked, did you invoke updateSchema() or isCompatible()? at com.github.tenxfuturetechnologies.kafkaconnecticeberg.sink.schema.RecordSchemaCompatibilityTracker.project(RecordSchemaCompatibilityTracker.java:35) at com.github.tenxfuturetechnologies.kafkaconnecticeberg.sink.RecordWriter.writeRecord(RecordWriter.java:138) at com.github.tenxfuturetechnologies.kafkaconnecticeberg.sink.RecordWriter.flush(RecordWriter.java:115) at com.github.tenxfuturetechnologies.kafkaconnecticeberg.sink.IcebergSinkTask.put(IcebergSinkTask.java:72) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:833)

ddcprg commented 1 year ago

Hi, thanks for raising this.

The sink should be able to sink JSON provided that Kafka messages have a schema otherwise the code won't be able to map Kafka connect record fields to Iceberg schemas. The stack trace above is misleading, I'll add a fix to provide the correct error message.

haripriyarhp commented 1 year ago

Hi, Thanks for the response. I was able to write Json data using the connector after your suggestion. But I have some new issues for which I have opened a seperate ticket.