tabular-io / iceberg-kafka-connect

Apache License 2.0
203 stars 46 forks source link

Recommended configuration for the combination of Debezium (Oracle) Source Connector and Iceberg Kafka Sink Connector #186

Closed chrisfw closed 8 months ago

chrisfw commented 8 months ago

Hello, I am very interested in using this Iceberg sink connector to mirror a large number of transactional database tables with CDC being performed by Debezium's Oracle Connector. Reviewing the configuration options for the Iceberg connector, I would greatly appreciate advice on how best to configure both the source and the sink. Debezium by default configures a Kafka topic for each physical table (and we have ~1800 Oracle tables that we would like to mirror in Iceberg). It does not seem that the Iceberg sink connector configuration is designed to accommodate large numbers of topics, but rather that I would need to configure Debezium via SMT to route messages for all tables to a single topic (or maybe a small set of topics) with a message field that contains table name and then utilize the Iceberg connector multi table fanout routing to create/update the set of Iceberg tables that I want to map 1 to 1 with the source transactional db tables.

Can you tell me if my thoughts above would be your recommended approach?

Thanks in advance, Chris Whelan

bryanck commented 8 months ago

Are you running into a specific issue ingesting from several topics, i.e. via the topics.regex setting?

chrisfw commented 8 months ago

Hi @bryanck , thanks for following up on this. Originally, it was a lack of understanding on my part regarding how to take advantage of the config options, which I think understand better now. I am trying to get a simplified configuration example working and I feel like I am somewhat closer to getting the sink functional. However, I am experiencing an issue now where the iceberg connector generates an error indicating the sds_tbl field is not a valid field name; however, the flattened message on the kafka topic originating from debezium does have the sds_tbl field and it is populated. My iceberg sink config and the errors are below.

#Iceberg Sink Connector
name=enquesta-sds-unisds-iceberg-connector
connector.class=io.tabular.iceberg.connect.IcebergSinkConnector
tasks.max=1
topics.regex=sds4.*
iceberg.tables.cdc-field=sds_op
iceberg.catalog.type=rest
iceberg.catalog.uri=redacted
iceberg.catalog.token=redacted
iceberg.catalog.warehouse=s3a://warehouse
iceberg.tables.route-field=sds_tbl
iceberg.tables.upsert-mode-enabled=false
iceberg.tables.auto-create-enabled=true

iceberg.tables.dynamic-enabled=false
iceberg.tables=resticeberg.CDC_TEST,resticeberg.CDC_SAMPLE
iceberg.table.resticeberg.CDC_TEST.route-regex=CDC_TEST
iceberg.table.resticeberg.CDC_SAMPLE.route-regex=CDC_SAMPLE
Caused by: org.apache.kafka.connect.errors.DataException: sds_tbl is not a valid field name
        at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
        at org.apache.kafka.connect.data.Struct.get(Struct.java:74)
        at io.tabular.iceberg.connect.data.Utilities.getValueFromStruct(Utilities.java:139)
        at io.tabular.iceberg.connect.data.Utilities.extractFromRecordValue(Utilities.java:128)

I tried commenting out the route-field in the config since I don't think it is used for this example, but the error just moved on to identifying another field as problematic that does also exist in the message: Caused by: org.apache.kafka.connect.errors.DataException: sds_op is not a valid field name

"payload": {
        "ID": 26.0,
        "DESCRIPTION": "DESC 26",
        "__deleted": "false",
        "sds_op": "c",
        "sds_tbl": "CDC_TEST",
        "sds_ts": 1706231637000,
        "sds_tbl_schema": "DBZUSER",
        "sds_scn": "45879738",
        "sds_commit_scn": "45879765",
        "sds_txId": "13000900e4240000"
    }

Any guidance you can provide would be greatly appreciated.

Regards, Chris Whelan

bryanck commented 8 months ago

It looks like the field names might need to be payload.sds_tbl and payload.sds_op?

chrisfw commented 8 months ago

