confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
124 stars 1.04k forks source link

KSQL 6.1.0, creating JSON stream with key resulting in serialization exception. #7344

Open manpreet1992 opened 3 years ago

manpreet1992 commented 3 years ago

Describe the bug We were trying creating a json stream with one column from kafka messages's key and other from values and stream creation was successful. But when we executed "select *" on the stream, the response was empty and we found serialization exception in ksql logs.

To Reproduce Steps to reproduce the behavior, include:

  1. Install KSQL 6.1.0.
  2. Create json stream. CREATE STREAM s99 (c4 INTEGER KEY, c1 VARCHAR, c2 INTEGER) WITH (kafka_topic='new', format='json', PARTITIONS=4, REPLICAS=3);
  3. Produce json data to kafka topic 'new'.
    confluent-6.1.1/bin/kafka-console-producer --broker-list kf-manpreeta-0.kf-manpreeta-headless.manpreet.svc.cluster.local:9092 --topic new --property key.separator=#  --property parse.key=true --property value.serializer=custom.class.serialization.JsonSerializer --property  key.serializer=custom.class.serialization.JsonSerializer
    {"c4":1}#{"c1":"Hi","c2":12}
    {"c4":2}#{"c1":"Hello","c2":24}
    {"c4":3}#{"c1":"World","c2":36}
    {"c4":4}#{"c1":"test","c2":48}
  4. Execute select command. SET 'auto.offset.reset'='earliest'; SELECT * FROM s99 EMIT CHANGES;

Expected behavior

+-----------------------------------------+-----------------------------------------+-----------------------------------------+
|C4                                                          |C1                                                         |C2                                                          |
+-----------------------------------------+-----------------------------------------+-----------------------------------------+
1                                                              Hi                                                            12
2                                                              Hello                                                       24
3                                                              World                                                      36
4                                                              test                                                          48

Actual behaviour

+-----------------------------------------+-----------------------------------------+-----------------------------------------+
|C4                                                          |C1                                                         |C2                                                          |
+-----------------------------------------+-----------------------------------------+-----------------------------------------+

