confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
103 stars 1.04k forks source link

JsonSchemaConverter deserializes STRUCT fields as NULL when using 'additionalProperties' #9584

Closed spena closed 1 year ago

spena commented 2 years ago

Describe the bug When using the additionalProperties in a JSON_SR schema and the new JSON_SR converter is used for deserialization, the columns or fields of that schema are not correctly deserialized. They are deserialized as null values.

To Reproduce Use the following SR schema:

{
  "$id": "http://example.com/myURI.schema.json",
  "$schema": "http://json-schema.org/draft-07/schema#",
  "additionalProperties": false,
  "properties": {
    "metadata": {
      "additionalProperties": {
        "type": "string"
      },
      "type": "object"
    }
  },
  "title": "SampleRecord",
  "type": "object"
}

^ Notice the additionalProperties in the metadata column. This indicates that any number of fields can appear in the metadata object field as long as they are of string types.

Now, produce a record with a couple of fields:

{
  "metadata": {
    "a": "A",
    "b": "B"
  }
}

^ You can use the kafka console producer to produce data from a SR schema id

$ ./kafka-json-schema-console-producer --bootstrap-server localhost:9092 --topic t1 --property value.schema.id=8
{"metadata": {"a": "A", "b": "B"}}

Now, create a stream in KSQL that contains the two fields above:

ksql> create stream t1(`metadata` struct<`a` string, `b` string>) with (kafka_topic='t1', value_format='json_sr')

If you query the stream with the JSON_SR disabled, then it works fine:

ksql> set 'ksql.json_sr.converter.deserializer.enabled'='false';
ksql> select * from t1;
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|metadata                                                                                                                                                                             |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{a=A, b=B}

But if you query with JSON_SR enabled, then it shows null:

ksql> set 'ksql.json_sr.converter.deserializer.enabled'='true';
ksql> select * from t1;
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|metadata                                                                                                                                                                             |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{a=null, b=null}

Expected behavior The new JSON_SR converter deserializer should work as the previous JSON_SR converter.

Additional context The issue happens in this line which returns a JsonSchemaAndValue with null values https://github.com/confluentinc/ksql/blob/master/ksqldb-serde/src/main/java/io/confluent/ksql/serde/connect/KsqlConnectDeserializer.java#L49

spena commented 2 years ago

@rayokota is this supported in SR? The JsonSchemaConverter is returning null field values when the schema is defined using additionaProperties. I looked in the SR code and seems this' case is unsupported, but could you confirm?

xavsan commented 2 years ago

@spena I was able to reproduce the null field values. However, I'm unsure of where to paste the SR schema in order to include the additionalProperties. Hence when I run ksql> set 'ksql.json_sr.converter.deserializer.enabled'='true'; I get the following error message: Not recognizable as ksql, streams, consumer, or producer property: 'ksql.json_sr.converter.deserializer.enabled' Screenshot from 2022-09-21 17-17-23

Appre

spena commented 2 years ago

@sanjay-awatramani do you know if SR supports the additionalProperties? Here's the question https://github.com/confluentinc/ksql/issues/9584#issuecomment-1252969778

rayokota commented 2 years ago

This use case is not supported. We do not preserve fields that do not appear explicitly in the schema.

drinehim commented 2 years ago

So are dictionaries completely unsupported with JSON_SR? This method was used because KSQL wasn't mapping to the MAP type when using

CREATE STREAM CONNECT_CORE_APPLICATION_EVALUATED_0_STRUCT
WITH ( KAFKA_TOPIC='connect.core.application.evaluated.0', VALUE_FORMAT='JSON_SR' );

We need to join based on one of the values in the dictionary.

xavsan commented 2 years ago

@drinehim if i was trying to recreate your issue, could you copy the path where you pasted the SR schema?

drinehim commented 2 years ago

Our schema looks like

