tabular-io / iceberg-kafka-connect

Apache License 2.0
177 stars 32 forks source link

Issue with the CDC operation/Upsert #165

Closed skushwaha-mdsol closed 7 months ago

skushwaha-mdsol commented 7 months ago

Hi, @bryanck We have been communicating with your team for our use case of kafka connector. We need some more assistance regarding below.

We are using connector version iceberg-kafka-connect-runtime-0.5.5

  1. What we did so far
    • Created connector with no _cdc_op and no upsert_enable property (upsert_enable= false)
    • Added record_id (identifier field) filed with unique value for each row
    • All data is successfully getting inserted into ICEBERG target table

connector.class=io.tabular.iceberg.connect.IcebergSinkConnector iceberg.control.group-id=cg-control-iceberg-kafka-connector-sandbox iceberg.tables.route-field=iceberg_table iceberg.tables.id-columns=record_id iceberg.catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO tasks.max=1 topics=datasets key.converter.region=us-east-1 iceberg.catalog.client.region=us-east-1 iceberg.tables.dynamic-enabled=true iceberg.catalog=AwsDataCatalog value.converter.region=us-east-1 iceberg.control.commit.timeout-ms=60000 value.converter.schemas.enable=false iceberg.catalog.warehouse=s3://data-os-sandbox value.converter=org.apache.kafka.connect.json.JsonConverter iceberg.control.topic=control-iceberg key.converter=org.apache.kafka.connect.storage.StringConverter

  1. What we want now,

A) scenario 1 (_cdc_op = U, upsert_enable= false)

Sample kafka source messages : {"studyid":"Sync Study","siteid":"100001","subjid":"HN_8","aeterm":"AE_1","aestdtc":"2019-04-02","visit":"VISIT1","visit_date":"2016-04-01","iceberg_table":"db_f0be5952_8b85_4076_a604_3c372d9b253d.tbl_3d245275_c765_4204_b4b9_3bf33b94c617_stg","record_id":"3d245275-c765-4204-b4b9-3bf33b94c617-48d38b7b01c2e0397f2f2a5c8401bc69","record_timestamp":"2023-11-24 07:24:17","_cdc_op":"U"} {"studyid":"Sync Study","siteid":"100001","subjid":"HN_9","aeterm":"AE_1","aestdtc":"2019-04-02","visit":"VISIT1","visit_date":"2016-04-01","iceberg_table":"db_f0be5952_8b85_4076_a604_3c372d9b253d.tbl_3d245275_c765_4204_b4b9_3bf33b94c617_stg","record_id":"3d245275-c765-4204-b4b9-3bf33b94c617-0a1c3578841219d4d69086d3f03590b1","record_timestamp":"2023-11-24 07:24:17","_cdc_op":"U"} {"studyid":"Sync Study","siteid":"100001","subjid":"HN_10","aeterm":"AE_1","aestdtc":"2019-04-02","visit":"VISIT1","visit_date":"2023-04-01","iceberg_table":"db_f0be5952_8b85_4076_a604_3c372d9b253d.tbl_3d245275_c765_4204_b4b9_3bf33b94c617_stg","record_id":"3d245275-c765-4204-b4b9-3bf33b94c617-203b58b47be28503f247c0ace957c2b7","record_timestamp":"2023-11-24 07:24:17","_cdc_op":"U"}


connector.class=io.tabular.iceberg.connect.IcebergSinkConnector iceberg.control.group-id=cg-control-iceberg-kafka-connector-sandbox-cdc-op-test iceberg.tables.route-field=iceberg_table iceberg.tables.cdc-field=_cdc_op iceberg.tables.id-columns=record_id iceberg.catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO tasks.max=1 topics=datasets-test key.converter.region=us-east-1 iceberg.catalog.client.region=us-east-1 iceberg.tables.dynamic-enabled=true iceberg.catalog=AwsDataCatalog value.converter.region=us-east-1 iceberg.control.commit.timeout-ms=60000 value.converter.schemas.enable=false iceberg.catalog.warehouse=s3://data-os-sandbox value.converter=org.apache.kafka.connect.json.JsonConverter iceberg.control.topic=control-iceberg-test key.converter=org.apache.kafka.connect.storage.StringConverter

