Closed chrisfw closed 9 months ago
Are you running into a specific issue ingesting from several topics, i.e. via the topics.regex
setting?
Hi @bryanck , thanks for following up on this. Originally, it was a lack of understanding on my part regarding how to take advantage of the config options, which I think understand better now. I am trying to get a simplified configuration example working and I feel like I am somewhat closer to getting the sink functional. However, I am experiencing an issue now where the iceberg connector generates an error indicating the sds_tbl field is not a valid field name; however, the flattened message on the kafka topic originating from debezium does have the sds_tbl field and it is populated. My iceberg sink config and the errors are below.
#Iceberg Sink Connector
name=enquesta-sds-unisds-iceberg-connector
connector.class=io.tabular.iceberg.connect.IcebergSinkConnector
tasks.max=1
topics.regex=sds4.*
iceberg.tables.cdc-field=sds_op
iceberg.catalog.type=rest
iceberg.catalog.uri=redacted
iceberg.catalog.token=redacted
iceberg.catalog.warehouse=s3a://warehouse
iceberg.tables.route-field=sds_tbl
iceberg.tables.upsert-mode-enabled=false
iceberg.tables.auto-create-enabled=true
iceberg.tables.dynamic-enabled=false
iceberg.tables=resticeberg.CDC_TEST,resticeberg.CDC_SAMPLE
iceberg.table.resticeberg.CDC_TEST.route-regex=CDC_TEST
iceberg.table.resticeberg.CDC_SAMPLE.route-regex=CDC_SAMPLE
Caused by: org.apache.kafka.connect.errors.DataException: sds_tbl is not a valid field name
at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
at org.apache.kafka.connect.data.Struct.get(Struct.java:74)
at io.tabular.iceberg.connect.data.Utilities.getValueFromStruct(Utilities.java:139)
at io.tabular.iceberg.connect.data.Utilities.extractFromRecordValue(Utilities.java:128)
I tried commenting out the route-field in the config since I don't think it is used for this example, but the error just moved on to identifying another field as problematic that does also exist in the message: Caused by: org.apache.kafka.connect.errors.DataException: sds_op is not a valid field name
"payload": {
"ID": 26.0,
"DESCRIPTION": "DESC 26",
"__deleted": "false",
"sds_op": "c",
"sds_tbl": "CDC_TEST",
"sds_ts": 1706231637000,
"sds_tbl_schema": "DBZUSER",
"sds_scn": "45879738",
"sds_commit_scn": "45879765",
"sds_txId": "13000900e4240000"
}
Any guidance you can provide would be greatly appreciated.
Regards, Chris Whelan
It looks like the field names might need to be payload.sds_tbl
and payload.sds_op
?
Hi @bryanck , thank you for your response. I had already flattened the original deeper structure of the debezium message using their io.debezium.transforms.ExtractNewRecordState
transform (and added the sds_ field prefix trying to solve an earlier occurrence of the invalid field error) and so thought that payload was the implicit message content root - I got excited when I saw that might not be the case, and unfortunately when trying payload.__op and payload.__tbl
in the sink config for processing the full message at the bottom of the comment, I now see the iceberg sink error. I am wondering if I need to flatten this message further and hoping that is not the case? Also, do I need to change the Debezium op codes from "c" to "I" for insert messages (and is that case-sensitive on the iceberg side)?
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: payload is not a valid field name
at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
at org.apache.kafka.connect.data.Struct.get(Struct.java:74)
at io.tabular.iceberg.connect.data.Utilities.getValueFromStruct(Utilities.java:139)
at io.tabular.iceberg.connect.data.Utilities.extractFromRecordValue(Utilities.java:128)
at io.tabular.iceberg.connect.channel.Worker.extractRouteValue(Worker.java:235)
at io.tabular.iceberg.connect.channel.Worker.routeRecordStatically(Worker.java:201)
at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:184)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:171)
at io.tabular.iceberg.connect.IcebergSinkTask.put(IcebergSinkTask.java:150)
{
"schema": {
"type": "struct",
"fields": [
{
"type": "double",
"optional": false,
"field": "ID"
},
{
"type": "string",
"optional": true,
"field": "DESCRIPTION"
},
{
"type": "string",
"optional": true,
"field": "__deleted"
},
{
"type": "string",
"optional": true,
"field": "__op"
},
{
"type": "string",
"optional": true,
"field": "__tbl"
},
{
"type": "int64",
"optional": true,
"field": "__ts"
},
{
"type": "string",
"optional": true,
"field": "__tbl_schema"
},
{
"type": "string",
"optional": true,
"field": "__scn"
},
{
"type": "string",
"optional": true,
"field": "__commit_scn"
},
{
"type": "string",
"optional": true,
"field": "__txId"
}
],
"optional": false,
"name": "sds5.DBZUSER.CDC_TEST.Value"
},
"payload": {
"ID": 26.0,
"DESCRIPTION": "DESCRIPTION 26",
"__deleted": "false",
"__op": "u",
"__tbl": "CDC_TEST",
"__ts": 1706276037000,
"__tbl_schema": "DBZUSER",
"__scn": "46017206",
"__commit_scn": "46017210",
"__txId": "3400130094010000"
}
}
Hi again @bryanck ,
I continued down the road of flattening the message completely, but unfortunately the same error persists. I am doing a transform in the sink to apply the flattening and I confirmed via the kafka file sink connector that the structure has been flattened. Below are the verbatim native output of the file sink connector (formatted with newlines only for readibility) and the corresponding iceberg sink error message. The __tbl field is clearly in the struct, so I am not sure how best to proceed to troubleshoot this issue. Any information or guidance you can provide would be greatly appreciated.
Regards, Chris
Struct
{
ID=2.0
,SAMPLE_NAME=Sample 2
,SAMPLE_PRICE=15.25
,__deleted=false
,__op=u
,__tbl=CDC_SAMPLE
,__ts=1706285605000
,__tbl_schema=DBZUSER
,__scn=46040053
,__commit_scn=46040059
,__txId=4100090075010000
}
Caused by: org.apache.kafka.connect.errors.DataException: __tbl is not a valid field name
at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
at org.apache.kafka.connect.data.Struct.get(Struct.java:74)
at io.tabular.iceberg.connect.data.Utilities.getValueFromStruct(Utilities.java:139)
at io.tabular.iceberg.connect.data.Utilities.extractFromRecordValue(Utilities.java:128)
at io.tabular.iceberg.connect.channel.Worker.extractRouteValue(Worker.java:235)
at io.tabular.iceberg.connect.channel.Worker.routeRecordStatically(Worker.java:201)
at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:184)
Thanks for posting a sample message. It appears you are using the Kafka Connect JSON with schema serializer to produce messages. Try setting the following on the sink: value.converter: org.apache.kafka.connect.json.JsonConverter
and value.converter.schemas.enable: true
and don't flatten the message. The JsonConverter
will handle converting the message.
Hi @bryanck ,
Thanks for your suggestion. I tried it and unfortunately the same error is still occurring in the iceberg sink connector. For additional context, below are my configs for both the iceberg sink connector and the file sink connector and lastly the file content as written by the file sink connector (it looks unchanged from before actually). I also tried running the iceberg sink connector in isolation from a different plugin folder without success. I am having a difficult time understanding what could be wrong here. Any additional thoughts or suggestions you have would be welcomed.
Regards, Chris
Iceberg Sink Connector
name=enquesta-sds-unisds-iceberg-connector
connector.class=io.tabular.iceberg.connect.IcebergSinkConnector
tasks.max=1
topics.regex=sds9.*
iceberg.tables.cdc-field=__op
iceberg.catalog=resticeberg
iceberg.catalog.type=rest
iceberg.catalog.uri=redacted
iceberg.catalog.token=redacted
iceberg.catalog.warehouse=s3a://warehouse
iceberg.tables.route-field=__tbl
iceberg.tables.upsert-mode-enabled=false
iceberg.tables.auto-create-enabled=true
iceberg.tables.schema-case-insensitive=true
iceberg.tables.dynamic-enabled=false
iceberg.tables=resticeberg.CDC_SAMPLE
iceberg.table.resticeberg.CDC_TEST.route-regex=CDC_TEST
iceberg.table.resticeberg.CDC_SAMPLE.route-regex=CDC_SAMPLE
value.converter.schemas.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter=org.apache.kafka.connect.storage.StringConverter
#Transforms
#transforms=flatten
#transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value
#transforms.flatten.delimiter=.
File sink connector
name=local-file-sink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
file=sds9.DBZUSER.CDC_SAMPLE.sink.txt
topics=sds9.DBZUSER.CDC_SAMPLE
value.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=true
#Transforms
#transforms=flatten
#transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value
#transforms.flatten.delimiter=.
File sink output
Struct{ID=4.0,SAMPLE_NAME=Sample four,SAMPLE_PRICE=10.0,__deleted=false,__op=c,__tbl=CDC_SAMPLE,__ts=1706295581000,__tbl_schema=DBZUSER,__scn=46065904,__commit_scn=46065917,__txId=2100010053180000}
Struct{ID=4.0,SAMPLE_NAME=Sample 4,SAMPLE_PRICE=10.0,__deleted=false,__op=u,__tbl=CDC_SAMPLE,__ts=1706295596000,__tbl_schema=DBZUSER,__scn=46065976,__commit_scn=46065994,__txId=25000e00ed140000}
Also, FYI when I originally posted the message content, I used the kafka-console-consumer.sh
command which perhaps is converting the messages to JSON along with inserting the schema and payload node with the struct content inserted inside the payload node. Running the command against the same sds9.DBZUSER.CDC_SAMPLE
topic from the message above generates the output below:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "double",
"optional": false,
"field": "ID"
},
{
"type": "string",
"optional": true,
"field": "SAMPLE_NAME"
},
{
"type": "double",
"optional": true,
"field": "SAMPLE_PRICE"
},
{
"type": "string",
"optional": true,
"field": "__deleted"
},
{
"type": "string",
"optional": true,
"field": "__op"
},
{
"type": "string",
"optional": true,
"field": "__tbl"
},
{
"type": "int64",
"optional": true,
"field": "__ts"
},
{
"type": "string",
"optional": true,
"field": "__tbl_schema"
},
{
"type": "string",
"optional": true,
"field": "__scn"
},
{
"type": "string",
"optional": true,
"field": "__commit_scn"
},
{
"type": "string",
"optional": true,
"field": "__txId"
}
],
"optional": false,
"name": "sds9.DBZUSER.CDC_SAMPLE.Value"
},
"payload": {
"ID": 4.0,
"SAMPLE_NAME": "Sample four",
"SAMPLE_PRICE": 10.0,
"__deleted": "false",
"__op": "c",
"__tbl": "CDC_SAMPLE",
"__ts": 1706295581000,
"__tbl_schema": "DBZUSER",
"__scn": "46065904",
"__commit_scn": "46065917",
"__txId": "2100010053180000"
}
}{
"schema": {
"type": "struct",
"fields": [
{
"type": "double",
"optional": false,
"field": "ID"
},
{
"type": "string",
"optional": true,
"field": "SAMPLE_NAME"
},
{
"type": "double",
"optional": true,
"field": "SAMPLE_PRICE"
},
{
"type": "string",
"optional": true,
"field": "__deleted"
},
{
"type": "string",
"optional": true,
"field": "__op"
},
{
"type": "string",
"optional": true,
"field": "__tbl"
},
{
"type": "int64",
"optional": true,
"field": "__ts"
},
{
"type": "string",
"optional": true,
"field": "__tbl_schema"
},
{
"type": "string",
"optional": true,
"field": "__scn"
},
{
"type": "string",
"optional": true,
"field": "__commit_scn"
},
{
"type": "string",
"optional": true,
"field": "__txId"
}
],
"optional": false,
"name": "sds9.DBZUSER.CDC_SAMPLE.Value"
},
"payload": {
"ID": 4.0,
"SAMPLE_NAME": "Sample 4",
"SAMPLE_PRICE": 10.0,
"__deleted": "false",
"__op": "u",
"__tbl": "CDC_SAMPLE",
"__ts": 1706295596000,
"__tbl_schema": "DBZUSER",
"__scn": "46065976",
"__commit_scn": "46065994",
"__txId": "25000e00ed140000"
}
}
Not sure if you have done this, but I would start from scratch, e.g. delete the connector, drop the table, then redeploy the connector with the converter setting. Then post here what the table schema is once it is auto-created.
Also, take a look at the integration tests and see if those help you. There are tests that use the JsonConverter with schema.
Hi @bryanck , thanks for the advice. I will give that a try and report back.
Regards, Chris
Hi @bryanck ,
Well, it has been a journey (and I am not done yet). I discovered that:
worker.properties
file had a value.converter.schemas.enable=true
and that setting was overriding the one I was setting in the iceberg sink properties file. This was causing the flattened debezium generated message to be "unflattened" by the JsonConverter with our old friends, schema
and payload
. iceberg.catalog.s3.endpoint
property and set it to the minio server endpoint__tbl
field in the message used for dynamic routing has only table name, but when the catalog lookup is performed, it appears to be expecting db_name.table_name and I don't have that target database name in my source message to provide and I am not sure if I can even create a combination field prefixed with the (static) value of that db name (resticeberg
in my case). I verified my suspicion by adding some logging into the connector code and attempting to have the catalog try to load the table with the name prefixed by resticeberg
after the initial NoSuchTableException occurs - which it did successfully :
[2024-01-29 16:57:23,790] DEBUG Using routeField: '__tbl' (io.tabular.iceberg.connect.channel.Worker)
[2024-01-29 16:57:23,791] DEBUG Extracted table name: 'sample' (io.tabular.iceberg.connect.channel.Worker)
[2024-01-29 16:57:23,791] DEBUG Target catalog 'resticeberg' extracting identifier from table name 'sample' (io.tabular.iceberg.connect.data.IcebergWriterFactory)
[2024-01-29 16:57:23,792] DEBUG Target catalog 'resticeberg', Table identifier 'sample' extracted from table name 'sample' (io.tabular.iceberg.connect.data.IcebergWriterFactory)
[2024-01-29 16:57:23,792] DEBUG Using Catalog 'resticeberg' (io.tabular.iceberg.connect.data.IcebergWriterFactory)
[2024-01-29 16:57:23,792] ERROR NoSuchTableException thrown! (io.tabular.iceberg.connect.data.IcebergWriterFactory)
[2024-01-29 16:57:23,792] DEBUG Attempting to load Table name 'resticeberg.sample' ... (io.tabular.iceberg.connect.data.IcebergWriterFactory)
[2024-01-29 16:57:23,843] DEBUG Catalog 'resticeberg', Table name 'resticeberg.resticeberg.sample' loaded. (io.tabular.iceberg.connect.data.IcebergWriterFactory)
**Is there a property setting I can use to set a static value for the database name?**
- The `__op` codes Debezium provides are lowercase and it also uses `c` for inserts rather than the `I` mentioned in your documentation along with `d` for deletes and `u` for updates. I noticed in the code that for the opcode value mapping in the config, the operation is defaulted to Insert if null, empty or not `U` or `D`. **There is a TODO in that location in IcebergWriter regarding making the opcode configurable - is this something that is on the roadmap?**
**FYI, Below is my current sink config**
name=enquesta-sds-unisds-iceberg-connector connector.class=io.tabular.iceberg.connect.IcebergSinkConnector
producer.override.value.converter.schemas.enable=false
tasks.max=1 topics.regex=sds19.*
iceberg.tables.cdc-field=__op iceberg.catalog=resticeberg
iceberg.catalog.type=rest iceberg.catalog.uri=http://redacted:8181 iceberg.catalog.token=redacted iceberg.catalog.warehouse=s3a://warehouse iceberg.catalog.s3.endpoint=http://redacted:9000
iceberg.tables.dynamic-enabled=true iceberg.tables.route-field=__tbl iceberg.tables.upsert-mode-enabled=false iceberg.tables.auto-create-enabled=false iceberg.tables.schema-case-insensitive=true
iceberg.control.commit.interval-ms=120000
value.converter.schemas.enable=false value.converter=org.apache.kafka.connect.json.JsonConverter
Thanks in advance for any input or suggestions on the topics above (no pun intended).
Regards,
Chris
You could use an SMT to set the table name to a name qualified with the database. Also, the SMT could transform the op code. In general we'd like to keep transform logic out of the sink. Making the table name lower case is often desirable but we may want an option to skip that.
Hi @bryanck , thank you for your recommendation. I had taken a look at SMT's, but I didn't find one or spot a combination that will do what I am attempting to accomplish. I see InsertField
can get the static resticeberg
value into a field, but I don't see a transform that will concatenate field values into a new field. Are you suggesting that I create a custom transformation for this purpose or are there other transforms I can utilize to achieve this objective?
Yes, you may need to create one, you can take a look at the set we have in this repo. We may want to add another one here for the purpose of prepending a database name or converting the op code.
Thanks @bryanck , I will definitely review the ones in this repo. I really appreciate all of your help with this issue.
Regards, Chris
Hi @bryanck,
Thanks for pointing me in the direction of the SMTs in this repo. I just wanted to let you know that I finally got the dbz source and the iceberg sink working together and I was able to leverage the Debezium SMT that is in the repo to do everything I needed. Below FYI are the src and sink configs. Note the short commit interval is just temporary for testing purposes.
Thanks again for your assistance with this.
dbz
name=enquesta-sds-unisds-oracle-connector
connector.class=io.debezium.connector.oracle.OracleConnector
tasks.max=1
#don't let the producer converter properties (worker.properties) override the connector's
producer.override.value.converter.schemas.enable=false
topic.prefix=sds43
topic.creation.enable=true
topic.creation.default.replication.factor=1
topic.creation.default.partitions=1
database.server.name=unisds
database.hostname=redacted
database.port=1521
database.user=redacted
database.password=redacted
database.dbname=unisds
decimal.handling.mode=double
table.include.list=DBZUSER.SAMPLE
database.history.kafka.bootstrap.servers=redacted:9092
database.history.kafka.topic=uni-sds-schema-changes.inventory
schema.history.internal.kafka.topic=uni-sds-schema-history
schema.history.internal.kafka.bootstrap.servers=redacted:9092
schema.history.internal.store.only.captured.tables.ddl=true
internal.log.mining.query.logs.for.snapshot.offset=true
value.converter.schemas.enable=false
key.converter.schemas.enable=false
#specified because there are multiple archive locations configured and we are using the oci object storage
#backed mount
log.mining.archive.destination.name=LOG_ARCHIVE_DEST_2
log.mining.strategy=online_catalog
log.mining.query.filter.mode=in
iceberg
#Iceberg Sink Connector
#Note that the source table topic has to exist when the iceberg sink connector starts up in order for commits to be picked up.
name=enquesta-sds-unisds-iceberg-connector
connector.class=io.tabular.iceberg.connect.IcebergSinkConnector
#don't let the producer converter properties (worker.properties) override the connector's
producer.override.value.converter.schemas.enable=false
tasks.max=1
topics.regex=sds43.DBZUSER.*
iceberg.tables.cdc-field=_cdc.op
iceberg.catalog=resticeberg
iceberg.catalog.type=rest
iceberg.catalog.uri=http://redacted:8181
iceberg.catalog.token=redacted
iceberg.catalog.warehouse=s3a://warehouse
iceberg.catalog.s3.endpoint=http://redacted:9000
iceberg.tables.dynamic-enabled=true
iceberg.tables.route-field=_cdc.target
iceberg.tables.upsert-mode-enabled=true
iceberg.tables.auto-create-enabled=false
iceberg.tables.schema-case-insensitive=true
iceberg.control.commit.interval-ms=10000
value.converter.schemas.enable=false
transforms=dbztransform
transforms.dbztransform.type=io.tabular.iceberg.connect.transforms.DebeziumTransform
transforms.dbztransform.cdc.target.pattern=resticeberg.{table}
Hello, I am very interested in using this Iceberg sink connector to mirror a large number of transactional database tables with CDC being performed by Debezium's Oracle Connector. Reviewing the configuration options for the Iceberg connector, I would greatly appreciate advice on how best to configure both the source and the sink. Debezium by default configures a Kafka topic for each physical table (and we have ~1800 Oracle tables that we would like to mirror in Iceberg). It does not seem that the Iceberg sink connector configuration is designed to accommodate large numbers of topics, but rather that I would need to configure Debezium via SMT to route messages for all tables to a single topic (or maybe a small set of topics) with a message field that contains table name and then utilize the Iceberg connector multi table fanout routing to create/update the set of Iceberg tables that I want to map 1 to 1 with the source transactional db tables.
Can you tell me if my thoughts above would be your recommended approach?
Thanks in advance, Chris Whelan