tabular-io / iceberg-kafka-connect

Apache License 2.0
177 stars 32 forks source link

ConnectException with Invalid Table Identifier in Iceberg Sink Connector #171

Closed junsik-gsitm closed 7 months ago

junsik-gsitm commented 7 months ago

Description of the Issue: I encountered an unrecoverable exception while using the Iceberg Sink Connector with Apache Kafka Connect. The primary issue seems to be an IllegalArgumentException related to an invalid table identifier, specifically for the table named 'tripNotification'.

Error Message:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.IllegalArgumentException: Invalid table identifier: tripNotification at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:218) at org.apache.iceberg.BaseMetastoreCatalog$BaseMetastoreCatalogTableBuilder.<init>(BaseMetastoreCatalog.java:146) at org.apache.iceberg.BaseMetastoreCatalog.buildTable(BaseMetastoreCatalog.java:94) at org.apache.iceberg.catalog.Catalog.createTable(Catalog.java:71) at org.apache.iceberg.catalog.Catalog.createTable(Catalog.java:93) at io.tabular.iceberg.connect.data.IcebergWriterFactory.lambda$autoCreateTable$1(IcebergWriterFactory.java:106) at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413) at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219) at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203) at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196) at io.tabular.iceberg.connect.data.IcebergWriterFactory.autoCreateTable(IcebergWriterFactory.java:100) at io.tabular.iceberg.connect.data.IcebergWriterFactory.createWriter(IcebergWriterFactory.java:57) at io.tabular.iceberg.connect.channel.Worker.lambda$writerForTable$8(Worker.java:242) at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1134) at io.tabular.iceberg.connect.channel.Worker.writerForTable(Worker.java:241) at io.tabular.iceberg.connect.channel.Worker.lambda$routeRecordStatically$5(Worker.java:197) at java.base/java.util.Arrays$ArrayList.forEach(Arrays.java:4390) at io.tabular.iceberg.connect.channel.Worker.routeRecordStatically(Worker.java:195) at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:184) at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:171) at io.tabular.iceberg.connect.IcebergSinkTask.put(IcebergSinkTask.java:150) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587) ... 11 more Additional Context:

I was using the Iceberg Sink Connector to write data from Kafka topics to Iceberg tables. The issue occurred when the connector attempted to create or access the table 'tripNotification'. I am not sure if this is related to configuration settings or an underlying issue in the connector itself. I appreciate any guidance or suggestions to resolve this issue. Thank you!

"I am trying to automatically create Iceberg tables using multiple topics, but I'm not sure why it's not working. Is there any special configuration that needs to be done in places like AWS Glue?"

Please feel free to modify the draft to better fit the specifics of your issue or add any additional details you think are relevant.

nastra commented 7 months ago

@junsik-gsitm if you're using Glue, then Glue expects a namespace/table name as a valid identifier. See also https://github.com/apache/iceberg/blob/4e62b58f04aba4ccc2a2a846494b478d3b03f58f/aws/src/main/java/org/apache/iceberg/aws/glue/GlueCatalog.java#L620-L627. That being said, I don't think that's an issue of the connector, but rather the catalog implementation requiring namespace/table.

It also looks like you can skip name validation by using glue.skip-name-validation: https://github.com/apache/iceberg/blob/c68abfc9fd3956077b43aba20441f089bb8b93d6/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java#L80-L87

junsik-gsitm commented 7 months ago

Thank you for your answer. @nastra

I have an additional question.

I am trying to process data using a sink connector.

Therefore, I applied the glue.skip-name-validation option to the JSON data of the sink connector, but the same error occurred.

It seems that I cannot apply the glue.skip-name-validation.

However, the tripNotification table does not have any special characters, and there seems to be nothing unusual about it,

so I don't understand why it's not working. Note that I have not made any settings in Glue. Could this be the reason for the error?? Below is the JSON I am using. Please help me.

