scylladb / kafka-connect-scylladb

Kafka Connect Scylladb Sink
Apache License 2.0
49 stars 23 forks source link

Fails when topic name and scylla table names are different in config `topic.my_topic.my_ks.my_table.mapping` #26

Open pushpavanthar opened 4 years ago

pushpavanthar commented 4 years ago

The connector fails with the below exception when mapping is provided with different topic and table names. It doesn't pick the table name from the mapping config topic.my_topic.my_ks.my_table.mapping. Whereas datastax Cassandra-sink connector seems to handle this seamlessly for cassandra db.

[2020-10-07 10:13:16,763] ERROR WorkerSinkTask{id=scylladb.sink.dp.ksql_t.poc_group_agg.test-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask) org.apache.kafka.connect.errors.ConnectException: Exception occurred while extracting records from Kafka Sink Records. at io.connect.scylladb.ScyllaDbSinkTask.handleErrors(ScyllaDbSinkTask.java:250) at io.connect.scylladb.ScyllaDbSinkTask.put(ScyllaDbSinkTask.java:153) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NullPointerException at io.connect.scylladb.ScyllaDbSessionImpl.createInsertPreparedStatement(ScyllaDbSessionImpl.java:147) at io.connect.scylladb.ScyllaDbSessionImpl.access$300(ScyllaDbSessionImpl.java:33) at io.connect.scylladb.ScyllaDbSessionImpl$2.apply(ScyllaDbSessionImpl.java:171) at io.connect.scylladb.ScyllaDbSessionImpl$2.apply(ScyllaDbSessionImpl.java:168) at java.util.HashMap.computeIfAbsent(HashMap.java:1127) at io.connect.scylladb.ScyllaDbSessionImpl.insert(ScyllaDbSessionImpl.java:166) at io.connect.scylladb.ScyllaDbSinkTaskHelper.getBoundStatementForRecord(ScyllaDbSinkTaskHelper.java:86) at io.connect.scylladb.ScyllaDbSinkTask.put(ScyllaDbSinkTask.java:113) ... 11 more

hartmut-co-uk commented 2 years ago

It might not be what the datastax sink connector provides, also maybe not what you're looking for.. but it's also doable via SMT RegexRouter:

{
  "name": "my_connector",
  "config": {
    "connector.class": "io.connect.scylladb.ScyllaDbSinkConnector",
    "tasks.max": "1",
    "topics": "my_topic",
    "scylladb.contact.points" : "some-scylla",
    "scylladb.keyspace" : "my_ks",
    "scylladb.keyspace.create.enabled": "false",
    "scylladb.table.manage.enabled": "false",
    "scylladb.consistency.level": "ONE",
    "scylladb.offset.storage.table.enable": "false",

    "transforms"                          : "renameTopic",
    "transforms.renameTopic.type"        : "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.renameTopic.regex"       : "my_topic",
    "transforms.renameTopic.replacement" : "my_table",
  }
}