Aiven-Open / cloud-storage-connectors-for-apache-kafka

Repository containing Cloud Storage Connectors for Apache Kafka®
Apache License 2.0
11 stars 16 forks source link

There is an issue handling avro schema with default value defined as String #273

Open rhatlapa opened 1 year ago

rhatlapa commented 1 year ago

Having an avro schema with a field defined like this:

      {
            "name": "osType",
            "type": [
                "string"
            ],
            "default": "UNKNOWN"
        },

Results in failure to process it with this kind of a stacktrace:

org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value
    at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)
    at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1564)
    at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1445)
    at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1323)
    at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1047)
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:92)
    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:540)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:496)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:496)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type STRUCT: class java.lang.String for field: "null"
    at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:241)
    at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)
    at org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)
    ... 23 more

The issue seems to be caused by using too old versions of org.apache.kafka:connect-api:1.1.0 & io.confluent:kafka-avro-serializer:4.1.4 => I suggest to update the dependencies to resolve the issue.

gharris1727 commented 1 year ago

Caused by: org.apache.kafka.connect.errors.DataException: Invalid Java object for schema type STRUCT: class java.lang.String for field: "null"

@rhatlapa Can you share the complete schema you're using to reproduce? The type STRUCT: and field: "null" makes it look like this default value is applied on the struct level, not the field level. This would mean that the default that is causing problems is not the one you've shown, but one somewhere else in the containing schema.

Also providing non-trivial default values to structs is currently not supported, see https://issues.apache.org/jira/browse/KAFKA-12694 for more details about that.

The issue seems to be caused by using too old versions of org.apache.kafka:connect-api:1.1.0 & io.confluent:kafka-avro-serializer:4.1.4 => I suggest to update the dependencies to resolve the issue.

Do you know what patch is missing from those dependencies?

We can upgrade the connect-api, but that is almost certainly not causing the issue, as the connect worker substitutes it's own connect-api at runtime. If there was a bug in that artifact, then running the connector on a modern Kafka Connect version would fix it.

Unfortunately we cannot upgrade the kafka-avro-serializer to a more modern version due to licensing changes. On a quick inspection I do not know what change has been made to the serializer which would address this bug, but please let me know if you've already looked into it.

rhatlapa commented 1 year ago

Per remote debugger the osType is the proplematic field and also I can confirm that removing the "default": "UNKNOWN" helped. Other tools we are using (e.g. Kafka UI ) had no issues to handle the schema as shown bellow including the "default": "UNKNOWN".

From what I can say, when I download the s3 connector plugin from https://github.com/aiven/s3-connector-for-apache-kafka/releases/download/v2.12.0/aiven-kafka-connect-s3-2.12.0.zip, it already contains the connect-api-1.1.0.jar (my assumption of this version being used). I need to setup local run to be able to verify the actual behavior (unfortunately I didn't have time to do so yet).

Here is the schema (slightly made smaller to show all types of fields defined there, no other field with "UNKNOWN" string is there).

    "type": "record",
    "namespace": "com.company.kafka.avro",
    "name": "AppInstallationEventAvro",
    "fields": [
        {
            "name": "packageName",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "osType",
            "type": [
                "string"
            ],
            "default": "UNKNOWN"
        }
   ]