confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
122 stars 1.04k forks source link

KSQL 5.4.0-beta1 - select of date type, result is missing logical type definition #4163

Open robinroos opened 4 years ago

robinroos commented 4 years ago

Topic TRADE is created with an explicit Avro IDL-generated schema, by publishing a message to the topic.

Stream TRADES is created from TRADE.

Topic NEAR_PAIR_POSITION_CHANGE is created via CREATE STREAM AS SELECT FROM TRADES.

The Field SettleDate in topic TRADE is an "int" with logical type "date".

The Field SettleDate, resulting from the SELECT statement, is an "int" without the logical type definition. As a result it is not possible to decode the messages from topic NEAR_PAIR_POSITION_CHANGE into an Avro IDL-generated message which defines SettleDate as date type.

1 - Create the stream TRADES from topic TRADE

CREATE STREAM trades WITH (kafka_topic='trade', value_format = 'AVRO');

2 - Create stream NEAR_PAIR_POSITION_CHANGE from stream TRADES

CREATE STREAM near_pair_position_change AS SELECT TradeSource, TradeType, TradeRef, Book, Pair, SettleDate, BaseAmount, QuotedAmount, (Book + ':' + Pair + ':' + CAST(SettleDate AS STRING)) AS PositionKey FROM trades WHERE TradeType = 'FXSPOT' or TradeType = 'FXFWD' or TradeType='FXSWAP' PARTITION BY PositionKey;

3 - Java exception when deserializing

2019-12-17 22:34:58.989 ERROR 78958 --- [ntainer#3-1-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition NEAR_PAIR_POSITION_CHANGE-2 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
robinroos commented 4 years ago

Attaching schemas downloaded from Control Centre, for the source topic TRADE and the select result NEAR_PAIR_POSITION_CHANGE.

schema-trade-value-v1-json.txt schema-NEAR_PAIR_POSITION_CHANGE-value-v1-json.txt

robinroos commented 4 years ago

Here is the Avro IDL definition of the message which I am attempting to deserialise:

    enum TradeType { FXSPOT, FXFWD, FXSWAP, FXNDF, FXPLSWEEP }

    record NearPairPositionChange {
        string TradeSource;
        TradeType TradeType;
        string TradeRef;
        string Book;
        string Pair;
        date SettleDate;
        decimal(14,4) BaseAmount;
        decimal(14,4) QuotedAmount;
        string PositionKey;
    }
robinroos commented 4 years ago

In the TRADE schema, SettleDate appears as:

    {
      "name": "SettleDate",
      "type": {
        "type": "int",
        "logicalType": "date"
      }
    },

In the NEAR_PAIR_POSITION_CHANGE schema, SettleDate appears as follows (note the logical type definition is missing):

    {
      "name": "SETTLEDATE",
      "type": [
        "null",
        "int"
      ],
      "default": null
    },
robinroos commented 4 years ago

Ok, I now know what is happening, and it is not pretty.

1 - Avro IDL by which messages are published to topic TRADE

Note that only a handful of the fields defined below are actually SELECTED.

enum BuySell { BUY, SELL }

enum TradeType { FXSPOT, FXFWD, FXSWAP, FXNDF, FXPLSWEEP }

record Trade {
        string TradeRef;
        string TradeSource;
        string Book;
        TradeType TradeType;
        BuySell BuySell;
        string Counterparty;
        string Pair;
        timestamp_ms TradeDateTime;
        date TradeDate;

        date SettleDate;
        decimal(14,4) BaseAmount;
        decimal(14,4) QuotedAmount;
        decimal(14,9) SpotPrice;
        decimal(14,9) Points;
        decimal(14,9) TraderPoints;
        decimal(14,9) SalesPoints;
        decimal(14,9) CustomerPrice;

        union{ null , date } FarSettleDate;
        decimal(14,4) FarBaseAmount;
        decimal(14,4) FarQuotedAmount;
        decimal(14,9) FarSpotPrice;
        decimal(14,9) FarPoints;
        decimal(14,9) FarTraderPoints;
        decimal(14,9) FarSalesPoints;
        decimal(14,9) FarCustomerPrice;
    }

2 - KSQL to SELECT from TRADE, writing to NEAR_PAIR_POSITION_CHANGE

CREATE STREAM trades WITH (kafka_topic='trade', value_format = 'AVRO');

CREATE STREAM near_pair_position_change 
WITH (VALUE_AVRO_SCHEMA_FULL_NAME = 'uk.co.suboctave.fxriskrouting.message.NearPairPositionChange') 
AS SELECT TradeSource, TradeType, TradeRef, Book, Pair, SettleDate, BaseAmount, QuotedAmount, 
    (Book + ':' + Pair + ':' + CAST(SettleDate AS STRING)) AS PositionKey 
FROM trades 
WHERE TradeType = 'FXSPOT' or TradeType = 'FXFWD' or TradeType='FXSWAP' 
PARTITION BY PositionKey;

3 - Expected Avro IDL, with which I had hoped to deserialize

record NearPairPositionChange {
    string TradeSource;
    TradeType TradeType;
    string TradeRef;
    string Book;
    string Pair;
    date SettleDate;
    decimal(14,4) BaseAmount;
    decimal(14,4) QuotedAmount;
    string PositionKey;
}

4 - Actual Avro IDL with which I am able to deserialize

Note that:

  1. All field names must be capitalised
  2. Field TRADETYPE has to change type from TradeType to string
record NearPairPositionChange {
    string TRADESOURCE;
    string TRADETYPE;
    string TRADEREF;
    string BOOK;
    string PAIR;
    date SETTLEDATE;
    decimal(14,4) BASEAMOUNT;
    decimal(14,4) QUOTEDAMOUNT;
    string POSITIONKEY;
 }

5 - Additional Notes

I finally had to give up on using Avro for any keys. Working with Spring Boot it is easier manage a singleton instance of KafkaStreamsConfiguration and inject that wherever required. Since KSQL does not support Avro keys as first-class citizens, I chose instead to configure:

default.key.serde = Serdes.StringSerde.class

and leave it at that, without trying to maintain a separate KafkaStreamsConfiguration dedicated to Avro keys.

The VALUE_AVRO_SCHEMA_FULL_NAME property must be set when creating, through KSQL, any stream which will be deserialised as Avro. This is because the deserialisation mechanism relies on that property to instantiate the correct message class. This property is not documented here ( https://docs.confluent.io/current/ksql/docs/developer-guide/syntax-reference.html ), but is documented way down deep in the comments for Pull Request https://github.com/confluentinc/ksql/pull/1863 .

The title of my issue mentions inability to deserialise date types. In fact the date type (SettleDate) deserialises correctly despite the missing logical type definition in the KSQL-generated schema. Instead, the enum type fails to deserialise as-such and must be re-typed as string.

The actual error I originally reported, "Unknown magic byte!", was caused by deserialising with default.key.serde = SpecificAvroSerde.class , when the messages of the stream were actually String-keyed (KSQL cannot generate or maintain Avro keys).