yugabyte / debezium-connector-yugabytedb

A Debezium CDC connector for the YugabyteDB database
https://docs.yugabyte.com/stable/explore/change-data-capture/using-logical-replication/yugabytedb-connector/
Apache License 2.0
12 stars 8 forks source link

DebeziumEngine and EXPLICIT checkpoint handling #344

Closed rbramante closed 3 months ago

rbramante commented 3 months ago

I am not able to deploy the entire Debezium stack primarily because a project I am working on can't pull in all of Kafka and secondarily the data sink I need to push events to is not currently supported by Debezium.

I am using the DebeziumEngine portion of Debezium in conjunction with debezium-connector-yugabytedb to get changes from YB and push to my sink. In general this is all working well on the happy-path but I need to switch from an IMPLICIT to EXPLICIT stream because I need to ensure my sink has persisted the events before allowing YB CDC to move forward.

This is where I am getting stuck. In EXPLICIT mode, I have yet to find the correct combination of snapshot.mode, RecordCommitter handleBatch markProcessed markBatchFinished, OffsetStore and the like to be able to get the behavior I need. Scanning code, it seems like I need a mechanism to invoke commitOffset in YugabyteDBChangeEventSourceCoordinator but currently I am not seeing a mechanism to perform this sort of action through DebeziumEngine.

A similar question on StackOverflow went unanswered https://stackoverflow.com/questions/66313424/debezium-how-to-commit-offset-manually

Also, is that anyway to examine the state of the CDC stream to see any commit offsets? Maybe that would help with debugging what is happening under various tests.

rbramante commented 3 months ago

The issue for us ended up being that we were not waiting long enough after committer.markBatchFinished() for the YugabyteDBChangeEventSourceCoordinator to invoke commitOffset to the server so our tests were behaving as if a checkpoint was never set at the server because, in fact, it wasn't. We ran some tests with OffsetCommitPolicy.always() and observed the behavior we were expecting to see.