databricks / iceberg-kafka-connect

Apache License 2.0
219 stars 49 forks source link

Duplicated records for same primary key, with upsert-mode active and partition configured for CDC Data #181

Open mattssll opened 10 months ago

mattssll commented 10 months ago

Hi, I'm using iceberg to pick-up data coming from Debezium. I realised now that for a table I have where I get lots of updates for the same IDs, data is getting duplicated. I have 1 record per primary key PER PARTITION. The expected result I'd like to have is having only the latest partition record per each primary key. The primary key is "dag_id", the partition field is "__source_ts_ms"

Here are some examples of the duplicated data:

dag_id | __schema_and_source_table | __kafka_ingestion_ts_ms | __source_ts_ms | is_paused -- | -- | -- | -- | -- airflow_db_cleanup | data_lake.dag | 2024-01-12 11:44:55.675000 UTC | 2024/01/12 | FALSE airflow_db_cleanup | data_lake.dag | 2024-01-11 23:59:26.214000 UTC | 2024/01/11 | FALSE airflow_resolve_stuck_tasks | data_lake.dag | 2024-01-12 11:45:14.237000 UTC | 2024/01/12 | FALSE airflow_resolve_stuck_tasks | data_lake.dag | 2024-01-11 23:59:59.813000 UTC | 2024/01/11 | FALSE auto_dag_generation | data_lake.dag | 2024-01-12 11:45:14.237000 UTC | 2024/01/12 | FALSE auto_dag_generation | data_lake.dag | 2024-01-11 23:59:59.813000 UTC | 2024/01/11 | FALSE And here is my config - my assumption is that if I removed the partition field then the duplication wouldn't happen, but then the table wouldn't be optimized... I assume that because I have one record per PK per daily partition. ``` apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: iceberg-sink-fan-out-connector namespace: kafka-connect labels: strimzi.io/cluster: strimzi-connect-cluster spec: # https://github.com/tabular-io/iceberg-kafka-connect # https://iceberg.apache.org/docs/latest/configuration/ class: io.tabular.iceberg.connect.IcebergSinkConnector tasksMax: 1 config: iceberg.control.topic: ${configmaps:kafka-connect/connectors-config:iceberg_control_topic} iceberg.control.group-id: ${configmaps:kafka-connect/connectors-config:iceberg_control_group_id} # GlueCatalog Configuration iceberg.catalog.catalog-impl: "org.apache.iceberg.aws.glue.GlueCatalog" iceberg.catalog.warehouse: ${configmaps:kafka-connect/connectors-config:iceberg_catalog_warehouse} iceberg.catalog.io-impl: "org.apache.iceberg.aws.s3.S3FileIO" iceberg.catalog.client.region": eu-west-1" # Schema Registry Configuration key.converter: "org.apache.kafka.connect.json.JsonConverter" value.converter: "io.confluent.connect.avro.AvroConverter" value.converter.schema.registry.url: "${env:SCHEMA_REGISTRY_URL}" value.converter.schemas.enable: "true" iceberg.control.commit.interval-ms": "5000" # Iceberg Configuration iceberg.tables.default-id-columns: "id" iceberg.tables.upsert-mode-enabled: "true" iceberg.tables.auto-create-enabled: "true" iceberg.tables.evolve-schema-enabled: "true" iceberg.tables.schema-force-optional: "true" # Tables and Topics Configuration topics: ${configmaps:kafka-connect/connectors-config:iceberg_topics} # CDC Configation # ddd.airflow.awsdms_apply_exceptions, has no primary key, gives error: # java.lang.NullPointerException: Cannot invoke "org.apache.iceberg.types.Types$NestedField.fieldId()" # because the return value of "org.apache.iceberg.Schema.findField(String)" is null iceberg.tables.cdc-field: "__op" # Dynamic Table Fan-Out Configuration iceberg.tables.dynamic-enabled: "true" iceberg.tables.route-field: "__schema_and_source_table" # Partition iceberg.tables.default-partition-by: "__source_ts_ms" # Setting primary key for tables iceberg.table.data_lake.ab_permission.id-columns: "id" iceberg.table.data_lake.ab_permission_view.id-columns: "id" iceberg.table.data_lake.ab_permission_view_role.id-columns: "id" iceberg.table.data_lake.ab_register_user.id-columns: "id" iceberg.table.data_lake.ab_role.id-columns: "id" iceberg.table.data_lake.ab_user.id-columns: "id" iceberg.table.data_lake.ab_user_role.id-columns: "id" iceberg.table.data_lake.ab_view_menu.id-columns: "id" iceberg.table.data_lake.alembic_version.id-columns: "version_num" iceberg.table.data_lake.celery_taskmeta.id-columns: "id" iceberg.table.data_lake.celery_tasksetmeta.id-columns: "id" iceberg.table.data_lake.connection.id-columns: "id" iceberg.table.data_lake.dag.id-columns: "dag_id" iceberg.table.data_lake.dag_code.id-columns: "fileloc_hash" iceberg.table.data_lake.dag_pickle.id-columns: "id" iceberg.table.data_lake.dag_run.id-columns: "id" iceberg.table.data_lake.dag_tag.id-columns: "name,dag_id" iceberg.table.data_lake.import_error.id-columns: "id" iceberg.table.data_lake.job.id-columns: "id" iceberg.table.data_lake.log.id-columns: "id" iceberg.table.data_lake.rendered_task_instance_fields.id-columns: "dag_id,task_id,execution_date" iceberg.table.data_lake.sensor_instance.id-columns: "id" iceberg.table.data_lake.serialized_dag.id-columns: "dag_id" iceberg.table.data_lake.sla_miss.id-columns: "task_id,dag_id,execution_date" iceberg.table.data_lake.slot_pool.id-columns: "id" iceberg.table.data_lake.task_fail.id-columns: "id" iceberg.table.data_lake.task_instance.id-columns: "task_id,dag_id,execution_date" iceberg.table.data_lake.task_reschedule.id-columns: "id" iceberg.table.data_lake.variable.id-columns: "id" iceberg.table.data_lake.test_dml.id-columns: "id" ``` Thanks, looking to hear about it from someone.
okayhooni commented 7 months ago

Hello, @mattssll I have a plan to use this sink connector for CDC sink from mysql-debezium.

Is there any fix or improvement for this issue..?