confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
78 stars 1.04k forks source link

org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "SAMPLETABLE.betIds", schema type: ARRAY #2673

Open ludekdolejsky opened 5 years ago

ludekdolejsky commented 5 years ago

Using KSQL 5.2.0-1, define a stream:

CREATE STREAM samplestream ( \
    betID VARCHAR,
    selection STRUCT<selectionID BIGINT>, \
    customer STRUCT<customerCountryCode VARCHAR> \
) \
WITH ( \
    KAFKA_TOPIC='sample-stream', \
    VALUE_FORMAT='JSON', \
    KEY='betID' \
);

Define a table with a simple condition and aggregation:

CREATE TABLE sampletable \
AS \
SELECT \
    WindowStart() as "windowStart", \
    WindowEnd() as "windowEnd", \
    selection->selectionID as "selectionId", \
    COLLECT_SET(betID) as "betIds" \
FROM samplestream \
WINDOW HOPPING (size 10 minutes, advance by 1 minute) \
WHERE \
    selection->selectionID IS NOT NULL \
    AND customer->customerCountryCode = 'INTERNATIONAL' \
GROUP BY \
    selection->selectionID, \
    customer->customerCountryCode \
HAVING COUNT(*) > 5;

Execute simple interactive query:

ksql> select * from sampletable;

Send some JSON data into the sample-stream Kafka topic

After a while, observe the following crash:

Exception caught in process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000, topic=SAMPLETABLE, partition=1, offset=800, stacktrace=org.apache.kafka.common.errors.SerializationException: Error serializing JSON message
Caused by: org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "SAMPLETABLE.betIds", schema type: ARRAY
    at org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:220)
    at org.apache.kafka.connect.data.Struct.put(Struct.java:216)
    at io.confluent.ksql.serde.json.KsqlJsonSerializer.serialize(KsqlJsonSerializer.java:62)
    at io.confluent.ksql.serde.json.KsqlJsonSerializer.serialize(KsqlJsonSerializer.java:29)
    at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:191)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:153)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:446)
    at org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:94)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
    at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:56)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:183)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:162)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:122)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:364)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:420)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:889)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:804)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:773)

Caused by: Error serializing JSON message
Caused by: Invalid value: null used for required field: "SAMPLETABLE.betIds",
    schema type: ARRAY
Query terminated

Inspecting the underlying Kafka topic on partition=1, offset=800 does not yield anything suspicious: key: 123456\|+\|INTERNATIONALj �V | message:{"windowStart":1554971580000,"windowEnd":1554972180000,"selectionId": 123456,"betIds":["298","309","312","397","347","424"]}`

big-andy-coates commented 4 years ago

Hi @ludekdolejsky

Can you add details of the minimal set of messages that can be pumped into the samplestream to recreate this please?

This will allow us to (hopefully) recreate this locally.