memiiso / debezium-server-iceberg

Replicates any database (CDC events) to Apache Iceberg (To Cloud Storage)
Apache License 2.0
171 stars 35 forks source link

Upsert doesn't work when the destination iceberg table is partitioned #248

Open vietcheems opened 8 months ago

vietcheems commented 8 months ago

I'm trying to cdc data in upsert mode from Postgres. I notice when I partition the iceberg table by a column present in the source table, new records are appended instead of upserted.

Here is the source table's data initially: image

The data is replicated normally to Iceberg: image

However, when I update the source table to: image

Instead of updating the existing record, a new record is added to the Iceberg table: image

The table definitions are as follows:

-- src table
CREATE TABLE test (
    id int4 NOT NULL,
    "name" varchar(50) NULL,
    CONSTRAINT test_pkey PRIMARY KEY (id)
);

-- dest table
CREATE TABLE iceberg_test (
   id integer NOT NULL,
   name varchar,
   __op varchar,
   __source_ts_ms timestamp(6) with time zone,
   __deleted varchar
)
WITH (
   format = 'PARQUET',
   format_version = 2,
   partitioning = array['name'],
   sorted_by = ARRAY['id ASC NULLS FIRST']
);

This is my application.properties file:

# Use iceberg sink
debezium.sink.type=iceberg

# Iceberg sink config
debezium.sink.iceberg.table-prefix=postgres121
debezium.sink.iceberg.upsert=true
debezium.sink.iceberg.upsert-keep-deletes=false
debezium.sink.iceberg.write.format.default=parquet
debezium.sink.iceberg.catalog-name=iceberg

# hive meatastore catalogs
debezium.sink.iceberg.type=hive
debezium.sink.iceberg.uri=thrift://SOME_URI
debezium.sink.iceberg.clients=5
debezium.sink.iceberg.warehouse=s3a://datalake
debezium.sink.iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
debezium.sink.iceberg.engine.hive.enabled=true
debezium.sink.iceberg.iceberg.engine.hive.enabled=true
debezium.sink.iceberg.hive.metastore.table.owner=admin
debezium.sink.iceberg.hive.other.configs=admin

# S3 config
debezium.sink.iceberg.fs.defaultFS=s3a://datalake
debezium.sink.iceberg.com.amazonaws.services.s3.enableV4=true
debezium.sink.iceberg.com.amazonaws.services.s3a.enableV4=true
debezium.sink.iceberg.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
debezium.sink.iceberg.fs.s3a.access.key=minioadmin
debezium.sink.iceberg.fs.s3a.secret.key=minioadmin
debezium.sink.iceberg.fs.s3a.endpoint=http://SOME_ENDPOINT
debezium.sink.iceberg.fs.s3a.path.style.access=true
debezium.sink.iceberg.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem

# enable event schemas - mandate
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json

# postgres 
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=/tmp/offset
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=SOME_HOSTNAME
debezium.source.database.port=5431
debezium.source.database.user=postgres
debezium.source.database.password=changeme
debezium.source.database.dbname=test
debezium.source.database.server.name=postgres_test
debezium.source.schema.include.list=kafka_hudi_test
#debezium.source.database.server.name=postgres_test1
#debezium.source.table.include.list=public.test_user1
debezium.source.topic.prefix=vietpq_
debezium.source.plugin.name=pgoutput
debezium.source.slot.name=ducdn1

# do event flattening. unwrap message!
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.add.fields=op,source.ts_ms
debezium.transforms.unwrap.delete.handling.mode=rewrite
debezium.transforms.unwrap.drop.tombstones=true

# ############ SET INTERVAL TIME ############
debezium.sink.batch.batch-size-wait=MaxBatchSizeWait
debezium.sink.batch.metrics.snapshot-mbean=debezium.postgres:type=connector-metrics,context=snapshot,server=testc
debezium.sink.batch.metrics.streaming-mbean=debezium.postgres:type=connector-metrics,context=streaming,server=testc
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.max.batch.size=100000
debezium.source.max.queue.size=1000000
debezium.sink.batch.batch-size-wait.max-wait-ms=6000
debezium.sink.batch.batch-size-wait.wait-interval-ms=1000

# ############ SET LOG LEVELS ############
quarkus.log.level=INFO
quarkus.log.console.json=false
# hadoop, parquet
quarkus.log.category."org.apache.hadoop".level=WARN
quarkus.log.category."org.apache.parquet".level=WARN
# Ignore messages below warning level from Jetty, because it's a bit verbose
quarkus.log.category."org.eclipse.jetty".level=WARN
quarkus.log.category."io.debezium.server.iceberg.batchsizewait".level=DEBUG

The expected behaviour is there should only be 1 record in the destination table containing the updated data. When the destination table is not partitioned by the "name" column, upsert works fine.

ismailsimsek commented 8 months ago

@vietcheems thank you for reporting it.

when you are creating destination table, could it be that the table is created without "identifier field"? application does falls back to append mode when target table doesn't have "identifier field" defined.

if thats not the case, could it be that the debezium application still thinks the table is not partitioned? to test it

  1. replicate first row
  2. stop the replication:
  3. turn table partitioned table
  4. start the replication
  5. do upsert
  6. check result
ismailsimsek commented 8 months ago

Could you try with adding identifier field after creating the table ALTER TABLE iceberg_test SET IDENTIFIER FIELDS id

https://iceberg.apache.org/docs/latest/spark-ddl/#alter-table--set-identifier-fields

vietcheems commented 8 months ago

I'm using Trino and there isn't an option to set an identifier field as far as I'm concerned. However, using Trino, when I create the table without the partitioning spec (remove partitioning = array['name']), upsert still works as usual. So I don't know if "identifier field" is the issue.

Could you try with adding identifier field after creating the table ALTER TABLE iceberg_test SET IDENTIFIER FIELDS id

https://iceberg.apache.org/docs/latest/spark-ddl/#alter-table--set-identifier-fields

I switched to spark to try this and the error still persists.

ismailsimsek commented 8 months ago

@vietcheems Thank you for cheeking it, it is related to using two different partitions between the previous row(name=1) and new row (name=deleted). This means partitioning field should be immutable(should not change between old row and new row/upsert)

DOC:https://iceberg.apache.org/spec/#scan-planning

An equality delete file must be applied to a data file when all of the following are true:

The data file’s partition (both spec and partition values) is equal to the delete file’s partition or the delete file’s partition spec is unpartitioned

currently upsert without changing the name field(partition field) should work for partitioned tables.

We could try to change the deletion to be global delete. apply/save deletion with unpartitioned spec

In general, deletes are applied only to data files that are older and in the same partition, except for two special cases: Equality delete files stored with an unpartitioned spec are applied as global deletes. Otherwise, delete files do not apply to files in other partitions.