confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
41 stars 1.04k forks source link

[SR Support]: Figure out how to join on keys where the key contains the schema id #6332

Closed big-andy-coates closed 3 years ago

big-andy-coates commented 3 years ago

Any format that supports schema inference will be adding the magic byte and the schema id as the first few bytes of the key, i.e. the serialized key is <magic_byte><schema-id><serialized-key>.

Without doing anything special, for a join to work we'd need the serialized-key and the schema-id to match!!!!

ksqlDB only supports topic-level subjects. The two topics being joined, let's call them left and right, will have their schema's registered under the subjects left-key and right-key. If two keys have the same schema, do they share the same id? If they don't, then we've got a problem!

big-andy-coates commented 3 years ago

Knocked up some quick test code to register the same key schema against multiple topics... they are get the same id. This is good!

However, there's still scope for joins not to work where the schema differs only slightly, e.g. the name of the AVRO record, which doesn't affect the serialized bytes of the data, but would change the schema id, and hence the serialized bytes of the key.

We should probably have some kind of check when executing a join to ensure there is at least some overlap between the schema ids of the key schemas of each side, but only where we're not already repartitioning.

I say some overlap, as its possible for each subject to have multiple schema versions associated with it, each with its own id. It could be that some ids match and some don't. This should probably be enough to allow the join, as the mismatched ids may not be in use, e.g. they could be old data or maybe there from when a mismatch in schemas was corrected, etc, or they could be in use, but the user is fine with these not joining.

Ideally, we could return:

vcrfxia commented 3 years ago

Spent some time looking into this. Turns out it's rather challenging to check whether the two sides of the join have the same schema ID or not. It's not just a matter of looking up the topics on both sides of the join in Schema Registry since in the case of pre-join repartition topics (or other internal topics prior to the join), there will be no schema in SR since ksqlDB doesn't create these subjects until after the join is validated (and started) which makes sense. Inferring what the key schema will be before it's actually registered is also non-trivial since it depends on details of the physical plan, such as serde options, which the logical plan does not (and should not) have. As a result, we've decided to go the route of forcing repartitions prior to the join in order to ensure matching schemas as this is preferable to either disallowing joins on SR-enabled key formats or allowing them with potential silent join misses. This approach is implemented in https://github.com/confluentinc/ksql/pull/6635.

Another alternative we considered was to implement a custom partitioner to ensure that logically-equivalent records will be sent to the same partition, but we determined that there are no efficient implementations available for this option at this time. One idea could be to remove the first five bytes (corresponding to the schema ID) of the serialized key and delegate the remaining bytes to the default partitioner, but this doesn't work as the remaining bytes may still differ for records considered logically equivalent by ksqlDB. For example, the serialization of an integer serialized with JSON schema "int" differs from that serialized with JSON schema "oneOf": ["null", "int"] even though these are logically equivalent to ksqlDB (since all ksqlDB types are nullable). Another idea for the custom partitioner is to partition based on the hash of the deserialized data instead of the serialized bytes but this doesn't work either because of our use of the Connect Struct type -- structs include schemas, which may differ (e.g., different record names) even though the data within the structs are logically equivalent.