datastax / pulsar-sink

An Apache Pulsar® sink for transferring events/messages from Pulsar topics to Apache Cassandra®, DataStax Astra or DataStax Enterprise (DSE) tables.
Apache License 2.0
14 stars 10 forks source link

Cannot Map a cql_decimal field coming from CDC for Cassandra #29

Closed eolivelli closed 2 years ago

eolivelli commented 2 years ago

When you are using CDC For Cassandra we map the "decimal" to a AVRO logical type cql_decimal.

This is an example of schema.

{
    "name": "price",
    "type": [
     "null",
     {
      "type": "record",
      "name": "cql_decimal",
      "namespace": "",
      "fields": [
       {
        "name": "bigint",
        "type": "bytes"
       },
       {
        "name": "scale",
        "type": "int"
       }
      ],
      "logicalType": "cql_decimal"
     }
    ],
    "default": null
   },

The Sink cannot process it

this is a sample error:

2022-06-07T07:12:14.314204675Z 07:12:14.313 [mapping-0] WARN com.datastax.oss.sink.pulsar.CassandraSinkTask - Error decoding/mapping Pulsar record PulsarSinkRecord{SinkRecord(sourceRecord=PulsarRecord(topicName=Optional[persistent://streamsample/astracdc/data-581af63c-8af6-4f05-8b57-a6241b5e4d47-cdcdb.products-partition-1], partition=0, message=Optional[org.apache.pulsar.client.impl.TopicMessageImpl@7536aaf0], schema=AUTO_CONSUME({schemaVersion=,schemaType=BYTES}{schemaVersion=\x00\x00\x00\x00\x00\x00\x00\x00,schemaType=KEY_VALUE}{schemaVersion=org.apache.pulsar.common.protocol.schema.LatestVersion@5f4eb991,schemaType=KEY_VALUE}), failFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$555/0x0000000840742040@3e210975, ackFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$554/0x0000000840742c40@e75dd94), value=(key = "org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord@7583c4ed", value = "org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord@35229187"))}: type class java.nio.HeapByteBuffer