Describe the bug
For data in stream with nested map attributes (ie. map of maps) and its value format is JSON_SR, select query will fail with serialization exception. Note that inserts still works.
To Reproduce
Steps to reproduce the behavior, include:
The version of KSQL: 0.24.0
create the following stream with nested map (attribute m)
create stream if not exists nestedmap_json_sr
(
id int,
m Map<String, Map<String, int>>
)
with (
kafka_topic = 'nestedmap_json_sr',
partitions = 1,
value_format = 'JSON_SR'
);
insert some data
insert into nestedmap_json_sr (id, m) values (1, MAP('name' := MAP('nested1' := 1)));
insert into nestedmap_json_sr (id, m) values (2, MAP('name' := MAP('nested1' := 2)));
insert into nestedmap_json_sr (id, m) values (3, MAP('name' := MAP('nested1' := 3)));
now run the select query and no data would be emitted
select * from nestedmap_json_sr emit changes;
Expected behavior
The expected behavior is data previously inserted will be emitted.
Actual behavior
No results is emitted from ksqldb CLI
Error messages from ksqldb server:
ksqldb-server | [2022-03-08 04:00:13,258] ERROR {"type":0,"deserializationError":{"target":"value","errorMessage":"Failed to deserialize value from topic: nestedmap_json_sr. Can't convert type. sourceType: ArrayNode, requiredType: MAP<VARCHAR, MAP<VARCHAR, INT>>, path: $.M","recordB64":null,"cause":["Can't convert type. sourceType: ArrayNode, requiredType: MAP<VARCHAR, MAP<VARCHAR, INT>>, path: $.M","Can't convert type. sourceType: ArrayNode, requiredType: MAP<VARCHAR, MAP<VARCHAR, INT>>, path: .M","Can't convert type. sourceType: ArrayNode, requiredType: MAP<VARCHAR, MAP<VARCHAR, INT>>"],"topic":"nestedmap_json_sr"},"recordProcessingError":null,"productionError":null,"serializationError":null,"kafkaStreamsThreadError":null} (processing.transient_NESTEDMAP_JSON_SR_8793810493621047377.KsqlTopic.Source.deserializer:44)
ksqldb-server | [2022-03-08 04:00:13,259] WARN stream-thread [_confluent-ksql-default_transient_transient_NESTEDMAP_JSON_SR_8793810493621047377_1646712013000-709af2c2-c644-49e5-ac3b-802cff431063-StreamThread-1] task [0_0] Skipping record due to deserialization error. topic=[nestedmap_json_sr] partition=[0] offset=[2] (org.apache.kafka.streams.processor.internals.RecordDeserializer:89)
ksqldb-server | org.apache.kafka.common.errors.SerializationException: Failed to deserialize value from topic: nestedmap_json_sr. Can't convert type. sourceType: ArrayNode, requiredType: MAP<VARCHAR, MAP<VARCHAR, INT>>, path: $.M
ksqldb-server | at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:145)
ksqldb-server | at io.confluent.ksql.serde.connect.ConnectFormat$StructToListDeserializer.deserialize(ConnectFormat.java:237)
ksqldb-server | at io.confluent.ksql.serde.connect.ConnectFormat$StructToListDeserializer.deserialize(ConnectFormat.java:216)
ksqldb-server | at io.confluent.ksql.serde.GenericDeserializer.deserialize(GenericDeserializer.java:59)
ksqldb-server | at io.confluent.ksql.logging.processing.LoggingDeserializer.tryDeserialize(LoggingDeserializer.java:61)
ksqldb-server | at io.confluent.ksql.logging.processing.LoggingDeserializer.deserialize(LoggingDeserializer.java:48)
ksqldb-server | at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
ksqldb-server | at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58)
ksqldb-server | at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
ksqldb-server | at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:178)
ksqldb-server | at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
ksqldb-server | at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:304)
ksqldb-server | at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:966)
ksqldb-server | at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1068)
ksqldb-server | at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:979)
ksqldb-server | at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:767)
ksqldb-server | at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:620)
ksqldb-server | at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:584)
ksqldb-server | Caused by: io.confluent.ksql.serde.json.KsqlJsonDeserializer$CoercionException: Can't convert type. sourceType: ArrayNode, requiredType: MAP<VARCHAR, MAP<VARCHAR, INT>>, path: $.M
ksqldb-server | at io.confluent.ksql.serde.json.KsqlJsonDeserializer.enforceFieldType(KsqlJsonDeserializer.java:169)
ksqldb-server | at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:128)
ksqldb-server | ... 17 more
ksqldb-server | Caused by: io.confluent.ksql.serde.json.KsqlJsonDeserializer$CoercionException: Can't convert type. sourceType: ArrayNode, requiredType: MAP<VARCHAR, MAP<VARCHAR, INT>>, path: .M
ksqldb-server | at io.confluent.ksql.serde.json.KsqlJsonDeserializer.enforceFieldType(KsqlJsonDeserializer.java:171)
ksqldb-server | at io.confluent.ksql.serde.json.KsqlJsonDeserializer.enforceFieldTypesForStruct(KsqlJsonDeserializer.java:293)
ksqldb-server | at io.confluent.ksql.serde.json.KsqlJsonDeserializer.enforceFieldType(KsqlJsonDeserializer.java:167)
ksqldb-server | ... 18 more
ksqldb-server | Caused by: java.lang.IllegalArgumentException: Can't convert type. sourceType: ArrayNode, requiredType: MAP<VARCHAR, MAP<VARCHAR, INT>>
ksqldb-server | at io.confluent.ksql.serde.json.JsonSerdeUtils.invalidConversionException(JsonSerdeUtils.java:185)
ksqldb-server | at io.confluent.ksql.serde.json.KsqlJsonDeserializer.invalidConversionException(KsqlJsonDeserializer.java:322)
ksqldb-server | at io.confluent.ksql.serde.json.KsqlJsonDeserializer.enforceKeyAndValueTypeForMap(KsqlJsonDeserializer.java:250)
ksqldb-server | at io.confluent.ksql.serde.json.KsqlJsonDeserializer.enforceFieldType(KsqlJsonDeserializer.java:167)
ksqldb-server | ... 20 more
Describe the bug For data in stream with nested map attributes (ie. map of maps) and its value format is JSON_SR, select query will fail with serialization exception. Note that inserts still works.
To Reproduce Steps to reproduce the behavior, include:
create the following stream with nested map (attribute m)
create stream if not exists nestedmap_json_sr ( id int, m Map<String, Map<String, int>> ) with ( kafka_topic = 'nestedmap_json_sr', partitions = 1, value_format = 'JSON_SR' );
insert some data
insert into nestedmap_json_sr (id, m) values (1, MAP('name' := MAP('nested1' := 1))); insert into nestedmap_json_sr (id, m) values (2, MAP('name' := MAP('nested1' := 2))); insert into nestedmap_json_sr (id, m) values (3, MAP('name' := MAP('nested1' := 3)));
now run the select query and no data would be emitted
select * from nestedmap_json_sr emit changes;
Expected behavior The expected behavior is data previously inserted will be emitted.
Actual behavior
No results is emitted from ksqldb CLI
Error messages from ksqldb server:
ksqldb-server | [2022-03-08 04:00:13,258] ERROR {"type":0,"deserializationError":{"target":"value","errorMessage":"Failed to deserialize value from topic: nestedmap_json_sr. Can't convert type. sourceType: ArrayNode, requiredType: MAP<VARCHAR, MAP<VARCHAR, INT>>, path: $.M","recordB64":null,"cause":["Can't convert type. sourceType: ArrayNode, requiredType: MAP<VARCHAR, MAP<VARCHAR, INT>>, path: $.M","Can't convert type. sourceType: ArrayNode, requiredType: MAP<VARCHAR, MAP<VARCHAR, INT>>, path: .M","Can't convert type. sourceType: ArrayNode, requiredType: MAP<VARCHAR, MAP<VARCHAR, INT>>"],"topic":"nestedmap_json_sr"},"recordProcessingError":null,"productionError":null,"serializationError":null,"kafkaStreamsThreadError":null} (processing.transient_NESTEDMAP_JSON_SR_8793810493621047377.KsqlTopic.Source.deserializer:44) ksqldb-server | [2022-03-08 04:00:13,259] WARN stream-thread [_confluent-ksql-default_transient_transient_NESTEDMAP_JSON_SR_8793810493621047377_1646712013000-709af2c2-c644-49e5-ac3b-802cff431063-StreamThread-1] task [0_0] Skipping record due to deserialization error. topic=[nestedmap_json_sr] partition=[0] offset=[2] (org.apache.kafka.streams.processor.internals.RecordDeserializer:89) ksqldb-server | org.apache.kafka.common.errors.SerializationException: Failed to deserialize value from topic: nestedmap_json_sr. Can't convert type. sourceType: ArrayNode, requiredType: MAP<VARCHAR, MAP<VARCHAR, INT>>, path: $.M ksqldb-server | at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:145) ksqldb-server | at io.confluent.ksql.serde.connect.ConnectFormat$StructToListDeserializer.deserialize(ConnectFormat.java:237) ksqldb-server | at io.confluent.ksql.serde.connect.ConnectFormat$StructToListDeserializer.deserialize(ConnectFormat.java:216) ksqldb-server | at io.confluent.ksql.serde.GenericDeserializer.deserialize(GenericDeserializer.java:59) ksqldb-server | at io.confluent.ksql.logging.processing.LoggingDeserializer.tryDeserialize(LoggingDeserializer.java:61) ksqldb-server | at io.confluent.ksql.logging.processing.LoggingDeserializer.deserialize(LoggingDeserializer.java:48) ksqldb-server | at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60) ksqldb-server | at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58) ksqldb-server | at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66) ksqldb-server | at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:178) ksqldb-server | at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112) ksqldb-server | at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:304) ksqldb-server | at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:966) ksqldb-server | at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1068) ksqldb-server | at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:979) ksqldb-server | at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:767) ksqldb-server | at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:620) ksqldb-server | at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:584) ksqldb-server | Caused by: io.confluent.ksql.serde.json.KsqlJsonDeserializer$CoercionException: Can't convert type. sourceType: ArrayNode, requiredType: MAP<VARCHAR, MAP<VARCHAR, INT>>, path: $.M ksqldb-server | at io.confluent.ksql.serde.json.KsqlJsonDeserializer.enforceFieldType(KsqlJsonDeserializer.java:169) ksqldb-server | at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:128) ksqldb-server | ... 17 more ksqldb-server | Caused by: io.confluent.ksql.serde.json.KsqlJsonDeserializer$CoercionException: Can't convert type. sourceType: ArrayNode, requiredType: MAP<VARCHAR, MAP<VARCHAR, INT>>, path: .M ksqldb-server | at io.confluent.ksql.serde.json.KsqlJsonDeserializer.enforceFieldType(KsqlJsonDeserializer.java:171) ksqldb-server | at io.confluent.ksql.serde.json.KsqlJsonDeserializer.enforceFieldTypesForStruct(KsqlJsonDeserializer.java:293) ksqldb-server | at io.confluent.ksql.serde.json.KsqlJsonDeserializer.enforceFieldType(KsqlJsonDeserializer.java:167) ksqldb-server | ... 18 more ksqldb-server | Caused by: java.lang.IllegalArgumentException: Can't convert type. sourceType: ArrayNode, requiredType: MAP<VARCHAR, MAP<VARCHAR, INT>> ksqldb-server | at io.confluent.ksql.serde.json.JsonSerdeUtils.invalidConversionException(JsonSerdeUtils.java:185) ksqldb-server | at io.confluent.ksql.serde.json.KsqlJsonDeserializer.invalidConversionException(KsqlJsonDeserializer.java:322) ksqldb-server | at io.confluent.ksql.serde.json.KsqlJsonDeserializer.enforceKeyAndValueTypeForMap(KsqlJsonDeserializer.java:250) ksqldb-server | at io.confluent.ksql.serde.json.KsqlJsonDeserializer.enforceFieldType(KsqlJsonDeserializer.java:167) ksqldb-server | ... 20 more
Additional context
Changing to avro value format works.