deephaven / deephaven-core

Deephaven Community Core
Other
251 stars 80 forks source link

kafka avro producer does not permit new fields to be added #5168

Open devinrsmith opened 7 months ago

devinrsmith commented 7 months ago

Our kafka avro producer code generates a schema such that new fields can't be added by default. For example, a table with long and int column gets generated as follows:

{
    "type": "record",
    "name": "yellow",
    "fields": [
        {
            "name": "VendorID",
            "type": [
                "long",
                "null"
            ]
        },
        {
            "name": "passenger_count",
            "type": [
                "int",
                "null"
            ]
        }
    ]
}

The order of the union types is important; in the case above, we have the option to, but don't, set an int default. If instead we want to set null as the default, we need "null" to come first.

If we create a new table that has an additional field and try to publish to publish it to the topic / schema registry, we get:

RuntimeError: io.deephaven.UncheckedDeephavenException: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an e
arlier schema for subject "{yellow}"; error code: 409                                                                                                                                 
        at io.deephaven.kafka.AvroImpl$AvroProduce.ensureSchema(AvroImpl.java:253)                                                                                                     
        at io.deephaven.kafka.AvroImpl$AvroProduce.getColumnNames(AvroImpl.java:194)                                                                                                   
        at io.deephaven.kafka.KafkaTools.produceFromTable(KafkaTools.java:1432)                                                                                                        
...                                                                                                                         
caused by io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema for subject "{yellow}"; error c
ode: 409                                                                                                                                                                               
        at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:314)                                                                             
        at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:384)                                                                                 
        at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:561)                                                                              
        at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:549)                                                                              
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:297)                                                   
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:404)                                                           
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:383)                                                           
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:371)                                                           
        at io.deephaven.kafka.AvroImpl$AvroProduce.ensureSchema(AvroImpl.java:251)                                                                                                     
        ... 24 more

This is because the schema registry as backwards-compatible by default - consumers using the new schema must be able to read data from the previous schema; but since no default is provided in the new schema, it can't do that.

We may either want to be more lenient in our avro schema definitions by default, or give the user the ability to more finely configure how they want to translate the data types into an avro schema.

devinrsmith commented 7 months ago

This may be as simple as using org.apache.avro.SchemaBuilder.FieldTypeBuilder#optional instead of org.apache.avro.SchemaBuilder.FieldTypeBuilder#nullable:

commit 026545f81ec47d2d6a6a2c3720715a1613b5b6af (HEAD -> avro-schema-change-fix)
Author: Devin Smith <devinsmith@deephaven.io>
Date:   Mon Feb 19 11:55:11 2024 -0800

    better produce

diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/AvroImpl.java b/extensions/kafka/src/main/java/io/deephaven/kafka/AvroImpl.java
index 3b39261bca..ce47a88656 100644
--- a/extensions/kafka/src/main/java/io/deephaven/kafka/AvroImpl.java
+++ b/extensions/kafka/src/main/java/io/deephaven/kafka/AvroImpl.java
@@ -32,6 +32,7 @@ import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.SchemaBuilder;
+import org.apache.avro.SchemaBuilder.FieldAssembler;
 import org.apache.avro.generic.GenericContainer;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.util.Utf8;
@@ -330,21 +331,21 @@ class AvroImpl {
         SchemaBuilder.FieldAssembler<Schema> fass = fassIn;
         final Class<?> type = colDef.getDataType();
         final String colName = colDef.getName();
-        final SchemaBuilder.BaseFieldTypeBuilder<Schema> base = fass.name(colName).type().nullable();
+        final SchemaBuilder.BaseTypeBuilder<FieldAssembler<Schema>> base = fass.name(colName).type().optional();
         if (type == byte.class || type == char.class || type == short.class) {
-            fass = base.intBuilder().prop(dhTypeAttribute, type.getName()).endInt().noDefault();
+            fass = base.intBuilder().prop(dhTypeAttribute, type.getName()).endInt();
         } else if (type == int.class) {
-            fass = base.intType().noDefault();
+            fass = base.intType();
         } else if (type == long.class) {
-            fass = base.longType().noDefault();
+            fass = base.longType();
         } else if (type == float.class) {
-            fass = base.floatType().noDefault();
+            fass = base.floatType();
         } else if (type == double.class) {
-            fass = base.doubleType().noDefault();
+            fass = base.doubleType();
         } else if (type == String.class) {
-            fass = base.stringType().noDefault();
+            fass = base.stringType();
         } else if (type == Instant.class) {
-            fass = base.longBuilder().prop(logicalTypeName, "timestamp-micros").endLong().noDefault();
+            fass = base.longBuilder().prop(logicalTypeName, "timestamp-micros").endLong();
         } else if (type == BigDecimal.class) {
             final BigDecimalUtils.PropertyNames propertyNames =
                     new BigDecimalUtils.PropertyNames(colName);
@@ -360,10 +361,9 @@ class AvroImpl {
                     .prop(logicalTypeName, "decimal")
                     .prop("precision", values.precision)
                     .prop("scale", values.scale)
-                    .endBytes()
-                    .noDefault();
+                    .endBytes();
         } else {
-            fass = base.bytesBuilder().prop(dhTypeAttribute, type.getName()).endBytes().noDefault();
+            fass = base.bytesBuilder().prop(dhTypeAttribute, type.getName()).endBytes();
         }
         return fass;
     }