memiiso / debezium-server-iceberg

Replicates any database (CDC events) to Apache Iceberg (To Cloud Storage)
Apache License 2.0
171 stars 35 forks source link

RowIndentifier don't right when creating table #294

Open hoaiff opened 3 months ago

hoaiff commented 3 months ago

Hi @ismailsimsek, I have run Oracle prod and faced an error when creating the following table. I checked the constraints of my table and found no constraint data, but in the log, it shows rowIdentifier:[BRANCH, HOTLINE, VIOLATE_CODE, REPORT_DAY] and stop connector image

oracle_prod1  | 2024-03-18 03:50:33,396 WARN  [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Creating table:'oracle_test1.ekyc_IDG_VOICE_COUNT_VIOLATE_CALL_CENTER_DAY'
oracle_prod1  | schema:table {
oracle_prod1  |   1: REPORT_DAY: required long (id)
oracle_prod1  |   2: VIOLATE_CODE: required string (id)
oracle_prod1  |   3: COUNT_DATA: optional string
oracle_prod1  |   4: HOTLINE: required string (id)
oracle_prod1  |   5: BRANCH: required string (id)
oracle_prod1  |   6: __deleted: optional string
oracle_prod1  |   7: __op: optional string
oracle_prod1  |   8: __source_ts_ms: optional timestamptz
oracle_prod1  | }
oracle_prod1  | rowIdentifier:[BRANCH, HOTLINE, VIOLATE_CODE, REPORT_DAY]
oracle_prod1  | 2024-03-18 03:50:33,512 INFO  [io.deb.con.ora.log.LogMinerStreamingChangeEventSource] (debezium-oracleconnector-ekyc-change-event-source-coordinator) Redo Log Group Sizes:
oracle_prod1  | 2024-03-18 03:50:33,512 INFO  [io.deb.con.ora.log.LogMinerStreamingChangeEventSource] (debezium-oracleconnector-ekyc-change-event-source-coordinator)   Group #11: 1073741824 bytes
oracle_prod1  | 2024-03-18 03:50:33,512 INFO  [io.deb.con.ora.log.LogMinerStreamingChangeEventSource] (debezium-oracleconnector-ekyc-change-event-source-coordinator)   Group #12: 1073741824 bytes
oracle_prod1  | 2024-03-18 03:50:33,512 INFO  [io.deb.con.ora.log.LogMinerStreamingChangeEventSource] (debezium-oracleconnector-ekyc-change-event-source-coordinator)   Group #13: 1073741824 bytes
oracle_prod1  | 2024-03-18 03:50:33,512 INFO  [io.deb.con.ora.log.LogMinerStreamingChangeEventSource] (debezium-oracleconnector-ekyc-change-event-source-coordinator)   Group #14: 1073741824 bytes
oracle_prod1  | 2024-03-18 03:50:33,512 INFO  [io.deb.con.ora.log.LogMinerStreamingChangeEventSource] (debezium-oracleconnector-ekyc-change-event-source-coordinator)   Group #21: 1073741824 bytes
oracle_prod1  | 2024-03-18 03:50:33,512 INFO  [io.deb.con.ora.log.LogMinerStreamingChangeEventSource] (debezium-oracleconnector-ekyc-change-event-source-coordinator)   Group #22: 1073741824 bytes
oracle_prod1  | 2024-03-18 03:50:33,513 INFO  [io.deb.con.ora.log.LogMinerStreamingChangeEventSource] (debezium-oracleconnector-ekyc-change-event-source-coordinator)   Group #23: 1073741824 bytes
oracle_prod1  | 2024-03-18 03:50:33,513 INFO  [io.deb.con.ora.log.LogMinerStreamingChangeEventSource] (debezium-oracleconnector-ekyc-change-event-source-coordinator)   Group #24: 1073741824 bytes
oracle_prod1  | 2024-03-18 03:50:33,839 INFO  [org.apa.ice.BaseMetastoreCatalog] (pool-7-thread-1) Table properties set at catalog level through catalog properties: {}
oracle_prod1  | 2024-03-18 03:50:33,863 INFO  [org.apa.ice.BaseMetastoreCatalog] (pool-7-thread-1) Table properties enforced at catalog level through catalog properties: {}
oracle_prod1  | 2024-03-18 03:50:34,465 INFO  [org.apa.ice.hiv.HiveTableOperations] (pool-7-thread-1) Committed to table iceberg.oracle_test1.ekyc_IDG_VOICE_COUNT_VIOLATE_CALL_CENTER_DAY with the new metadata location s3a://datalake/datawarehouse/oracle_test1.db/ekyc_IDG_VOICE_COUNT_VIOLATE_CALL_CENTER_DAY/metadata/00000-23bb8dbe-8ea7-40be-b7d7-37a1eeab0d1d.metadata.json
oracle_prod1  | 2024-03-18 03:50:34,466 INFO  [org.apa.ice.BaseMetastoreTableOperations] (pool-7-thread-1) Successfully committed to table iceberg.oracle_test1.ekyc_IDG_VOICE_COUNT_VIOLATE_CALL_CENTER_DAY in 576 ms
oracle_prod1  | 2024-03-18 03:50:34,476 INFO  [org.apa.ice.BaseMetastoreTableOperations] (pool-7-thread-1) Refreshing table metadata from new version: s3a://datalake/datawarehouse/oracle_test1.db/ekyc_IDG_VOICE_COUNT_VIOLATE_CALL_CENTER_DAY/metadata/00000-23bb8dbe-8ea7-40be-b7d7-37a1eeab0d1d.metadata.json
oracle_prod1  | 2024-03-18 03:50:34,774 INFO  [io.deb.emb.EmbeddedEngine] (pool-7-thread-1) Stopping the task and engine
oracle_prod1  | 2024-03-18 03:50:34,776 INFO  [io.deb.con.com.BaseSourceTask] (pool-7-thread-1) Stopping down connector
oracle_prod1  | 2024-03-18 03:50:45,257 INFO  [io.deb.con.ora.log.LogMinerStreamingChangeEventSource] (debezium-oracleconnector-ekyc-change-event-source-coordinator) startScn=21615599442, endScn=null
oracle_prod1  | 2024-03-18 03:50:45,309 INFO  [io.deb.con.ora.log.LogMinerStreamingChangeEventSource] (debezium-oracleconnector-ekyc-change-event-source-coordinator) Streaming metrics dump: LogMinerStreamingChangeEventSourceMetrics{connectorConfig=io.debezium.connector.oracle.OracleConnectorConfig@2c0c1eac, startTime=2024-03-18T03:50:22.605364Z, clock=SystemClock[Z], currentScn=null, offsetScn=null, commitScn=null, oldestScn=null, oldestScnTime=null, currentLogFileNames=[Ljava.lang.String;@7bb76ca5, redoLogStatuses=[Ljava.lang.String;@458db361, databaseZoneOffset=Z, batchSize=20000, logSwitchCount=0, logMinerQueryCount=0, sleepTime=1000, minimumLogsMined=4, maximumLogsMined=4, maxBatchProcessingThroughput=0, timeDifference=0, processedRowsCount=0, activeTransactionCount=0, rolledBackTransactionCount=0, oversizedTransactionCount=0, changesCount=0, scnFreezeCount=0, batchProcessingDuration=io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics$DurationHistogramMetric@65b039f4, fetchQueryDuration=io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics$DurationHistogramMetric@2905b290, commitDuration=io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics$DurationHistogramMetric@69972151, lagFromSourceDuration=io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics$DurationHistogramMetric@412981b9, miningSessionStartupDuration=io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics$DurationHistogramMetric@25b27ff1, parseTimeDuration=io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics$DurationHistogramMetric@2127d8e5, resultSetNextDuration=io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics$DurationHistogramMetric@716ae839, userGlobalAreaMemory=io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics$MaxLongValueMetric@7cc39be4, processGlobalAreaMemory=io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics$MaxLongValueMetric@4a382e7a, abandonedTransactionIds=io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics$LRUSet@55c72728, rolledBackTransactionIds=io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics$LRUSet@734a3689} io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics@4aba0171
oracle_prod1  | 2024-03-18 03:50:45,309 INFO  [io.deb.con.ora.log.LogMinerStreamingChangeEventSource] (debezium-oracleconnector-ekyc-change-event-source-coordinator) Offsets: OracleOffsetContext [scn=21615599442, commit_scn=[]]
oracle_prod1  | 2024-03-18 03:50:45,309 INFO  [io.deb.pip.ChangeEventSourceCoordinator] (debezium-oracleconnector-ekyc-change-event-source-coordinator) Finished streaming
oracle_prod1  | 2024-03-18 03:50:45,309 INFO  [io.deb.pip.ChangeEventSourceCoordinator] (debezium-oracleconnector-ekyc-change-event-source-coordinator) Connected metrics set to 'false'
oracle_prod1  | 2024-03-18 03:50:45,310 INFO  [io.deb.pip.sig.SignalProcessor] (pool-7-thread-1) SignalProcessor stopped
oracle_prod1  | 2024-03-18 03:50:45,311 INFO  [io.deb.ser.DefaultServiceRegistry] (pool-7-thread-1) Debezium ServiceRegistry stopped.
oracle_prod1  | 2024-03-18 03:50:45,319 INFO  [io.deb.jdb.JdbcConnection] (pool-19-thread-1) Connection gracefully closed
oracle_prod1  | 2024-03-18 03:50:45,321 INFO  [io.deb.jdb.JdbcConnection] (pool-20-thread-1) Connection gracefully closed
oracle_prod1  | 2024-03-18 03:50:45,323 INFO  [io.deb.ser.ice.off.IcebergOffsetBackingStore] (pool-7-thread-1) Stopped IcebergOffsetBackingStore table:oracle_test1.debezium_offset_storage_custom_table
oracle_prod1  | 2024-03-18 03:50:45,324 ERROR [io.deb.ser.ConnectorLifecycle] (pool-7-thread-1) Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: null', error = 'java.lang.NullPointerException': java.lang.NullPointerException
oracle_prod1  |     at org.apache.iceberg.parquet.ParquetValueWriters$StringWriter.write(ParquetValueWriters.java:336)
oracle_prod1  |     at org.apache.iceberg.parquet.ParquetValueWriters$StringWriter.write(ParquetValueWriters.java:324)
oracle_prod1  |     at org.apache.iceberg.parquet.ParquetValueWriters$StructWriter.write(ParquetValueWriters.java:589)
oracle_prod1  |     at org.apache.iceberg.parquet.ParquetWriter.add(ParquetWriter.java:135)
oracle_prod1  |     at org.apache.iceberg.deletes.EqualityDeleteWriter.write(EqualityDeleteWriter.java:67)
oracle_prod1  |     at org.apache.iceberg.io.BaseTaskWriter$RollingEqDeleteWriter.write(BaseTaskWriter.java:388)
oracle_prod1  |     at org.apache.iceberg.io.BaseTaskWriter$RollingEqDeleteWriter.write(BaseTaskWriter.java:371)
oracle_prod1  |     at org.apache.iceberg.io.BaseTaskWriter$BaseRollingWriter.write(BaseTaskWriter.java:277)
oracle_prod1  |     at org.apache.iceberg.io.BaseTaskWriter$BaseEqualityDeltaWriter.delete(BaseTaskWriter.java:174)
oracle_prod1  |     at io.debezium.server.iceberg.tableoperator.BaseDeltaTaskWriter$RowDataDeltaWriter.delete(BaseDeltaTaskWriter.java:66)
oracle_prod1  |     at io.debezium.server.iceberg.tableoperator.BaseDeltaTaskWriter.write(BaseDeltaTaskWriter.java:54)
oracle_prod1  |     at io.debezium.server.iceberg.tableoperator.BaseDeltaTaskWriter.write(BaseDeltaTaskWriter.java:16)
oracle_prod1  |     at io.debezium.server.iceberg.tableoperator.IcebergTableOperator.addToTablePerSchema(IcebergTableOperator.java:173)
oracle_prod1  |     at io.debezium.server.iceberg.tableoperator.IcebergTableOperator.addToTable(IcebergTableOperator.java:157)
oracle_prod1  |     at io.debezium.server.iceberg.IcebergChangeConsumer.handleBatch(IcebergChangeConsumer.java:167)
oracle_prod1  |     at io.debezium.embedded.ConvertingEngineBuilder$ConvertingChangeConsumer.handleBatch(ConvertingEngineBuilder.java:108)
oracle_prod1  |     at io.debezium.embedded.EmbeddedEngine.pollRecords(EmbeddedEngine.java:728)
oracle_prod1  |     at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:475)
oracle_prod1  |     at io.debezium.embedded.ConvertingEngineBuilder$1.run(ConvertingEngineBuilder.java:248)
oracle_prod1  |     at io.debezium.server.DebeziumServer.lambda$start$1(DebeziumServer.java:170)
oracle_prod1  |     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
oracle_prod1  |     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
oracle_prod1  |     at java.base/java.lang.Thread.run(Unknown Source)
ismailsimsek commented 3 months ago

@hoaiff Debezium might be generating the unique key combination. could you post it to debezium zulip channel?

Additionally if you enable DEBUG logging(example below) you could see event key and payload schema received by the consumer. this will be printed with the initial load when the initial table is created.

config.put("quarkus.log.category.\"io.debezium.server.iceberg.IcebergChangeConsumer\".level", "DEBUG");

example output: in the third row the key filed schema(PK) is printed, its sent by debezium the keys schema end like "testc.inventory.orders.Key"}


2024-03-18 11:07:13,867 WARN  [io.deb.ser.ice.IcebergUtil] (pool-10-thread-1) Table not found: debeziumevents.debeziumcdc_testc_inventory_orders
2024-03-18 11:07:13,871 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-10-thread-1) Converting iceberg schema to debezium:{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"int32","optional":false,"name":"io.debezium.time.Date","version":1,"field":"order_date"},{"type":"int32","optional":false,"field":"purchaser"},{"type":"int32","optional":false,"field":"quantity"},{"type":"int32","optional":false,"field":"product_id"},{"type":"string","optional":true,"field":"__deleted"},{"type":"string","optional":true,"field":"__op"},{"type":"string","optional":true,"field":"__table"},{"type":"int64","optional":true,"field":"__source_ts_ms"},{"type":"string","optional":true,"field":"__db"},{"type":"int64","optional":true,"field":"__ts_ms"}],"optional":false,"name":"testc.inventory.orders.Value"}
2024-03-18 11:07:13,871 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-10-thread-1) Converting iceberg schema to debezium:{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"}],"optional":false,"name":"testc.inventory.orders.Key"}
2024-03-18 11:07:13,872 WARN  [io.deb.ser.ice.IcebergUtil] (pool-10-thread-1) Creating table:'debeziumevents.debeziumcdc_testc_inventory_orders'
ismailsimsek commented 3 months ago

@hoaiff try following logging setting for testing. this will print the event data, there you can see what is the key sent by debezium. (Note for the config file backslashes are not needed.)

      config.put("quarkus.log.category.\"io.debezium.server.iceberg\".level", "TRACE");
      config.put("quarkus.log.category.\"io.debezium.server.iceberg\".min-level", "TRACE");
      config.put("quarkus.log.level", "WARN");

logs

2024-03-18 11:27:25,446 TRACE [io.deb.ser.ice.IcebergChangeConsumer] (pool-10-thread-1) Processed event 'EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"}],"optional":false,"name":"testc.inventory.customers.Key"},"payload":{"id":1001}}, value={"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"},{"type":"string","optional":true,"field":"__deleted"},{"type":"string","optional":true,"field":"__op"},{"type":"string","optional":true,"field":"__table"},{"type":"int64","optional":true,"field":"__source_ts_ms"},{"type":"string","optional":true,"field":"__db"},{"type":"int64","optional":true,"field":"__ts_ms"}],"optional":false,"name":"testc.inventory.customers.Value"},"payload":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com","__deleted":"false","__op":"r","__table":"customers","__source_ts_ms":1710757638454,"__db":"postgres","__ts_ms":1710757638442}}, sourceRecord=SourceRecord{sourcePartition={server=testc}, sourceOffset={last_snapshot_record=false, lsn=34412792, txId=753, ts_usec=1710757638454155, snapshot=true}} ConnectRecord{topic='testc.inventory.customers', kafkaPartition=null, key=Struct{id=1001}, keySchema=Schema{testc.inventory.customers.Key:STRUCT}, value=Struct{id=1001,first_name=Sally,last_name=Thomas,email=sally.thomas@acme.com,__deleted=false,__op=r,__table=customers,__source_ts_ms=1710757638454,__db=postgres,__ts_ms=1710757638442}, valueSchema=Schema{testc.inventory.customers.Value:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
2024-03-18 11:27:25,447 TRACE [io.deb.ser.ice.IcebergChangeConsumer] (pool-10-thread-1) Processed event 'EmbeddedEngineChangeEvent [key={"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"}],"optional":false,"name":"testc.inventory.customers.Key"},"payload":{"id":1002}}, value={"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"},{"type":"string","optional":true,"field":"__deleted"},{"type":"string","optional":true,"field":"__op"},{"type":"string","optional":true,"field":"__table"},{"type":"int64","optional":true,"field":"__source_ts_ms"},{"type":"string","optional":true,"field":"__db"},{"type":"int64","optional":true,"field":"__ts_ms"}],"optional":false,"name":"testc.inventory.customers.Value"},"payload":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com","__deleted":"false","__op":"r","__table":"customers","__source_ts_ms":1710757638454,"__db":"postgres","__ts_ms":1710757638445}}, sourceRecord=SourceRecord{sourcePartition={server=testc}, sourceOffset={last_snapshot_record=false, lsn=34412792, txId=753, ts_usec=1710757638454155, snapshot=true}} ConnectRecord{topic='testc.inventory.customers', kafkaPartition=null, key=Struct{id=1002}, keySchema=Schema{testc.inventory.customers.Key:STRUCT}, value=Struct{id=1002,first_name=George,last_name=Bailey,email=gbailey@foobar.com,__deleted=false,__op=r,__table=customers,__source_ts_ms=1710757638454,__db=postgres,__ts_ms=1710757638445}, valueSchema=Schema{testc.inventory.customers.Value:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}]'
ismailsimsek commented 3 months ago

one option is manually fixing it

  1. stop the consumer,
  2. alter iceberg table drop identifier filed,
  3. set fields optional.
  4. start the consumer
hoaiff commented 3 months ago

Thanks @ismailsimsek I will try manually fixing it later. Besides, I added a log level as above. This is the detailed log

oracle_prod1  | 2024-03-19 01:48:39,576 WARN  [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Created namespace:'oracle_test2'
oracle_prod1  | 2024-03-19 01:48:40,424 WARN  [org.apa.had.met.imp.MetricsConfig] (pool-7-thread-1) Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
oracle_prod1  | 2024-03-19 01:48:42,946 INFO  [io.deb.ser.ice.his.IcebergSchemaHistory] (pool-7-thread-1) Starting IcebergSchemaHistory storage table:iceberg.debezium_database_history_storage_test
oracle_prod1  | 2024-03-19 01:48:56,539 WARN  [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Table not found: oracle_test2.ekyc_IDG_VOICE_COUNT_VIOLATE_CALL_CENTER_DAY
oracle_prod1  | 2024-03-19 01:48:56,553 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Converting iceberg schema to debezium:{"type":"struct","fields":[{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"REPORT_DAY"},{"type":"string","optional":true,"field":"VIOLATE_CODE"},{"type":"string","optional":true,"field":"COUNT_DATA"},{"type":"string","optional":true,"field":"HOTLINE"},{"type":"string","optional":true,"field":"BRANCH"},{"type":"string","optional":true,"field":"__deleted"},{"type":"string","optional":true,"field":"__op"},{"type":"int64","optional":true,"field":"__source_ts_ms"}],"optional":false,"name":"ekyc.IDG_VOICE.COUNT_VIOLATE_CALL_CENTER_DAY.Value"}
oracle_prod1  | 2024-03-19 01:48:56,554 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Converting iceberg schema to debezium:{"type":"struct","fields":[{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"REPORT_DAY"},{"type":"string","optional":true,"field":"VIOLATE_CODE"},{"type":"string","optional":true,"field":"HOTLINE"},{"type":"string","optional":true,"field":"BRANCH"}],"optional":false,"name":"ekyc.IDG_VOICE.COUNT_VIOLATE_CALL_CENTER_DAY.Key"}
oracle_prod1  | 2024-03-19 01:48:56,556 WARN  [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Creating table:'oracle_test2.ekyc_IDG_VOICE_COUNT_VIOLATE_CALL_CENTER_DAY'
oracle_prod1  | schema:table {
oracle_prod1  |   1: REPORT_DAY: required long (id)
oracle_prod1  |   2: VIOLATE_CODE: required string (id)
oracle_prod1  |   3: COUNT_DATA: optional string
oracle_prod1  |   4: HOTLINE: required string (id)
oracle_prod1  |   5: BRANCH: required string (id)
oracle_prod1  |   6: __deleted: optional string
oracle_prod1  |   7: __op: optional string
oracle_prod1  |   8: __source_ts_ms: optional timestamptz
oracle_prod1  | }
oracle_prod1  | rowIdentifier:[BRANCH, HOTLINE, VIOLATE_CODE, REPORT_DAY]
oracle_prod1  | 2024-03-19 01:48:56,832 DEBUG [io.deb.ser.ice.tab.IcebergTableOperator] (pool-7-thread-1) Batch got 5027 records with 1 different schema!!
oracle_prod1  | 2024-03-19 01:48:56,833 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Converting iceberg schema to debezium:{"type":"struct","fields":[{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"REPORT_DAY"},{"type":"string","optional":true,"field":"VIOLATE_CODE"},{"type":"string","optional":true,"field":"COUNT_DATA"},{"type":"string","optional":true,"field":"HOTLINE"},{"type":"string","optional":true,"field":"BRANCH"},{"type":"string","optional":true,"field":"__deleted"},{"type":"string","optional":true,"field":"__op"},{"type":"int64","optional":true,"field":"__source_ts_ms"}],"optional":false,"name":"ekyc.IDG_VOICE.COUNT_VIOLATE_CALL_CENTER_DAY.Value"}
oracle_prod1  | 2024-03-19 01:48:56,833 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Converting iceberg schema to debezium:{"type":"struct","fields":[{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"REPORT_DAY"},{"type":"string","optional":true,"field":"VIOLATE_CODE"},{"type":"string","optional":true,"field":"HOTLINE"},{"type":"string","optional":true,"field":"BRANCH"}],"optional":false,"name":"ekyc.IDG_VOICE.COUNT_VIOLATE_CALL_CENTER_DAY.Key"}
oracle_prod1  | 2024-03-19 01:48:56,885 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing nested field:struct<1: REPORT_DAY: required long, 2: VIOLATE_CODE: required string, 3: COUNT_DATA: optional string, 4: HOTLINE: required string, 5: BRANCH: required string, 6: __deleted: optional string, 7: __op: optional string, 8: __source_ts_ms: optional timestamptz>
oracle_prod1  | 2024-03-19 01:48:56,886 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:REPORT_DAY Type:long
oracle_prod1  | 2024-03-19 01:48:56,887 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:VIOLATE_CODE Type:string
oracle_prod1  | 2024-03-19 01:48:56,888 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:COUNT_DATA Type:string
oracle_prod1  | 2024-03-19 01:48:56,888 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:HOTLINE Type:string
oracle_prod1  | 2024-03-19 01:48:56,888 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:BRANCH Type:string
oracle_prod1  | 2024-03-19 01:48:56,888 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:__deleted Type:string
oracle_prod1  | 2024-03-19 01:48:56,889 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:__op Type:string
oracle_prod1  | 2024-03-19 01:48:56,889 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:__source_ts_ms Type:timestamptz
oracle_prod1  | 2024-03-19 01:48:57,013 ERROR [io.deb.rel.RelationalSnapshotChangeEventSource] (debezium-oracleconnector-ekyc-change-event-source-coordinator) Error during snapshot: java.util.concurrent.ExecutionException: java.lang.InterruptedException: Interrupted while snapshotting table PDBIDG_SYS.IDG_VOICE.COUNT_VIOLATE_CALL_CENTER_DAY
oracle_prod1  |     at java.base/java.util.concurrent.FutureTask.report(Unknown Source)
oracle_prod1  |     at java.base/java.util.concurrent.FutureTask.get(Unknown Source)
oracle_prod1  |     at io.debezium.relational.RelationalSnapshotChangeEventSource.createDataEvents(RelationalSnapshotChangeEventSource.java:468)
oracle_prod1  |     at io.debezium.relational.RelationalSnapshotChangeEventSource.doExecute(RelationalSnapshotChangeEventSource.java:165)
oracle_prod1  |     at io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:92)
oracle_prod1  |     at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:250)
oracle_prod1  |     at io.debezium.pipeline.ChangeEventSourceCoordinator.doSnapshot(ChangeEventSourceCoordinator.java:234)
oracle_prod1  |     at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:186)
oracle_prod1  |     at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:137)
oracle_prod1  |     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
oracle_prod1  |     at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
oracle_prod1  |     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
oracle_prod1  |     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
oracle_prod1  |     at java.base/java.lang.Thread.run(Unknown Source)
oracle_prod1  | Caused by: java.lang.InterruptedException: Interrupted while snapshotting table PDBIDG_SYS.IDG_VOICE.COUNT_VIOLATE_CALL_CENTER_DAY
oracle_prod1  |     at io.debezium.relational.RelationalSnapshotChangeEventSource.doCreateDataEventsForTable(RelationalSnapshotChangeEventSource.java:557)
oracle_prod1  |     at io.debezium.relational.RelationalSnapshotChangeEventSource.lambda$createDataEventsForTableCallable$6(RelationalSnapshotChangeEventSource.java:520)
oracle_prod1  |     at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
oracle_prod1  |     ... 5 more
oracle_prod1  | 
oracle_prod1  | 
oracle_prod1  | 2024-03-19 01:48:57,020 WARN  [io.deb.pip.sou.AbstractSnapshotChangeEventSource] (debezium-oracleconnector-ekyc-change-event-source-coordinator) Snapshot was not completed successfully, it will be re-executed upon connector restart

