SAP / kafka-connect-sap

Kafka Connect SAP is a set of connectors, using the Apache Kafka Connect framework for reliably connecting Kafka with SAP systems
Apache License 2.0
122 stars 56 forks source link

Sink connector is not working #69

Closed pavangatta1718 closed 3 years ago

pavangatta1718 commented 3 years ago

Hello,

I am currently working on sink connector to push the data into hana db from my kafka topic. Using confluent kafka connect 5.5.0 version and when i am creating a connector, getting the below error. Can you please help me in fixing this?

Capture

pavangatta1718 commented 3 years ago

Can anyone please help me to solve the mentioned issue?

" "trace": "java.lang.NoClassDefFoundError: scala/Product$class\n\tat com.sap.kafka.connect.config.hana.HANAConfig.(HANAConfig.scala:6)\n\tat com.sap.kafka.connect.config.hana.HANAParameters$.getConfig(HANAParameters.scala:30)\n\tat com.sap.kafka.connect.sink.hana.HANASinkTask.start(HANASinkTask.scala:20)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:305)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\n"

elakito commented 3 years ago

@pavangatta1718 An earlier version of this connector supported Kafka with Scala 2.11 but the current version supports only Kafka with Scala 2.12 or 2.13. The above error indicates that you Kafka (i.e., the Kafka Connect instance where the connector is installed) is of Scala 2.11 version.

pavangatta1718 commented 3 years ago

Hello Elakito,

Many thanks for your quick respone. Do you have the steps/link to access the earlier version of this connector? I will plan to upgrade the scala version in upcoming weeks and then i will use the latest version of this connector.

Thanks Again in advance.

pavangatta1718 commented 3 years ago

And i forgot to mention that, we are using scala version of 2.12(See below jar files). In this case, the connector should work as you mentioned earlier. Or is it like the connector should only work with scala version 2.13?

/usr/share/java/kafka/kafka_2.12-5.5.0-ce-scaladoc.jar /usr/share/java/kafka/scala-collection-compat_2.12-2.1.3.jar /usr/share/java/kafka/scala-logging_2.12-3.9.2.jar /usr/share/java/kafka/jackson-module-scala_2.12-2.10.2.jar /usr/share/java/kafka/scala-reflect-2.12.10.jar /usr/share/java/kafka/scala-java8-compat_2.12-0.9.0.jar /usr/share/java/kafka/kafka-streams-scala_2.12-5.5.0-ce.jar /usr/share/java/kafka/scala-library-2.12.10.jar

Thanks again for your help.

elakito commented 3 years ago

HI @pavangatta1718, it is good to hear that you are using scala-2.12. In that case, you can just grab the 2.12 version of the jar file from https://github.com/SAP/kafka-connect-sap/releases/tag/0.9.0 or you can build your own snapshot jar from the master branch with mvn clean install. Either version should work with your Kafka connect instance.

regards, aki

pavangatta1718 commented 3 years ago

Hello Elakito,

Thanks for your response. The earlier version of jar files worked for me. But now i am getting below error by stating invalid table name and we need to follow the naming convention.

"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:568)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: com.sap.kafka.client.hana.HANAConfigInvalidInputException: The table name mentioned in {topic}.table.name is invalid. Does not follow naming conventions\n\tat com.sap.kafka.connect.sink.hana.HANASinkRecordsCollector.getTableName(HANASinkRecordsCollector.scala:180)\n\tat com.sap.kafka.connect.sink.hana.HANASinkRecordsCollector.add(HANASinkRecordsCollector.scala:51)\n\tat com.sap.kafka.connect.sink.hana.HANAWriter.$anonfun$write$2(HANAWriter.scala:56)\n\tat com.sap.kafka.connect.sink.hana.HANAWriter.$anonfun$write$2$adapted(HANAWriter.scala:44)\n\tat scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)\n\tat scala.collection.Iterator.foreach(Iterator.scala:941)\n\tat scala.collection.Iterator.foreach$(Iterator.scala:941)\n\tat scala.collection.AbstractIterator.foreach(Iterator.scala:1429)\n\tat scala.collection.IterableLike.foreach(IterableLike.scala:74)\n\tat scala.collection.IterableLike.foreach$(IterableLike.scala:73)\n\tat scala.collection.AbstractIterable.foreach(Iterable.scala:56)\n\tat scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)\n\tat com.sap.kafka.connect.sink.hana.HANAWriter.write(HANAWriter.scala:44)\n\tat com.sap.kafka.connect.sink.GenericSinkTask.put(GenericSinkTask.scala:36)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)\n\t... 10 more\n"

in kafka topic, data is in of json format. is this connector works with json format data? or am i missing something?

