SAP / kafka-connect-sap

Kafka Connect SAP is a set of connectors, using the Apache Kafka Connect framework for reliably connecting Kafka with SAP systems
Apache License 2.0
119 stars 54 forks source link

Sink connector doesn't work after upgrading to 0.9.2 (com.sap.kafka.utils.SchemaNotMatchedException) #116

Closed thibthibus closed 2 years ago

thibthibus commented 2 years ago

Hi, After upgrading to the latest 0.9.2 release, one of our Sink connectors stopped working with the following error:

Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-ch-edwh-jdbc-sink-ch-tempus-0-0-3-0]
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:609)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.sap.kafka.utils.SchemaNotMatchedException: Table "HELVETIA_CH9970"."YCHEDW_TEMPUS_CUSTOMFIELD_TEXT" has an incompatible schema to the record Schema.
Auto Evolution of schema is not supported
    at com.sap.kafka.connect.sink.hana.HANASinkRecordsCollector.add(HANASinkRecordsCollector.scala:106)
    at com.sap.kafka.connect.sink.hana.HANAWriter.$anonfun$write$2(HANAWriter.scala:56)
    at com.sap.kafka.connect.sink.hana.HANAWriter.$anonfun$write$2$adapted(HANAWriter.scala:44)
    at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984)
    at com.sap.kafka.connect.sink.hana.HANAWriter.write(HANAWriter.scala:44)
    at com.sap.kafka.connect.sink.GenericSinkTask.put(GenericSinkTask.scala:36)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
    ... 10 more

In the topic there is a string field that can be up to 13'000 characters long. It has been defined as "type": "string" in its AVRO schema. For this field the corresponding column in the HANA table is of type TEXT (to prevent the 5000 limit of VARCHAR type).

It used to work fine with version 0.9.1 of the HANA connector but the changes (additionals controls ?) made in HANASinkRecordsCollector in 0.9.2 are now preventing this setup (type string in AVRO schema and tpye TEXT in HANA DB column) to work.

What should we do ? Change the field type in Schema Registry ?

Thanks for your help

Regards,

Thibaut

elakito commented 2 years ago

@thibthibus Maybe before 0.9.2, the type was not checked and it worked. We should accept certain type mismatches so that data can be transferred between those columns. I'll have a look...

elakito commented 2 years ago

The fix has been merged to master. If you could test it and give us your feedback, that would be great. regards, aki

thibthibus commented 2 years ago

Hi @elakito,

I tested and got the same error INFO Target type -7 is incompatile with source type 16 What are these numbers referring to ?

I thought this was a different case (because here I try to write in a table that has not been created by auto.create=true due to the VARCHAR(5000) limitation) but the error in the same..

Thanks

thibthibus commented 2 years ago

Thanks @elakito. Now with the latest change on boolean/bit it worked.