Additional, I see that field of keys schema is optional

{
    "type":"struct",
    "fields":[
    {
        "type":"int64",
        "optional":true,
        "name":"io.debezium.time.Timestamp",
        "version":1,
        "field":"REPORT_DAY"
    },
    {
        "type":"string",
        "optional":true,
        "field":"VIOLATE_CODE"
    },
    {
        "type":"string",
        "optional":true,
        "field":"HOTLINE"
    },
    {
        "type":"string",
        "optional":true,
        "field":"BRANCH"
    }
    ],
    "optional":false,
    "name":"ekyc.IDG_VOICE.COUNT_VIOLATE_CALL_CENTER_DAY.Key"
}
ismailsimsek commented 3 months ago

it seems like Debezium is sending this fields as key. not sure why, it might be oracle related feature.

Additional, I see that field of keys schema is optional

Correct but for iceberg, KEY fields cannot be null. that's why on iceberg side they are set as required

hoaiff commented 3 months ago

@ismailsimsek, I tried to create a new table with the same schema in dev env and I don't face same problem, RowIndentifier is right with the root table, I don't why it occured with my table in prod env

oracle_prod1  | 2024-03-20 04:08:08,167 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Converting iceberg schema to debezium:{"type":"struct","fields":[{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"REPORT_DAY"},{"type":"string","optional":true,"field":"VIOLATE_CODE"},{"type":"string","optional":true,"field":"COUNT_DATA"},{"type":"string","optional":true,"field":"HOTLINE"},{"type":"string","optional":true,"field":"BRANCH"},{"type":"string","optional":true,"field":"__deleted"},{"type":"string","optional":true,"field":"__op"},{"type":"int64","optional":true,"field":"__source_ts_ms"}],"optional":false,"name":"ekyc2.DBZUSER.COUNT_VIOLATE_CALL_CENTER_DAY1.Value"}
oracle_prod1  | 2024-03-20 04:08:08,169 WARN  [io.deb.ser.ice.IcebergUtil] (pool-7-thread-1) Creating table:'oracle_clob2.ekyc2_DBZUSER_COUNT_VIOLATE_CALL_CENTER_DAY1'
oracle_prod1  | schema:table {
oracle_prod1  |   1: REPORT_DAY: optional long
oracle_prod1  |   2: VIOLATE_CODE: optional string
oracle_prod1  |   3: COUNT_DATA: optional string
oracle_prod1  |   4: HOTLINE: optional string
oracle_prod1  |   5: BRANCH: optional string
oracle_prod1  |   6: __deleted: optional string
oracle_prod1  |   7: __op: optional string
oracle_prod1  |   8: __source_ts_ms: optional timestamptz
oracle_prod1  | }
oracle_prod1  | rowIdentifier:[]
oracle_prod1  | 2024-03-20 04:08:08,271 WARN  [io.deb.con.ora.log.LogMinerStreamingChangeEventSource] (debezium-oracleconnector-ekyc2-change-event-source-coordinator) Database table 'PDBIDG_DEV.DBZUSER.COUNT_VIOLATE_CALL_CENTER_DAY1' not configured with supplemental logging "(ALL) COLUMNS"; only explicitly changed columns will be captured. Use: ALTER TABLE DBZUSER.COUNT_VIOLATE_CALL_CENTER_DAY1 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS
oracle_prod1  | 2024-03-20 04:08:08,308 WARN  [io.deb.con.ora.log.LogMinerStreamingChangeEventSource] (debezium-oracleconnector-ekyc2-change-event-source-coordinator) Redo logs may be sized too small using the default mining strategy, consider increasing redo log sizes to a minimum of 500MB.
oracle_prod1  | 2024-03-20 04:08:08,421 DEBUG [io.deb.ser.ice.tab.IcebergTableOperator] (pool-7-thread-1) Batch got 9 records with 1 different schema!!
oracle_prod1  | 2024-03-20 04:08:08,422 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Converting iceberg schema to debezium:{"type":"struct","fields":[{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"REPORT_DAY"},{"type":"string","optional":true,"field":"VIOLATE_CODE"},{"type":"string","optional":true,"field":"COUNT_DATA"},{"type":"string","optional":true,"field":"HOTLINE"},{"type":"string","optional":true,"field":"BRANCH"},{"type":"string","optional":true,"field":"__deleted"},{"type":"string","optional":true,"field":"__op"},{"type":"int64","optional":true,"field":"__source_ts_ms"}],"optional":false,"name":"ekyc2.DBZUSER.COUNT_VIOLATE_CALL_CENTER_DAY1.Value"}
oracle_prod1  | 2024-03-20 04:08:08,433 INFO  [io.deb.ser.ice.tab.IcebergTableOperator] (pool-7-thread-1) Table don't have Pk defined upsert is not possible falling back to append!
oracle_prod1  | 2024-03-20 04:08:08,437 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing nested field:struct<1: REPORT_DAY: optional long, 2: VIOLATE_CODE: optional string, 3: COUNT_DATA: optional string, 4: HOTLINE: optional string, 5: BRANCH: optional string, 6: __deleted: optional string, 7: __op: optional string, 8: __source_ts_ms: optional timestamptz>
oracle_prod1  | 2024-03-20 04:08:08,437 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:REPORT_DAY Type:long
oracle_prod1  | 2024-03-20 04:08:08,438 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:VIOLATE_CODE Type:string
oracle_prod1  | 2024-03-20 04:08:08,439 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:COUNT_DATA Type:string
oracle_prod1  | 2024-03-20 04:08:08,439 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:HOTLINE Type:string
oracle_prod1  | 2024-03-20 04:08:08,439 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:BRANCH Type:string
oracle_prod1  | 2024-03-20 04:08:08,439 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:__deleted Type:string
oracle_prod1  | 2024-03-20 04:08:08,439 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:__op Type:string
oracle_prod1  | 2024-03-20 04:08:08,439 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:__source_ts_ms Type:timestamptz
oracle_prod1  | 2024-03-20 04:08:08,440 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing nested field:struct<1: REPORT_DAY: optional long, 2: VIOLATE_CODE: optional string, 3: COUNT_DATA: optional string, 4: HOTLINE: optional string, 5: BRANCH: optional string, 6: __deleted: optional string, 7: __op: optional string, 8: __source_ts_ms: optional timestamptz>
oracle_prod1  | 2024-03-20 04:08:08,440 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:REPORT_DAY Type:long
oracle_prod1  | 2024-03-20 04:08:08,440 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:VIOLATE_CODE Type:string
oracle_prod1  | 2024-03-20 04:08:08,440 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:COUNT_DATA Type:string
oracle_prod1  | 2024-03-20 04:08:08,441 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:HOTLINE Type:string
oracle_prod1  | 2024-03-20 04:08:08,441 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:BRANCH Type:string
oracle_prod1  | 2024-03-20 04:08:08,441 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:__deleted Type:string
oracle_prod1  | 2024-03-20 04:08:08,441 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:__op Type:string
oracle_prod1  | 2024-03-20 04:08:08,441 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:__source_ts_ms Type:timestamptz
oracle_prod1  | 2024-03-20 04:08:08,442 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing nested field:struct<1: REPORT_DAY: optional long, 2: VIOLATE_CODE: optional string, 3: COUNT_DATA: optional string, 4: HOTLINE: optional string, 5: BRANCH: optional string, 6: __deleted: optional string, 7: __op: optional string, 8: __source_ts_ms: optional timestamptz>
oracle_prod1  | 2024-03-20 04:08:08,442 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:REPORT_DAY Type:long
oracle_prod1  | 2024-03-20 04:08:08,442 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:VIOLATE_CODE Type:string
oracle_prod1  | 2024-03-20 04:08:08,442 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:COUNT_DATA Type:string
oracle_prod1  | 2024-03-20 04:08:08,442 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:HOTLINE Type:string
oracle_prod1  | 2024-03-20 04:08:08,442 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:BRANCH Type:string
oracle_prod1  | 2024-03-20 04:08:08,442 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:__deleted Type:string
oracle_prod1  | 2024-03-20 04:08:08,442 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:__op Type:string
oracle_prod1  | 2024-03-20 04:08:08,443 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:__source_ts_ms Type:timestamptz
oracle_prod1  | 2024-03-20 04:08:08,443 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing nested field:struct<1: REPORT_DAY: optional long, 2: VIOLATE_CODE: optional string, 3: COUNT_DATA: optional string, 4: HOTLINE: optional string, 5: BRANCH: optional string, 6: __deleted: optional string, 7: __op: optional string, 8: __source_ts_ms: optional timestamptz>
oracle_prod1  | 2024-03-20 04:08:08,443 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:REPORT_DAY Type:long
oracle_prod1  | 2024-03-20 04:08:08,443 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:VIOLATE_CODE Type:string
oracle_prod1  | 2024-03-20 04:08:08,443 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:COUNT_DATA Type:string
oracle_prod1  | 2024-03-20 04:08:08,443 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:HOTLINE Type:string
oracle_prod1  | 2024-03-20 04:08:08,443 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:BRANCH Type:string
oracle_prod1  | 2024-03-20 04:08:08,443 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:__deleted Type:string
oracle_prod1  | 2024-03-20 04:08:08,444 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:__op Type:string
oracle_prod1  | 2024-03-20 04:08:08,444 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:__source_ts_ms Type:timestamptz
oracle_prod1  | 2024-03-20 04:08:08,444 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing nested field:struct<1: REPORT_DAY: optional long, 2: VIOLATE_CODE: optional string, 3: COUNT_DATA: optional string, 4: HOTLINE: optional string, 5: BRANCH: optional string, 6: __deleted: optional string, 7: __op: optional string, 8: __source_ts_ms: optional timestamptz>
oracle_prod1  | 2024-03-20 04:08:08,444 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:REPORT_DAY Type:long
oracle_prod1  | 2024-03-20 04:08:08,444 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:VIOLATE_CODE Type:string
oracle_prod1  | 2024-03-20 04:08:08,444 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:COUNT_DATA Type:string
oracle_prod1  | 2024-03-20 04:08:08,444 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:HOTLINE Type:string
oracle_prod1  | 2024-03-20 04:08:08,444 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:BRANCH Type:string
oracle_prod1  | 2024-03-20 04:08:08,444 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:__deleted Type:string
oracle_prod1  | 2024-03-20 04:08:08,444 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:__op Type:string
oracle_prod1  | 2024-03-20 04:08:08,445 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:__source_ts_ms Type:timestamptz
oracle_prod1  | 2024-03-20 04:08:08,445 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing nested field:struct<1: REPORT_DAY: optional long, 2: VIOLATE_CODE: optional string, 3: COUNT_DATA: optional string, 4: HOTLINE: optional string, 5: BRANCH: optional string, 6: __deleted: optional string, 7: __op: optional string, 8: __source_ts_ms: optional timestamptz>
oracle_prod1  | 2024-03-20 04:08:08,445 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:REPORT_DAY Type:long
oracle_prod1  | 2024-03-20 04:08:08,445 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:VIOLATE_CODE Type:string
oracle_prod1  | 2024-03-20 04:08:08,445 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:COUNT_DATA Type:string
oracle_prod1  | 2024-03-20 04:08:08,445 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:HOTLINE Type:string
oracle_prod1  | 2024-03-20 04:08:08,445 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:BRANCH Type:string
oracle_prod1  | 2024-03-20 04:08:08,445 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:__deleted Type:string
oracle_prod1  | 2024-03-20 04:08:08,445 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:__op Type:string
oracle_prod1  | 2024-03-20 04:08:08,445 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:__source_ts_ms Type:timestamptz
oracle_prod1  | 2024-03-20 04:08:08,446 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing nested field:struct<1: REPORT_DAY: optional long, 2: VIOLATE_CODE: optional string, 3: COUNT_DATA: optional string, 4: HOTLINE: optional string, 5: BRANCH: optional string, 6: __deleted: optional string, 7: __op: optional string, 8: __source_ts_ms: optional timestamptz>
oracle_prod1  | 2024-03-20 04:08:08,446 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:REPORT_DAY Type:long
oracle_prod1  | 2024-03-20 04:08:08,446 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:VIOLATE_CODE Type:string
oracle_prod1  | 2024-03-20 04:08:08,446 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:COUNT_DATA Type:string
oracle_prod1  | 2024-03-20 04:08:08,446 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:HOTLINE Type:string
oracle_prod1  | 2024-03-20 04:08:08,446 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:BRANCH Type:string
oracle_prod1  | 2024-03-20 04:08:08,446 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:__deleted Type:string
oracle_prod1  | 2024-03-20 04:08:08,446 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:__op Type:string
oracle_prod1  | 2024-03-20 04:08:08,446 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:__source_ts_ms Type:timestamptz
oracle_prod1  | 2024-03-20 04:08:08,447 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing nested field:struct<1: REPORT_DAY: optional long, 2: VIOLATE_CODE: optional string, 3: COUNT_DATA: optional string, 4: HOTLINE: optional string, 5: BRANCH: optional string, 6: __deleted: optional string, 7: __op: optional string, 8: __source_ts_ms: optional timestamptz>
oracle_prod1  | 2024-03-20 04:08:08,447 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:REPORT_DAY Type:long
oracle_prod1  | 2024-03-20 04:08:08,447 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:VIOLATE_CODE Type:string
oracle_prod1  | 2024-03-20 04:08:08,447 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:COUNT_DATA Type:string
oracle_prod1  | 2024-03-20 04:08:08,447 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:HOTLINE Type:string
oracle_prod1  | 2024-03-20 04:08:08,447 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:BRANCH Type:string
oracle_prod1  | 2024-03-20 04:08:08,448 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:__deleted Type:string
oracle_prod1  | 2024-03-20 04:08:08,448 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:__op Type:string
oracle_prod1  | 2024-03-20 04:08:08,448 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:__source_ts_ms Type:timestamptz
oracle_prod1  | 2024-03-20 04:08:08,448 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing nested field:struct<1: REPORT_DAY: optional long, 2: VIOLATE_CODE: optional string, 3: COUNT_DATA: optional string, 4: HOTLINE: optional string, 5: BRANCH: optional string, 6: __deleted: optional string, 7: __op: optional string, 8: __source_ts_ms: optional timestamptz>
oracle_prod1  | 2024-03-20 04:08:08,448 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:REPORT_DAY Type:long
oracle_prod1  | 2024-03-20 04:08:08,448 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:VIOLATE_CODE Type:string
oracle_prod1  | 2024-03-20 04:08:08,448 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:COUNT_DATA Type:string
oracle_prod1  | 2024-03-20 04:08:08,449 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:HOTLINE Type:string
oracle_prod1  | 2024-03-20 04:08:08,449 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:BRANCH Type:string
oracle_prod1  | 2024-03-20 04:08:08,449 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:__deleted Type:string
oracle_prod1  | 2024-03-20 04:08:08,449 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:__op Type:string
oracle_prod1  | 2024-03-20 04:08:08,449 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Processing Field:__source_ts_ms Type:timestamptz
oracle_prod1  | 2024-03-20 04:08:08,798 INFO  [io.deb.ser.ice.tab.IcebergTableOperator] (pool-7-thread-1) Committed 9 events to table! s3a://datalake/datawarehouse/oracle_clob2.db/ekyc2_DBZUSER_COUNT_VIOLATE_CALL_CENTER_DAY1
oracle_prod1  | 2024-03-20 04:08:08,801 DEBUG [io.deb.ser.ice.bat.MaxBatchSizeWait] (pool-7-thread-1) Processed 9, QueueCurrentSize:0, QueueTotalCapacity:150000, SecondsBehindSource:0, SnapshotCompleted:true
oracle_prod1  | 2024-03-20 04:08:08,802 DEBUG [io.deb.ser.ice.bat.MaxBatchSizeWait] (pool-7-thread-1) Sleeping 20000 Milliseconds, QueueCurrentSize:0 < maxBatchSize:120000
oracle_prod1  | 2024-03-20 04:08:28,803 DEBUG [io.deb.ser.ice.bat.MaxBatchSizeWait] (pool-7-thread-1) Sleeping 20000 Milliseconds, QueueCurrentSize:0 < maxBatchSize:120000
oracle_prod1  | 2024-03-20 04:08:48,831 DEBUG [io.deb.ser.ice.bat.MaxBatchSizeWait] (pool-7-thread-1) Sleeping 20000 Milliseconds, QueueCurrentSize:0 < maxBatchSize:120000
oracle_prod1  | 2024-03-20 04:09:08,832 DEBUG [io.deb.ser.ice.bat.MaxBatchSizeWait] (pool-7-thread-1) Sleeping 20000 Milliseconds, QueueCurrentSize:0 < maxBatchSize:120000
oracle_prod1  | 2024-03-20 04:09:31,753 DEBUG [io.deb.ser.ice.bat.MaxBatchSizeWait] (pool-7-thread-1) Sleeping 20000 Milliseconds, QueueCurrentSize:0 < maxBatchSize:120000
oracle_prod1  | 2024-03-20 04:09:52,007 DEBUG [io.deb.ser.ice.bat.MaxBatchSizeWait] (pool-7-thread-1) Sleeping 20000 Milliseconds, QueueCurrentSize:0 < maxBatchSize:120000
oracle_prod1  | 2024-03-20 04:10:12,008 DEBUG [io.deb.ser.ice.bat.MaxBatchSizeWait] (pool-7-thread-1) Total wait 120000 Milliseconds, QueueCurrentSize:0 < maxBatchSize:120000
ismailsimsek commented 3 months ago

