getindata / kafka-connect-iceberg-sink

Apache License 2.0
77 stars 28 forks source link

Issue with WorkerSinkTask in Kafka Connect: Exception with Identifier Field in Iceberg Schema #49

Open junsik-gsitm opened 10 months ago

junsik-gsitm commented 10 months ago

Hello,

I'm encountering an issue with the Kafka Connect Sink Connector, specifically when integrating with Apache Iceberg. The error is as follows:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.IllegalArgumentException: Cannot add field _id as an identifier field: not a primitive type field at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:220) at org.apache.iceberg.Schema.validateIdentifierField(Schema.java:122) at org.apache.iceberg.Schema.lambda$new$0(Schema.java:106) at java.base/java.lang.Iterable.forEach(Iterable.java:75) at org.apache.iceberg.Schema.<init>(Schema.java:106) at org.apache.iceberg.Schema.<init>(Schema.java:91) at org.apache.iceberg.Schema.<init>(Schema.java:83) at com.getindata.kafka.connect.iceberg.sink.IcebergChangeEvent$JsonSchema.icebergSchema(IcebergChangeEvent.java:347) at com.getindata.kafka.connect.iceberg.sink.IcebergChangeEvent.icebergSchema(IcebergChangeEvent.java:64) at com.getindata.kafka.connect.iceberg.sink.IcebergChangeConsumer.lambda$loadIcebergTable$0(IcebergChangeConsumer.java:68) at java.base/java.util.Optional.orElseGet(Optional.java:369) at com.getindata.kafka.connect.iceberg.sink.IcebergChangeConsumer.loadIcebergTable(IcebergChangeConsumer.java:64) at com.getindata.kafka.connect.iceberg.sink.IcebergChangeConsumer.accept(IcebergChangeConsumer.java:55) at com.getindata.kafka.connect.iceberg.sink.IcebergSinkTask.put(IcebergSinkTask.java:38) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587) ... 11 more

The complete error log is attached below. This exception suggests a problem with adding the _id field as an identifier in the Iceberg schema because it's not a primitive type.

I am using a specific JSON configuration for the connector (which I will attach below). Could you please help me understand why this error occurs? It seems that the _id field is not recognized as a valid identifier, although there doesn't seem to be anything unusual with its definition.

Attached: [Error Log and JSON Configuration] { "name": "mongodb-iceberg-sink-getindata", "config": { "connector.class": "com.getindata.kafka.connect.iceberg.sink.IcebergSink", "topics": "test.mongodemo.tripNotification", "upsert": "true", "upsert.keep-deletes": "true", "table.auto-create": "true", "allow-field-addition": "true", "table.write-format": "parquet", "table.namespace": "jskim", "table.prefix": "debumcdc_", "iceberg.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog", "iceberg.warehouse": "s3a://jskim/sample-test", "iceberg.fs.defaultFS": "s3a://jskim/sample-test", "iceberg.com.amazonaws.services.s3.enableV4": "true", "iceberg.com.amazonaws.services.s3a.enableV4": "true", "iceberg.fs.s3a.aws.credentials.provider": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain", "iceberg.fs.s3a.path.style.access": "true", "iceberg.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem", "iceberg.fs.s3a.access.key": "", "iceberg.fs.s3a.secret.key": "+l/YxMb2pxJQ" } } Any insights or suggestions on resolving this issue would be greatly appreciated.

Thank you.

utkanbir commented 10 months ago

Hi Junsik , i have a similiar problem. What are your iceberg settings?