A clear and concise description of what actually happens, including:

  1. CLI output
    +-----------------------------------------+-----------------------------------------+-----------------------------------------+
    |C4                                                          |C1                                                         |C2                                                          |
    +-----------------------------------------+-----------------------------------------+-----------------------------------------+
  2. Error messages No error msg.
  3. KSQL logs
    {"type":"log", "host":"manpreetksql-ckaf-ksql-5d7c4665b5-947zl", "level":"ERROR", "neid":"ksql-cfd5ccf2af7f47868e83471a5b603408", "system":"ksql", "time":"2021-04-05T10:51:46.154Z", "timezone":"UTC", "log":{"message":"_confluent-ksql-manpreetksqltransient_7706520502061733660_1617619905744-43716e38-aa03-42c5-91cd-c6dc0c0241a1-StreamThread-2 - processing.7706520502061733660.KsqlTopic.Source.deserializer - {"type":0,"deserializationError":{"target":"key","errorMessage":"Failed to deserialize key from topic: new. Can't convert type. sourceType: ObjectNode, requiredType: INTEGER, path: $","recordB64":null,"cause":["Can't convert type. sourceType: ObjectNode, requiredType: INTEGER, path: $","Can't convert type. sourceType: ObjectNode, requiredType: INTEGER"],"topic":"new"},"recordProcessingError":null,"productionError":null,"serializationError":null,"kafkaStreamsThreadError":null}"}}
    {"type":"log", "host":"manpreetksql-ckaf-ksql-5d7c4665b5-947zl", "level":"WARN", "neid":"ksql-cfd5ccf2af7f47868e83471a5b603408", "system":"ksql", "time":"2021-04-05T10:51:46.158Z", "timezone":"UTC", "log":{"message":"_confluent-ksql-manpreetksqltransient_7706520502061733660_1617619905744-43716e38-aa03-42c5-91cd-c6dc0c0241a1-StreamThread-2 - org.apache.kafka.streams.processor.internals.StreamThread - Exception caught during Deserialization, taskId: 0_1, topic: new, partition: 1, offset: 0"}}
    org.apache.kafka.common.errors.SerializationException: Failed to deserialize key from topic: new. Can't convert type. sourceType: ObjectNode, requiredType: INTEGER, path: $
    Caused by: io.confluent.ksql.serde.json.KsqlJsonDeserializer$CoercionException: Can't convert type. sourceType: ObjectNode, requiredType: INTEGER, path: $
        at io.confluent.ksql.serde.json.KsqlJsonDeserializer.enforceFieldType(KsqlJsonDeserializer.java:155)
        at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:117)
        at io.confluent.ksql.serde.unwrapped.UnwrappedDeserializer.deserialize(UnwrappedDeserializer.java:47)
        at io.confluent.ksql.serde.unwrapped.UnwrappedDeserializer.deserialize(UnwrappedDeserializer.java:26)
        at io.confluent.ksql.serde.GenericKeySerDe$GenericKeyDeserializer.deserialize(GenericKeySerDe.java:236)
        at io.confluent.ksql.serde.GenericKeySerDe$GenericKeyDeserializer.deserialize(GenericKeySerDe.java:213)
        at io.confluent.ksql.logging.processing.LoggingDeserializer.tryDeserialize(LoggingDeserializer.java:60)
        at io.confluent.ksql.logging.processing.LoggingDeserializer.deserialize(LoggingDeserializer.java:47)
        at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
        at org.apache.kafka.streams.processor.internals.SourceNode.deserializeKey(SourceNode.java:54)
        at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:65)
        at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
        at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
        at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:185)
        at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:888)
        at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1008)
        at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:783)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:614)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
    Caused by: java.lang.IllegalArgumentException: Can't convert type. sourceType: ObjectNode, requiredType: INTEGER
        at io.confluent.ksql.serde.json.JsonSerdeUtils.invalidConversionException(JsonSerdeUtils.java:137)
        at io.confluent.ksql.serde.json.JsonSerdeUtils.invalidConversionException(JsonSerdeUtils.java:145)
        at io.confluent.ksql.serde.json.JsonSerdeUtils.toInteger(JsonSerdeUtils.java:100)
        at io.confluent.ksql.serde.json.KsqlJsonDeserializer.lambda$static$2(KsqlJsonDeserializer.java:71)
        at io.confluent.ksql.serde.json.KsqlJsonDeserializer.enforceFieldType(KsqlJsonDeserializer.java:151)
        ... 19 more
    {"type":"log", "host":"manpreetksql-ckaf-ksql-5d7c4665b5-947zl", "level":"WARN", "neid":"ksql-cfd5ccf2af7f47868e83471a5b603408", "system":"ksql", "time":"2021-04-05T10:51:46.159Z", "timezone":"UTC", "log":{"message":"_confluent-ksql-manpreetksqltransient_7706520502061733660_1617619905744-43716e38-aa03-42c5-91cd-c6dc0c0241a1-StreamThread-1 - org.apache.kafka.streams.processor.internals.RecordDeserializer - stream-thread [_confluent-ksql-manpreetksqltransient_7706520502061733660_1617619905744-43716e38-aa03-42c5-91cd-c6dc0c0241a1-StreamThread-1] task [0_0] Skipping record due to deserialization error. topic=[new] partition=[0] offset=[0]"}}
    org.apache.kafka.common.errors.SerializationException: Failed to deserialize key from topic: new. Can't convert type. sourceType: ObjectNode, requiredType: INTEGER, path: $
    Caused by: io.confluent.ksql.serde.json.KsqlJsonDeserializer$CoercionException: Can't convert type. sourceType: ObjectNode, requiredType: INTEGER, path: $
        at io.confluent.ksql.serde.json.KsqlJsonDeserializer.enforceFieldType(KsqlJsonDeserializer.java:155)
        at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:117)
        at io.confluent.ksql.serde.unwrapped.UnwrappedDeserializer.deserialize(UnwrappedDeserializer.java:47)
        at io.confluent.ksql.serde.unwrapped.UnwrappedDeserializer.deserialize(UnwrappedDeserializer.java:26)
        at io.confluent.ksql.serde.GenericKeySerDe$GenericKeyDeserializer.deserialize(GenericKeySerDe.java:236)
        at io.confluent.ksql.serde.GenericKeySerDe$GenericKeyDeserializer.deserialize(GenericKeySerDe.java:213)
        at io.confluent.ksql.logging.processing.LoggingDeserializer.tryDeserialize(LoggingDeserializer.java:60)
        at io.confluent.ksql.logging.processing.LoggingDeserializer.deserialize(LoggingDeserializer.java:47)
        at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
        at org.apache.kafka.streams.processor.internals.SourceNode.deserializeKey(SourceNode.java:54)
        at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:65)
        at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
        at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
        at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:185)
        at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:888)
        at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1008)
        at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:783)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:614)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
    Caused by: java.lang.IllegalArgumentException: Can't convert type. sourceType: ObjectNode, requiredType: INTEGER
        at io.confluent.ksql.serde.json.JsonSerdeUtils.invalidConversionException(JsonSerdeUtils.java:137)
        at io.confluent.ksql.serde.json.JsonSerdeUtils.invalidConversionException(JsonSerdeUtils.java:145)
        at io.confluent.ksql.serde.json.JsonSerdeUtils.toInteger(JsonSerdeUtils.java:100)
        at io.confluent.ksql.serde.json.KsqlJsonDeserializer.lambda$static$2(KsqlJsonDeserializer.java:71)
        at io.confluent.ksql.serde.json.KsqlJsonDeserializer.enforceFieldType(KsqlJsonDeserializer.java:151)
        ... 19 more
    {"type":"log", "host":"manpreetksql-ckaf-ksql-5d7c4665b5-947zl", "level":"ERROR", "neid":"ksql-cfd5ccf2af7f47868e83471a5b603408", "system":"ksql", "time":"2021-04-05T10:51:46.160Z", "timezone":"UTC", "log":{"message":"_confluent-ksql-manpreetksqltransient_7706520502061733660_1617619905744-43716e38-aa03-42c5-91cd-c6dc0c0241a1-StreamThread-3 - processing.7706520502061733660.KsqlTopic.Source.deserializer - {"type":0,"deserializationError":{"target":"key","errorMessage":"Failed to deserialize key from topic: new. Can't convert type. sourceType: ObjectNode, requiredType: INTEGER, path: $","recordB64":null,"cause":["Can't convert type. sourceType: ObjectNode, requiredType: INTEGER, path: $","Can't convert type. sourceType: ObjectNode, requiredType: INTEGER"],"topic":"new"},"recordProcessingError":null,"productionError":null,"serializationError":null,"kafkaStreamsThreadError":null}"}}
    {"type":"log", "host":"manpreetksql-ckaf-ksql-5d7c4665b5-947zl", "level":"WARN", "neid":"ksql-cfd5ccf2af7f47868e83471a5b603408", "system":"ksql", "time":"2021-04-05T10:51:46.160Z", "timezone":"UTC", "log":{"message":"_confluent-ksql-manpreetksqltransient_7706520502061733660_1617619905744-43716e38-aa03-42c5-91cd-c6dc0c0241a1-StreamThread-3 - org.apache.kafka.streams.processor.internals.StreamThread - Exception caught during Deserialization, taskId: 0_2, topic: new, partition: 2, offset: 0"}}
    org.apache.kafka.common.errors.SerializationException: Failed to deserialize key from topic: new. Can't convert type. sourceType: ObjectNode, requiredType: INTEGER, path: $
    Caused by: io.confluent.ksql.serde.json.KsqlJsonDeserializer$CoercionException: Can't convert type. sourceType: ObjectNode, requiredType: INTEGER, path: $
        at io.confluent.ksql.serde.json.KsqlJsonDeserializer.enforceFieldType(KsqlJsonDeserializer.java:155)
        at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:117)
        at io.confluent.ksql.serde.unwrapped.UnwrappedDeserializer.deserialize(UnwrappedDeserializer.java:47)
        at io.confluent.ksql.serde.unwrapped.UnwrappedDeserializer.deserialize(UnwrappedDeserializer.java:26)
        at io.confluent.ksql.serde.GenericKeySerDe$GenericKeyDeserializer.deserialize(GenericKeySerDe.java:236)
        at io.confluent.ksql.serde.GenericKeySerDe$GenericKeyDeserializer.deserialize(GenericKeySerDe.java:213)
        at io.confluent.ksql.logging.processing.LoggingDeserializer.tryDeserialize(LoggingDeserializer.java:60)
        at io.confluent.ksql.logging.processing.LoggingDeserializer.deserialize(LoggingDeserializer.java:47)
        at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
        at org.apache.kafka.streams.processor.internals.SourceNode.deserializeKey(SourceNode.java:54)
        at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:65)
        at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
        at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
        at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:185)
        at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:888)
        at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1008)
        at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:783)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:614)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
    Caused by: java.lang.IllegalArgumentException: Can't convert type. sourceType: ObjectNode, requiredType: INTEGER
        at io.confluent.ksql.serde.json.JsonSerdeUtils.invalidConversionException(JsonSerdeUtils.java:137)
        at io.confluent.ksql.serde.json.JsonSerdeUtils.invalidConversionException(JsonSerdeUtils.java:145)
        at io.confluent.ksql.serde.json.JsonSerdeUtils.toInteger(JsonSerdeUtils.java:100)
        at io.confluent.ksql.serde.json.KsqlJsonDeserializer.lambda$static$2(KsqlJsonDeserializer.java:71)
        at io.confluent.ksql.serde.json.KsqlJsonDeserializer.enforceFieldType(KsqlJsonDeserializer.java:151)
        ... 19 more

