awslabs / kinesis-kafka-connector

kinesis-kafka-connector is connector based on Kafka Connect to publish messages to Amazon Kinesis streams or Amazon Kinesis Firehose.
Apache License 2.0
153 stars 91 forks source link

Invalid schema type when sending Avro message containing array of struct to Kinesis #71

Open filpano opened 2 years ago

filpano commented 2 years ago

We have Avro records that we wish to send from Kafka to Kinesis. We are using Kafka 2.5 and the connector Config looks like the following:

{
    "aws.access.key.id": "...",
    "aws.secret.access.key": "...",
    "connector.class": "com.amazon.kinesis.kafka.AmazonKinesisSinkConnector",
    "errors.deadletterqueue.topic.replication.factor": "1",
    "header.converter": "org.apache.kafka.connect.storage.StringConverter",
    "header.converter.converter.type": "header",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "name": "Kinesis Connector",
    "region": "eu-central-1",
    "schema.registry.url": "http://schema-registry-dp:8081",
    "streamName": "my-stream",
    "tasks.max": "2",
    "topics": "my-topic",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry-dp:8081",
    "value.converter.schemas.enable": "true"
}

The schema looks something like the following:

{
  "type": "record",
  "name": "MyEvent",
  "namespace": "com.avro.event",
  "fields": [
    {
      "name": "cancelReasons",
      "type": [
        "null",
        {
          "type": "array",
          "items": {
            "type": "record",
            "name": "CancelReason",
            "namespace": "com.avro.pojo",
            "fields": [
              {
                "name": "cancelReasonId",
                "type": [
                  "null",
                  {
                    "type": "string",
                    "logicalType": "uuid"
                  }
                ]
              },
              {
                "name": "name",
                "type": [
                  "null",
                  "string"
                ]
              },
              {
                "name": "number",
                "type": [
                  "null",
                  "string"
                ]
              }
            ]
          },
          "java-class": "java.util.List"
        }
      ]
    },
    { ... } // other fields
  ]
}

After trying to set up this connector to transfer these messages to Kinesis, we are immediately seeing the following error message:

org.apache.kafka.connect.errors.DataException: Invalid schema type.
at com.amazon.kinesis.kafka.DataUtility.parseValue(DataUtility.java:77)
at com.amazon.kinesis.kafka.DataUtility.lambda$parseValue$1(DataUtility.java:102)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085)
at com.amazon.kinesis.kafka.DataUtility.parseValue(DataUtility.java:103)
at com.amazon.kinesis.kafka.AmazonKinesisSinkTask.addUserRecord(AmazonKinesisSinkTask.java:252)
at com.amazon.kinesis.kafka.AmazonKinesisSinkTask.put(AmazonKinesisSinkTask.java:154)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:584)

The offending code portion seems to be the following case:

        case ARRAY:
            Schema sch = schema.valueSchema();
            if (sch.type() == Type.MAP || sch.type() == Type.STRUCT) {
                throw new DataException("Invalid schema type.");
            }

Is this schema type (struct containing a field which is an array of structs itself) not supported? That would be quite the show stopper for us since we wanted to set up a pipeline using Kinesis sourced from our existing Kafka topics.

bdesert commented 2 years ago

hi @filpano , thanks for submitting your record, this will help a lot. I'll update soon about the status and ETA.

bdesert commented 2 years ago

@filpano , sorry for wishful typo above. :) thanks for schema. Can you provide also sample record based on this schema?