datastax / pulsar-sink

An Apache Pulsar® sink for transferring events/messages from Pulsar topics to Apache Cassandra®, DataStax Astra or DataStax Enterprise (DSE) tables.
Apache License 2.0
14 stars 10 forks source link

Add support for logical AVRO types. #40

Open aymkhalil opened 2 years ago

aymkhalil commented 2 years ago

There are three different ways a pulsar sink can receive a logical type depending on how the message was generated. Below, I'll use C* Decimal type (BigDecimal in Java) as an example:

  1. Using Pulsar API Schema.AVRO(org.apache.pulsar.client.api.MyPojo.class) where MyPojo has a BigDecimal field. The generate output schema is:
{
        "name": "bigDecimal",
        "type": [
          "null",
          {
            "type": "string",
            "java-class": "java.math.BigDecimal"
          }
        ]
    }

The pulsar sink will fail with

Caused by: java.lang.UnsupportedOperationException: No recommended schema for decimal (scale is required)
  1. Using Native AVRO APIs (LogicalTypes.decimal(precision, scale).addToSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES))))) to add logical schema to bytes. This is matching the AVRO standard mentioned here. Example schema:
    {
        "name": "v2",
        "type": {
          "type": "bytes",
          "logicalType": "decimal",
          "precision": 17,
          "scale": 4
        }
  2. 42 Using a logical CQL type that comes from a C* CDC source, the schema will be:

    {
              "name": "fieldName",
              "type": [
                "null",
                {
                  "type": "record",
                  "name": "cql_decimal",
                  "namespace": "",
                  "fields": [
                    {
                      "name": "bigint",
                      "type": "bytes"
                    },
                    {
                      "name": "scale",
                      "type": "int"
                    }
                  ],
                  "logicalType": "cql_decimal"
                }
              ]
            },

and this will cause the sink to fail with:

com.datastax.oss.sink.pulsar.CassandraSinkTask - Error decoding/mapping Pulsar record PulsarSinkRecord{SinkRecord(sourceRecord=PulsarRecord(topicName=Optional[persist    topicname], partition=0, message=Optional[org.apache.pulsar.client.impl.TopicMessageImpl@20958f19], schema=KeyValueSchema(SEP     ARATED,org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema@4d53f4e4,org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema@a79d566), failFunction=org.apache.pulsar.functions.source.PulsarSource$     $Lambda$586/0x00000008407be840@2d24432d, ackFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$585/0x00000008407bf440@21dc87f6), value=(key = "org.apache.pulsar.client.impl.schema.generic.GenericAvro     Record@e180683", value = "org.apache.pulsar.client.impl.schema.generic.Generic
aymkhalil commented 2 years ago

Similar reported issue https://github.com/datastax/pulsar-sink/issues/29