confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
116 stars 1.04k forks source link

KSQL 5.4.0-beta1 - SUM - BigDecimal has mismatching scale value for given Decimal schema #3875

Closed robinroos closed 4 years ago

robinroos commented 4 years ago

Using 5.4.0-beta1, KSQL fails when SUM is run over a DECIMAL type.

I happen to know that the Avro record which I am publishing to Kafka is defined, in Avro IDL, as decimal(9,4), so it has Scale=4.

1 COUNT works - the row appears when I stream 1 record

ksql> select book, pair, settledate, COUNT(baseamount), COUNT(quotedamount) from pair_position_change group by book, pair, settledate;
+------------+------------+------------+------------+------------+
|BOOK        |PAIR        |SETTLEDATE  |KSQL_COL_3  |KSQL_COL_4  |
+------------+------------+------------+------------+------------+
|UKFXSALES   |EURUSD      |18213       |1           |1           |
^CQuery terminated

2 SUM fails - the exception appears when I stream 1 record

ksql> select book, pair, settledate, SUM(baseamount), SUM(quotedamount) from pair_position_change group by book, pair, settledate;
+------------+------------+------------+------------+------------+
|BOOK        |PAIR        |SETTLEDATE  |KSQL_COL_3  |KSQL_COL_4  |
+------------+------------+------------+------------+------------+
Exception caught in process. taskId=1_0, processor=KSTREAM-SOURCE-0000000009, topic=_confluent-ksql-default_transient_2784932728943465989_1573916925762-Aggregate-groupby-repartition, partition=0, offset=0, stacktrace=org.apache.kafka.common.errors.SerializationException: Error serializing message to topic: _confluent-ksql-default_transient_2784932728943465989_1573916925762-Aggregate-aggregate-changelog
Caused by: org.apache.kafka.connect.errors.DataException: BigDecimal has mismatching scale value for given Decimal schema
    at org.apache.kafka.connect.data.Decimal.fromLogical(Decimal.java:68)
    at io.confluent.connect.avro.AvroData$5.convert(AvroData.java:265)
    at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:420)
    at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:607)
    at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:366)
    at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:80)
    at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:49)
    at io.confluent.ksql.serde.tls.ThreadLocalSerializer.serialize(ThreadLocalSerializer.java:37)
    at io.confluent.ksql.serde.GenericRowSerDe$GenericRowSerializer.serialize(GenericRowSerDe.java:248)
    at io.confluent.ksql.serde.GenericRowSerDe$GenericRowSerializer.serialize(GenericRowSerDe.java:215)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:59)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:50)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:27)
    at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:166)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:486)
    at org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:107)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:116)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:345)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:199)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:420)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:886)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:792)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:761)

Caused by: Error serializing message to topic:
    _confluent-ksql-default_transient_2784932728943465989_1573916925762-Aggregate-ag
    gregate-changelog
Caused by: BigDecimal has mismatching scale value for given Decimal schema
^CQuery terminated

3 Describe the underlying Stream

ksql> describe pair_position_change;

Name                 : PAIR_POSITION_CHANGE
 Field        | Type                                                                   
---------------------------------------------------------------------------------------
 ROWTIME      | BIGINT           (system)                                              
 ROWKEY       | VARCHAR(STRING)  (system)                                              
 LEG          | VARCHAR(STRING)                                                        
 TRADETYPE    | VARCHAR(STRING)                                                        
 TRADEREF     | VARCHAR(STRING)                                                        
 BOOK         | VARCHAR(STRING)                                                        
 PAIR         | VARCHAR(STRING)                                                        
 SETTLEDATE   | INTEGER                                                                
 BASEAMOUNT   | DECIMAL                                                                
 QUOTEDAMOUNT | DECIMAL                                                                
 POSITIONKEY  | STRUCT<BOOK VARCHAR(STRING), PAIR VARCHAR(STRING), SETTLEDATE INTEGER> 
---------------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql> 

4 This query is actually quite far removed from the original Topic. Here is the full KSQL "topology". My final statement, CREATE TABLE, was not working, so I tested it with SELECT and saw the error reported here.

create stream trades with (kafka_topic='trade', value_format = 'AVRO');

create stream near_pair_position_change as select tradetype, traderef, book, pair, settledate, baseamount, quotedamount, positionkey from trades where TradeType = 'FXSPOT' or TradeType = 'FXFWD' or TradeType='FXSWAP' partition by PositionKey;

create stream far_pair_position_change as select tradetype, traderef, book, pair, farsettledate as settledate, farbaseamount as baseamount, farquotedamount as quotedamount, farpositionkey as positionkey from trades where TradeType='FXSWAP' partition by PositionKey;

create stream pair_position_change as select 'NEAR' as Leg, * from near_pair_position_change;

insert into pair_position_change select 'FAR' as Leg, * from far_pair_position_change;

create table fx_position_by_book_pair_settledate as select book, pair, settledate, SUM(baseamount) as baseamount, SUM(quotedamount) as quotedamount from pair_position_change group by book, pair, settledate;

5 Describe the top-level stream trades, which streams the topic trade

ksql> describe trades;