B) scenario 2 (_cdc_op = U, upsert_enable= true)

Sample kafka source messages :

{"studyid":"Sync Study","siteid":"100001","subjid":"HN_8","aeterm":"AE_1","aestdtc":"2019-04-02","visit":"VISIT1","visit_date":"2016-04-01","iceberg_table":"db_f0be5952_8b85_4076_a604_3c372d9b253d.tbl_3d245275_c765_4204_b4b9_3bf33b94c617_stg","record_id":"3d245275-c765-4204-b4b9-3bf33b94c617-48d38b7b01c2e0397f2f2a5c8401bc69","record_timestamp":"2023-11-24 07:24:17","_cdc_op":"U"} {"studyid":"Sync Study","siteid":"100001","subjid":"HN_9","aeterm":"AE_1","aestdtc":"2019-04-02","visit":"VISIT1","visit_date":"2016-04-01","iceberg_table":"db_f0be5952_8b85_4076_a604_3c372d9b253d.tbl_3d245275_c765_4204_b4b9_3bf33b94c617_stg","record_id":"3d245275-c765-4204-b4b9-3bf33b94c617-0a1c3578841219d4d69086d3f03590b1","record_timestamp":"2023-11-24 07:24:17","_cdc_op":"U"} {"studyid":"Sync Study","siteid":"100001","subjid":"HN_10","aeterm":"AE_1","aestdtc":"2019-04-02","visit":"VISIT1","visit_date":"2023-04-01","iceberg_table":"db_f0be5952_8b85_4076_a604_3c372d9b253d.tbl_3d245275_c765_4204_b4b9_3bf33b94c617_stg","record_id":"3d245275-c765-4204-b4b9-3bf33b94c617-203b58b47be28503f247c0ace957c2b7","record_timestamp":"2023-11-24 07:24:17","_cdc_op":"U"}


connector.class=io.tabular.iceberg.connect.IcebergSinkConnector iceberg.control.group-id=cg-control-iceberg-kafka-connector-sandbox-cdc-op-test iceberg.tables.route-field=iceberg_table iceberg.tables.cdc-field=_cdc_op iceberg.tables.id-columns=record_id iceberg.catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO tasks.max=1 topics=datasets-test key.converter.region=us-east-1 iceberg.tables.upsert-mode-enabled=true iceberg.catalog.client.region=us-east-1 iceberg.tables.dynamic-enabled=true iceberg.catalog=AwsDataCatalog value.converter.region=us-east-1 iceberg.control.commit.timeout-ms=60000 value.converter.schemas.enable=false iceberg.catalog.warehouse=s3://data-os-sandbox value.converter=org.apache.kafka.connect.json.JsonConverter iceberg.control.topic=control-iceberg-test key.converter=org.apache.kafka.connect.storage.StringConverter

C) scenario 3 ( upsert_enable= true)

Sample kafka source messages :

{"studyid":"Sync Study","siteid":"100001","subjid":"HN_8","aeterm":"AE_1","aestdtc":"2019-04-02","visit":"VISIT1","visit_date":"2016-04-01","iceberg_table":"db_f0be5952_8b85_4076_a604_3c372d9b253d.tbl_3d245275_c765_4204_b4b9_3bf33b94c617_stg","record_id":"3d245275-c765-4204-b4b9-3bf33b94c617-48d38b7b01c2e0397f2f2a5c8401bc69","record_timestamp":"2023-11-24 07:24:17"} {"studyid":"Sync Study","siteid":"100001","subjid":"HN_9","aeterm":"AE_1","aestdtc":"2019-04-02","visit":"VISIT1","visit_date":"2016-04-01","iceberg_table":"db_f0be5952_8b85_4076_a604_3c372d9b253d.tbl_3d245275_c765_4204_b4b9_3bf33b94c617_stg","record_id":"3d245275-c765-4204-b4b9-3bf33b94c617-0a1c3578841219d4d69086d3f03590b1","record_timestamp":"2023-11-24 07:24:17"} {"studyid":"Sync Study","siteid":"100001","subjid":"HN_10","aeterm":"AE_1","aestdtc":"2019-04-02","visit":"VISIT1","visit_date":"2023-04-01","iceberg_table":"db_f0be5952_8b85_4076_a604_3c372d9b253d.tbl_3d245275_c765_4204_b4b9_3bf33b94c617_stg","record_id":"3d245275-c765-4204-b4b9-3bf33b94c617-203b58b47be28503f247c0ace957c2b7","record_timestamp":"2023-11-24 07:24:17"}