Additional context Add any other context about the problem here. @apurvam

vcrfxia commented 3 years ago

Hi @manpreet1992 , from the example data you shared, your keys are not JSON integers but rather JSON objects containing an integer field. This means your stream must be declared as

CREATE STREAM s99 (my_key STRUCT<c4 INTEGER> KEY, c1 VARCHAR, c2 INTEGER) WITH (kafka_topic='new', format='json', PARTITIONS=4, REPLICAS=3);

instead. This should resolve the deserialization issues with your keys.

I'm going to mark this as closed for now. Feel free to reopen if your issue is still not resolved.

neuromantik33 commented 2 years ago

I would like to re-open this issue as it is something I've often encountered with the ksql-test-runner. My typical use case is to generate test data from an avro topic as described here. I will present the following example

ksql> show streams;

 Stream Name                | Kafka Topic                       | Key Format | Value Format | Windowed 
-------------------------------------------------------------------------------------------------------
 ADMISSIONS_ID_CHECKS_USERS | admissions.public.id_checks_users | AVRO       | AVRO         | false    
-------------------------------------------------------------------------------------------------------
ksql> describe ADMISSIONS_ID_CHECKS_USERS;

Name                 : ADMISSIONS_ID_CHECKS_USERS
 Field                  | Type                   
-------------------------------------------------
 ROWKEY                 | BIGINT           (key) 
 ID                     | BIGINT                 
 ID_CHECK_ID            | BIGINT                 
 USER_ID                | BIGINT                       
