confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
102 stars 1.04k forks source link

INSERT INTO statement fails on stream with defined VALUE_SCHEMA_FULL_NAME and VALUE_SCHEMA_ID for Avro schema w/ struct #9183

Closed randyx123 closed 2 years ago

randyx123 commented 2 years ago

Describe the bug When attempting INSERT INTO <stream> values(); statements to a stream with a defined VALUE_SCHEMA_FULL_NAME and VALUE_SCHEMA_ID referencing an AVRO schema with a struct data type fail with the error Could not serialize value. Error serializing message to topic: <topic-name>. Struct schemas do not match.

To Reproduce KSQL Version: 0.26.0 Example:

  1. Created topic "tmp-simple" with associated Avro schema:
    {
    "fields": [
    {
      "name": "field1",
      "type": "string"
    },
    {
      "default": null,
      "name": "field2",
      "type": [
        "null",
        {
          "fields": [
            {
              "default": null,
              "name": "foo",
              "type": [
                "null",
                "string"
              ]
            },
            {
              "default": null,
              "name": "bar",
              "type": [
                "null",
                "int"
              ]
            }
          ],
          "name": "tmpField2",
          "type": "record"
        }
      ]
    }
    ],
    "name": "tmpSimple",
    "namespace": "io.confluent.test",
    "type": "record"
    }
  2. Created stream using topic, and specify the VALUE_SCHEMA_ID: create stream tmp_simple with(kafka_topic='tmp_simple', key_format='kafka', value_format='avro', value_schema_id=100120, partitions=1);
  3. Attempt to INSERT value into the stream, returning the error:
    INSERT INTO tmp_simple (`field1`,`field2`) VALUES('s1', struct(`foo`:='bar',`bar`:=2));
    Failed to insert values into 'TMP_SIMPLE'. Could not serialize value: [ 's1' | Struct{foo=bar,bar=2} ]. Error serializing message to topic: tmp-simple. Struct schemas do not match.

    Additional context If a stream is made without specifying VALUE_SCHEMA_ID, such that ksql create the topic and schema for example, there are no issues with INSERT statements executing:

    CREATE STREAM TMP_SIMPLE_AUTO (`field1` STRING, `field2` STRUCT<`foo` STRING, `bar` INTEGER>) WITH (KAFKA_TOPIC='tmp-simple-auto', KEY_FORMAT='kafka', PARTITIONS=1, VALUE_FORMAT='avro');
    INSERT INTO  tmp_simple_auto(`field1`,`field2`) VALEUS('s1', struct(`foo`:='bar',`bar`:=2));

    Value returned in query:

    {
    "field1": "s1",
    "field2": {
    "foo": "bar",
    "bar": 2
    }
    }

    Schema created by ksql:

    {
    "fields": [
    {
      "default": null,
      "name": "field1",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "default": null,
      "name": "field2",
      "type": [
        "null",
        {
          "fields": [
            {
              "default": null,
              "name": "foo",
              "type": [
                "null",
                "string"
              ]
            },
            {
              "default": null,
              "name": "bar",
              "type": [
                "null",
                "int"
              ]
            }
          ],
          "name": "KsqlDataSourceSchema_field2",
          "type": "record"
        }
      ]
    }
    ],
    "name": "KsqlDataSourceSchema",
    "namespace": "io.confluent.ksql.avro_schemas",
    "type": "record"
    }
suhas-satish commented 2 years ago

@spena , can you test this out ?

kurtis-ley commented 2 years ago

Hi,

I'm facing the same exact problem and I believe I know why. In my Avro schema, I have fields that are optional (it can be a struct, or it can be null, etc). I found this in the KSQL_PROCESSING_LOG with the following query:

SELECT MESSAGE->SERIALIZATIONERROR
FROM KSQL_PROCESSING_LOG 
WHERE LEVEL = 'ERROR'
    AND MESSAGE->TYPE = 3; 

However, when I use EXPLAIN, I see that the logical ksqlDB schemas defined in the SOURCE and SINK are identical. This is the only place where I could identify where struct schemas did not match.

So, is ksqlDB capable of optional values or accepting null?

spena commented 2 years ago

There's a bug when using the VALUE_SCHEMA_ID and INSERT with Avro schemas. KSQL takes a different code path in Ksql when serializing the INSERT record when a SCHEMA_ID is found in the stream.

See, during a normal CREATE (no schema id), the INSERT would create a schema with the connect names that match the SR schema. See https://github.com/confluentinc/ksql/blob/master/ksqldb-serde/src/main/java/io/confluent/ksql/serde/avro/AvroDataTranslator.java#L77

But with SCHEMA_ID, the INSERT won't make the schema compatible. It will create an INSERT schema with empty connect names which make it incompatible when serializing the value. See https://github.com/confluentinc/ksql/blob/master/ksqldb-serde/src/main/java/io/confluent/ksql/serde/avro/AvroSRSchemaDataTranslator.java#L34

Sadly there's no workaround. I tried different alternatives to bypass this buggy code but I couldn't find it. We'll fix on a bugfix for this.

lihaosky commented 2 years ago

Yeah. If we insert without schema id, new schema will be created and registered again so that insertion will always succeed. But if schema id is provided, no new schema will be registered and we need to match the exact schema from schema registry.

This is also related to https://github.com/confluentinc/ksql/issues/6091 and https://github.com/confluentinc/ksql/issues/7211. Basically connect.name and possibly other properties in physical schema might be mishandled during logical to physical schema translation time.

nicollette commented 1 year ago

Hi, Is the fix for this issue in any releases?

AndreasFischbach3003 commented 1 year ago

Can you please update if this fix is available in any release?

aliehsaeedii commented 1 year ago

@AndreasFischbach3003 @nicollette Yes it will be available in our next release. It comes in January.

sgudavalli commented 1 year ago

hi, is this fix already available in the confluent cloud. can you let me know which version of ksql got this fix. thank you