Name                 : TRADES
 Field            | Type                                                                   
-------------------------------------------------------------------------------------------
 ROWTIME          | BIGINT           (system)                                              
 ROWKEY           | VARCHAR(STRING)  (system)                                              
 TRADEREF         | VARCHAR(STRING)                                                        
 BOOK             | VARCHAR(STRING)                                                        
 TRADETYPE        | VARCHAR(STRING)                                                        
 BUYSELL          | VARCHAR(STRING)                                                        
 COUNTERPARTY     | VARCHAR(STRING)                                                        
 PAIR             | VARCHAR(STRING)                                                        
 TRADEDATETIME    | BIGINT                                                                 
 TRADEDATE        | INTEGER                                                                
 SETTLEDATE       | INTEGER                                                                
 BASEAMOUNT       | DECIMAL                                                                
 QUOTEDAMOUNT     | DECIMAL                                                                
 SPOTPRICE        | DECIMAL                                                                
 TRADERPOINTS     | DECIMAL                                                                
 SALESPOINTS      | DECIMAL                                                                
 CUSTOMERPRICE    | DECIMAL                                                                
 POSITIONKEY      | STRUCT<BOOK VARCHAR(STRING), PAIR VARCHAR(STRING), SETTLEDATE INTEGER> 
 FARSETTLEDATE    | INTEGER                                                                
 FARBASEAMOUNT    | DECIMAL                                                                
 FARQUOTEDAMOUNT  | DECIMAL                                                                
 FARSPOTPRICE     | DECIMAL                                                                
 FARTRADERPOINTS  | DECIMAL                                                                
 FARSALESPOINTS   | DECIMAL                                                                
 FARCUSTOMERPRICE | DECIMAL                                                                
 FARPOSITIONKEY   | STRUCT<BOOK VARCHAR(STRING), PAIR VARCHAR(STRING), SETTLEDATE INTEGER> 
-------------------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql> 

6 Version info, to confirm I am running 5.4.0-beta1 CLI and Server

CLI v5.4.0-beta1, Server v5.4.0-beta1 located at http://0.0.0.0:8088

robinroos commented 4 years ago

Attaching Key and Value Schemas for Topic trade from Control Center.

schema-trade-key-v1-_T5E1pJ0TsKYKxknN1yuFA_json.txt schema-trade-value-v1-_T5E1pJ0TsKYKxknN1yuFA_json.txt

robinroos commented 4 years ago

7 This works if I CAST AS INTEGER within SUM. Of course the logic of the application is broken by that approach....

ksql> select book, pair, settledate, SUM(CAST(baseamount as integer)), SUM(CAST(quotedamount as integer)) from pair_position_change group by book, pair, settledate;
+------------+------------+------------+------------+------------+
|BOOK        |PAIR        |SETTLEDATE  |KSQL_COL_3  |KSQL_COL_4  |
+------------+------------+------------+------------+------------+
|UKFXSALES   |EURUSD      |18213       |1000000     |-1234500    |
|UKFXSALES   |EURUSD      |18213       |2000000     |-2469000    |
|UKFXSALES   |EURUSD      |18213       |3000000     |-3703500    |
robinroos commented 4 years ago

8 Explicit CAST AS DOUBLE also works, so I guess I have my work-around:

ksql> select book, pair, settledate, SUM(CAST(baseamount as double)), SUM(CAST(quotedamount as double)) from pair_position_change group by book, pair, settledate;
+------------+------------+------------+------------+------------+
|BOOK        |PAIR        |SETTLEDATE  |KSQL_COL_3  |KSQL_COL_4  |
+------------+------------+------------+------------+------------+
|UKFXSALES   |EURUSD      |18213       |1000000.0   |-1234500.0  |
|UKFXSALES   |EURUSD      |18213       |2000000.0   |-2469000.0  |
|UKFXSALES   |EURUSD      |18213       |3000000.0   |-3703500.0  |

The inability to aggregate decimal types without an explicit cast is likely to catch-out other users, specifically those new to KSQL.

agavra commented 4 years ago

Thanks for reporting this @robinroos! This is definitely a problem that we need to address, but I'm still trying to figure out what the "right" behavior for such a sum mechanism would be. As it stands, it returns the same type as the input (e.g. if you sum DECIMAL(2,1) your return type is DECIMAL(2,1)). This is obviously pretty limited because, for example, 9.9+1.0 cannot fit in DECIMAL(2,1) and you'd get the serialization error that you saw. We could automatically cast to double, but that would risk loosing precision (something that is likely unacceptable). We could choose some really large scale/precision and just hope that it fits. None of these are really satisfying solutions, but I'm open to ideas!

robinroos commented 4 years ago

The work-around, CAST (... AS DOUBLE), has been shown to fail downstream when attempting to use Avro IDL-generated message classes to interact with topics.

See: #3999 .

robinroos commented 4 years ago

@agavra, your 16-Nov comment implied the issue was numeric overflow-related.

I do not believe this is the case. Rather, I suspect that the scale of the input is not being maintained through both the value of the aggregate and the defined schema of the resulting Topic. I notice that there is a Narrowing of Decimal to (2,1) within io.confluent.ksql.function.AggregateFunctionFactory. Could that be contributing to the problem described here?

