Axual / ksml

Kafka Streams for Low Code Environments
Apache License 2.0
25 stars 9 forks source link

KSML crashes when providing Null data #107

Closed richard-axual closed 5 months ago

richard-axual commented 6 months ago

When a Null key or value is used in a message which is read and processed then the PythonFunction fails with a wrong parameter message. This can be reproduced by running the Peek operation example and produce a record with a Null key or value. # The error was located at PythonFunction class, method call in the validation loop (line 88-92)

Update: When testing joins with Null data there are also exception, null handling needs to be checked in general. This join exception can be reproduced by running the join example without a data generator and provide a string/null value on the ksml_sensoralert_settings topic

Caused by: io.axual.ksml.data.exception.ExecutionException: KSML Execution error: Incorrect type passed in: expected=SensorAlertSettings, got Null
    at io.axual.ksml.data.serde.DataObjectSerializer.serialize(DataObjectSerializer.java:60)
    at io.axual.ksml.serde.StoreSerde.lambda$new$0(StoreSerde.java:57)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:83)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:74)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:31)
    at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:195)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$6(MeteredKeyValueStore.java:331)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:331)
    at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:131)
    at org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.put(KeyValueStoreWrapper.java:102)
    at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:151)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
    at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741)
    ... 6 common frames omitted
richard-axual commented 5 months ago

Fixed in release 0.2.4 and 0.9.0