tabular-io / iceberg-kafka-connect

Apache License 2.0
177 stars 32 forks source link

Connector not committing to Iceberg table #146

Closed anuja-kelkar closed 8 months ago

anuja-kelkar commented 8 months ago

Hi, We are using iceberg-kafka-connect as a custom plugin with Amazon MSK Connect. We are publishing messages to a single Kafka topic in our Amazon MSK Cluster and using the Multi-table fan-out, dynamic routing to write to dynamic tables in Iceberg.

The connector config we are using is as follows:

connector.class=io.tabular.iceberg.connect.IcebergSinkConnector
iceberg.control.commit.threads=2
iceberg.tables.route-field=iceberg_table
iceberg.control.group.id=cg-control-sync-iceberg-kafka-connect-distro
iceberg.tables.id-columns=record_id
iceberg.catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
tasks.max=3
topics=datasets
key.converter.region=us-east-1
iceberg.catalog.client.region=us-east-1
iceberg.control.commit.interval-ms=120000
iceberg.tables.dynamic-enabled=true
iceberg.catalog=AwsDataCatalog
value.converter.region=us-east-1
iceberg.control.commit.timeout-ms=60000
value.converter.schemas.enable=false
iceberg.catalog.warehouse=s3://data-os-distro
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter

Here is a sample message published to the datasets topic.

{
  "record_id": "random-string"
  "field1": "value1",
  "field1": "value2",
  "field3": "value3",
  "field4": "value4",
  "field5": "value5",
  "field6": "value5",
  "iceberg_table": "db_name.table_name"
}

We do see that the iceberg table metadata is getting refreshed and table is being loaded by the catalog. Refer to the log messages below:

[Worker-00ea96f94f9a0f883] [2023-11-03 16:20:27,473] INFO [iceberg-kafka-connector-distro|task-1] Refreshing table metadata from new version: s3://data-os-distro/db_37fa5317_9323_4118_9a5b_5cdef4f4d35c.db/tbl_027019fa_f1ab_44e2_808a_990d2d662527_stg/metadata/00000-1b0ad084-dd3c-42ae-b356-91f0eaa39bae.metadata.json (org.apache.iceberg.BaseMetastoreTableOperations:199)

[Worker-00ea96f94f9a0f883] [2023-11-03 16:20:28,257] INFO [iceberg-kafka-connector-distro|task-1] Table loaded by catalog: AwsDataCatalog.db_37fa5317_9323_4118_9a5b_5cdef4f4d35c.tbl_027019fa_f1ab_44e2_808a_990d2d662527_stg (org.apache.iceberg.BaseMetastoreCatalog:67)

We are encountering intermittent issues with the connector logs showing up the following message and the records not getting routed to the target Iceberg tables.

[Worker-0c9f5e21454c1773b] [2023-11-03 16:14:51,913] INFO [iceberg-kafka-connector-distro|task-1] Commit 93944a6d-361f-4420-af98-adfac6b9fdc7 complete, committed to 0 table(s), vtts null (io.tabular.iceberg.connect.channel.Coordinator:161)

We have seen that a successful commit message usually includes committed to <db_name>.<table_name>. Strangely enough, we have seen the records getting committed when the topic was deleted. We are unsure what we are missing and why we are seeing this behaviour. Any help is appreciated.

Thanks.

bryanck commented 8 months ago

Dynamic routing will ignore records if they can't be routed to a table, i.e. if the table doesn't exist and auto create is off. You can try enabling auto create to help debug this, and see if any tables are being created that you don't expect.

anuja-kelkar commented 8 months ago

Thanks @bryanck . We observed a property of the connector incorrectly named, which was causing the issue. This can be closed.