scylladb / kafka-connect-scylladb

Kafka Connect Scylladb Sink
Apache License 2.0
42 stars 22 forks source link

messages from kafka topic not inserted in scylla db #32

Closed HARIHARAN1103 closed 3 years ago

HARIHARAN1103 commented 3 years ago

after loading the connector using Kafka-connect-scylladb.json, no messages are inserted in Scylla DB.

HARIHARAN1103 commented 3 years ago

Followed the quickstart https://github.com/scylladb/kafka-connect-scylladb/blob/master/documentation/QUICKSTART.md

but scylla db is not started using docker image, since Scylla DB is already present in the machine with below version:

ScyllaDB: cqlsh 5.0.1 | Cassandra 3.0.8 | CQL spec 3.3.1 | Native protocol v4

checked plugin loader using:

curl -sS localhost:8083/connector-plugins | jq .[].class | grep ScyllaDbSinkConnector

and got the ouput as mentioned in quick start:

"io.connect.scylladb.ScyllaDbSinkConnector" below Kafka-connect-scylladb.json load connector config is used

{
     "name" : "scylladb-sink-connector",
     "config" : {
       "connector.class" : "io.connect.scylladb.ScyllaDbSinkConnector",
       "tasks.max" : "1",
       "topics" : "topic1,topic2,mqtt-malaria",
       "scylladb.contact.points" : "localhost:9042",
       "scylladb.keyspace" : "mqtest"
       }
}

tried the same message to check:

kafka-avro-console-producer 
--broker-list localhost:9092 
--topic topic1  
--property parse.key=true 
--property key.schema='{"type":"record","name":"key_schema","fields":[{"name":"id","type":"int"}]}' 
--property "key.separator=$" 
--property value.schema='{"type":"record","name":"value_schema","fields":[{"name":"id","type":"int"},
{"name":"firstName","type":"string"},{"name":"lastName","type":"string"}]}'

even after creating a table with keyspace maually, the messages are not inserted in DB.

Is that any config or setup is missed in the quick start? any config is need to add in Kafka? How to confirm Scylla is connected to kafka

avelanarius commented 3 years ago

Could you also post messages on the topic? Command to do it:

kafka-avro-console-consumer --broker-list localhost:9092 --topic topic1 --from-beginning --property print.key=true

(note that on my system --broker-list is called --bootstrap-server)

And also print logs of connector? Command to do it:

confluent local services connect log

(command as of Confluent 6.0.0, if it doesn't work maybe look at /tmp/confluent.XXXXXX/connect/logs/connect.log, where XXXXXX are some numbers)

HARIHARAN1103 commented 3 years ago

Could you also post messages on the topic? Command to do it:

kafka-avro-console-consumer --broker-list localhost:9092 --topic topic1 --from-beginning --property print.key=true

(note that on my system --broker-list is called --bootstrap-server)

And also print logs of connector? Command to do it:

confluent local services connect log

(command as of Confluent 6.0.0, if it doesn't work maybe look at /tmp/confluent.XXXXXX/connect/logs/connect.log, where XXXXXX are some numbers)

posted messages are hitting in consumer, Scylla DB connection issue exists.

checked confluent local services connect log changed config from

{
     "name" : "scylladb-sink-connector",
     "config" : {
       "connector.class" : "io.connect.scylladb.ScyllaDbSinkConnector",
       "tasks.max" : "1",
       "topics" : "topic1,topic2,mqtt-malaria",
       "scylladb.contact.points" : "localhost:9042",
       "scylladb.keyspace" : "mqtest"
       }
}

to

 {
  "name" : "scylladb-sink-connector",
  "config" : {
    "connector.class" : "io.connect.scylladb.ScyllaDbSinkConnector",
    "tasks.max" : "1",
    "topics" : "topic1",
    "scylladb.contact.points" : "localhost",
    "scylladb.port":"9042",
    "scylladb.keyspace" : "mqtest",
    "key.converter" : "org.apache.kafka.connect.storage.StringConverter",
    "value.converter" : "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable" : "true",
    "value.converter.schemas.enable" : "true",
    "transforms" : "createKey",
    "transforms.createKey.fields" : "id",
    "transforms.createKey.type" : "org.apache.kafka.connect.transforms.ValueToKey"
  }
}

and passed message in topic1 as : {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"emailid"},{"type":"string","optional":false,"field":"ip"},{"type":"string","optional":false,"field":"browser"},{"type":"string","optional":false,"field":"pagetype"}],"payload":{"id":1,"emailid":"uaer@gmail.com","ip":"172.123.145.000","browser":"chrome6.0","pagetype":"home"}}}

