scylladb / kafka-connect-scylladb

Kafka Connect Scylladb Sink
Apache License 2.0
42 stars 22 forks source link

Connect fails when "scylladb.offset.storage.table.enable" is set to "false" #21

Open pushpavanthar opened 3 years ago

pushpavanthar commented 3 years ago

The Scylladb sink connector works fine when "scylladb.offset.storage.table.enable" is set to true (when consumer group offset is maintained in scylladb table). However, when this config is set to false (consumer group offset maintained in Kafka Topic), my connect task fails with the below exception. I tried setting "timezone": "UTC", and "locale": "en" as a fix suggested by people facing similar exceptions from discussions in other sink-connector projects[1][2], but it still throws the same exception.

Below is my connect config for reference :

{
  "connector.class": "io.connect.scylladb.ScyllaDbSinkConnector",
  "transforms.valuefield.skip.missing.or.null": "false",
  "scylladb.port": "9042",
  "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
  "tasks.max": "1",
  "topics": "poc_group_agg_clutser",
  "scylladb.contact.points": "scylladb.hostname.com",
  "transforms": "createKey",
  "behavior.on.error": "FAIL",
  "scylladb.password": "**********",
  "key.converter.schemas.enable": "false",
  "scylladb.username": "username",
  "scylladb.keyspace": "test",
  "transforms.createKey.fields": "GROUP_ID",
  "scylladb.security.enabled": "true",
  "scylladb.offset.storage.table.enable": "false",
  "value.converter.schemas.enable": "false",
  "timezone": "UTC",
  "locale": "en",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter"
}
 [2020-07-23 12:01:33,153] ERROR WorkerSinkTask{id=scylladb.sink.dp.ksql_t.poc_group_agg_clutser-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.ConnectException: Exception occurred while extracting records from Kafka Sink Records.
    at io.connect.scylladb.ScyllaDbSinkTask.handleErrors(ScyllaDbSinkTask.java:249)
    at io.connect.scylladb.ScyllaDbSinkTask.put(ScyllaDbSinkTask.java:152)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
    at io.connect.scylladb.ScyllaDbSinkTask.put(ScyllaDbSinkTask.java:149)
    ... 11 more
[2020-07-23 12:01:33,153] WARN WorkerSinkTask{id=scylladb.sink.dp.ksql_t.poc_group_agg_clutser-0} Offset commit failed during close (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2020-07-23 12:01:33,153] ERROR WorkerSinkTask{id=scylladb.sink.dp.ksql_t.poc_group_agg_clutser-0} Commit of offsets threw an unexpected exception for sequence number 1: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
java.lang.NullPointerException
    at io.connect.scylladb.ScyllaDbSinkTask.preCommit(ScyllaDbSinkTask.java:234)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:378)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:590)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
[2020-07-23 12:01:33,153] ERROR WorkerSinkTask{id=scylladb.sink.dp.ksql_t.poc_group_agg_clutser-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Exception occurred while extracting records from Kafka Sink Records.
    at io.connect.scylladb.ScyllaDbSinkTask.handleErrors(ScyllaDbSinkTask.java:249)
    at io.connect.scylladb.ScyllaDbSinkTask.put(ScyllaDbSinkTask.java:152)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
    ... 10 more
Caused by: java.lang.NullPointerException
    at io.connect.scylladb.ScyllaDbSinkTask.put(ScyllaDbSinkTask.java:149)
    ... 11 more
[2020-07-23 12:01:33,153] ERROR WorkerSinkTask{id=scylladb.sink.dp.ksql_t.poc_group_agg_clutser-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
[2020-07-23 12:01:33,153] INFO Closing getValidSession (io.connect.scylladb.ScyllaDbSinkTask)
[2020-07-23 12:01:35,163] WARN WorkerSinkTask{id=scylladb.sink.dp.ksql_t.poc_group_agg_clutser-0} Offset commit failed during close (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2020-07-23 12:01:35,163] ERROR WorkerSinkTask{id=scylladb.sink.dp.ksql_t.poc_group_agg_clutser-0} Commit of offsets threw an unexpected exception for sequence number 2: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
java.lang.NullPointerException
    at io.connect.scylladb.ScyllaDbSinkTask.preCommit(ScyllaDbSinkTask.java:234)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:378)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:590)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1300(WorkerSinkTask.java:67)
    at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:666)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:288)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeavePrepare(ConsumerCoordinator.java:703)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.close(AbstractCoordinator.java:849)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:822)
    at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2211)
    at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2178)
    at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2128)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:164)
    at org.apache.kafka.connect.runtime.WorkerTask.doClose(WorkerTask.java:156)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:183)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
hartmut-co-uk commented 2 years ago

I run into a similar issue.

I didn't test (also not sure which code version the stacktrace applies to) but following might also fix this issue...

Part of https://github.com/scylladb/kafka-connect-scylladb/pull/61:

Bouncheck commented 1 year ago

From the looks of it I believe this was fixed with #66 (even though PR itself was not about this issue). Please reopen if the issue persists.