confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
102 stars 1.04k forks source link

Multiple-field key schema inference should be handled consistently between Protobuf and other formats #8222

Open mikebin opened 3 years ago

mikebin commented 3 years ago

Is your feature request related to a problem? Please describe. Key schema inference between Protobuf and other formats like Avro is not currently consistent. For example, with a simple Protobuf message containing two fields:

ksql> create stream test (val string) with (kafka_topic='test', key_format='protobuf', value_format='json');
The key schema for topic test contains multiple columns, which is not supported by ksqlDB at this time.
Schema:syntax = "proto3";
package com.mycorp.mynamespace;

message Key {
  string k1 = 1;
  string k2 = 2;
}

With essentially the same schema in Avro:

{
  "fields": [
    {
      "name": "k1",
      "type": "string"
    },
    {
      "name": "k2",
      "type": "string"
    }
  ],
  "name": "key",
  "namespace": "com.mycorp.mynamespace",
  "type": "record"
}

it works:

ksql> create stream test (val string) with (kafka_topic='test2', key_format='avro', value_format='json');

 Message
----------------
 Stream created
----------------

ksql> describe test;

Name                 : TEST
 Field  | Type
---------------------------------------------------------------
 ROWKEY | STRUCT<K1 VARCHAR(STRING), K2 VARCHAR(STRING)> (key)
 VAL    | VARCHAR(STRING)
---------------------------------------------------------------

The difference appears to be that with Avro, the key schema is wrapped in a single struct field, while with Protobuf, the top level message is unwrapped into separate fields.

Describe the solution you'd like Make behavior for multi-field key schema inference consistent.

Describe alternatives you've considered Defining the key schema explicitly is a workaround for Protobuf:

create stream test (k1 string key, k2 string key, val string) with (kafka_topic='test', key_format='protobuf', value_format='json');
lihaosky commented 3 years ago

Hi @mikebin, I was able to verify this and the problem is UNWRAP_SINGLES is supported by Avro format and when we translate the schema from schema registry to avro format, if UNWRAP_SINGLES is supported by the format, we wrap it. So the multiple fields in Avro format became a single field and bypassed the check. I think this is a bug which we should fix. If the type is already struct, we shouldn't wrap it I think.