databricks / iceberg-kafka-connect

Apache License 2.0
219 stars 49 forks source link

Is it possible to consume the data from multiple tables having different schema and write to respective tables in Iceberg? #188

Closed rj-u-developer closed 9 months ago

rj-u-developer commented 9 months ago

We have published the data of table t1 and t2 to Kafka in separate topics topic1 and topic2.

Here table t1 and t2 have different schema. and Primary key column name is also different in both the tables.

Now we want to consume the data from both the topics topic1 and topic2. And want to write the data into separate Iceberg tables t1 and t2 respectively by using a single sink job.

Please let us know if it is possible or not.

Thanks

rj-u-developer commented 9 months ago

@bryanck @ajantha-bhat , please share your thoughts on this.

bryanck commented 9 months ago

Yes, this is possible.

rj-u-developer commented 9 months ago

@bryanck Thank you for responding.

I have below two sample tables:-

First Table:employee emp_id, emp_name, emp_gender, insert_ts

Second Table: student std_id, std_name,std_gender, std_class, insert_ts

Data In Kafka Topics for both the tables is as below:-

For Employee Table:-

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"emp_id"},{"type":"string","optional":true,"field":"emp_name"},{"type":"string","optional":true,"field":"emp_gender"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"insert_ts"}],"optional":false,"name":"empstd.public.employee.Value"},"payload":{"emp_id":1,"emp_name":"EMP_NAME_1","emp_gender":"MALE","insert_ts":"2024-01-01 11:12:23"}}

For Student Table:-

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"std_id"},{"type":"string","optional":true,"field":"std_name"},{"type":"string","optional":true,"field":"std_gender"},{"type":"string","optional":true,"field":"std_class"},{"type":"int64","optional":true,"name":"org.apache.kafka.connect.data.Timestamp","version":1,"field":"insert_ts"}],"optional":false,"name":"empstd.public.student.Value"},"payload":{"std_id":1,"std_name":"STD_NAME_1","std_gender":"FEMALE","std_class":"CLASS_1","insert_ts":"2024-01-01 11:12:23"}}

So please let me know what field I can use for iceberg.tables.route-field in sink?

Thanks in advance.

bryanck commented 9 months ago

It looks like you'll need to add a field for that. You could use the InsertField SMT to add the topic name for example. It might be nice for us to add a config option to use the topic name so you don't need that.

rj-u-developer commented 9 months ago

@bryanck Thanks again.

Now Im getting below data in Kafka:-

{ "before": null, "after": { "emp_id": 1, "emp_name": "EMP_NAME_1", "emp_gender": "MALE", "insertts": 1706866842479857 }, "source": { "version": "2.2.1.Final", "connector": "postgresql", "name": "debz", "ts_ms": 1706887749034, "snapshot": "first", "db": "postgres", "sequence": "[null,\"17246981384\"]", "schema": "public", "table": "dbz_employee", "txId": 2527, "lsn": 17246981384, "xmin": null }, "op": "r", "ts_ms": 1706887749322, "transaction": null }

As per the above data sample please suggest what would be the value for iceberg.tables.cdc-field property in sink.

rj-u-developer commented 9 months ago

@bryanck , see the iceberg sink properties below:-

connector.class=io.tabular.iceberg.connect.IcebergSinkConnector iceberg.control.group-id=dbz_con_grp_v1 iceberg.tables.cdc-field=_cdc.op iceberg.tables.route-field=cdc.target iceberg.catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog iceberg.tables.schema-force-optional=true tasks.max=1 iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO transforms=dbztransform iceberg.catalog.client.region=ap-south-1 iceberg.tables.upsert-mode-enabled=true transforms.dbztransform.type=io.tabular.iceberg.connect.transforms.DebeziumTransform topics.regex=debz.* iceberg.tables.dynamic-enabled=true transforms.dbztransform.cdc.target.pattern=iceberg_db_name.{table} iceberg.tables.auto-create-enabled=false value.converter.schemas.enable=false iceberg.tables.schema-case-insensitive=true iceberg.catalog.warehouse=s3://s3_bucket_name/iceberg_dir

bryanck commented 9 months ago

That looks correct, the Debezium transform will add a field named _cdc.op that can be used.

