memiiso / debezium-server-iceberg

Replicates any database (CDC events) to Apache Iceberg (To Cloud Storage)
Apache License 2.0
170 stars 35 forks source link

Error while running the application #335

Open kowshikdutta opened 1 month ago

kowshikdutta commented 1 month ago

Hi, I am facing the below error while running the applicaiton.

Source: MariaDB Sink: AWS Glue Catalog and S3 Java: V17

Attached is the "application.properties" file

Tables are getting created in "default" database in Glue. But then the app is crashing due to below error

ERROR: application.properties.pdf application.properties.pdf application.properties.pdf

2024-05-31 12:28:22,710 ERROR [io.deb.ser.ConnectorLifecycle] (pool-7-thread-1) Connector completed: success = 'false', message = 'java.lang.NullPointerException: Cannot invoke "com.fasterxml.jackson.databind.JsonNode.asLong(long)" because the return value of "com.fasterxml.jackson.databind.JsonNode.get(String)" is null', error = 'java.lang.NullPointerException: Cannot invoke "com.fasterxml.jackson.databind.JsonNode.asLong(long)" because the return value of "com.fasterxml.jackson.databind.JsonNode.get(String)" is null': java.lang.NullPointerException: Cannot invoke "com.fasterxml.jackson.databind.JsonNode.asLong(long)" because the return value of "com.fasterxml.jackson.databind.JsonNode.get(String)" is null at io.debezium.server.iceberg.tableoperator.IcebergTableOperator.compareByTsThenOp(IcebergTableOperator.java:89) at io.debezium.server.iceberg.tableoperator.IcebergTableOperator.lambda$deduplicateBatch$0(IcebergTableOperator.java:63) at java.base/java.util.concurrent.ConcurrentHashMap.merge(ConcurrentHashMap.java:2056) at io.debezium.server.iceberg.tableoperator.IcebergTableOperator.lambda$deduplicateBatch$1(IcebergTableOperator.java:62) at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) at io.debezium.server.iceberg.tableoperator.IcebergTableOperator.deduplicateBatch(IcebergTableOperator.java:60) at io.debezium.server.iceberg.tableoperator.IcebergTableOperator.addToTable(IcebergTableOperator.java:141) at io.debezium.server.iceberg.IcebergChangeConsumer.handleBatch(IcebergChangeConsumer.java:167) at io.debezium.embedded.async.ParallelSmtAndConvertBatchProcessor.processRecords(ParallelSmtAndConvertBatchProcessor.java:56) at io.debezium.embedded.async.AsyncEmbeddedEngine$PollRecords.doCall(AsyncEmbeddedEngine.java:1157) at io.debezium.embedded.async.AsyncEmbeddedEngine$PollRecords.doCall(AsyncEmbeddedEngine.java:1138) at io.debezium.embedded.async.RetryingCallable.call(RetryingCallable.java:47) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840)

2024-05-31 12:28:22,711 DEBUG [io.qua.run.Application] (main) Stopping application 2024-05-31 12:28:22,711 DEBUG [io.qua.run.shu.ShutdownRecorder] (main) Attempting to gracefully shutdown. 2024-05-31 12:28:22,728 INFO [io.deb.ser.DebeziumServer] (main) Received request to stop the engine 2024-05-31 12:28:22,728 DEBUG [io.deb.emb.asy.AsyncEmbeddedEngine] (main) Engine shutdown called. 2024-05-31 12:28:22,728 ERROR [io.deb.ser.DebeziumServer] (main) Exception while shuttting down Debezium [Error Occurred After Shutdown]: java.lang.IllegalStateException: Engine has been already shut down. at io.debezium.embedded.async.AsyncEmbeddedEngine.close(AsyncEmbeddedEngine.java:249) at io.debezium.server.DebeziumServer.stop(DebeziumServer.java:249) at io.debezium.server.DebeziumServer_Observer_stop_yMpCZhmgvv79zBZyHwCy4l6x-EI.notify(Unknown Source) at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:351) at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:333) at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:80) at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:155) at io.quarkus.arc.runtime.ArcRecorder$2.run(ArcRecorder.java:111) at io.quarkus.runtime.StartupContext.runAllInReverseOrder(StartupContext.java:84) at io.quarkus.runtime.StartupContext.close(StartupContext.java:73) at io.quarkus.runner.ApplicationImpl.doStop(Unknown Source) at io.quarkus.runtime.Application.stop(Application.java:208) at io.quarkus.runtime.Application.stop(Application.java:155) at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:228) at io.quarkus.runtime.Quarkus.run(Quarkus.java:71) at io.quarkus.runtime.Quarkus.run(Quarkus.java:44) at io.quarkus.runtime.Quarkus.run(Quarkus.java:124) at io.debezium.server.Main.main(Main.java:15)

ismailsimsek commented 1 month ago

its failing here while cheeking __source_ts_ms value for de-duplication.

do you have sample events? somehow events are generated with NULL __source_ts_ms values. you can see them in debug mode.

does it works when you run it without upsert mode? debezium.sink.iceberg.upsert=false

kowshikdutta commented 4 weeks ago

Hi, I did update flag "debezium.sink.iceberg.upsert=false" and the application started working. I could see duplicate records coming up with before and after image. Please check the attachment. For "winyear" = 2008, table is showing both before and after image. I would like to see only the latest image. Please can you suggest how to turn on that flag without causing the application to fail.

Also, I have attached a sample set of events data from a final glue table. There are lot of records where "__source_ts_ms" is coming as NULL. They appear to be snapshots. Please can you suggest as how to avoid these.

Regards, Kowshik sampledata.xlsx

ismailsimsek commented 4 weeks ago

@kowshikdutta fist section of the records (first 22) is not unwrapped data. that's why its created with null __source_ts_ms

could it be that debezium.transforms=unwrap was not working before?

in both data, you can see that source.ts_ms is populated

kowshikdutta commented 3 weeks ago

Hi, Application is failing when I set 'debezium.sink.iceberg.upsert=true'. Without this, I could see the updated records are showing up with the original record. So finally it's coming with duplicates. Please can you advise as ![Uploading Screenshot 2024-06-07 at 6.09.19 PM.png…]() how to make the application run with the above flag set to 'debezium.sink.iceberg.upsert=true'.

ismailsimsek commented 3 weeks ago

@kowshikdutta could you share the last error log? and config? and you can see working example here, just leaving here for a reference