message in consumer:

./bin/kafka-console-consumer --topic topic1 --from-beginning --bootstrap-server localhost:9092 --property print.key=true

null    {"schema":{"type":"map","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"emailid"},{"type":"string","optional":false,"field":"ip"},{"type":"string","optional":false,"field":"browser"},{"type":"string","optional":false,"field":"pagetype"}],"payload":{"id":1,"emailid":"uaer@gmail.com","ip":"172.123.145.000","browser":"chrome6.0","pagetype":"home"}}}

and got exception in connection log:

[2021-03-18 09:17:21,716] ERROR WorkerSinkTask{id=scylladb-sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:187)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
    at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:525)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [copying fields from value to key], found: java.lang.String
    at org.apache.kafka.connect.transforms.util.Requirements.requireStruct(Requirements.java:52)
    at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:81)
    at org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:67)
    at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
    ... 14 more
avelanarius commented 3 years ago

and passed message in topic1 as : {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"emailid"},{"type":"string","optional":false,"field":"ip"},{"type":"string","optional":false,"field":"browser"},{"type":"string","optional":false,"field":"pagetype"}],"payload":{"id":1,"emailid":"uaer@gmail.com","ip":"172.123.145.000","browser":"chrome6.0","pagetype":"home"}}}

message in consumer:

./bin/kafka-console-consumer --topic topic1 --from-beginning --bootstrap-server localhost:9092 --property print.key=true

null  {"schema":{"type":"map","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"emailid"},{"type":"string","optional":false,"field":"ip"},{"type":"string","optional":false,"field":"browser"},{"type":"string","optional":false,"field":"pagetype"}],"payload":{"id":1,"emailid":"uaer@gmail.com","ip":"172.123.145.000","browser":"chrome6.0","pagetype":"home"}}}

So the problem is that your messages in Kafka don't have a key properly set (the key is null, as printed by kafka-console-consumer). The key should be filled in and only contain fields that will be a part of PRIMARY KEY in destination Scylla cluster.

I see that you tried to use ValueToKey, but it fails (note that this transformation is not a part of Scylla Sink Connector and was not developed by us). My guess is that you are using org.apache.kafka.connect.storage.StringConverter instead of org.apache.kafka.connect.json.JsonConverter and because of that ValueToKey can't extract id field.

avelanarius commented 3 years ago

I have been successfully able to configure the connector with the following configuration:

{
  "name": "scylladb-sink-connector",
  "config": {
    "connector.class": "io.connect.scylladb.ScyllaDbSinkConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "transforms": "createKey",
    "topics": "topic1",
    "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.createKey.fields": "id",
    "scylladb.contact.points": "127.0.0.1",
    "scylladb.port": "9042",
    "scylladb.keyspace": "ks",
    "key.converter.schemas.enable": "true",
    "value.converter.schemas.enable": "true"
  }
}

And by sending the following message to a topic.

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"emailid"},{"type":"string","optional":false,"field":"ip"},{"type":"string","optional":false,"field":"browser"},{"type":"string","optional":false,"field":"pagetype"}]},"payload":{"id":1,"emailid":"uaer@gmail.com","ip":"172.123.145.000","browser":"chrome6.0","pagetype":"home"}}

Before, your posted JSON was invalid, as the payload field was erroneously in schema instead of being top level: Invalid and valid JSON

HARIHARAN1103 commented 3 years ago

I have been successfully able to configure the connector with the following configuration:

{
  "name": "scylladb-sink-connector",
  "config": {
    "connector.class": "io.connect.scylladb.ScyllaDbSinkConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "transforms": "createKey",
    "topics": "topic1",
    "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.createKey.fields": "id",
    "scylladb.contact.points": "127.0.0.1",
    "scylladb.port": "9042",
    "scylladb.keyspace": "ks",
    "key.converter.schemas.enable": "true",
    "value.converter.schemas.enable": "true"
  }
}

And by sending the following message to a topic.

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"emailid"},{"type":"string","optional":false,"field":"ip"},{"type":"string","optional":false,"field":"browser"},{"type":"string","optional":false,"field":"pagetype"}]},"payload":{"id":1,"emailid":"uaer@gmail.com","ip":"172.123.145.000","browser":"chrome6.0","pagetype":"home"}}

Before, your posted JSON was invalid, as the payload field was erroneously in schema instead of being top level: Invalid and valid JSON

It was a documentation issue, noticed that one, now the messages are inserting. Thanks for reply!