{ "name": "mongodb-iceberg-sink", "config": { "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector", "tasks.max": "1", "topics": "test.mongodemo.tripNotification", "iceberg.tables": "tripNotification", "iceberg.tables.schema-force-optional": "false", "iceberg.tables.schema-case-insensitive": "false", "iceberg.tables.dynamic-enabled": "false", "iceberg.tables.auto-create-enabled": "true", "iceberg.tables.evolve-schema-enabled": "true", "iceberg.tables.upsert-mode-enabled":"true", "iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog", "iceberg.catalog.warehouse": "s3a://skim/sample-test", "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO", "iceberg.control.group-id": "iceberg", "glue.skip-name-validation": "true", "iceberg.control.commit.interval-ms": "300000", "iceberg.control.commit.timeout-ms": "30000", "iceberg.control.commit.threads": "1" } }

nastra commented 7 months ago

@junsik-gsitm have you tried providing the namespace with the table name?

junsik-gsitm commented 7 months ago

@nastra I think that the package only supports Debezium's source connector. However, it seems like the connector I am using is not supported. The connector class I am using is 'com.mongodb.kafka.connect.MongoSourceConnector'.

junsik-gsitm commented 7 months ago

@nastra @bryanck

The same error occurs with the source connector of io.debezium.connector.mysql.MySqlConnector, even after assigning the table.namespace. I've tried both com.getindata.kafka.connect.iceberg.sink.IcebergSink and io.tabular.iceberg.connect.IcebergSinkConnector, but the same issue persists. Is it really okay not to do any preliminary setup in AWS Glue? It's quite stressful and challenging. I could use some help.


    "name": "mongodb-iceberg-sink",
    "config": {
      "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
      "tasks.max": "1",
      "topics": "20A_JEVS.20A_JEVS.tb_truster",
      "iceberg.tables":"test.tb_truster",
      "iceberg.tables.cdc-field":"_cdc.op",
      "iceberg.tables.schema-force-optional": "false",
      "iceberg.tables.schema-case-insensitive": "false",
      "iceberg.tables.auto-create-enabled": "true",
      "iceberg.tables.evolve-schema-enabled": "true",
      "iceberg.tables.upsert-mode-enabled":"true",
      "iceberg.catalog":"test",
      "iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
      "iceberg.catalog.warehouse": "s3://xx-xx-xx/warehouse/",
      "iceberg.catalog.s3.access-key-id": "xxxxx",
      "iceberg.catalog.s3.secret-access-key": "xxxxxxxxxxxxxxxxxx",
      "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
      "iceberg.catalog.client.region": "ap-northeast-2",
      "iceberg.catalog.s3.region": "ap-northeast-2",
      "iceberg.catalog.client.assume-role.region": "ap-northeast-2",
      "iceberg.catalog.s3.path-style-access": "true",
      "iceberg.catalog.glue.lakeformation-enabled": "false",
      "iceberg.catalog.client.assume-role.arn": "arn:aws:s3:::xx-xx-xxx/warehouse/",
      "iceberg.catalog.s3.sse.key": "arn:aws:s3:::xx-xx-xx/warehouse/",
      "iceberg.control.group-id": "iceberg",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "value.converter":"io.confluent.connect.avro.AvroConverter",
      "value.converter.schema.registry.url":"http://0.0.0.1:8081",
      "transforms":"debezium",
      "transforms.debezium.type":"io.tabular.iceberg.connect.transforms.DebeziumTransform"
    }
}```
mattssll commented 6 months ago

I ended up using a SMT in debezium to append the glue catalog name as one of the fields, so I can pass it as table name for the dynamic routing in fan out, here's the relevant debezium config:

transforms.patternMapString.type: "com.github.jcustenborder.kafka.connect.transform.common.PatternMapString$Value"
    transforms.patternMapString.src.field.name: "__source_table"
    transforms.patternMapString.dest.field.name: "__schema_and_source_table"
    transforms.patternMapString.value.pattern: "^"
    transforms.patternMapString.value.replacement: "${configmaps:kafka-connect/connectors-config:iceberg_glue_database}."

This is not nice indeed, cause debezium should have nothing to do with the iceberg implementation. It would be nice to have a better solution indeed.

@junsik-gsitm could you maybe reopen the issue? to see if we get an answer on this