scylladb / kafka-connect-scylladb

Kafka Connect Scylladb Sink
Apache License 2.0
42 stars 22 forks source link

Connector fails to start when TTL is set using "scylladb.ttl" #22

Open pushpavanthar opened 3 years ago

pushpavanthar commented 3 years ago

The Connector fails to start when "scylladb.ttl" config is set. There seems to be casting exception due to datatype of ttl in io.connect.scylladb.ScyllaDbSinkConnectorConfig.class.

The below exception is observed when a connector is POSTed.

2020-07-24 06:28:29,182] ERROR WorkerConnector{id=scylladb.sink.dp.ksql_t.poc_group_agg_cluster} Error while starting connector (org.apache.kafka.connect.runtime.WorkerConnector)
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
    at org.apache.kafka.common.config.AbstractConfig.getInt(AbstractConfig.java:173)
    at io.connect.scylladb.ScyllaDbSinkConnectorConfig.<init>(ScyllaDbSinkConnectorConfig.java:93)
    at io.connect.scylladb.ScyllaDbSinkConnector.start(ScyllaDbSinkConnector.java:45)
    at org.apache.kafka.connect.runtime.WorkerConnector.doStart(WorkerConnector.java:111)
    at org.apache.kafka.connect.runtime.WorkerConnector.start(WorkerConnector.java:136)
    at org.apache.kafka.connect.runtime.WorkerConnector.transitionTo(WorkerConnector.java:196)
    at org.apache.kafka.connect.runtime.Worker.startConnector(Worker.java:252)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startConnector(DistributedHerder.java:1079)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1300(DistributedHerder.java:117)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1095)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder$15.call(DistributedHerder.java:1091)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Connector config I used for reference :

{
    "connector.class": "io.connect.scylladb.ScyllaDbSinkConnector",
    "scylladb.port": "9042",
    "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "tasks.max": "1",
    "topics": "poc_group_agg_test",
    "scylladb.contact.points": "scylla.hostname.com",
    "transforms": "createKey",
    "behavior.on.error": "FAIL",
    "scylladb.password": "*********",
    "key.converter.schemas.enable": "false",
    "scylladb.username": "username",
    "scylladb.keyspace": "test",
    "transforms.createKey.fields": "GROUP_ID,WINDOW_START,WINDOW_END",
    "scylladb.security.enabled": "true",
    "scylladb.offset.storage.table.enable": "true",
    "scylladb.ttl": "600",
    "value.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter"
  }