deephaven / deephaven-core

Deephaven Community Core
Other
234 stars 79 forks source link

Kafka, support for producing an avro enum #5640

Open devinrsmith opened 1 week ago

devinrsmith commented 1 week ago

There's an asymmetry right now between avro kafka consume and produce, such that we can read in an enum, but we can't write out an enum. When we read in the enum, it is read in as a String; when we try and write it out as a String, and we get an error that looks something like:

RuntimeError: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
        at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:163)
        at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:67)
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1015)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:962)
        at io.deephaven.kafka.publish.PublishToKafka.publishMessages(PublishToKafka.java:281)
        at io.deephaven.kafka.publish.PublishToKafka.<init>(PublishToKafka.java:170)
        at io.deephaven.kafka.KafkaTools.produceFromTable(KafkaTools.java:1458)
caused by org.apache.avro.AvroTypeException: value EMAIL (a java.lang.String) is not a ChannelId at SendTimeEvent.channel_id
        at org.apache.avro.path.TracingAvroTypeException.summarize(TracingAvroTypeException.java:62)
        at org.apache.avro.path.TracingAvroTypeException.summarize(TracingAvroTypeException.java:32)
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:84)
        at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.writeDatum(AbstractKafkaAvroSerializer.java:181)
        at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:151)
        ... 6 more
caused by org.apache.avro.AvroTypeException: value EMAIL (a java.lang.String) is not a ChannelId
        at org.apache.avro.generic.GenericDatumWriter.writeEnum(GenericDatumWriter.java:269)
        at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:148)
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95)
        at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:245)
        at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:234)
        at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:145)
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:95)
        at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
        ... 8 more
{
 "name": "channel_id",
 "type": {
    "type": "enum",
    "name": "ChannelId",
    "symbols": ["EMAIL", "TEXT"]
  }
}

It seems like we would need to use org.apache.avro.generic.GenericData.EnumSymbol to properly write out this value. It may be possible that the end-user can call this from their query formula as a workaround until DHC has proper backend support for this, but that may be tricky (user needs to have Schema).

devinrsmith commented 1 week ago

In the meantime, a workaround like

static final CHANNEL_ID_SCHEMA = Schema.createEnum("ChannelId", null, "com.example", List.of("EMAIL", "TEXT"))
...
table = table.updateView("channel_id = new org.apache.avro.generic.GenericData.EnumSymbol(CHANNEL_ID_SCHEMA, channel_id)")

has been verified to work.