Hi @bryanck , thank you for your response. I had already flattened the original deeper structure of the debezium message using their io.debezium.transforms.ExtractNewRecordState transform (and added the sds_ field prefix trying to solve an earlier occurrence of the invalid field error) and so thought that payload was the implicit message content root - I got excited when I saw that might not be the case, and unfortunately when trying payload.__op and payload.__tbl in the sink config for processing the full message at the bottom of the comment, I now see the iceberg sink error. I am wondering if I need to flatten this message further and hoping that is not the case? Also, do I need to change the Debezium op codes from "c" to "I" for insert messages (and is that case-sensitive on the iceberg side)?

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: payload is not a valid field name
        at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
        at org.apache.kafka.connect.data.Struct.get(Struct.java:74)
        at io.tabular.iceberg.connect.data.Utilities.getValueFromStruct(Utilities.java:139)
        at io.tabular.iceberg.connect.data.Utilities.extractFromRecordValue(Utilities.java:128)
        at io.tabular.iceberg.connect.channel.Worker.extractRouteValue(Worker.java:235)
        at io.tabular.iceberg.connect.channel.Worker.routeRecordStatically(Worker.java:201)
        at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:184)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
        at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:171)
        at io.tabular.iceberg.connect.IcebergSinkTask.put(IcebergSinkTask.java:150)
{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "double",
                "optional": false,
                "field": "ID"
            },
            {
                "type": "string",
                "optional": true,
                "field": "DESCRIPTION"
            },
            {
                "type": "string",
                "optional": true,
                "field": "__deleted"
            },
            {
                "type": "string",
                "optional": true,
                "field": "__op"
            },
            {
                "type": "string",
                "optional": true,
                "field": "__tbl"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "__ts"
            },
            {
                "type": "string",
                "optional": true,
                "field": "__tbl_schema"
            },
            {
                "type": "string",
                "optional": true,
                "field": "__scn"
            },
            {
                "type": "string",
                "optional": true,
                "field": "__commit_scn"
            },
            {
                "type": "string",
                "optional": true,
                "field": "__txId"
            }
        ],
        "optional": false,
        "name": "sds5.DBZUSER.CDC_TEST.Value"
    },
    "payload": {
        "ID": 26.0,
        "DESCRIPTION": "DESCRIPTION 26",
        "__deleted": "false",
        "__op": "u",
        "__tbl": "CDC_TEST",
        "__ts": 1706276037000,
        "__tbl_schema": "DBZUSER",
        "__scn": "46017206",
        "__commit_scn": "46017210",
        "__txId": "3400130094010000"
    }
}
chrisfw commented 8 months ago

Hi again @bryanck ,

I continued down the road of flattening the message completely, but unfortunately the same error persists. I am doing a transform in the sink to apply the flattening and I confirmed via the kafka file sink connector that the structure has been flattened. Below are the verbatim native output of the file sink connector (formatted with newlines only for readibility) and the corresponding iceberg sink error message. The __tbl field is clearly in the struct, so I am not sure how best to proceed to troubleshoot this issue. Any information or guidance you can provide would be greatly appreciated.

Regards, Chris

Struct
{
ID=2.0
,SAMPLE_NAME=Sample 2
,SAMPLE_PRICE=15.25
,__deleted=false
,__op=u
,__tbl=CDC_SAMPLE
,__ts=1706285605000
,__tbl_schema=DBZUSER
,__scn=46040053
,__commit_scn=46040059
,__txId=4100090075010000
}
Caused by: org.apache.kafka.connect.errors.DataException: __tbl is not a valid field name
        at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
        at org.apache.kafka.connect.data.Struct.get(Struct.java:74)
        at io.tabular.iceberg.connect.data.Utilities.getValueFromStruct(Utilities.java:139)
        at io.tabular.iceberg.connect.data.Utilities.extractFromRecordValue(Utilities.java:128)
        at io.tabular.iceberg.connect.channel.Worker.extractRouteValue(Worker.java:235)
        at io.tabular.iceberg.connect.channel.Worker.routeRecordStatically(Worker.java:201)
        at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:184)
bryanck commented 8 months ago