Thanks & Regards, Pavan Gatta

pavangatta1718 commented 3 years ago

Below is the connector configuration.

{ "name": "saphana-sink-test", "config" : { "connector.class":"com.sap.kafka.connect.sink.hana.HANASinkConnector", "tasks.max" : "1", "topics":"testingtopic", "connection.url":"jdbc:sap://:30015/", "connection.user":"", "connection.password":"", "auto.create":"true", "testingtopic.table.name":"***"."TABLE" } }

elakito commented 3 years ago

@pavangatta1718 The current code requires the DB schema name (e.g., when using the user's DB schema, that would correspond to the user name). I don't know how you exactly configured the connection.user and the schema name part of the table name. The error comes from this syntax check. The table name has to look like "xxx"."xxx".

elakito commented 3 years ago

regarding your question on the message formats, the connector will see only the records as Kafka-connect's API records. The format conversion is handled by the kafka-connect's converters and json is the default format used there.

pavangatta1718 commented 3 years ago

@elakito Thanks for your responses so far. its being very helpful. May be just last question from my side. To overcome the above issue, i had manually created a table in hana data base with all the fields and data types. But when i am trying to run the connector(with simple json data in topic), its still failing with error "table has a different schema from the record schema and Auto evolution of schema is not supported".

Does this support auto evolve of schems, if not how do we solve this issue? Please help. Or if i use avro format, do i need to use any converters in the connector configuration. Since you said that the connector will support conversion of json by default.

"trace": "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:568)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: com.sap.kafka.utils.SchemaNotMatchedException: Table \"Schemaname\".\"Table1\" has a different schema from the Record Schema.\nAuto Evolution of schema is not supported\n \n\tat com.sap.kafka.connect.sink.hana.HANASinkRecordsCollector.add(HANASinkRecordsCollector.scala:83)\n\tat com.sap.kafka.connect.sink.hana.HANAWriter.$anonfun$write$2(HANAWriter.scala:56)\n\tat com.sap.kafka.connect.sink.hana.HANAWriter.$anonfun$write$2$adapted(HANAWriter.scala:44)\n\tat scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)\n\tat scala.collection.Iterator.foreach(Iterator.scala:941)\n\tat scala.collection.Iterator.foreach$(Iterator.scala:941)\n\tat scala.collection.AbstractIterator.foreach(Iterator.scala:1429)\n\tat scala.collection.IterableLike.foreach(IterableLike.scala:74)\n\tat scala.collection.IterableLike.foreach$(IterableLike.scala:73)\n\tat scala.collection.AbstractIterable.foreach(Iterable.scala:56)\n\tat scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)\n\tat com.sap.kafka.connect.sink.hana.HANAWriter.write(HANAWriter.scala:44)\n\tat com.sap.kafka.connect.sink.GenericSinkTask.put(GenericSinkTask.scala:36)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)\n\t... 10 more\n"

elakito commented 3 years ago

@pavangatta1718 I don't know what you mean by overcoming the above problem and why manually creating the table will solve the problem. The original problem which you had was that you didn't specify the table schema name. So, unless you replace this empty string, I suppose it won't work anyway. And If you replace it with a non-empty string, the table will be automatically created and it should work.

Regarding the schema evolution, certain types of changes (i.e., those that can be accommodated by the ALTER TABLE command) can be supported but currently no schema change is supported by this sink connector. But you don't need schema evolution unless your table schema is changing (adding null-able or default-valued columns, etc). In your case, I think the table that you manually created didn't match the expected table schema. Did you compare the table schema with the one created automatically at the user's DB schema?

pavangatta1718 commented 3 years ago

@elakito Thanks for your response on this. I did actually specified the schema and table name(like "schema.TABLE" ) as well. I had made it to empty string while i am copying the content here(Similarly to the connection url). Sorry, if this is confused you. But i attached below the screenshot.

And to compare the schemas, the connector is not created any table automatically in target(HANA DB) so far. I only have the topic data(in JSON format) and with that i have tried creating a table in HANA DB and the connector fails to insert the data into that table and says that schema is different. I would really like to see that the auto table creation works but not in this case. Please suggest.

Saphanasink

elakito commented 3 years ago

If your configuration used a non-empty table schema name as shown above and there is no table declared, the connector shouldn't be throwing the naming convention error. The table name mentioned in {topic}.table.name is invalid. Does not follow naming conventions nor the schema changed exception.

The schema changed exception indicates the table already exists and it is not matching the input message. So, just delete the table and run your scenario again and also post the output from kafka-console-consumer consuming from this topic including the key value using --property print.key=true? You mentioned that you are using json, so we can look at the schema definition.