tabular-io / iceberg-kafka-connect

Apache License 2.0
192 stars 41 forks source link

Not able to do upserts on GlueCatalog #77

Closed Mayurg6832 closed 11 months ago

Mayurg6832 commented 11 months ago

I am trying to do upserts on table created on Athena with properties as

"iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
"iceberg.catalog.warehouse": "s3a://bucket/warehouse",
"iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",

here is my connector config

{
    "name": "iceberg-sink-connector",
    "config": {
        "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
        "tasks.max": "2",
        "topics": "postgres.public.test1",
        "iceberg.tables": "default.test1",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "iceberg.control.commitIntervalMs": 60000,
        "iceberg.tables.cdcField": "__op",
        "iceberg.tables.upsertModeEnabled": true,
        "iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
        "iceberg.catalog.warehouse": "s3a://test-bucket/catalog-path/",
        "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO"
        }
    }

Have tried without cdcField still not working and getting equality field ids shouldn't be null or empty when creating equality-delete writer

Can you give me some recommendations to resolve this issue? Thanks

bryanck commented 11 months ago

The sink relies on the table's identifier column(s) for the primary key for upserts. You will need to set that on your table if you haven't already.

Mayurg6832 commented 11 months ago

But there is no concept of primary key on Athena tables and Primary key is already set on source Postgres table.

bryanck commented 11 months ago

In Iceberg you can define identifier fields, which the sink uses to identify a row. However, it looks like you can't easily set this on the table using Athena. I'll add a config option to specify the columns to use as the key, for cases where it isn't easy to set the identifier fields.

bryanck commented 11 months ago

Here's the PR for this: https://github.com/tabular-io/iceberg-kafka-connect/pull/78

bryanck commented 11 months ago

This was merged.

Mayurg6832 commented 11 months ago

Great, Thaks a lot.