SAP / kafka-connect-sap

Kafka Connect SAP is a set of connectors, using the Apache Kafka Connect framework for reliably connecting Kafka with SAP systems
Apache License 2.0
119 stars 54 forks source link

enum type not supported in AVRO schema #115

Closed thibthibus closed 2 years ago

thibthibus commented 2 years ago

Hi, We are not able to setup a Sink connector on a topic that has fields defined as "type": "enum" in its AVRO Schema Registry. We are using the latest 0.9.2 release. The connector is configured with auto-create: true. We get the following exception: com.sap.kafka.utils.ConnectorException: Field Schema type name <Avro Schema field name> is invalid

See full stack below:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:609)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.sap.kafka.client.hana.HANAJdbcException: Creation of table YCHEDW_TEMPUS_CUSTOMFIELD_v92_ENUM failed
    at com.sap.kafka.client.hana.HANAJdbcClient.createTable(HANAJdbcClient.scala:159)
    at com.sap.kafka.connect.sink.hana.HANASinkRecordsCollector.add(HANASinkRecordsCollector.scala:158)
    at com.sap.kafka.connect.sink.hana.HANAWriter.$anonfun$write$2(HANAWriter.scala:56)
    at com.sap.kafka.connect.sink.hana.HANAWriter.$anonfun$write$2$adapted(HANAWriter.scala:44)
    at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:985)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:984)
    at com.sap.kafka.connect.sink.hana.HANAWriter.write(HANAWriter.scala:44)
    at com.sap.kafka.connect.sink.GenericSinkTask.put(GenericSinkTask.scala:36)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
    ... 10 more
Caused by: com.sap.kafka.utils.ConnectorException: Field Schema type name io.helvetia.tempus.customfield.EntityType is invalid
    at com.sap.kafka.utils.GenericSchemaBuilder.getLogicalTypeFromFieldSchema(GenericSchemaBuilder.scala:65)
    at com.sap.kafka.utils.GenericSchemaBuilder.$anonfun$avroToJdbcSchema$1(GenericSchemaBuilder.scala:23)
    at scala.collection.immutable.List.map(List.scala:297)
    at com.sap.kafka.utils.GenericSchemaBuilder.avroToJdbcSchema(GenericSchemaBuilder.scala:19)
    at com.sap.kafka.utils.GenericSchemaBuilder.avroToJdbcSchema$(GenericSchemaBuilder.scala:18)
    at com.sap.kafka.utils.hana.HANASchemaBuilder$.avroToHANASchema(HANASchemaBuilder.scala:15)
    at com.sap.kafka.client.hana.HANAJdbcClient.$anonfun$createTable$1(HANAJdbcClient.scala:160)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.sap.kafka.utils.ExecuteWithExceptions$.apply(ExecuteWithExceptions.scala:40)
    ... 25 more

In Schema Registry, this field is defined this way:

    {
      "name": "entityType",
      "type": [
        "null",
        {
          "type": "enum",
          "name": "EntityType",
          "symbols": [
            "Project",
            "Resource",
            "Assignment"
          ]
        }
      ],
      "doc": "Type of entity that can have associated values of custom field."
    },

Are only simple type supported ?

Thanks for your help on this.

Regards,

Thibaut

elakito commented 2 years ago

I think the enum type is currently not supported but I think it is probably easy to fix it. We need to have a look ...

thibthibus commented 2 years ago

Hi @elakito Thanks a lot !! I see that you already worked on a PR to fix this. Let me know if you want me to test this.

Regards,

Thibaut

elakito commented 2 years ago

@thibthibus Yes. The change is in master. If you could test it and give us your feedback, that would be great.

With this change, the sink connector will not reject an unknown custom type but will use its base type's mapping. For example, if your record generation code (either the source connector or some transformer) is using io.helvetia.tempus.customfield.EntityType for the AVRO enum type, this value will be injected as string. regards, aki

thibthibus commented 2 years ago

Hi @elakito,

I tested successfully this change I guess since the table was created. It fails however afterwards wihout being able to process any record with the same error INFO Target type -7 is incompatile with source type 16 See full logs: 1644441555_75031_EE57B850-A1DA-4978-8A09-F01E066A8475.txt

Thanks

Thibaut