yugabyte / debezium-connector-yugabytedb

A Debezium CDC connector for the YugabyteDB database
https://docs.yugabyte.com/stable/explore/change-data-capture/using-logical-replication/yugabytedb-connector/
Apache License 2.0
12 stars 8 forks source link

[DBZ] Skip adding column to change event if received tuple is empty #337

Closed vaibhav-yb closed 4 months ago

vaibhav-yb commented 4 months ago

Problem

For cases where an existing column has a null value set as its default, when the service sends a before image, the tuple is empty and we get the old tuple with the following structure:

old_tuple {
}

In turn, when the connector access its column type (OID) i.e. getOldTuple().getColumnType() it returns a default value which is 0 (zero) and since this OID is not cached by the connector, it tries to look it up on the database which fails since the connector is not supposed to look up for any OID at the runtime, the failure stacktrace is similar to:

org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {}
    at io.debezium.connector.yugabytedb.YugabyteDBEventDispatcher.dispatchDataChangeEvent(YugabyteDBEventDispatcher.java:154)
    at io.debezium.connector.yugabytedb.YugabyteDBStreamingChangeEventSource.getChanges2(YugabyteDBStreamingChangeEventSource.java:740)
    at io.debezium.connector.yugabytedb.YugabyteDBStreamingChangeEventSource.execute(YugabyteDBStreamingChangeEventSource.java:152)
    at io.debezium.connector.yugabytedb.YugabyteDBStreamingChangeEventSource.execute(YugabyteDBStreamingChangeEventSource.java:48)
    at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:174)
    at io.debezium.connector.yugabytedb.YugabyteDBChangeEventSourceCoordinator.executeChangeEventSources(YugabyteDBChangeEventSourceCoordinator.java:150)
    at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:109)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.UnsupportedOperationException
    at io.debezium.connector.yugabytedb.YugabyteDBTaskConnection.prepareStatement(YugabyteDBTaskConnection.java:216)
    at io.debezium.connector.yugabytedb.YugabyteDBTypeRegistry.resolveUnknownType(YugabyteDBTypeRegistry.java:483)
    at io.debezium.connector.yugabytedb.YugabyteDBTypeRegistry.get(YugabyteDBTypeRegistry.java:251)
    at io.debezium.connector.yugabytedb.connection.pgproto.YbProtoReplicationMessage.lambda$transform$0(YbProtoReplicationMessage.java:122)
    at java.base/java.util.stream.IntPipeline$1$1.accept(IntPipeline.java:180)
    at java.base/java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104)
    at java.base/java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:699)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
    at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
    at io.debezium.connector.yugabytedb.connection.pgproto.YbProtoReplicationMessage.transform(YbProtoReplicationMessage.java:152)
    at io.debezium.connector.yugabytedb.connection.pgproto.YbProtoReplicationMessage.getOldTupleList(YbProtoReplicationMessage.java:96)
    at io.debezium.connector.yugabytedb.YugabyteDBChangeRecordEmitter.getOldColumnValues(YugabyteDBChangeRecordEmitter.java:124)
    at io.debezium.connector.yugabytedb.YugabyteDBChangeRecordEmitter.emitUpdateRecord(YugabyteDBChangeRecordEmitter.java:346)
    at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:53)
    at io.debezium.connector.yugabytedb.YugabyteDBChangeRecordEmitter.emitChangeRecords(YugabyteDBChangeRecordEmitter.java:99)
    at io.debezium.connector.yugabytedb.YugabyteDBEventDispatcher.dispatchDataChangeEvent(YugabyteDBEventDispatcher.java:124)
    ... 11 more

Solution

This PR has a fix for the connector failure which is to ignore a tuple if its contents are empty and proceed without adding that column.

Tests added

The added tests can be run using:

mvn clean test -Dtest=YugabyteDBBeforeImageTest#shouldWorkWhenDefaultOldValueIsNull

mvn clean test -Dtest=YugabyteDBBeforeImageTest#shouldWorkWhenDefaultHasOldValue