Suppose I create a stream with KEY_FORMAT='JSON' but no fields declared as KEY. When data is read, if the data is not a JSON object, ksqlDB throws deserialization errors. We should either reject the KEY_FORMAT or just be smarter about handling json primitives when deserializing.
$ cat statements.sql
CREATE STREAM CLICKS (IP_ADDRESS STRING, URL STRING, TIMESTAMP STRING)
WITH (KAFKA_TOPIC = 'CLICKS',
KEY_FORMAT = 'JSON',
VALUE_FORMAT = 'JSON',
TIMESTAMP = 'TIMESTAMP',
TIMESTAMP_FORMAT = 'yyyy-MM-dd''T''HH:mm:ssXXX',
PARTITIONS = 1);
CREATE TABLE DETECTED_CLICKS AS
SELECT
IP_ADDRESS AS KEY1,
URL AS KEY2,
TIMESTAMP AS KEY3,
AS_VALUE(IP_ADDRESS) AS IP_ADDRESS,
AS_VALUE(URL) AS URL,
AS_VALUE(TIMESTAMP) AS TIMESTAMP
FROM CLICKS WINDOW TUMBLING (SIZE 2 MINUTES, RETENTION 3650 DAYS)
GROUP BY IP_ADDRESS, URL, TIMESTAMP
HAVING COUNT(IP_ADDRESS) = 1;
CREATE STREAM RAW_DISTINCT_CLICKS (IP_ADDRESS STRING, URL STRING, TIMESTAMP STRING)
WITH (KAFKA_TOPIC = 'DETECTED_CLICKS',
KEY_FORMAT='JSON',
VALUE_FORMAT = 'JSON');
CREATE STREAM DISTINCT_CLICKS AS
SELECT
IP_ADDRESS,
URL,
TIMESTAMP
FROM RAW_DISTINCT_CLICKS
WHERE IP_ADDRESS IS NOT NULL
PARTITION BY IP_ADDRESS;
$ cat inputs.json
{
"inputs": [
{
"topic": "CLICKS",
"key": "\"10.0.0.1\"",
"value": {
"IP_ADDRESS": "10.0.0.1",
"URL": "https://docs.confluent.io/current/tutorials/examples/kubernetes/gke-base/docs/index.html",
"TIMESTAMP": "2021-01-17T14:50:43+00:00"
}
},
{
"topic": "CLICKS",
"key": "\"10.0.0.2\"",
"value": {
"IP_ADDRESS": "10.0.0.12",
"URL": "https://www.confluent.io/hub/confluentinc/kafka-connect-datagen",
"TIMESTAMP": "2021-01-17T14:53:44+00:01"
}
},
{
"topic": "CLICKS",
"key": "\"10.0.0.3\"",
"value": {
"IP_ADDRESS": "10.0.0.13",
"URL": "https://www.confluent.io/hub/confluentinc/kafka-connect-datagen",
"TIMESTAMP": "2021-01-17T14:56:45+00:03"
}
},
{
"topic": "CLICKS",
"key": "\"10.0.0.1\"",
"value": {
"IP_ADDRESS": "10.0.0.1",
"URL": "https://docs.confluent.io/current/tutorials/examples/kubernetes/gke-base/docs/index.html",
"TIMESTAMP": "2021-01-17T14:50:43+00:00"
}
},
{
"topic": "CLICKS",
"key": "\"10.0.0.2\"",
"value": {
"IP_ADDRESS": "10.0.0.12",
"URL": "https://www.confluent.io/hub/confluentinc/kafka-connect-datagen",
"TIMESTAMP": "2021-01-17T14:53:44+00:01"
}
},
{
"topic": "CLICKS",
"key": "\"10.0.0.3\"",
"value": {
"IP_ADDRESS": "10.0.0.13",
"URL": "https://www.confluent.io/hub/confluentinc/kafka-connect-datagen",
"TIMESTAMP": "2021-01-17T14:56:45+00:03"
}
}
]
}
$ cat outputs.json
{
"outputs": [
{
"topic": "DISTINCT_CLICKS",
"key": "10.0.0.1",
"value": {
"URL": "https://docs.confluent.io/current/tutorials/examples/kubernetes/gke-base/docs/index.html",
"TIMESTAMP": "2021-01-17T14:50:43+00:00"
},
"timestamp": 1610895043000
},
{
"topic": "DISTINCT_CLICKS",
"key": "10.0.0.12",
"value": {
"URL": "https://www.confluent.io/hub/confluentinc/kafka-connect-datagen",
"TIMESTAMP": "2021-01-17T14:53:44+00:01"
},
"timestamp": 1610895164000
},
{
"topic": "DISTINCT_CLICKS",
"key": "10.0.0.13",
"value": {
"URL": "https://www.confluent.io/hub/confluentinc/kafka-connect-datagen",
"TIMESTAMP": "2021-01-17T14:56:45+00:03"
},
"timestamp": 1610895225000
}
]
}
$ ksql-testing-tool -i input.json -s statements.sql -o output.json
...
[2021-02-02 00:00:07,546] ERROR {"type":0,"deserializationError":{"target":"key","errorMessage":"Failed to deserialize key from topic: CLICKS. Can't convert type. sourceType: TextNode, requiredType: STRUCT< >, path: $","recordB64":null,"cause":["Can't convert type. sourceType: TextNode, requiredType: STRUCT< >, path: $","Can't convert type. sourceType: TextNode, requiredType: STRUCT< >"],"topic":"CLICKS"},"recordProcessingError":null,"productionError":null,"serializationError":null,"kafkaStreamsThreadError":null} (processing.CTAS_DETECTED_CLICKS_0.KsqlTopic.Source.deserializer:44)
[2021-02-02 00:00:07,546] WARN stream-thread [main] task [0_0] Skipping record due to deserialization error. topic=[CLICKS] partition=[0] offset=[5] (org.apache.kafka.streams.processor.internals.RecordDeserializer:88)
org.apache.kafka.common.errors.SerializationException: Failed to deserialize key from topic: CLICKS. Can't convert type. sourceType: TextNode, requiredType: STRUCT< >, path: $
Caused by: io.confluent.ksql.serde.json.KsqlJsonDeserializer$CoercionException: Can't convert type. sourceType: TextNode, requiredType: STRUCT< >, path: $
at io.confluent.ksql.serde.json.KsqlJsonDeserializer.enforceFieldType(KsqlJsonDeserializer.java:155)
at io.confluent.ksql.serde.json.KsqlJsonDeserializer.deserialize(KsqlJsonDeserializer.java:117)
at io.confluent.ksql.serde.connect.ConnectFormat$StructToListDeserializer.deserialize(ConnectFormat.java:224)
at io.confluent.ksql.serde.connect.ConnectFormat$StructToListDeserializer.deserialize(ConnectFormat.java:203)
at io.confluent.ksql.serde.GenericDeserializer.deserialize(GenericDeserializer.java:59)
at io.confluent.ksql.logging.processing.LoggingDeserializer.tryDeserialize(LoggingDeserializer.java:60)
at io.confluent.ksql.logging.processing.LoggingDeserializer.deserialize(LoggingDeserializer.java:47)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at io.confluent.ksql.serde.tracked.TrackedDeserializer.deserialize(TrackedDeserializer.java:53)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeKey(SourceNode.java:54)
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:65)
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:176)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:112)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:185)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:891)
at org.apache.kafka.streams.TopologyTestDriver.enqueueTaskRecord(TopologyTestDriver.java:570)
at org.apache.kafka.streams.TopologyTestDriver.pipeRecord(TopologyTestDriver.java:555)
at org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:533)
at io.confluent.ksql.test.tools.TestExecutor.processSingleRecord(TestExecutor.java:514)
at io.confluent.ksql.test.tools.TestExecutor.pipeRecordsFromProvidedInput(TestExecutor.java:475)
at io.confluent.ksql.test.tools.TestExecutor.buildAndExecuteQuery(TestExecutor.java:196)
at io.confluent.ksql.test.tools.KsqlTestingTool.executeTestCase(KsqlTestingTool.java:141)
at io.confluent.ksql.test.tools.KsqlTestingTool.runWithTripleFiles(KsqlTestingTool.java:131)
at io.confluent.ksql.test.tools.KsqlTestingTool.main(KsqlTestingTool.java:56)
Caused by: java.lang.IllegalArgumentException: Can't convert type. sourceType: TextNode, requiredType: STRUCT< >
at io.confluent.ksql.serde.json.JsonSerdeUtils.invalidConversionException(JsonSerdeUtils.java:137)
at io.confluent.ksql.serde.json.KsqlJsonDeserializer.invalidConversionException(KsqlJsonDeserializer.java:283)
at io.confluent.ksql.serde.json.KsqlJsonDeserializer.enforceFieldTypesForStruct(KsqlJsonDeserializer.java:236)
at io.confluent.ksql.serde.json.KsqlJsonDeserializer.enforceFieldType(KsqlJsonDeserializer.java:151)
... 23 more
>>>>> Test failed: Topic DISTINCT_CLICKS. Expected <3> records but it was <0>
Actual records:
Part of this is prioritizing support for JSON formats without keys. The other part is improving the error message to explain this lack of support for the time being.
Suppose I create a stream with
KEY_FORMAT='JSON'
but no fields declared as KEY. When data is read, if the data is not a JSON object, ksqlDB throws deserialization errors. We should either reject the KEY_FORMAT or just be smarter about handling json primitives when deserializing.