confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
102 stars 1.04k forks source link

Support for UNION types in Avro #1910

Closed sknop closed 1 year ago

sknop commented 6 years ago

Some customers create topics with complex schemas using Avro that include UNION (choice) fields. It would be useful to be able to query these UNION fields, for example to only show messages whose field is of a particular type.

donalpmccarthy commented 6 years ago

Not supporting UNION AVRO is very limiting for KSQL. We have a scenario where we have a message with some fixed fields and a map of additional fields whose values are constrained by the AVRO UNION type. Specifically it looks like this.

"fields":[
    {"name":"Key","type":"string"},
    {"name":"Timestamp","type":"long"},
    {"name":"Attributes","type":{"type":"map","values":
       ["string","float","double","int", 
         {"type":"long",
          "connect.version":1,
          "connect.name":"org.apache.kafka.connect.data.Timestamp",
          "logicalType":"timestamp-millis"
         }
       ]
     }]

We can create a stream in KSQL so long as it doesn't contain the Attributes map. This is because the map type can only be specified as MAP<VARCHAR, ValueType>, where ValueType needs to be a primitive type. It doesn't support value types where an AVRO UNION is included as above. However, the attributes map is the main body of the data and therefore we can't use KSQL at all to interrogate the data at all. This rules KSQL out for us until this can be supported.

The only alternative we would have is to publish our data in a MAP <VARCHAR, VARCHAR> and convert all values to strings. We don't want to lose type information so doing that would be too much of a compromise.

Please can you support AVRO UNION types in general but specifically as values in a Map?

vcrfxia commented 4 years ago

cc @MichaelDrogalis @derekjn @apurvam in case we want to prioritize this on our roadmap.

rayokota commented 4 years ago

Protobuf and JSON Schema both have an equivalent "oneof" construct.

Unions/oneofs will be more important now that Schema Registry supports references. Using unions with references is to be preferred over using RecordNamingStrategy when storing multiple schema types in the same topic (see https://github.com/confluentinc/ksql/issues/1267).

big-andy-coates commented 4 years ago

Should totally support this by just adding the superset of columns from all types in the union.

https://martinfowler.com/eaaCatalog/singleTableInheritance.html

With Schema Registry's new support for schema references more and more users will be using Unions to allow topics to receive different event types, so ksqlDB not supporting Unions/OneOfs is going to become a bigger issue.

rayokota commented 4 years ago

Here's a blog post describing how to store multiple event types in the same topic using unions/oneofs. Having union support in ksqlDB would allow such topics to be queried.

https://www.confluent.io/blog/multiple-event-types-in-the-same-kafka-topic/

rayokota commented 2 years ago

I believe to support unions properly, we would need to have Union as a first-class construct in Kafka Connect (since ksqlDB relies on Kafka Connect). I will probably prepare a KIP for this in the future.

gphilipp commented 2 years ago

Hi @rayokota. Is there any news on this feature? The lack of support for union types is blocking us from using ksqlDB for several use cases.

JSeb225 commented 1 year ago

Hi, multi-schema support with AVRO has been added to a recent release. More info here: https://www.confluent.io/blog/announcing-ksqldb-0-27-1/

gphilipp commented 1 year ago

Glorious 🤠

jessfdm-codes commented 1 year ago

How would we go around creating this kind of struct in CREATE STREAM statement where we're getting the avro schema generated for us? For example, we have multiple streams we want to merge but one value of those streams is of a generic type. Currently we're having to store that as JSON, is there a way to define so that I can do something like...

CREATE STREAM somestream (
  myId VARCHAR,
  myConcreteValue INTEGER,
  myDynamicType *,
  ....
)