databricks / iceberg-kafka-connect

Apache License 2.0
213 stars 47 forks source link

Duplicates in the same partition with upsert mode enabled and primary Key given #291

Open ArkaSarkar19 opened 1 month ago

ArkaSarkar19 commented 1 month ago

Hi Team

We are noticing duplicates to occur within the same partition when we configure the primary keys with upset-mode enabled on the table. We are tried the two setups below:

  1. Reading CDC data with cdc-field and mode enabled in iceberg connector.
  2. Reading Data form Kafka with a primary key.

In both the setups the issue occurs. Ideally we should see only one record per primary key in a partition, however the results are inconsistent there are a few percentage of records which are duplicated within the same partition. We need some support on why this might be occurring, we suspect it could be due to some concurrency issue in the commit coordinator. I am attaching the connector config below :

{
 "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
 "iceberg.catalog.table-override.write.data.path": "<path>",
 "iceberg.catalog.table-override.write.parquet.compression-codec": "snappy",
 "errors.log.include.messages": "true",
 "iceberg.catalog.s3.region": "us-east-1",
 "iceberg.catalog.client.region": "us-east-1",
 "iceberg.catalog.table-override.write.metadata.path": "<path>",
 "errors.log.enable": "true",
 "key.converter": "org.apache.kafka.connect.storage.StringConverter",
 "consumer.override.bootstrap.servers": "<url>",
 "value.converter.schema.registry.url": "<url>",

 "name": "test_connector",
 "iceberg.table.db.table.partition-by": "date,hour",
 "iceberg.tables.evolve-schema-enabled": "true",
 "iceberg.catalog.table-override.write.metadata.previous-versions-max": "100",
 "tasks.max": "1",
 "value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
 "iceberg.catalog.s3.sse.key": "AES256",
 "iceberg.tables.upsert-mode-enabled": "true",
 "iceberg.tables.auto-create-enabled": "true",
 "iceberg.tables": "db.table",
 "value.converter": "io.confluent.connect.avro.AvroConverter",
 "iceberg.catalog.s3.sse.type": "s3",
 "iceberg.catalog.table-override.write.metadata.delete-after-commit.enabled": "true",
 "iceberg.table.db.table.id-columns": "id",
 "topics": "<topic>",
 "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
 "iceberg.control.commit.interval-ms": "900000",
 "iceberg.catalog.uri": "<uri>",
 "key.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
 "iceberg.catalog": "spark_catalog",
 "consumer.override.auto.offset.reset": "latest",
 "iceberg.tables.schema-case-insensitive": "true",
 "iceberg.catalog.warehouse": "<warehourse>",
 "iceberg.control.topic": "<topic>",
 "key.converter.schema.registry.url": "<url>",
 "iceberg.catalog.type": "hive",
 "iceberg.catalog.s3.path-style-access": "true"
}