datastax / pulsar-sink

An Apache Pulsar® sink for transferring events/messages from Pulsar topics to Apache Cassandra®, DataStax Astra or DataStax Enterprise (DSE) tables.
Apache License 2.0
14 stars 10 forks source link

Add support for Collections types of Avro GenericData$Record #41

Closed aymkhalil closed 2 years ago

aymkhalil commented 2 years ago

Pulsar sink will fail to write data from an upstream topic with the following schema:

 {
   ...
    "fields": [
      {
        "name": "f1",
        "type": [
          "null",
          {
            "type": "array",
            "items": {
              "type": "record",
              "name": "f2",
              "fields": [
                {
                  "name": "code",
                  "type": [
                    "null",
                    "string"
                  ]
                }
              ]
            },
            "java-class": "java.util.List"
          }
        ]
      },
      ...
    ]
  },
  "type": "AVRO",
  "properties": {
  ...
  }
}

This could generate from an upstream C* CDC source with list<frozen<udt_type>>, a java pojo with a list attributes of type pojo, etc...

It turns out that the JacksonUtils.toJson utility method used in the sink, doesn't know how to convert a map or array of type "record"

We should either extend the methods functionality or just remove the dependency on this code in favor of our own conversion.