yugabyte / debezium-connector-yugabytedb

A Debezium CDC connector for the YugabyteDB database
https://docs.yugabyte.com/stable/explore/change-data-capture/using-logical-replication/yugabytedb-connector/
Apache License 2.0
12 stars 8 forks source link

[DBZ] Read offset from Kafka for every commit callback #348

Closed vaibhav-yb closed 2 months ago

vaibhav-yb commented 2 months ago

Note: This PR breaks explicit checkpointing in the tablet splitting case, it will be fixed in a follow-up PR.

Problem

There are two issues with the current checkpointing mechanism using the callbacks:

They both are potential cause of data loss and have been reproduced manually as well.

DBZ-6026 -

The current code uses a method BaseSourceTask#logStatistics to log some information, however, it also ends up updating the offset map which is then used for a callback.

However, with this, there's a possibility that if there's a commit() callback after the statistics are logged and before the records are returned from the BaseSourceTask#poll, it will end up marking the checkpoint on service but if the connector restarts in that window, there will be a data loss.

Steps to reproduce:

  1. Put a sleep of 2 minutes after logStatistics() is called and before records are returned from BaseSourceTask#poll. a. Add a log so that we see when the method is getting called.
  2. Create a table with a single tablet.
  3. Insert a record and wait for the log in 1a to appear, that will indicate that records are not yet returned.
  4. Wait for commit callback - this generally comes before the sleep gets over.
  5. Restart the connector.
  6. Upon restart, the connector will start from the checkpoint in step 4 and the record inserted in step 3 will never be streamed.

DBZ-7816 -

According to Kafka docs, callbacks for the same Kafka partition are guaranteed to be in order but callbacks for different Kafka partitions can come out of order which can lead to a potential data loss window as mentioned in DBZ-7816

Steps to reproduce:

  1. Create a table with 20 tablets and fill it with 100k records.
  2. Create a topic with 50 partitions.
  3. Start snapshot on the table with snapshot.mode=initial
  4. Once the snapshot is finished, the tablet will be added to wait list and if the callbacks come out of order, it is possible that we will never receive a callback for the last snapshot record and we will never transition from snapshot to streaming.

Note that out of all the experiments performed for this issue, we were able to reproduce it 100% of the times but there's a possibility that it might not reproduce if callbacks are in order (1 out of 10 times maybe?)

Solution

This PR includes the changes to override the commit() method in YugabyteDBConnectorTask and reads the offsets from Kafka partitions and uses the same offsets to send via commit callbacks so that they can be marked on service for checkpointing.