Thanks for posting a sample message. It appears you are using the Kafka Connect JSON with schema serializer to produce messages. Try setting the following on the sink: value.converter: org.apache.kafka.connect.json.JsonConverter and value.converter.schemas.enable: true and don't flatten the message. The JsonConverter will handle converting the message.

chrisfw commented 8 months ago

Hi @bryanck ,

Thanks for your suggestion. I tried it and unfortunately the same error is still occurring in the iceberg sink connector. For additional context, below are my configs for both the iceberg sink connector and the file sink connector and lastly the file content as written by the file sink connector (it looks unchanged from before actually). I also tried running the iceberg sink connector in isolation from a different plugin folder without success. I am having a difficult time understanding what could be wrong here. Any additional thoughts or suggestions you have would be welcomed.

Regards, Chris

Iceberg Sink Connector

name=enquesta-sds-unisds-iceberg-connector
connector.class=io.tabular.iceberg.connect.IcebergSinkConnector
tasks.max=1
topics.regex=sds9.*
iceberg.tables.cdc-field=__op
iceberg.catalog=resticeberg
iceberg.catalog.type=rest
iceberg.catalog.uri=redacted
iceberg.catalog.token=redacted
iceberg.catalog.warehouse=s3a://warehouse
iceberg.tables.route-field=__tbl
iceberg.tables.upsert-mode-enabled=false
iceberg.tables.auto-create-enabled=true
iceberg.tables.schema-case-insensitive=true

iceberg.tables.dynamic-enabled=false
iceberg.tables=resticeberg.CDC_SAMPLE
iceberg.table.resticeberg.CDC_TEST.route-regex=CDC_TEST
iceberg.table.resticeberg.CDC_SAMPLE.route-regex=CDC_SAMPLE
value.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter=org.apache.kafka.connect.storage.StringConverter

#Transforms
#transforms=flatten
#transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value
#transforms.flatten.delimiter=.

File sink connector

name=local-file-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
file=sds9.DBZUSER.CDC_SAMPLE.sink.txt
topics=sds9.DBZUSER.CDC_SAMPLE
value.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=true

#Transforms
#transforms=flatten
#transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value
#transforms.flatten.delimiter=.

File sink output

Struct{ID=4.0,SAMPLE_NAME=Sample four,SAMPLE_PRICE=10.0,__deleted=false,__op=c,__tbl=CDC_SAMPLE,__ts=1706295581000,__tbl_schema=DBZUSER,__scn=46065904,__commit_scn=46065917,__txId=2100010053180000}
Struct{ID=4.0,SAMPLE_NAME=Sample 4,SAMPLE_PRICE=10.0,__deleted=false,__op=u,__tbl=CDC_SAMPLE,__ts=1706295596000,__tbl_schema=DBZUSER,__scn=46065976,__commit_scn=46065994,__txId=25000e00ed140000}
chrisfw commented 8 months ago