-------------------------------------------------

This represents my schema, thus by executing a dump as above I come up with the following input.json

kcat -C -e -J -E \
  -b "$CC_BOOTSTRAP_SERVERS" \
  -t admissions.public.id_checks_users \
  -X security.protocol=SASL_SSL \
  -X sasl.mechanisms=PLAIN \
  -X sasl.username="$CC_API_KEY" \
  -X sasl.password="$CC_API_SECRET" \
  -s avro \
  -r "$SCHEMA_REGISTRY" | \
  jq --slurp \
    "{inputs:[.[] | select(.key == $ID_CHECKS_USERS_ID) | {topic: .topic, timestamp: .ts, key: .key, value: .payload}]}" \
  > $PWD/input.json

And the corresponding input.json

{
  "inputs": [
    {
      "topic": "admissions.public.id_checks_users",
      "timestamp": 1646643836312,
      "key": 82549,
      "value": {
        "id": 82549,
        "id_check_id": {
          "long": 381
        },
        "user_id": {
          "long": 369290
        }
      }
    }
  ]
}

Since the underlying avro schema allows null values (all ksqldb fields are nullable to my knowledge), this represents the normal json representation. Anyhow when creating the statements.sql and preparing the output.json, all events get ignored due to the following exception.

CREATE
STREAM admissions_id_checks_users
(
    id BIGINT KEY,
    id_check_id BIGINT,
    user_id BIGINT
) WITH (KAFKA_TOPIC = 'admissions.public.id_checks_users', KEY_FORMAT='KAFKA', VALUE_FORMAT = 'JSON');