connector.class=io.tabular.iceberg.connect.IcebergSinkConnector iceberg.control.group-id=cg-control-iceberg-kafka-connector-sandbox-cdc-op-test iceberg.tables.route-field=iceberg_table iceberg.tables.id-columns=record_id iceberg.catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO tasks.max=1 topics=datasets-test key.converter.region=us-east-1 iceberg.tables.upsert-mode-enabled=true iceberg.catalog.client.region=us-east-1 iceberg.tables.dynamic-enabled=true iceberg.catalog=AwsDataCatalog value.converter.region=us-east-1 iceberg.control.commit.timeout-ms=60000 value.converter.schemas.enable=false iceberg.catalog.warehouse=s3://data-os-sandbox value.converter=org.apache.kafka.connect.json.JsonConverter iceberg.control.topic=control-iceberg-test key.converter=org.apache.kafka.connect.storage.StringConverter

All of the above 3 scenarios are throwing the same error in the connector logs.

image
bryanck commented 7 months ago

This looks like it may be an issue that was addressed with https://github.com/tabular-io/iceberg-kafka-connect/pull/136, can you try with the latest version of the sink?

anuja-kelkar commented 7 months ago

Thanks @bryanck . We will use the latest version. Which of the configurations we mentioned above are correct for our use case - to perform an upsert for all data (insert if not exist and updated if exist) using record_id field? Please advise, thanks

bryanck commented 7 months ago

With the CDC feature, the operation specified with the record will always take precedence over the iceberg.tables.upsert-mode-enabled option. To achieve what you want, you could map all insert operations to updates, using an SMT for example. One side note, you will want to compact your data regularly to merge in the deletes.

jwalinrami commented 7 months ago

@bryanck Thank you !! With the latest version (0.6.4) we are able to perform Upserts (both inserts and updates). We are publishing all messages with _cdc_op = 'U' which is performing the upserts. For deletes we are currently taking a different approach by not producing delete messages (_cdc_op = 'D). We will continue testing and update the comments here as we go for any anomalies, so let's keep this thread open if you may. Thank You Jwalin

jwalinrami commented 7 months ago

@bryanck Continue from above, for CDC we have been testing with below configuration,

Screenshot 2023-12-11 at 5 02 33 PM Screenshot 2023-12-11 at 5 02 44 PM

Do you see if we are missing anything here or have incorrect settings, please advise.

cc : @anuja-kelkar @skushwaha-mdsol

bryanck commented 7 months ago

There appears to be an issue in Trino/Athena when updating a v2 Iceberg table that has equality deletes, with an open PR to fix that.

jwalinrami commented 7 months ago

@bryanck Thank you for the feedback. We will follow up with AWS on when Athena will get the trino fix(above PR) so that we can start using it. So once the trino fix is merged and AWS Athena also inherits that fix - we should be able to use upsert-mode-enabled = true for deletes as well, correct ! And there are no other changes on connector configuration will be required.

bryanck commented 7 months ago

If I understand your question right, once the fix is available in Trino/Athena, you should be able to use either upsert mode or CDC mode in the sink, and then SQL update or delete statements on the table should not throw that error. The sink should not need any changes.

jwalinrami commented 7 months ago

hi @bryanck - my question is more specific on using 'upsert-mode-enabled' flag only and not '_cdc_op'.
Let me ask a different question : If there were no issues present (which Trino is planning to fix), how can we perform 'delete' operation using 'upsert-mode-enabled' flag only(without '_cdc_op'). Will the connector perform deletes based on some time window if the records are not found in the stream !

bryanck commented 7 months ago

Upsert mode only performs upserts, no deletes.

jwalinrami commented 7 months ago

Alright, that clears the doubt. Thank you @bryanck !

bryanck commented 7 months ago

In case you're interested, the logic to determine the operation type when writing deltas is here.