databricks / iceberg-kafka-connect

Apache License 2.0
220 stars 49 forks source link

Tabular sink connector not working with azure event hub (kaka head) #304

Open abhijith-motorq opened 1 month ago

abhijith-motorq commented 1 month ago

hi, I am using azure event hub(kafka head) and I am trying to create a iceberg sink connector in kafka connect using the following config

{
        "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
        "tasks.max": "2",
        "topics": "metrics2",
        "iceberg.tables": "feed.test-messages",
        "iceberg.tables.auto-create-enabled": "true",
        "iceberg.tables.schema-force-optional": "true",
        "iceberg.catalog.type": "rest",
        "iceberg.catalog.uri": "https://xxxxxxxx.snowflakecomputing.com/polaris/api/catalog",
        "iceberg.catalog.io-impl": "org.apache.iceberg.azure.adlsv2.ADLSFileIO",
        "iceberg.catalog.include-credentials": "true",
        "iceberg.catalog.warehouse": "lakehouse-test-snowflake",
        "iceberg.catalog.credential": "client_id:client_seccret",
        "icebberg.catalog.scope": "PRINCIPAL_ROLE:xxx",
        "name": "sink-feed-snowflake",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter.schemas.enable": "false"
    }

I am using auto-create-table in Kafka Connect . and the table get's created and I can see a metadata folder and a data folder and inside metadata folder there is one json file. and data folder has a couple of parquet files

after further debugging found the below logs

 ERROR Coordinator error during process, exiting thread (io.tabular.iceberg.connect.channel.CoordinatorThread)
java.lang.IllegalStateException: Unrecognized header bytes: 0x%02X 0x%02X [0, 0]
  at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkState(Preconditions.java:686)
  at org.apache.iceberg.avro.AvroEncoderUtil.decode(AvroEncoderUtil.java:73)
  at org.apache.iceberg.connect.events.AvroUtil.decode(AvroUtil.java:63)
  at io.tabular.iceberg.connect.channel.EventDecoder.decode(EventDecoder.java:73)
  at io.tabular.iceberg.connect.channel.Channel.lambda$consumeAvailable$2(Channel.java:131)
  at java.base/java.lang.Iterable.forEach(Iterable.java:75)
  at io.tabular.iceberg.connect.channel.Channel.consumeAvailable(Channel.java:125)
  at io.tabular.iceberg.connect.channel.Coordinator.process(Coordinator.java:108)
  at io.tabular.iceberg.connect.channel.CoordinatorThread.run(CoordinatorThread.java:40)
[2024-10-10 10:05:14,909] INFO Channel stopping (io.tabular.iceberg.connect.channel.Channel)
│ [2024-10-10 09:04:04,014] INFO Commit timeout reached. Now: 1728551044014, start: 1728551013945, timeout: 30000 (io.tabular.iceberg.connect.channel.CommitState)                                                                                                             │
│ [2024-10-10 09:04:04,014] INFO Processing commit after responses for d505acc5-1b2c-4ebf-bc30-2a91bdbd4e90, isPartialCommit true (io.tabular.iceberg.connect.channel.Coordinator)

when I read the one metadata json file that was created, it had no snapshots.

then when I tried with the same Kafka Connect setup and same sink connector configs, but with confluent kafka it worked

is there an issues in iceberg sink connector using azure event hub or am I missing something?