agavra commented 4 years ago

@robinroos - the narrowing in AggregateFunctionFactory is only used to describe functions, it is never used in the path of function application so that should not be the cause. I'll take a look at the scale mismatch when I have some time

robinroos commented 4 years ago

As input to the triage of this issue:

A stream of Avro messages, containing a Decimal(9.4) field, aggregated through SELECT SUM(...), results in a message format which cannot be represented by any Avro schema and which cannot be decoded in Java.

agavra commented 4 years ago

I was able to reproduce this locally - I am pretty confident in my original assessment that it has to do with overflow, as I mentioned before. In the example below, the SUM behaves fine until I add a value that causes it to no longer fit within DECIMAL(9,4).

ksql> CREATE STREAM books (name VARCHAR, cost DECIMAL(9, 4)) WITH (kafka_topic='books', value_format='AVRO', partitions=1);

 Message
----------------
 Stream created
----------------

ksql> INSERT INTO books (name, cost) VALUES ('lotr', '10000.5555');
ksql> SELECT * FROM books EMIT CHANGES LIMIT 1;
+----------------------+----------------------+----------------------+----------------------+
|ROWTIME               |ROWKEY                |NAME                  |COST                  |
+----------------------+----------------------+----------------------+----------------------+
|1576601141304         |null                  |lotr                  |10000.5555            |

ksql> INSERT INTO books (name, cost) VALUES ('lotr', '20000.5555');
ksql> SELECT SUM(cost) FROM books GROUP BY name EMIT CHANGES LIMIT 1;
+----------------------------------------------------------------------------------------------+
|KSQL_COL_0                                                                                    |
+----------------------------------------------------------------------------------------------+
|30001.1110                                                                                    |
Limit Reached
Query terminated

ksql> INSERT INTO books (name, cost) VALUES ('lotr', '99999.5555');
ksql> SELECT SUM(cost) FROM books GROUP BY name EMIT CHANGES;
+----------------------------------------------------------------------------------------------+
|KSQL_COL_0                                                                                    |
+----------------------------------------------------------------------------------------------+
|30001.1110                                                                                    |
Exception caught in process. taskId=1_0, processor=Aggregate-GroupBy-repartition-source, topic=_confluent-ksql-default_transient_4060992337144310523_1576601273150-Aggregate-GroupBy-repartition, partition=0, offset=2, stacktrace=org.apache.kafka.common.errors.SerializationException: Error serializing message to topic: _confluent-ksql-default_transient_4060992337144310523_1576601273150-Aggregate-Aggregate-Materialize-changelog
Caused by: org.apache.kafka.connect.errors.DataException: BigDecimal has mismatching scale value for given Decimal schema
    at org.apache.kafka.connect.data.Decimal.fromLogical(Decimal.java:68)
    at io.confluent.connect.avro.AvroData$5.convert(AvroData.java:264)
    at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:419)
    at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:606)
    at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:365)
    at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:80)
    at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:49)
    at io.confluent.ksql.serde.tls.ThreadLocalSerializer.serialize(ThreadLocalSerializer.java:37)
    at io.confluent.ksql.serde.GenericRowSerDe$GenericRowSerializer.serialize(GenericRowSerDe.java:248)
    at io.confluent.ksql.serde.GenericRowSerDe$GenericRowSerializer.serialize(GenericRowSerDe.java:215)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:59)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:50)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:27)
    at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:782)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)
    at org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:107)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:782)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:385)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:782)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:385)
    at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:537)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:795)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:701)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:674)

Caused by: Error serializing message to topic:
    _confluent-ksql-default_transient_4060992337144310523_1576601273150-Aggregate-Ag
    gregate-Materialize-changelog
Caused by: BigDecimal has mismatching scale value for given Decimal schema
Query terminated
robinroos commented 4 years ago

Then perhaps I have misunderstood Decimal(9,4). I presumed that was 9 places before the decimal place, and 4 after. But your test case implies a semantic of 9 places, of which 4 come after the decimal place.

Given that semantic I could easily have been overflowing the maximum bound for the type....

agavra commented 4 years ago

Ah yes! That's the meaning of the precision and scale - I found that pretty confusing as well. The best explanation I found is on MSFT SqlServer docs: https://docs.microsoft.com/en-us/sql/t-sql/data-types/decimal-and-numeric-transact-sql?view=sql-server-ver15

[scale] The number of decimal digits that are stored to the right of the decimal point. This number is subtracted from p[recision] to determine the maximum number of digits to the left of the decimal point. Scale must be a value from 0 through p[recision], and can only be specified if precision is specified. The default scale is 0 and so 0 <= s <= p. Maximum storage sizes vary, based on the precision.

robinroos commented 4 years ago

Ok, thanks for bearing with me here. I will retest with Decimal(14,4).

robinroos commented 4 years ago

This problem arose due to a misunderstanding, on my part, of the relationship between precision and scale, which resulted in numeric overflow.

Furthermore, the issue of maintaining decimal scale and precision through arithmetic can be solved by CAST(... AS DECIMAL(precision, scale)).