confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
77 stars 1.04k forks source link

Cannot execute a JOIN when one of the sources uses a schema referenced by ID #9163

Closed rodesai closed 1 year ago

rodesai commented 2 years ago

Describe the bug ksqlDB cannot execute a JOIN when one of the sources uses a schema referenced by ID. This is because of the interaction of these 2 behaviors:

  1. When a join requires a repartition, ksqlDB will try to use the physical schema of the source being repartitioned to write to the repartition topic, if the source has a schema ID

  2. For joins, ksqlDB first does a projection to a new schema that prefixes each column with the source name. So, for example if I do:

    CREATE STREAM SOURCE (A STRING, B STRING) WITH ...
    CREATE STREAM SOURCE2 (A STRING, B STRING) WITH ...
    CREATE STREAM OUT AS SELECT SOURCE.*, SOURCE2.* FROM SOURCE JOIN SOURCE 2 ...

    Then, the repartition schema has columns like SOURCE_A STRING, SOURCE_B STRING, SOURCE2_A STRING, SOURCE2_B STRING

The problem here is that the source schema referenced by ID does not have fields that are prefixed with the source name. So when ksqlDB goes to serialize the records for writing to the repartition topic, the serializer complains that there is no field in the schema that matches the field name in the record:

Caused by: org.apache.kafka.streams.errors.StreamsException: Error encountered sending record to topic _confluent-ksql-some.ksql.service.idquery_CSAS_OUT_0-Join-repartition for task 0_0 due to:
org.apache.kafka.common.errors.SerializationException: Error serializing message to topic: _confluent-ksql-some.ksql.service.idquery_CSAS_OUT_0-Join-repartition. Schema from Schema Registry misses field with name: S_A

To Reproduce Here's a QTT:

{
      "statements": [
        "CREATE STREAM SOURCE WITH (kafka_topic='source', format='PROTOBUF', KEY_SCHEMA_ID=1, KEY_SCHEMA_FULL_NAME='ProtobufKey2', VALUE_SCHEMA_ID=2, VALUE_SCHEMA_FULL_NAME='ProtobufValue2');",
        "CREATE TABLE SINK WITH (kafka_topic='source2', format='PROTOBUF', KEY_SCHEMA_FULL_NAME='ProtobufKey2', VALUE_SCHEMA_FULL_NAME='ProtobufValue2');",
        "CREATE STREAM OUT AS SELECT S.*, SS.* FROM SOURCE S JOIN SINK SS ON S.B=SS.K;"
      ],
      "topics": [
        {
          "name": "source",
          "keyFormat": "PROTOBUF",
          "keySchema": "syntax = \"proto3\"; message ProtobufKey1 {uint32 k1 = 1;} message ProtobufKey2 {string K = 1;}",
          "valueFormat": "PROTOBUF",
          "valueSchema": "syntax = \"proto3\"; message ProtobufValue1 {float c1 = 1; uint32 c2 = 2;} message ProtobufValue2 {uint64 A = 1; string B = 2;}"
        },
        {
          "name": "source2",
          "keyFormat": "PROTOBUF",
          "keySchema": "syntax = \"proto3\"; message ProtobufKey1 {uint32 k1 = 1;} message ProtobufKey2 {string K = 1;}",
          "valueFormat": "PROTOBUF",
          "valueSchema": "syntax = \"proto3\"; message ProtobufValue1 {float c1 = 1; uint32 c2 = 2;} message ProtobufValue2 {uint64 A = 1; string B = 2;}"
        }
      ],
      "inputs": [
        {"topic": "source", "key": {"K": "0"}, "value": {"A": 123, "B": "falcon"}, "timestamp": 0},
        {"topic": "source", "key": {"K": "0"}, "value": {"A": 456, "B": "giraffe"}, "timestamp": 0},
        {"topic": "source", "key": {"K": "0"}, "value": {"A": 789, "B": "turtle"}, "timestamp": 0},
        {"topic": "source2", "key": {"K": "0"}, "value": {"A": 123, "B": "falcon"}, "timestamp": 0},
        {"topic": "source2", "key": {"K": "0"}, "value": {"A": 456, "B": "giraffe"}, "timestamp": 0},
        {"topic": "source2", "key": {"K": "0"}, "value": {"A": 789, "B": "turtle"}, "timestamp": 0}
      ],
      "outputs": [
          // doesn't matter
      ]
}

Expected behavior The join should execute correctly

Additional context I think the right approach here is not to forward the schema id along to the repartition serdes. They should just generate their own schema to use for the repartition topic.

rodesai commented 2 years ago

Same as https://github.com/confluentinc/ksql/issues/9162

suhas-satish commented 2 years ago

@bvarghese1 , @spena mentioned this is similar to a fix he's working on for https://github.com/confluentinc/ksql/issues/9169 . Maybe you wan to wait before starting this work ?

bvarghese1 commented 2 years ago

Will sync with @spena regarding this. Thank you for the update @suhas-satish