{
  "$schema": "http://json-schema.org/draft-04/schema#",
  "additionalProperties": false,
  "definitions": {
    "ErrorSeverity": {
      "description": "",
      "enum": [
        "Error",
        "Warning"
      ],
      "type": "string",
      "x-enumNames": [
        "Error",
        "Warning"
      ]
    },
    "EventContext": {
      "additionalProperties": false,
      "properties": {
        "correlationId": {
          "format": "guid",
          "type": "string"
        },
        "metadata": {
          "items": {
            "$ref": "#/definitions/KeyValuePairOfStringAndString"
          },
          "type": "array"
        },
        "triggeringUser": {
          "$ref": "#/definitions/User"
        }
      },
      "type": "object"
    },
    "KeyValuePairOfStringAndString": {
      "additionalProperties": false,
      "properties": {
        "key": {
          "type": "string"
        },
        "value": {
          "type": "string"
        }
      },
      "type": "object"
    },
    "ResourceLocator": {
      "additionalProperties": false,
      "properties": {
        "parentId": {
          "format": "guid",
          "type": "string"
        },
        "parentType": {
          "type": "string"
        }
      },
      "type": "object"
    },
    "ResponseMessageOfValidationResponseMessage": {
      "additionalProperties": false,
      "properties": {
        "additionalResults": {
          "items": {
            "$ref": "#/definitions/ResponseResult"
          },
          "type": "array"
        },
        "payload": {
          "oneOf": [
            {
              "type": "null"
            },
            {
              "$ref": "#/definitions/ValidationResponseMessage"
            }
          ]
        },
        "result": {
          "$ref": "#/definitions/ResponseResult"
        }
      },
      "type": "object"
    },
    "ResponseResult": {
      "additionalProperties": false,
      "properties": {
        "key": {
          "type": "string"
        },
        "publicReason": {
          "type": "string"
        },
        "reason": {
          "type": "string"
        },
        "severity": {
          "oneOf": [
            {
              "type": "null"
            },
            {
              "$ref": "#/definitions/ErrorSeverity"
            }
          ]
        },
        "status": {
          "type": "string"
        }
      },
      "type": "object"
    },
    "User": {
      "additionalProperties": false,
      "properties": {
        "claims": {
          "items": {
            "$ref": "#/definitions/UserClaim"
          },
          "type": "array"
        },
        "userName": {
          "type": "string"
        }
      },
      "type": "object"
    },
    "UserClaim": {
      "additionalProperties": false,
      "properties": {
        "issuer": {
          "type": "string"
        },
        "originalIssuer": {
          "type": "string"
        },
        "type": {
          "type": "string"
        },
        "value": {
          "type": "string"
        },
        "valueType": {
          "type": "string"
        }
      },
      "type": "object"
    },
    "ValidationResponseMessage": {
      "additionalProperties": false,
      "properties": {
        "ingestionDate": {
          "format": "date-time",
          "type": "string"
        },
        "metadata": {
          "additionalProperties": {
            "type": "string"
          },
          "type": "object"
        },
        "parent": {
          "$ref": "#/definitions/ResourceLocator"
        },
        "topic": {
          "type": "string"
        }
      },
      "type": "object"
    }
  },
  "properties": {
    "context": {
      "$ref": "#/definitions/EventContext"
    },
    "exception": {
      "type": [
        "null",
        "string"
      ]
    },
    "message": {
      "$ref": "#/definitions/ResponseMessageOfValidationResponseMessage"
    },
    "retryGroup": {
      "type": [
        "null",
        "string"
      ]
    },
    "waitUntil": {
      "format": "date-time",
      "type": [
        "null",
        "string"
      ]
    }
  },
  "title": "InFlightEventOfResponseMessageOfValidationResponseMessage",
  "type": "object"
}

It is generated by the dotnet kafka client.

The primary place to look at is the metadata property under the ValidationResponseMessage definition.

xavsan commented 2 years ago

@drinehim so i poked around inside the kafka client and couldn't find the generated schema, thus I'm still getting the Not recognizable as ksql, streams, consumer, or producer property: 'ksql.json_sr.converter.deserializer.enabled' error in the ksqldb-cli. Is there something I'm missing? is your schema stored at ./bin in the kafka client?

spena commented 2 years ago

@xavsan That issue might be related to the version of ksql than the SR schema. Which version are you using? The above config was added in 0.28

drinehim commented 2 years ago

@xavsan yes. I recreated this locally. I had to use 0.28.1 which sets ksql.json_sr.converter.deserializer.enabled to true by default. (ksql.json_sr.converter.deserializer.enabled set to false through environment variables does fix this but seems like a short term solution).

xavsan commented 2 years ago