rj-u-developer commented 9 months ago

Ok @bryanck. So should I set iceberg.tables.cdc-field=_cdc.op and iceberg.tables.route-field = _cdc.source.table ?

rj-u-developer commented 9 months ago

Here is the data sample in kafka topic:-

{ "before": null, "after": { "emp_id": 1, "emp_name": "EMP_NAME_1", "emp_gender": "MALE", "insert_ts": 1706866842479857 }, "source": { "version": "2.2.1.Final", "connector": "postgresql", "name": "debz_", "ts_ms": 1706887749034, "snapshot": "first", "db": "postgres", "sequence": "[null,"17246981384"]", "schema": "public", "table": "dbz_employee", "txId": 2527, "lsn": 17246981384, "xmin": null }, "op": "r", "ts_ms": 1706887749322, "transaction": null }

bryanck commented 9 months ago

You should set the route field to _cdc.target.

rj-u-developer commented 9 months ago

@bryanck , Thanks for your suggestion.

I set route field to _cdc.target but still it is not working. I think I am missing something in source and sink properties.

Here I am sharing source and sink configurations, let me know if you find something wrong with below configs:-

SOURCE CONFIG:- `connector.class=io.debezium.connector.postgresql.PostgresConnector

database.user=USERNAME database.dbname=DB_NAME slot.name=SLOT_NAME tasks.max=2 database.server.name=DB_SERVER_NAME plugin.name=pgoutput database.port=5432 key.converter.schemas.enable=false topic.prefix=debzv3 database.hostname=HOSTNAME database.password=PASSWORD internal.value.converter=org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable=false table.include.list=SCHEMA_NAME.dbz_employee,SCHEMA_NAME.dbz_student value.converter=org.apache.kafka.connect.json.JsonConverter database.whitelist=DB_NAME key.converter=org.apache.kafka.connect.json.JsonConverter ` DATA IN KAFKA: -

{ "before": null, "after": { "emp_id": 1, "emp_name": "EMP_NAME_1", "emp_gender": "MALE", "insert_ts": 1706866842479857 }, "source": { "version": "2.2.1.Final", "connector": "postgresql", "name": "debz_v3_", "ts_ms": 1706934146189, "snapshot": "first", "db": "DB_NAME", "sequence": "[null,\"18454939416\"]", "schema": "SCHEMA_NAME", "table": "dbz_employee", "txId": 2629, "lsn": 18454939416, "xmin": null }, "op": "r", "ts_ms": 1706934146451, "transaction": null }

SINK CONFIG:-

`connector.class=io.tabular.iceberg.connect.IcebergSinkConnector

iceberg.control.group-id=dbz_con_grp_v2 iceberg.tables.cdc-field=_cdc.op iceberg.tables.route-field=_cdc.target iceberg.catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog iceberg.tables.schema-force-optional=true tasks.max=2 iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO transforms=dbztransform iceberg.catalog.client.region=ap-south-1 iceberg.tables.upsert-mode-enabled=true transforms.dbztransform.type=io.tabular.iceberg.connect.transforms.DebeziumTransform topics.regex=debzv3.* iceberg.tables.dynamic-enabled=true transforms.dbztransform.cdc.target.pattern=ICEBERG_DB_NAME.{table} iceberg.tables.auto-create-enabled=false value.converter.schemas.enable=false iceberg.tables.schema-case-insensitive=true iceberg.catalog.warehouse=s3://BUCKET_NAME/ICEBERG_DIR`

bryanck commented 9 months ago

Can you be more specific on what you mean by "not working"? Are you seeing any errors? Also, what does the schema of the tables you created look like?

rj-u-developer commented 9 months ago

@bryanck , I apologize for not sharing the error details.

Iceberg Sink connector was throwing below error:-

kafka consumer timeout expired after 60000milliseconds while awaiting initproducerid

Checking transaction.state.log.replication.factor and transaction.state.log.min.isr . Will update you once done.

rj-u-developer commented 9 months ago

@bryanck , I set default.replication.factor=2 and min.insync.replicas=2properties for cluster of three brokers and error got resolved.

Thanks for you suggestions and helping me out. Closing the issue.