confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
5.88k stars 1.04k forks source link

Logical and Physical schemas do not match when converting from JSON without key to AVRO with key #8744

Open marcus-007 opened 2 years ago

marcus-007 commented 2 years ago

Describe the bug When converting the payload from a stream and topic which has the value format of json without a key, an error occurs when trying to create a stream on top with value format avro and a defined key for the stream/topic

To Reproduce Steps to reproduce the behavior, include: **1. The version of KSQL is 0.23.1

  1. Sample source data**
    {
    "payload": {
    "name": "marcus",
    "telephone": [
      {
        "telephone_type": "mobile",
        "tel_nr": "1111"
      },
      {
        "telephone_type": "fax",
        "tel_nr": "2222"
      },
      {
        "telephone_type": "landline",
        "tel_nr": "3333"
      }
    ]
    }
    }

    3. Any SQL statements you ran The topic data looks like this:

    Key format: ¯\_(ツ)_/¯ - no data processed
    Value format: JSON or KAFKA_STRING
    rowtime: 2022/02/09 16:13:47.231 Z, key: <null>, value: {"payload": {"name": "marcus","telephone": [{"telephone_type": "mobile","tel_nr": "1111"},{"telephone_type": "fax","tel_nr": "2222"},{"telephone_type": "landline","tel_nr": "3333"}]}}, partition: 1

Step 1: Create Stream on top of topic data

create or replace stream source_multiactive_basic_data
(payload struct<
            name varchar,
            telephone array<
                            struct<
                            telephone_type varchar,
                            tel_nr varchar
                            >
                        >
                    >
) with (kafka_topic='source_multiactive', value_format='json');

Option 1: Create CSAS

CREATE STREAM explode_key_1  
WITH (kafka_topic='ma_explode_key_1', format='avro')
AS SELECT 
    payload->NAME as NAME,
    explode(payload->TELEPHONE)->TEL_NR as TEL_NR, 
    explode(payload->TELEPHONE)->TELEPHONE_TYPE as TELEPHONE_TYPE
 FROM source_multiactive_basic_data 
 PARTITION BY payload->NAME;

Option 2: Create empty stream with insert afterwards

Create stream explode_key_1  
(
   NAME VARCHAR KEY,
   TEL_NR VARCHAR,
   TELEPHONE_TYPE varchar

)
with (kafka_topic='ma_explode_key_1', value_format='avro',key_format='AVRO');
INSERT INTO explode_key_1
 select 
    payload->NAME as NAME,
    explode(payload->TELEPHONE)->TEL_NR as TEL_NR, 
    explode(payload->TELEPHONE)->TELEPHONE_TYPE as TELEPHONE_TYPE
 from source_multiactive_basic_data 
 partition by payload->NAME
 emit changes;

Expected behavior By utilizing the partition by clause i would expect that the column is created as key and no schema mismatch occurs.

Actual behaviour Schema mismatch does not allow to create streams or inserts as key column is twice in physical schema CLI Output for Option 1

Could not determine output schema for query due to error: Logical and Physical schemas do not match!
Logical : `NAME` STRING, `TEL_NR` STRING, `TELEPHONE_TYPE` STRING
Physical: `NAME` STRING KEY, `NAME` STRING, `TEL_NR` STRING, `TELEPHONE_TYPE` STRING
Statement: CREATE STREAM EXPLODE_KEY_3 WITH (FORMAT='avro', KAFKA_TOPIC='ma_explode_key_3', PARTITIONS=10, REPLICAS=1) AS SELECT
  SOURCE_MULTIACTIVE_BASIC_DATA.PAYLOAD->NAME NAME,
  EXPLODE(SOURCE_MULTIACTIVE_BASIC_DATA.PAYLOAD->TELEPHONE)->TEL_NR TEL_NR,
  EXPLODE(SOURCE_MULTIACTIVE_BASIC_DATA.PAYLOAD->TELEPHONE)->TELEPHONE_TYPE TELEPHONE_TYPE
FROM SOURCE_MULTIACTIVE_BASIC_DATA SOURCE_MULTIACTIVE_BASIC_DATA
PARTITION BY SOURCE_MULTIACTIVE_BASIC_DATA.PAYLOAD->NAME
EMIT CHANGES;

CLI Output for Option 2

Logical and Physical schemas do not match!
Logical : `NAME` STRING, `TEL_NR` STRING, `TELEPHONE_TYPE` STRING
Physical: `NAME` STRING KEY, `NAME` STRING, `TEL_NR` STRING, `TELEPHONE_TYPE` STRING

Additional context Add any other context about the problem here.

mjsax commented 2 years ago

Thanks for reporting this issue. -- I agree that the PARTITION BY payload->NAME should imply that NAME becomes the key, and there is no (duplicated) "value column" NAME.

Not sure if it's related to AVRO/JSON conversion, or if it's because of the struct dereferencing payload->NAME in the PARTITION BY clause (or something else).

Maybe you can try to split it into multiple queries to work around the bug?

marcus-007 commented 2 years ago

Hi @mjsax : yep i worked around this issue like you mentioned in a 2 step approach by setting the key after conversion to avro without a key first and then i reassign the keys in a second query