Also, FYI when I originally posted the message content, I used the kafka-console-consumer.sh command which perhaps is converting the messages to JSON along with inserting the schema and payload node with the struct content inserted inside the payload node. Running the command against the same sds9.DBZUSER.CDC_SAMPLE topic from the message above generates the output below:

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "double",
                "optional": false,
                "field": "ID"
            },
            {
                "type": "string",
                "optional": true,
                "field": "SAMPLE_NAME"
            },
            {
                "type": "double",
                "optional": true,
                "field": "SAMPLE_PRICE"
            },
            {
                "type": "string",
                "optional": true,
                "field": "__deleted"
            },
            {
                "type": "string",
                "optional": true,
                "field": "__op"
            },
            {
                "type": "string",
                "optional": true,
                "field": "__tbl"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "__ts"
            },
            {
                "type": "string",
                "optional": true,
                "field": "__tbl_schema"
            },
            {
                "type": "string",
                "optional": true,
                "field": "__scn"
            },
            {
                "type": "string",
                "optional": true,
                "field": "__commit_scn"
            },
            {
                "type": "string",
                "optional": true,
                "field": "__txId"
            }
        ],
        "optional": false,
        "name": "sds9.DBZUSER.CDC_SAMPLE.Value"
    },
    "payload": {
        "ID": 4.0,
        "SAMPLE_NAME": "Sample four",
        "SAMPLE_PRICE": 10.0,
        "__deleted": "false",
        "__op": "c",
        "__tbl": "CDC_SAMPLE",
        "__ts": 1706295581000,
        "__tbl_schema": "DBZUSER",
        "__scn": "46065904",
        "__commit_scn": "46065917",
        "__txId": "2100010053180000"
    }
}{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "double",
                "optional": false,
                "field": "ID"
            },
            {
                "type": "string",
                "optional": true,
                "field": "SAMPLE_NAME"
            },
            {
                "type": "double",
                "optional": true,
                "field": "SAMPLE_PRICE"
            },
            {
                "type": "string",
                "optional": true,
                "field": "__deleted"
            },
            {
                "type": "string",
                "optional": true,
                "field": "__op"
            },
            {
                "type": "string",
                "optional": true,
                "field": "__tbl"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "__ts"
            },
            {
                "type": "string",
                "optional": true,
                "field": "__tbl_schema"
            },
            {
                "type": "string",
                "optional": true,
                "field": "__scn"
            },
            {
                "type": "string",
                "optional": true,
                "field": "__commit_scn"
            },
            {
                "type": "string",
                "optional": true,
                "field": "__txId"
            }
        ],
        "optional": false,
        "name": "sds9.DBZUSER.CDC_SAMPLE.Value"
    },
    "payload": {
        "ID": 4.0,
        "SAMPLE_NAME": "Sample 4",
        "SAMPLE_PRICE": 10.0,
        "__deleted": "false",
        "__op": "u",
        "__tbl": "CDC_SAMPLE",
        "__ts": 1706295596000,
        "__tbl_schema": "DBZUSER",
        "__scn": "46065976",
        "__commit_scn": "46065994",
        "__txId": "25000e00ed140000"
    }
}
bryanck commented 8 months ago

Not sure if you have done this, but I would start from scratch, e.g. delete the connector, drop the table, then redeploy the connector with the converter setting. Then post here what the table schema is once it is auto-created.

bryanck commented 8 months ago

Also, take a look at the integration tests and see if those help you. There are tests that use the JsonConverter with schema.

chrisfw commented 8 months ago

Hi @bryanck , thanks for the advice. I will give that a try and report back.

Regards, Chris

chrisfw commented 8 months ago

Hi @bryanck ,

Well, it has been a journey (and I am not done yet). I discovered that:

**Is there a property setting I can use to set a static value for the database name?**

- The `__op` codes Debezium provides are lowercase and it also uses `c` for inserts rather than the `I` mentioned in your documentation along with `d` for deletes and `u` for updates.  I noticed in the code that for the opcode value mapping in the config, the operation is defaulted to Insert if null, empty or not `U` or `D`.  **There is a TODO in that location in IcebergWriter regarding making the opcode configurable - is this something that is on the roadmap?**

**FYI, Below is my current sink config**

Iceberg Sink Connector

name=enquesta-sds-unisds-iceberg-connector connector.class=io.tabular.iceberg.connect.IcebergSinkConnector

don't let the producer converter properties (worker.properties) override the connector's

producer.override.value.converter.schemas.enable=false

tasks.max=1 topics.regex=sds19.*

iceberg.tables.cdc-field=__op iceberg.catalog=resticeberg

iceberg.catalog.type=rest iceberg.catalog.uri=http://redacted:8181 iceberg.catalog.token=redacted iceberg.catalog.warehouse=s3a://warehouse iceberg.catalog.s3.endpoint=http://redacted:9000

iceberg.tables.dynamic-enabled=true iceberg.tables.route-field=__tbl iceberg.tables.upsert-mode-enabled=false iceberg.tables.auto-create-enabled=false iceberg.tables.schema-case-insensitive=true

iceberg.tables=resticeberg.SAMPLE

iceberg.table.resticeberg.SAMPLE.route-regex=SAMPLE

iceberg.control.commit.interval-ms=120000

value.converter.schemas.enable=false value.converter=org.apache.kafka.connect.json.JsonConverter


