databricks / iceberg-kafka-connect

Apache License 2.0
220 stars 49 forks source link

Not able to do upserts on GlueCatalog #77

Closed Mayurg6832 closed 1 year ago

Mayurg6832 commented 1 year ago

I am trying to do upserts on table created on Athena with properties as

"iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
"iceberg.catalog.warehouse": "s3a://bucket/warehouse",
"iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",

here is my connector config

{
    "name": "iceberg-sink-connector",
    "config": {
        "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
        "tasks.max": "2",
        "topics": "postgres.public.test1",
        "iceberg.tables": "default.test1",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "iceberg.control.commitIntervalMs": 60000,
        "iceberg.tables.cdcField": "__op",
        "iceberg.tables.upsertModeEnabled": true,
        "iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
        "iceberg.catalog.warehouse": "s3a://test-bucket/catalog-path/",
        "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO"
        }
    }

Have tried without cdcField still not working and getting equality field ids shouldn't be null or empty when creating equality-delete writer

Can you give me some recommendations to resolve this issue? Thanks

bryanck commented 1 year ago

The sink relies on the table's identifier column(s) for the primary key for upserts. You will need to set that on your table if you haven't already.

Mayurg6832 commented 1 year ago

But there is no concept of primary key on Athena tables and Primary key is already set on source Postgres table.

bryanck commented 1 year ago

In Iceberg you can define identifier fields, which the sink uses to identify a row. However, it looks like you can't easily set this on the table using Athena. I'll add a config option to specify the columns to use as the key, for cases where it isn't easy to set the identifier fields.

bryanck commented 1 year ago

Here's the PR for this: https://github.com/tabular-io/iceberg-kafka-connect/pull/78

bryanck commented 1 year ago

This was merged.

Mayurg6832 commented 1 year ago

Great, Thaks a lot.