confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
87 stars 1.04k forks source link

Add support for generic message keys (e.g. message keys in Avro or JSON format, STRUCT) #824

Closed apurvam closed 3 years ago

apurvam commented 6 years ago

Right now, KSQL only uses a string Serde for keys in kafka messages. This works well for doing joins and aggregates on the entire key. However, in some cases a key (in json or avro or any other format) may have nested fields, and we may want to do joins and aggregates on a per field basis.

Further, by supporting structured keys, we can do filters on the fields of the keys as well.

This is related to #638 and #804.

We should also support non-string keys, such as Integer.

mquraishi commented 6 years ago

Additionally, 1) If the value (not key) is an avro containtered object with the schema visible in clear text but the payload itself is a bytearray, KSQL cannot deserialize correctly. In code it is easy. This is once the message is retrieved from the topic. 2) If the value is json and one of the elements is another nested json object, creating a stream turns the nested son into some dictionary key=value object, resulting in deserialization errors.

miguno commented 6 years ago

For the record: Another +1 from a user (offline conversation).

big-andy-coates commented 5 years ago

Hi @mquraishi,

Additionally, 1) If the value (not key) is an avro containtered object with the schema visible in clear text but the payload itself is a bytearray, KSQL cannot deserialize correctly. In code it is easy. This is once the message is retrieved from the topic. 2) If the value is json and one of the elements is another nested json object, creating a stream turns the nested son into some dictionary key=value object, resulting in deserialization errors.

I think you're talking about an Avro serialized value where the schema is serialized as part of the message, right? At the moment I don't believe KSQL supports such a serialization format. KSQL does not expect the schema to be provided in the record. It expects the schema from the Schema Registry or provided by the users when importing the data into KSQL.

Supporting embedded schemas would be a different issue to this one. If you're still interested in seeing this worked on would you mind detailing it in a new Github issue please?

big-andy-coates commented 5 years ago

Please find attached a document comparing two potentially approaches we're looking into for structured key support in KSQL. We are looking for feedback from the community on the two approaches. We're wanting to move on this pretty quick: a decision on direction is likely to be taken by the end of this week, i.e. the Friday 8th Feb. Please leave any comments below. In addition, you can use the two additional comments below to vote for one of the approaches.

Structured Keys Comparison.pdf

We look forward to hearing from you!

Additional examples, that answer questions that have been asked below, but not covered by the main document:

Keys with 2+ levels of nesting

Let's say the key looks like:

{
    "x" : {
        "y": 1
    },
    "z": 2
}

(And the value again has two fields v0 and v1).

Logical

Logical only flattens the first level, so the above could be imported with:

CREATE STREAM FOO (x STRUCT<y INT> KEY, z INT KEY, v0 INT, V1 INT) ...

And used in a query like:

SELECT x->y, z FROM FOO;

Physical

The physical approach defines actual structure:

CREATE STREAM FOO (ROWKEY STRUCT<x STRUCT<y INT>, Z INT>>, ROWVALUE<v0 INT, v1 INT>)...

And used in a query exactly the same as above, e.g.:

SELECT x->y, z FROM FOO;
big-andy-coates commented 5 years ago

Click here if you prefer the more Logical approach.

Please leave a thumbs up emoji on this comment if you prefer for the more logical approach covered in the above document.

big-andy-coates commented 5 years ago

Click here if you prefer the more Physical approach.

Please leave a thumbs up emoji on this comment if you prefer the more physical approach covered in the above document.

miguno commented 5 years ago

@big-andy-coates : Thanks for sharing the comparison PDF.

Two questions for the logical approach:

The reason I am asking is that the logical approach flattens the first level of a structured key, but only the first level. And the PDF only shows examples with 1-level nested keys.

big-andy-coates commented 5 years ago

@miguno : The second level of nesting would continue to work as it currently does for both approaches. Hence I'd not included it as I was trying to keep the document succinct. This is the same reason I've not covered join semantics or windowing.

But as you've asked, it probably means I should have included them ;) . I've added an example above.

mquraishi commented 5 years ago

Hi @mquraishi,

Additionally, 1) If the value (not key) is an avro containtered object with the schema visible in clear text but the payload itself is a bytearray, KSQL cannot deserialize correctly. In code it is easy. This is once the message is retrieved from the topic. 2) If the value is json and one of the elements is another nested json object, creating a stream turns the nested son into some dictionary key=value object, resulting in deserialization errors.

I think you're talking about an Avro serialized value where the schema is serialized as part of the message, right? At the moment I don't believe KSQL supports such a serialization format. KSQL does not expect the schema to be provided in the record. It expects the schema from the Schema Registry or provided by the users when importing the data into KSQL.

Supporting embedded schemas would be a different issue to this one. If you're still interested in seeing this worked on would you mind detailing it in a new Github issue please?

@big-andy-coates I will do that soon. It would essentially be breaking up the concerns I reported here. I think that's what you are asking for. In general, I was hoping the flexibility embedded schemas offers, albeit at the cost of performance, could be useful. Inferring it an runtime can be costly.

big-andy-coates commented 5 years ago

Thanks for the feedback everyone. We'll be looking to implement the more logical model.

gphilipp commented 5 years ago

@big-andy-coates any news on this one ?

bellemare commented 5 years ago

Also curious as to where this is in progress, since it's been silent since February.

We have extensive avro-keyed data throughout our streams (since it's self-documenting with the schema registry) and would find it very helpful to be able to use them as such.

apurvam commented 5 years ago

Hello folks, the work is in progress. The current goal is to ship support for primitive keys (long, int) serialized with avro and json in the 5.4 release.

Deninc commented 4 years ago

Hello folks, the work is in progress. The current goal is to ship support for primitive keys (long, int) serialized with avro and json in the 5.4 release.

Sorry for bothering but I'm still not sure if we can do this yet?

{"name": "hello!",
 "inner": {"id", "12345"}}

CREATE TABLE <X> WITH (..., KEY='inner->id')

From https://github.com/confluentinc/ksql/issues/1975

agavra commented 3 years ago

We're finally ready to close this one out!

Huge shout out to everyone involved (cc @big-andy-coates @vcrfxia especially) - it's been a long haul. There's some future feature work related to this (like #6371 and multi-column pull query support) but you can now declare your streams with any key type (so long as the underlying format supports it) and any format supported by ksql.

mquraishi commented 3 years ago

We're finally ready to close this one out!

Huge shout out to everyone involved (cc @big-andy-coates @vcrfxia especially) - it's been a long haul. There's some future feature work related to this (like #6371 and multi-column pull query support) but you can now declare your streams with any key type (so long as the underlying format supports it) and any format supported by ksql.

Thank you!