glad its working. only thing to keep in mind is: with upsert mode, for the tables without primary key, the consumer falls back to append mode.

 Table don't have Pk defined upsert is not possible falling back to append!
hoaiff commented 3 months ago

@ismailsimsek, I tried manual fixing in prod env, but it not working

one option is manually fixing it stop the consumer, alter iceberg table drop identifier filed, set fields optional. start the consumer

table after drop identifier filed image

Stack trace after running again connector

oracle_prod1  | 2024-03-21 08:56:36,244 DEBUG [io.deb.ser.ice.tab.IcebergTableOperator] (pool-7-thread-1) Batch got 6930 records with 1 different schema!!
oracle_prod1  | 2024-03-21 08:56:36,259 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Converting iceberg schema to debezium:{"type":"struct","fields":[{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"REPORT_DAY"},{"type":"string","optional":true,"field":"COUNT_DATA"},{"type":"string","optional":true,"field":"HOTLINE"},{"type":"string","optional":true,"field":"BRANCH"},{"type":"string","optional":true,"field":"__deleted"},{"type":"string","optional":true,"field":"__op"},{"type":"int64","optional":true,"field":"__source_ts_ms"}],"optional":false,"name":"ekyc2.IDG_VOICE.COUNT_CALL_CENTER_DAY.Value"}
oracle_prod1  | 2024-03-21 08:56:36,260 DEBUG [io.deb.ser.ice.IcebergChangeEvent] (pool-7-thread-1) Converting iceberg schema to debezium:{"type":"struct","fields":[{"type":"int64","optional":true,"name":"io.debezium.time.Timestamp","version":1,"field":"REPORT_DAY"},{"type":"string","optional":true,"field":"HOTLINE"},{"type":"string","optional":true,"field":"BRANCH"}],"optional":false,"name":"ekyc2.IDG_VOICE.COUNT_CALL_CENTER_DAY.Key"}
oracle_prod1  | 2024-03-21 08:56:52,037 INFO  [io.deb.ser.ice.off.IcebergOffsetBackingStore] (pool-7-thread-1) Stopped IcebergOffsetBackingStore table:oracle3.debezium_offset_storage_custom_table
oracle_prod1  | 2024-03-21 08:56:52,038 ERROR [io.deb.ser.ConnectorLifecycle] (pool-7-thread-1) Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: Cannot add field REPORT_DAY as an identifier field: not a required field', error = 'java.lang.IllegalArgumentException: Cannot add field REPORT_DAY as an identifier field: not a required field': java.lang.IllegalArgumentException: Cannot add field REPORT_DAY as an identifier field: not a required field
oracle_prod1  |     at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:218)
oracle_prod1  |     at org.apache.iceberg.Schema.validateIdentifierField(Schema.java:126)
oracle_prod1  |     at org.apache.iceberg.SchemaUpdate.lambda$applyChanges$1(SchemaUpdate.java:555)
oracle_prod1  |     at java.base/java.lang.Iterable.forEach(Unknown Source)
oracle_prod1  |     at org.apache.iceberg.SchemaUpdate.applyChanges(SchemaUpdate.java:554)
oracle_prod1  |     at org.apache.iceberg.SchemaUpdate.apply(SchemaUpdate.java:440)
oracle_prod1  |     at org.apache.iceberg.SchemaUpdate.apply(SchemaUpdate.java:48)
oracle_prod1  |     at io.debezium.server.iceberg.tableoperator.IcebergTableOperator.applyFieldAddition(IcebergTableOperator.java:116)
oracle_prod1  |     at io.debezium.server.iceberg.tableoperator.IcebergTableOperator.addToTable(IcebergTableOperator.java:155)
oracle_prod1  |     at io.debezium.server.iceberg.IcebergChangeConsumer.handleBatch(IcebergChangeConsumer.java:167)
oracle_prod1  |     at io.debezium.embedded.ConvertingEngineBuilder$ConvertingChangeConsumer.handleBatch(ConvertingEngineBuilder.java:108)
oracle_prod1  |     at io.debezium.embedded.EmbeddedEngine.pollRecords(EmbeddedEngine.java:728)
oracle_prod1  |     at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:475)
oracle_prod1  |     at io.debezium.embedded.ConvertingEngineBuilder$1.run(ConvertingEngineBuilder.java:248)
oracle_prod1  |     at io.debezium.server.DebeziumServer.lambda$start$1(DebeziumServer.java:170)
oracle_prod1  |     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
oracle_prod1  |     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
oracle_prod1  |     at java.base/java.lang.Thread.run(Unknown Source)
oracle_prod1  | 
oracle_prod1  | 
oracle_prod1 exited with code 1
ismailsimsek commented 3 months ago

@hoaiff this is different table it seems, right?

according to the log this source table has primary keys. line 3 in the above log.

whats happening is: Since debezium still sending events with key schema, the 3th line above, the consumer is detecting schema difference and trying to apply the schema changes to destination-debezium table.

https://github.com/memiiso/debezium-server-iceberg/blob/48d7355804fae0bbe01ec550c9d7be4e6e7a38b8/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java#L148-L158

one option is to disable fiedl addition feature: debezium.sink.iceberg.allow-field-addition=false

with this the schema changes will not be applied, by the consumer.

See: https://github.com/memiiso/debezium-server-iceberg/blob/master/docs/DOCS.md#schema-change-behaviour

hoaiff commented 3 months ago

Hi @ismailsimsek, I found this problem. Because my table in prod env using unique index on 4 fields: BRANCH, HOTLINE, VIOLATE_CODE, REPORT_DAY therefore debezium gen it as key in key schema

ismailsimsek commented 3 months ago

@hoaiff in that case the fix is:

  1. set the primary key fields to required, iceberg table
  2. set this fields as identifier fields, iceberg table

it seems like the consumer is tying to do it automatically, but it cannot handle the fist step.