confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
128 stars 1.04k forks source link

Schema Inference With ID in CSAS does not work for enums #10038

Open SamiShaikh opened 1 year ago

SamiShaikh commented 1 year ago

Describe the bug Schema Inference With ID in CSAS does not work for enums.

To Reproduce To reproduce Register below schema for topic avroTest4

{
  "doc": "Sample schema to help you get started.",
  "fields": [
    {
      "default": null,
      "name": "myenum",
      "type": [
        "null",
        {
          "default": "MountainView",
          "name": "Myenum",
          "symbols": [
            "MountainView",
            "Bangalore"
          ],
          "type": "enum"
        }
      ]
    }
  ],
  "name": "sampleRecord",
  "namespace": "com.mycorp.mynamespace",
  "type": "record"
}

Produce a record. Create streams below

CREATE STREAM avroTest4 WITH (
KAFKA_TOPIC = 'avroTest4',
VALUE_FORMAT = 'AVRO',
VALUE_SCHEMA_ID = 3
);
CREATE STREAM avroTest5 WITH (
KAFKA_TOPIC = 'avroTest5',
PARTITIONS = 1,
VALUE_FORMAT = 'AVRO',
VALUE_SCHEMA_ID = 3
) AS
SELECT
*
FROM
avroTest4
EMIT CHANGES;

select from avroTest5 emit changes;

Expected behavior The produced record should be emitted

Actual behaviour Throws error during serialisation

[2023-08-02 18:09:06,596] ERROR Unhandled exception caught in streams thread _confluent-ksql-default_query_CSAS_AVROTEST5_11-4fcd28dc-f7fb-402d-b1a0-51424d9c4baf-StreamThread-2. (USER) (io.confluent.ksql.util.QueryMetadataImpl:209)
org.apache.kafka.streams.errors.StreamsException: Error encountered sending record to topic avroTest5 for task 0_0 due to:
io.confluent.ksql.serde.KsqlSerializationException: Error serializing message to topic: avroTest5. Failed to serialize Avro data from topic avroTest5 :
Hint: You probably forgot to add VALUE_SCHEMA_ID when creating the source.
  at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:218)
  at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:176)
  at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
  at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
  at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
  at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
  at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:66)
  at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
  at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
  at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
  at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
  at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:66)
  at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
  at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
  at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
  at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
  at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
  at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:792)
  at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
  at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:792)
  at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:723)
  at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97)
  at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78)
  at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1708)
  at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:796)
  at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:613)
  at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:575)
Caused by: io.confluent.ksql.serde.KsqlSerializationException: Error serializing message to topic: avroTest5. Failed to serialize Avro data from topic avroTest5 :
Hint: You probably forgot to add VALUE_SCHEMA_ID when creating the source.
  at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:56)
  at io.confluent.ksql.serde.tls.ThreadLocalSerializer.serialize(ThreadLocalSerializer.java:37)
  at io.confluent.ksql.serde.connect.ConnectFormat$ListToStructSerializer.serialize(ConnectFormat.java:188)
  at io.confluent.ksql.serde.connect.ConnectFormat$ListToStructSerializer.serialize(ConnectFormat.java:151)
  at io.confluent.ksql.serde.GenericSerializer.serialize(GenericSerializer.java:62)
  at io.confluent.ksql.logging.processing.LoggingSerializer.serialize(LoggingSerializer.java:47)
  at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
  at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:198)
  ... 26 more
Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic avroTest5 :
  at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:107)
  at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:87)
  at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:53)
  ... 33 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
  at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:163)
  at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:177)
  at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:94)
  ... 35 more
Caused by: java.io.IOException: Incompatible schema {"type":"record","name":"sampleRecord","namespace":"com.mycorp.mynamespace","doc":"Sample schema to help you get started.","fields":[{"name":"myenum","type":["null",{"type":"enum","name":"Myenum","symbols":["MountainView","Bangalore"],"default":"MountainView"}],"default":null}]} with refs [] of type AVRO for schema {"type":"record","name":"sampleRecord","namespace":"com.mycorp.mynamespace","doc":"Sample schema to help you get started.","fields":[{"name":"myenum","type":["null",{"type":"string","connect.parameters":{"io.confluent.connect.avro.enum.default.Myenum":"MountainView","io.confluent.connect.avro.Enum":"com.mycorp.mynamespace.Myenum","io.confluent.connect.avro.Enum.MountainView":"MountainView","io.confluent.connect.avro.Enum.Bangalore":"Bangalore"},"connect.name":"com.mycorp.mynamespace.Myenum"}],"default":null}],"connect.parameters":{"io.confluent.connect.avro.record.doc":"Sample schema to help you get started."},"connect.name":"com.mycorp.mynamespace.sampleRecord"}. Set id.compatibility.strict=false to disable this check
  at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupSchemaBySubjectAndId(AbstractKafkaSchemaSerDe.java:484)
  at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:119)
  ... 37 more

Additional context The issue occurs even if you replace CSAS with CREATE and INSERT

suhas-satish commented 1 year ago

This is a known documented limitation - https://github.com/confluentinc/ksql/pull/10036/files

SamiShaikh commented 1 year ago

The docs seem unrelated. The documented limitation is with json schema. This ticket is for avro

havasd commented 6 months ago

do we have any updates on this topic? KSQL is unable to convert the stringified enum field back to the avro serialized index.

ezequielantunez commented 5 months ago

Using a CSAS from a stream having source and target the same avro schema inference using enums. The original avro enum is converted to string in ksqldb. KsqlDB is able to read the avro enums, but is not able to write them back to a topic.

Original enum (part of a namespace) { "name": "eventName", "type": { "type": "enum", "name": "EventName", "doc": "Event name", "symbols": [ "MY_DEFAULT_VALUE" ], "default": "MY_DEFAULT_VALUE" }, "doc": "Event name" },

Resulting enum in ksqlDB { "name": "eventName", "type": { "type": "string", "connect.parameters": { "io.confluent.connect.avro.enum.doc.EventName": "Event name", "io.confluent.connect.avro.enum.default.EventName": "MY_DEFAULT_VALUE", "io.confluent.connect.avro.Enum": "my.namespace.EventName", "io.confluent.connect.avro.Enum.MY_DEFAULT_VALUE": "MY_DEFAULT_VALUE" }, "connect.name": "my.namespace.EventName" }, "doc": "Event name" }

Error: Caused by: java.io.IOException: Incompatible schema {....} with refs [] of type AVRO for schema {...} Set id.compatibility.strict=false to disable this check at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.lookupSchemaBySubjectAndId(AbstractKafkaSchemaSerDe.java:544) at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:130)