scylladb / scylla-cdc-source-connector

A Kafka source connector capturing Scylla CDC changes
Apache License 2.0
48 stars 18 forks source link

Implemented record simplified new state extractor #11

Closed Lorak-mmk closed 3 years ago

Lorak-mmk commented 3 years ago

This extractor transforms record in a similar way to io.debezium.transforms.ExtractNewRecordState, but does not differentiate between NULL and no value. Thanks to this, values don't need to be packed in a 1-field struct, and can be represented directly instead.

avelanarius commented 3 years ago

This transform crashes under such scenario:

cqlsh> CREATE KEYSPACE ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1};
cqlsh> CREATE TABLE ks.t(pk int, ck int, v int, PRIMARY KEY(pk, ck)) WITH cdc = {'enabled': true};
cqlsh> INSERT INTO ks.t(pk, ck, v) VALUES (1, 2, 3);
cqlsh> INSERT INTO ks.t(pk, ck, v) VALUES (4, 5, 6);
cqlsh> INSERT INTO ks.t(pk, ck, v) VALUES (7, 8, 9);
cqlsh> INSERT INTO ks.t(pk, ck) VALUES (666, 666);
cqlsh> DELETE FROM ks.t WHERE pk = 666 AND ck = 666;
cqlsh> DELETE FROM ks.t WHERE pk = 1 AND ck = 2;
cqlsh> INSERT INTO ks.t(pk, ck, v) VALUES (10, 12, 13);
cqlsh> INSERT INTO ks.t(pk, ck, v) VALUES (14, 15, 18);
cqlsh> UPDATE ks.t SET v = 111 WHERE pk = 14 AND ck = 15;
cqlsh> INSERT INTO ks.t(pk, ck, v) VALUES (777, 77, 777);
cqlsh> DELETE FROM ks.t WHERE pk = 777 AND ck = 77;

Connector configuration:

{
  "name": "TransformCrashExample",
  "config": {
    "connector.class": "com.scylladb.cdc.debezium.connector.ScyllaConnector",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "transforms": "scyllatransform",
    "transforms.scyllatransform.type": "com.scylladb.cdc.debezium.connector.ScyllaExtractSimpleNewRecordState",
    "transforms.scyllatransform.delete.handling.mode": "rewrite",
    "transforms.scyllatransform.add.fields": "source.ts_ms:EVENT_TIMESTAMP",
    "transforms.scyllatransform.drop.tombstones": "false",
    "scylla.cluster.ip.addresses": "127.0.0.2:9042",
    "scylla.name": "MyNamespace",
    "scylla.table.names": "ks.t"
  }
}

Exception:

[2021-08-12 12:08:47,575] ERROR WorkerSourceTask{id=MyConn-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:187)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:196)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:122)
        at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:339)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
        at com.scylladb.cdc.debezium.connector.ScyllaExtractSimpleNewRecordState.apply(ScyllaExtractSimpleNewRecordState.java:25)
        at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:146)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:180)
        ... 11 more

Sorry for not bisecting which exact CQL statement causes the crash (tried many at once).

pkgonan commented 3 years ago

@avelanarius Hi, When is the release date of this feature?

avelanarius commented 3 years ago

@pkgonan We have one more feature planned for the very near future (ability to configure connection SSL parameters) and then we will release the next version.