SAP / kafka-connect-sap

Kafka Connect SAP is a set of connectors, using the Apache Kafka Connect framework for reliably connecting Kafka with SAP systems
Apache License 2.0
122 stars 56 forks source link

Sink Connector Setup for Kafka Cloud Topic to Hana DB Failing #130

Closed magno101 closed 2 years ago

magno101 commented 2 years ago

Hello,

I've been attempting to setup the connector to sync a topic in our Confluent Cloud Kafka Cluster to one of our HANA DBs, yet I cannot get past the following error:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

Caused by: org.apache.kafka.connect.errors.DataException: Cannot list fields on non-struct type
        at org.apache.kafka.connect.data.ConnectSchema.fields(ConnectSchema.java:179)
        at com.sap.kafka.connect.sink.hana.HANASinkRecordsCollector.add(HANASinkRecordsCollector.scala:64)
        at com.sap.kafka.connect.sink.hana.HANAWriter.$anonfun$write$2(HANAWriter.scala:56)
        at com.sap.kafka.connect.sink.hana.HANAWriter.$anonfun$write$2$adapted(HANAWriter.scala:44)
        at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
        at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:919)
        at scala.collection.IterableOps$WithFilter.foreach(Iterable.scala:889)
        at com.sap.kafka.connect.sink.hana.HANAWriter.write(HANAWriter.scala:44)
        at com.sap.kafka.connect.sink.GenericSinkTask.put(GenericSinkTask.scala:36)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
        ... 10 more

I've attempted to follow the example laid out in persons1ks (as our connector is deployed to a GKE cluster) and the image is deployed and working just fine. I can post the connector json config up without issue, but after a few moments I check the status of the connector: curl -s "https://<connector_dns_name>/connectors?expand=info&expand=status" | jq '. | to_entries[] ' I get the error above in the trace field.

The messages in the topic are nothing grand, just a flat json structure. Is this error pointing at an issue when trying to write the record to HANA?

Using kafka-connector-hana_2.13-0.9.3 and ngdbc-2.12.9

Here is my connector config:

{
    "name": "hana-test-sink",
    "config": {
        "connector.value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "connector.schemas.enable": "false",
        "connector.class": "com.sap.kafka.connect.sink.hana.HANASinkConnector",
        "tasks.max": "1",
        "topics": "my.topic.name",
        "connection.url": "jdbc:sap://<internal_dns_name>:30015/",
        "connection.user": "<SAPUSER>",
        "connection.password": "<SAPPW>",
        "my.topic.name.table.name": "\"SCHEMA\".\"TABLENAME\""
    }
}

Any help would be greatly appreciated.

Thanks!

elakito commented 2 years ago

This error is happening during the inspection of the records before storing them to HANA. That means there is something unexpected in the schema part of the records. How did you generate those input records? Did you use one of the available kafka source connectors?

magno101 commented 2 years ago

Thanks @elakito for the quick response.

I thought the settings "connector.schemas.enable": "false" was intended to ignore or not expect that the messages had a schema attached?

The messages are making their way into the topic via a custom application developed within the company (owned by another team so I'm not intimately familiar with it).

Do I need to change to using the AVRO schema and schama registry provided by Confluent Cloud in order to make this work?

magno101 commented 2 years ago

Just to report back: I did go ahead and define the schemas on the topics (AVRO) and rebuilt the container with all of the required jars to support the use of AVRO and the schema registry and I am still at the same point. No change as far as the error is concerned.

magno101 commented 2 years ago

To close out this one:

I created a JSON Schema on top of the Kafka topic in Confluent Cloud (the messages in the topic didn't include schema). After that was registered in schema registry I then moved back to using the following for my connector setup:

{
    "name": "hana-test-sink",
    "config": {
        "connector.value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "connector.schemas.enable": "false",
        "connector.class": "com.sap.kafka.connect.sink.hana.HANASinkConnector",
        "tasks.max": "1",
        "topics": "topic1,topic2",
        "connection.url": "jdbc:sap://<hana_db_host>:<hana_port>/",
        "connection.user": "<sap_un>",
        "connection.password": "<sap_pw>",
        "topic1.dev.table.name": "\"SCHEMA\".\"TABLE1\"",
        "topic2.table.name": "\"SCHEMA\".\"TABLE2\""
    }
  }

Only issue now is that we need to open up our FW to allow for the container to contact the HANA DB, but that's on us. Thanks!

elakito commented 2 years ago

@magno101 Thanks for the update. Both source and sink connectors assume using schemas to associate the type information of the payload. The schemas may be included in the message or referenced when using a schema registry.

Maybe there are some use cases where messages coming in from somewhere without a schema and someone needs to guess the schema. In that case, maybe we could use a custom Single Message Transformation (SMT) to add a custom schema, whether specifically configured or defaulted to string fields.