an0r0c / kafka-connect-transform-tojsonstring

transform-to-json-string is a Single Message Transformation (SMT) for Apache Kafka® Connect to convert a given Connect Record to a single JSON String. It's an UNOFFICIAL community project.
Apache License 2.0
33 stars 15 forks source link

Support null encodings in Arrays #19

Open citygent opened 5 months ago

citygent commented 5 months ago

Hey,

When using an Avro Schema such as array<union{SomeType, null}> (avdl, in AVSC:

{
  "name" : "arrayOfRecordOptional",
  "type" : {
    "type" : "array",
    "items" : [ "string", "null" ]
  }
}

)

And sending a message like: ['hi', 'this', 'is', null, 'legit', 'right?']

The SchemaRegistry will rightfully let this thought as a validated message, however I get errors in my worker:

    tasks:
    - id: 0
      state: FAILED
      trace: "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded
        in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:223)\n\tat
        org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:149)\n\tat
        org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)\n\tat
        org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:540)\n\tat
        org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:493)\n\tat
        org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:332)\n\tat
        org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)\n\tat
        org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)\n\tat
        org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)\n\tat
        org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)\n\tat
        java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat
        java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat
        java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat
        java.base/java.lang.Thread.run(Thread.java:833)\nCaused by: org.apache.kafka.connect.errors.DataException:
        error while processing field recordOfArrayCases\n\tat com.github.cedelsb.kafka.connect.smt.converter.AvroJsonSchemafulRecordConverter.processField(AvroJsonSchemafulRecordConverter.java:135)\n\tat
        com.github.cedelsb.kafka.connect.smt.converter.AvroJsonSchemafulRecordConverter.lambda$toBsonDoc$0(AvroJsonSchemafulRecordConverter.java:90)\n\tat
        java.base/java.util.ArrayList.forEach(ArrayList.java:1511)\n\tat java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1092)\n\tat
        com.github.cedelsb.kafka.connect.smt.converter.AvroJsonSchemafulRecordConverter.toBsonDoc(AvroJsonSchemafulRecordConverter.java:90)\n\tat
        com.github.cedelsb.kafka.connect.smt.converter.AvroJsonSchemafulRecordConverter.convert(AvroJsonSchemafulRecordConverter.java:76)\n\tat
        com.github.cedelsb.kafka.connect.smt.Record2JsonStringConverter.applyWithSchema(Record2JsonStringConverter.java:110)\n\tat
        com.github.cedelsb.kafka.connect.smt.Record2JsonStringConverter.apply(Record2JsonStringConverter.java:99)\n\tat
        org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)\n\tat
        org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:173)\n\tat
        org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:207)\n\t...
        14 more\nCaused by: org.apache.kafka.connect.errors.DataException: error while
        processing field arrayOfRecordOptional\n\tat 

... redacted lines ....

com.github.cedelsb.kafka.connect.smt.converter.AvroJsonSchemafulRecordConverter.processField(AvroJsonSchemafulRecordConverter.java:120)\n\t...
        30 more\nCaused by: java.lang.NullPointerException: Cannot invoke \"org.apache.kafka.connect.data.Struct.get(org.apache.kafka.connect.data.Field)\"
        because \"struct\" is null\n\tat com.github.cedelsb.kafka.connect.smt.converter.AvroJsonSchemafulRecordConverter.processField(AvroJsonSchemafulRecordConverter.java:114)\n\t...
        36 more\n"

Is this an easy fix? We're using this in Confluent Cloud to Materalise Views and this limitation hurts in an otherwise great workaround for the limitations on arrays.

Thanks

an0r0c commented 5 months ago

Hi,

I was able to reproduce it - but it was not too easy. If I translated the given avro schema manually in a Kafka Connect Schema to try this pattern with a unit test it worked as it should. When I implemented an Integration Test where I worked with real Avro i were able to reproduce it. Something seems to be mapped different from Avro Schema to Kafka Connect Schema in this pattern. I need to debug through to understand the difference and to see how it can be fixed.

I cannot give you any promises when I will have time for that.