confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
77 stars 1.04k forks source link

The AVG function throws NPE if the value is null #8363

Closed spena closed 2 years ago

spena commented 2 years ago

Describe the bug When a null value is passed to the AVG function, the following exception is logged in the KSQL log:

Uncaught exception in query org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=1_4, processor=KSTREAM-SOURCE-0000000009, topic=_confluent-ksql-pksqlc-zg3m0query_CTAS_OBEM_DEV_COV_FLOOR_OCCUPANCY_FINAL_2193-Aggregate-GroupBy-repartition, partition=4, offset=5, stacktrace=java.lang.NullPointerException
    at io.confluent.ksql.function.udaf.average.AverageUdaf.lambda$averageDouble$11(AverageUdaf.java:89)
    at io.confluent.ksql.function.udaf.average.AverageUdaf$1.undo(AverageUdaf.java:144)
    at io.confluent.ksql.function.udaf.average.AverageUdaf$1.undo(AverageUdaf.java:101)
    at io.confluent.ksql.function.UdafTableAggregateFunction.undo(UdafTableAggregateFunction.java:45)
    at io.confluent.ksql.execution.function.udaf.KudafUndoAggregator.apply(KudafUndoAggregator.java:68)
    at io.confluent.ksql.execution.function.udaf.KudafUndoAggregator.apply(KudafUndoAggregator.java:27)
    at org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process(KTableAggregate.java:92)
    at org.apache.kafka.streams.kstream.internals.KTableAggregate$KTableAggregateProcessor.process(KTableAggregate.java:59)
    at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:253)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:232)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:191)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1282)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:772)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:602)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:574)

Expected behavior The AVG should just return null and not thrown an error in the logs. A null value cannot be aggregated, so there is no reason to attempt to process it. This error causes alerts in Cloud logs.

An alternative is to log the null value in the processing log.

jnh5y commented 2 years ago

This issue has been addressed by https://github.com/confluentinc/ksql/pull/8400.