tabular-io / iceberg-kafka-connect

Apache License 2.0
203 stars 46 forks source link

Suggest configuration for dynamic table name and cdc field coming in payload instead source. #208

Closed rj-u-developer closed 6 months ago

rj-u-developer commented 6 months ago

We have data in Kafka topic in below format:-

{ "schema": { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "std_id" }, { "type": "string", "optional": true, "field": "std_name" }, { "type": "string", "optional": true, "field": "std_gender" }, { "type": "string", "optional": true, "field": "std_school_name" }, { "type": "int64", "optional": true, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "insert_ts" }, { "type": "string", "optional": true, "field": "__op" }, { "type": "string", "optional": true, "field": "__table" }, { "type": "int64", "optional": true, "field": "__lsn" }, { "type": "int64", "optional": true, "field": "__source_ts_ms" }, { "type": "string", "optional": true, "field": "__deleted" } ], "optional": false, "name": "topic_prefix_.public.dbz_student.Value" }, "payload": { "std_id": 1, "std_name": "STD_NAME_1", "std_gender": "MALE", "std_school_name": "SCHOOL_1", "insert_ts": 1710024178248, "__op": "c", "__table": "dbz_student", "__lsn": 87845504944, "__source_ts_ms": 1710004390887, "__deleted": "false" } }

So as per our data in kafka, @bryanck please suggest what would be the value of below properties :-

iceberg.tables.route-field=_cdc.target
iceberg.tables.cdc-field=_cdc.__op
transforms.dbztransform.cdc.target.pattern=iceberg_db.{__table}

Note: It works fine if we get table name in source field.

bryanck commented 6 months ago

It looks like you are using the Debezium SMT on a message format that is not supported by that SMT?

rj-u-developer commented 6 months ago

Ok, got it.

@bryanck , thanks for your time.