Oh that makes a lot of sense. I'm running 0.27.2. I'll switch over to 0.28, but I think that should do it

spena commented 2 years ago

@drinehim One option to make it work with the new JSON_SR converter is to specify the metadata fields explicitly in the SR schema. That way SR will be able to return the fields for the record to KSQL.

Btw, if SR does not support the additionalProperties case, that means you are not using any other client to deserialize SR records besides using KSQL? Prior to KSQL 0.28, JSON_SR records were deserialized using plan JSON converters which it completely ignores whether the additionalProperties is used or not (it only take the fields from the record). With the new converter, SR uses the SR schema fields and extracts those from the record. So the problem is the SR schema does not have any fields declared.

drinehim commented 2 years ago

@spena We primarily use the dotnet client which has no problem deserializing.

The plain json converter vs. SR converter it the conclusion I came to as well. So my biggest question, long term, is is there any way with JSON schema to tell KSQL that the field should be a MAP field? If we had that then we should be able to let KSQL define the object model instead of us doing it and the serializer should work.

spena commented 2 years ago

@drinehim I created a JSON_SR stream with a MAP ((metadata map<string, string>)) and this is what registered in SR:

{
  "properties": {
    "METADATA": {
      "connect.index": 0,
      "oneOf": [
        {
          "type": "null"
        },
        {
          "connect.type": "map",
          "items": {
            "properties": {
              "key": {
                "connect.index": 0,
                "oneOf": [
                  {
                    "type": "null"
                  },
                  {
                    "type": "string"
                  }
                ]
              },
              "value": {
                "connect.index": 1,
                "oneOf": [
                  {
                    "type": "null"
                  },
                  {
                    "type": "string"
                  }
                ]
              }
            },
            "type": "object"
          },
          "type": "array"
        }
      ]
    }
  },
  "type": "object"
}

I then created another stream to infer the SR schema and it inferred the metadata field as a MAP.

ksql> create stream test2 with (kafka_topic='test', value_format='json_sr');

 Message        
----------------
 Stream created 
----------------
ksql> describe test2;

Name                 : TEST2
 Field | Type                         
--------------------------------------
 METADATA   | MAP<STRING, VARCHAR(STRING)> 
--------------------------------------

That works fine.

drinehim commented 2 years ago

@spena I fiddled around with things and found that simply adding the "connect:type": "map" metadata to a normal json dictionary field worked. The schema essentially looks like

{
  "$id": "http://example.com/myURI.schema.json",
  "$schema": "http://json-schema.org/draft-07/schema#",
  "additionalProperties": false,
  "properties": {
    "metadata": {
      "additionalProperties": {
        "type": "string"
      },
      "type": "object",
      "connect.type": "map"
    }
  },
  "title": "SampleRecord",
  "type": "object"
}

It seems to me like that is what clues KSQL into a map. Without it, using the schema you sent previously, KSQL generates an array field instead of a map.

Is this another "it just happens to work" or would this be supported? I would strongly prefer to keep a dictionary as KSQL is tapping into existing messages for systems in place. I don't want to have to eliminate dictionaries across the board just in case we want to start using KSQL against them.

spena commented 2 years ago

@drinehim Nice!. It is actually supported. The SR API builds a MAP when the connect.type is found. It uses STRING as the key and the additionalProperties as the value. See the code below I found from SR when it infers the schema:

 String type = (String) objectSchema.getUnprocessedProperties().get(CONNECT_TYPE_PROP);
  if (CONNECT_TYPE_MAP.equals(type)) {
    builder = SchemaBuilder.map(Schema.STRING_SCHEMA,
        toConnectSchema(ctx, objectSchema.getSchemaOfAdditionalProperties())
    );
  }

I also tried an insert/select on KSQL and it worked.

ksql> insert into test(metadata) values(map('k1' := 'v1', 'k2' := 'v2'));
ksql> select metadata['k1'] as k1, metadata['k2'] as k2 from test;
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+
|K1                                                                                                 |K2                                                                                                 |
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+
|v1                                                                                                 |v2                                                                                                 |
Query Completed
drinehim commented 2 years ago

@spena Obviously this broke us when it was released, but looking long term I think we should be good since we can use the "connect.type" property. I am ok with closing this. Note: Note sure it was ever pointed out but my company is what this was opened for.