CREATE
STREAM admissions_v2_id_checks_users
    WITH (KAFKA_TOPIC='admissions-v2.id_checks_users', KEY_FORMAT='KAFKA', VALUE_FORMAT='JSON') AS
SELECT *
FROM admissions_id_checks_users EMIT CHANGES;

Above is the statements.sql I'm trying to test, and below is the exception.

org.apache.kafka.common.errors.SerializationException: Failed to deserialize value from topic: admissions.public.id_checks_users. Can't convert type. sourceType: ObjectNode, requiredType: BIGINT, path: $.ID_CHECK_ID
    at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:145)
    at io.confluent.ksql.serde.connect.ConnectFormat$StructToListDeserializer.deserialize(ConnectFormat.java:234)
    at io.confluent.ksql.serde.connect.ConnectFormat$StructToListDeserializer.deserialize(ConnectFormat.java:213)
    at io.confluent.ksql.serde.GenericDeserializer.deserialize(GenericDeserializer.java:59)
    at io.confluent.ksql.logging.processing.LoggingDeserializer.tryDeserialize(LoggingDeserializer.java:61)
    at io.confluent.ksql.logging.processing.LoggingDeserializer.deserialize(LoggingDeserializer.java:48)
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
    at io.confluent.ksql.serde.tracked.TrackedDeserializer.deserialize(TrackedDeserializer.java:53)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:304)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:960)
    at org.apache.kafka.streams.TopologyTestDriver.enqueueTaskRecord(TopologyTestDriver.java:568)
    at org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:552)
    at org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:842)
    at org.apache.kafka.streams.TestInputTopic.pipeInput(TestInputTopic.java:115)
    at org.apache.kafka.streams.TestInputTopic.pipeInput(TestInputTopic.java:163)
    at io.confluent.ksql.test.tools.TestExecutor.processSingleRecord(TestExecutor.java:502)
    at io.confluent.ksql.test.tools.TestExecutor.pipeRecordsFromProvidedInput(TestExecutor.java:475)
    at io.confluent.ksql.test.tools.TestExecutor.buildAndExecuteQuery(TestExecutor.java:194)
    at io.confluent.ksql.test.tools.KsqlTestingTool.executeTestCase(KsqlTestingTool.java:141)
    at io.confluent.ksql.test.tools.KsqlTestingTool.runWithTripleFiles(KsqlTestingTool.java:131)
    at io.confluent.ksql.test.tools.KsqlTestingTool.main(KsqlTestingTool.java:56)
Caused by: io.confluent.ksql.serde.json.KsqlJsonDeserializer$CoercionException: Can't convert type. sourceType: ObjectNode, requiredType: BIGINT, path: $.ID_CHECK_ID
    at io.confluent.ksql.serde.json.KsqlJsonDeserializer.enforceFieldType(KsqlJsonDeserializer.java:169)
    at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:128)
    ... 24 more

I don't really believe that the proper course of action is to change all my test schemas to use STRUCT<long BIGINT>, making the test runner pretty unusable from a conformity standpoint. Of course I can mangle my data until it passes or just perform inserts manually but I believe the type system when testing JSON (as avro to my knowledge can't be used with the test runner) should be more lenient. Any suggestions would be welcome.

jnh5y commented 2 years ago

Sure, we can reopen this issue.