datastax / cdc-apache-cassandra

Datastax CDC for Apache Cassandra
Apache License 2.0
35 stars 21 forks source link

Allow optional UDT attributes #93

Closed aymkhalil closed 2 years ago

aymkhalil commented 2 years ago

Today, CDC creates UDT schema on the data topic as non-optional. This has been limiting some legit customer use cases where not all UDT attributes are set. Example:

CREATE TYPE ks1.dt1(zboolean boolean, ztext text);
CREATE TABLE ks1.dt1(xtext text, xudt dt1, primary key(xtext)) with cdc=true;
INSERT INTO ks1.dt1(xtext, xudt) values ('a', {ztext:'hi'});

SELECT * FROM ks1.dt1;
xtext | xudt
-------+-------------------------------
     a | {zboolean: null, ztext: 'hi'}

Now the CDC sink will fail because of the null zboolean value with the following exception:

[com.datastax.oss-pulsar-functions-instance-2.10.1.3.jar:?]
        at java.lang.Thread.run(Thread.java:750) [?:1.8.0_322]
Caused by: java.lang.NullPointerException: null of boolean in field zboolean of xudt of array of union in field zboolean of xudt
        at org.apache.avro.generic.GenericDatumWriter.npe(GenericDatumWriter.java:184) ~[java-instance.jar:?]
        at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:178) ~[java-instance.jar:?]
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:83) ~[java-instance.jar:?]
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73) ~[java-instance.jar:?]
        at com.datastax.oss.pulsar.source.converters.NativeAvroConverter.serializeAvroGenericRecord(NativeAvroConverter.java:309) ~[?:?]
        at com.datastax.oss.pulsar.source.converters.NativeAvroConverter.toConnectData(NativeAvroConverter.java:301) ~[?:?]
        at com.datastax.oss.pulsar.source.converters.NativeAvroConverter.toConnectData(NativeAvroConverter.java:74) ~[?:?]
        at 

This patch allows the UDT schema to be null. Here is how the schema looks like before the change:

    {
        "name": "...",
        "schema": {
          "type": "record",
          "name": "record",
          "fields": [
            {
              "name": "ztext",
              "type": "string"
            },
            {
              "name": "zboolean",
              "type": "boolean"
            }
          ]
        },
        "type": "AVRO",
        "properties": {}
      }

and here is how the schema will look like after:

 {
        "name": "...",
        "schema": {
          "type": "record",
          "name": "record",
          "fields": [
            {
              "name": "ztext",
              "type": "string"
            },
           {
             "name": "zboolean",
             "type": [
               "null",
               "boolean"
             ]
             },
          ]
        },
        "type": "AVRO",
        "properties": {}
      }
# In conf/standalone.conf:
systemTopicEnabled=true
topicLevelPoliciesEnabled=true

# Pulsar admin
bin/pulsar-admin topicPolicies set-schema-compatibility-strategy persistent://public/default/data-topic --strategy BACKWARD

92