Closed jzaralim closed 2 years ago
I saw this issue happening as well but with a slightly different error message on changelogs:
org.apache.kafka.common.errors.SerializationException: Error serializing message to topic: XYZ-changelog. Numeric field overflow: A field with precision 11 and scale 2 must round to an absolute value less than 10^9. Got 1000004556.0
@jnh5y , do you think this is a good fix-it-week item?
@suhas-satish yes, I think it'd be quick to do either as a first issue or a fix-it week item.
Given Zara was able to identify readily three options, I think we need to decide on what we want the behavior to be.
I'd propose another option: Summing over a DECIMAL column could return a Double instead. That's prevent the overflow. That would break compatibility, so I don't know if that option can be considered at this point.
cc: @colinhicks
As a separate note, for users, casting DECIMAL fields to Doubles ought to reduce the chance of overflows. (It'd be great if it were easy to hint / suggest changes like that as users are writing queries!)
I think throwing a KsqlFunctionException
is appropriate behavior. Users should be able to detect and handle the issue via the processing log. I don't think we want to automatically return a different datatype when decimals are summed.
@colinhicks that makes sense. Do we have examples of how to help users handle errors like that via the processing log? That seems pretty complicated.
In general, I think we may be in a situation where SUM for a Decimal column could be error-prone. (I'm not sure what's the best way to make it easier for folks.)
@colinhicks I changed the exception type from ArithmeticException to KSqlFunctionException and this is what I see in the CLI and server logs. Let me know your thoughts?
[2022-04-21 12:26:43,538] ERROR Uncaught exception in query org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000, topic=test_topic, partition=1, offset=7, stacktrace=org.apache.kafka.common.errors.SerializationException: Error serializing message to topic: _confluent-ksql-default_query_CTAS_S2_3-Aggregate-Aggregate-Materialize-changelog. Numeric field overflow: A field with precision 4 and scale 1 must round to an absolute value less than 10^3. Got 1302
at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:56)
at io.confluent.ksql.serde.tls.ThreadLocalSerializer.serialize(ThreadLocalSerializer.java:37)
at io.confluent.ksql.serde.connect.ConnectFormat$ListToStructSerializer.serialize(ConnectFormat.java:188)
at io.confluent.ksql.serde.connect.ConnectFormat$ListToStructSerializer.serialize(ConnectFormat.java:151)
at io.confluent.ksql.serde.GenericSerializer.serialize(GenericSerializer.java:62)
at io.confluent.ksql.logging.processing.LoggingSerializer.serialize(LoggingSerializer.java:47)
at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:83)
at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:74)
at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:31)
at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:195)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$6(MeteredKeyValueStore.java:350)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:350)
at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:127)
at org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:121)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:275)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:254)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:213)
at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:66)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:275)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:254)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:213)
at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:66)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:275)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:254)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:213)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:788)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809)
at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:788)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:718)
at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1196)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:783)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:594)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
Caused by: io.confluent.ksql.function.KsqlFunctionException: Numeric field overflow: A field with precision 4 and scale 1 must round to an absolute value less than 10^3. Got 1302
at io.confluent.ksql.util.DecimalUtil.ensureMax(DecimalUtil.java:263)
at io.confluent.ksql.util.DecimalUtil.ensureFit(DecimalUtil.java:143)
at io.confluent.ksql.util.DecimalUtil.ensureFit(DecimalUtil.java:122)
at io.confluent.ksql.serde.avro.AvroDataTranslator.replaceSchema(AvroDataTranslator.java:124)
at io.confluent.ksql.serde.avro.AvroDataTranslator.convertStruct(AvroDataTranslator.java:92)
at io.confluent.ksql.serde.avro.AvroDataTranslator.replaceSchema(AvroDataTranslator.java:121)
at io.confluent.ksql.serde.avro.AvroDataTranslator.toConnectRow(AvroDataTranslator.java:77)
at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:52)
... 39 more
(9f7ca995-bef1-3c1c-9368-83d2a28a0509): CREATE TABLE table1WITH (KAFKA_TOPIC='[string]', PARTITIONS='0', REPLICAS='0') AS SELECT column1, udf1 FROM source1 GROUP BY column1 EMIT CHANGES; (io.confluent.ksql.logging.query.QueryLogger:68)
[2022-04-21 12:26:43,538] ERROR Unhandled exception caught in streams thread _confluent-ksql-default_query_CTAS_S2_3-567b0c75-8a52-47bc-94a7-470c07b33f28-StreamThread-3. (UNKNOWN) (io.confluent.ksql.util.QueryMetadataImpl:200)
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_1, processor=KSTREAM-SOURCE-0000000000, topic=test_topic, partition=1, offset=7, stacktrace=org.apache.kafka.common.errors.SerializationException: Error serializing message to topic: _confluent-ksql-default_query_CTAS_S2_3-Aggregate-Aggregate-Materialize-changelog. Numeric field overflow: A field with precision 4 and scale 1 must round to an absolute value less than 10^3. Got 1302
at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:56)
at io.confluent.ksql.serde.tls.ThreadLocalSerializer.serialize(ThreadLocalSerializer.java:37)
at io.confluent.ksql.serde.connect.ConnectFormat$ListToStructSerializer.serialize(ConnectFormat.java:188)
at io.confluent.ksql.serde.connect.ConnectFormat$ListToStructSerializer.serialize(ConnectFormat.java:151)
at io.confluent.ksql.serde.GenericSerializer.serialize(GenericSerializer.java:62)
at io.confluent.ksql.logging.processing.LoggingSerializer.serialize(LoggingSerializer.java:47)
at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:83)
at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:74)
at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:31)
at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:195)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$6(MeteredKeyValueStore.java:350)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:350)
at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:127)
at org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:121)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:275)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:254)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:213)
at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:66)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:275)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:254)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:213)
at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:66)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:275)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:254)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:213)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:788)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809)
at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:788)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:718)
at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1196)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:783)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:594)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
Caused by: io.confluent.ksql.function.KsqlFunctionException: Numeric field overflow: A field with precision 4 and scale 1 must round to an absolute value less than 10^3. Got 1302
at io.confluent.ksql.util.DecimalUtil.ensureMax(DecimalUtil.java:263)
at io.confluent.ksql.util.DecimalUtil.ensureFit(DecimalUtil.java:143)
at io.confluent.ksql.util.DecimalUtil.ensureFit(DecimalUtil.java:122)
at io.confluent.ksql.serde.avro.AvroDataTranslator.replaceSchema(AvroDataTranslator.java:124)
at io.confluent.ksql.serde.avro.AvroDataTranslator.convertStruct(AvroDataTranslator.java:92)
at io.confluent.ksql.serde.avro.AvroDataTranslator.replaceSchema(AvroDataTranslator.java:121)
at io.confluent.ksql.serde.avro.AvroDataTranslator.toConnectRow(AvroDataTranslator.java:77)
at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:52)
... 39 more
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:745)
at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1196)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:783)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:594)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing message to topic: _confluent-ksql-default_query_CTAS_S2_3-Aggregate-Aggregate-Materialize-changelog. Numeric field overflow: A field with precision 4 and scale 1 must round to an absolute value less than 10^3. Got 1302
at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:56)
at io.confluent.ksql.serde.tls.ThreadLocalSerializer.serialize(ThreadLocalSerializer.java:37)
at io.confluent.ksql.serde.connect.ConnectFormat$ListToStructSerializer.serialize(ConnectFormat.java:188)
at io.confluent.ksql.serde.connect.ConnectFormat$ListToStructSerializer.serialize(ConnectFormat.java:151)
at io.confluent.ksql.serde.GenericSerializer.serialize(GenericSerializer.java:62)
at io.confluent.ksql.logging.processing.LoggingSerializer.serialize(LoggingSerializer.java:47)
at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:83)
at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:74)
at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:31)
at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:195)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$6(MeteredKeyValueStore.java:350)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:350)
at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:127)
at org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:121)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:275)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:254)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:213)
at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:66)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:275)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:254)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:213)
at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:66)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:146)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:275)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:254)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:213)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:788)
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:809)
at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:788)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:718)
... 6 more
Caused by: io.confluent.ksql.function.KsqlFunctionException: Numeric field overflow: A field with precision 4 and scale 1 must round to an absolute value less than 10^3. Got 1302
at io.confluent.ksql.util.DecimalUtil.ensureMax(DecimalUtil.java:263)
at io.confluent.ksql.util.DecimalUtil.ensureFit(DecimalUtil.java:143)
at io.confluent.ksql.util.DecimalUtil.ensureFit(DecimalUtil.java:122)
at io.confluent.ksql.serde.avro.AvroDataTranslator.replaceSchema(AvroDataTranslator.java:124)
at io.confluent.ksql.serde.avro.AvroDataTranslator.convertStruct(AvroDataTranslator.java:92)
at io.confluent.ksql.serde.avro.AvroDataTranslator.replaceSchema(AvroDataTranslator.java:121)
at io.confluent.ksql.serde.avro.AvroDataTranslator.toConnectRow(AvroDataTranslator.java:77)
at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:52)
... 39 more
Hm, it's still being classified as an UNKNOWN error, so we would still get paged for them
[2022-04-21 12:26:43,538] ERROR Unhandled exception caught in streams thread _confluent-ksql-default_query_CTAS_S2_3-567b0c75-8a52-47bc-94a7-470c07b33f28-StreamThread-3. (UNKNOWN) (io.confluent.ksql.util.QueryMetadataImpl:200)
@jzaralim Do you know how we can classify the error? Bonnie and I were chatting about this earlier, and I haven't learned how we can achieve that in ksql.
Not an authoritative answer, but check out the implementations of io.confluent.ksql.query.QueryErrorClassifier
, which are applied here: https://github.com/confluentinc/ksql/blob/423bb3d218dd3cf646d76f66a55ca55201758957/ksqldb-engine/src/main/java/io/confluent/ksql/util/SharedKafkaStreamsRuntimeImpl.java#L99-L147
@colinhicks yes, I have added a new classifier to classify KsqlFunctionExceptions as USER errors. Will create a PR shortly for review.
After classifying it as USER error the ERROR log line looks as follows:
[2022-04-22 00:46:54,564] ERROR Unhandled exception caught in streams thread _confluent-ksql-default_query_CTAS_S2_11-f67c0476-ee5e-417e-bc21-9e363840eae7-StreamThread-5. (USER) (io.confluent.ksql.util.QueryMetadataImpl:200)
Describe the bug The
SUM
function does not check that the result of the aggregation will fit into the data type. For example, if you callSUM(col)
on aDECIMAL(4,2)
column, then you eventually run into this if you're adding30.33 + 22.34 + 40.99 + 20
This applies for all the data types.
To Reproduce
SUM
on the decimal columnSUM
column in the table is too big to fit into the decimalExpected behavior Any of the following:
KsqlFunctionException
.Actual behaviour We get a serialization error with
Additional context Add any other context about the problem here.