Thanks in advance for any input or suggestions on the topics above (no pun intended).

Regards,
Chris
bryanck commented 8 months ago

You could use an SMT to set the table name to a name qualified with the database. Also, the SMT could transform the op code. In general we'd like to keep transform logic out of the sink. Making the table name lower case is often desirable but we may want an option to skip that.

chrisfw commented 8 months ago

Hi @bryanck , thank you for your recommendation. I had taken a look at SMT's, but I didn't find one or spot a combination that will do what I am attempting to accomplish. I see InsertField can get the static resticeberg value into a field, but I don't see a transform that will concatenate field values into a new field. Are you suggesting that I create a custom transformation for this purpose or are there other transforms I can utilize to achieve this objective?

bryanck commented 8 months ago

Yes, you may need to create one, you can take a look at the set we have in this repo. We may want to add another one here for the purpose of prepending a database name or converting the op code.

chrisfw commented 8 months ago

Thanks @bryanck , I will definitely review the ones in this repo. I really appreciate all of your help with this issue.

Regards, Chris

chrisfw commented 8 months ago

Hi @bryanck,

Thanks for pointing me in the direction of the SMTs in this repo. I just wanted to let you know that I finally got the dbz source and the iceberg sink working together and I was able to leverage the Debezium SMT that is in the repo to do everything I needed. Below FYI are the src and sink configs. Note the short commit interval is just temporary for testing purposes.

Thanks again for your assistance with this.

dbz

name=enquesta-sds-unisds-oracle-connector
connector.class=io.debezium.connector.oracle.OracleConnector
tasks.max=1

#don't let the producer converter properties (worker.properties) override the connector's
producer.override.value.converter.schemas.enable=false

topic.prefix=sds43
topic.creation.enable=true
topic.creation.default.replication.factor=1
topic.creation.default.partitions=1

database.server.name=unisds
database.hostname=redacted
        database.port=1521
        database.user=redacted
        database.password=redacted
database.dbname=unisds
decimal.handling.mode=double

table.include.list=DBZUSER.SAMPLE

database.history.kafka.bootstrap.servers=redacted:9092
database.history.kafka.topic=uni-sds-schema-changes.inventory
schema.history.internal.kafka.topic=uni-sds-schema-history
schema.history.internal.kafka.bootstrap.servers=redacted:9092
schema.history.internal.store.only.captured.tables.ddl=true
internal.log.mining.query.logs.for.snapshot.offset=true

value.converter.schemas.enable=false
key.converter.schemas.enable=false

#specified because there are multiple archive locations configured and we are using the oci object storage
#backed mount
log.mining.archive.destination.name=LOG_ARCHIVE_DEST_2
log.mining.strategy=online_catalog
log.mining.query.filter.mode=in

iceberg

#Iceberg Sink Connector
#Note that the source table topic has to exist when the iceberg sink connector starts up in order for commits to be picked up.
name=enquesta-sds-unisds-iceberg-connector
connector.class=io.tabular.iceberg.connect.IcebergSinkConnector

#don't let the producer converter properties (worker.properties) override the connector's
producer.override.value.converter.schemas.enable=false

tasks.max=1
topics.regex=sds43.DBZUSER.*

iceberg.tables.cdc-field=_cdc.op
iceberg.catalog=resticeberg

iceberg.catalog.type=rest
iceberg.catalog.uri=http://redacted:8181
iceberg.catalog.token=redacted
iceberg.catalog.warehouse=s3a://warehouse
iceberg.catalog.s3.endpoint=http://redacted:9000

iceberg.tables.dynamic-enabled=true
iceberg.tables.route-field=_cdc.target
iceberg.tables.upsert-mode-enabled=true
iceberg.tables.auto-create-enabled=false
iceberg.tables.schema-case-insensitive=true

iceberg.control.commit.interval-ms=10000

value.converter.schemas.enable=false

transforms=dbztransform
transforms.dbztransform.type=io.tabular.iceberg.connect.transforms.DebeziumTransform
transforms.dbztransform.cdc.target.pattern=resticeberg.{table}