confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
102 stars 1.04k forks source link

Nullable Avro fields incorrectly typed in Confluent Control Center #1850

Open andybryant opened 6 years ago

andybryant commented 6 years ago

This is an issue using KSQL from Control Center, so probably belongs under the CCC project but I'm not aware of a CCC issue tracker so added here.

We have some CDC data in Kafka in Avro format. Many of the fields are optional, so are represented in the Avro schema as a union type of null and the underlying datatype. For instance...

{
    "name": "imageflag",
    "type": [
        "null",
        "string"
    ],
    "default": null
},

When creating a stream in Control Center for this topic it defaults to treating these fields as maps

image

 IMAGEFLAG          | MAP<STRING, VARCHAR(STRING)> 

If the user leaves this as is when querying the stream KSQL is unable to deserialize the result. It fails silently in Control Center, but shows the following error in the KSQL-server logs:

[2018-09-10 00:36:58,613] WARN task [0_0] Skipping record due to deserialization error. topic=[play-huon-bulk-sor_bpt031] partition=[0] offset=[65217] (org.apache.kafka.streams.processor.internals.RecordDeserializer:86)
org.apache.kafka.connect.errors.DataException: Cannot deserialize type string as type map for field ->IMAGEFLAG
    at io.confluent.ksql.serde.connect.ConnectDataTranslator.createTypeMismatchException(ConnectDataTranslator.java:65)
    at io.confluent.ksql.serde.connect.ConnectDataTranslator.lambda$validateType$1(ConnectDataTranslator.java:80)
    at java.util.Optional.orElseThrow(Optional.java:290)
    at io.confluent.ksql.serde.connect.ConnectDataTranslator.validateType(ConnectDataTranslator.java:79)
    at io.confluent.ksql.serde.connect.ConnectDataTranslator.validateSchema(ConnectDataTranslator.java:92)
    at io.confluent.ksql.serde.connect.ConnectDataTranslator.toKsqlValue(ConnectDataTranslator.java:140)
    at io.confluent.ksql.serde.connect.ConnectDataTranslator.toKsqlStruct(ConnectDataTranslator.java:212)
    at io.confluent.ksql.serde.connect.ConnectDataTranslator.toKsqlValue(ConnectDataTranslator.java:160)
    at io.confluent.ksql.serde.connect.ConnectDataTranslator.toKsqlRow(ConnectDataTranslator.java:49)
    at io.confluent.ksql.serde.avro.AvroDataTranslator.toKsqlRow(AvroDataTranslator.java:51)
    at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:45)
    at io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:26)
    at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
    at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:97)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:638)
    at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:936)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:831)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)

Unfortunately in this case there doesn't seem to be any way to be informed of the error on Control Center. If I use describe extended on the stream, it doesn't show any failures.

In Control Center describe extended bad_map; gives...

{
    "@type": "sourceDescription",
    "statementText": "describe extended BAD_MAP;",
    "sourceDescription": {
        "name": "BAD_MAP",
        "readQueries": [],
        "writeQueries": [],
        "fields" [
// MANY FIELDS HERE
        ],
        "type": "STREAM",
        "key": "",
        "timestamp": "",
        "statistics": "",
        "errorStats": "",
        "extended": true,
        "format": "AVRO",
        "topic": "play-huon-bulk-sor_bpt031",
        "partitions": 1,
        "replication": 3
    }
}

If I change the type for the column to be the underlying type I can query it successfully.

In summary there's a couple of issues here:

Note that querying this stream using the KSQL-CLI also fails silently with no errors shown when using describe extended

apurvam commented 6 years ago

Thanks for the detailed report, @andybryant . I filed an internal issue for C3 and the team will look at it.