confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
41 stars 1.04k forks source link

Tombstone message in Table when filtering duplicate events #8145

Open bluedog13 opened 2 years ago

bluedog13 commented 2 years ago

Provide details of the setup you're running ksql> version Version: 0.21.0-rc6

Outline your question I am trying to filter duplicate events and end up seeing TOMBSTONE messages - something I was not expecting to see in the output.

Below is my setup

## create a stream so store vehicle location
CREATE STREAM VEHICLE_LOCATION (
    VIN VARCHAR,
    LOCATION_NAME VARCHAR
) WITH (
    KAFKA_TOPIC = 'vehicle-location',
    FORMAT = 'JSON',
    PARTITIONS = 3
);

## insert few values into the stream
INSERT INTO VEHICLE_LOCATION(VIN, LOCATION_NAME) VALUES ('2G1WL54T4R9165225', 'DALLAS');
INSERT INTO VEHICLE_LOCATION(VIN, LOCATION_NAME) VALUES ('2G1WL54T4R9165225', 'DALLAS');
INSERT INTO VEHICLE_LOCATION(VIN, LOCATION_NAME) VALUES ('2G1WL54T4R9165225', 'DALLAS');
INSERT INTO VEHICLE_LOCATION(VIN, LOCATION_NAME) VALUES ('2G1WL54T4R9165225', 'HOUSTON');
INSERT INTO VEHICLE_LOCATION(VIN, LOCATION_NAME) VALUES ('2G1WL54T4R9165225', 'HOUSTON');
INSERT INTO VEHICLE_LOCATION(VIN, LOCATION_NAME) VALUES ('2G1WL54T4R9165225', 'HOUSTON');

## create a table to filter the duplicates
CREATE TABLE DETECTED_VEHICLE_LOCATION AS
    SELECT 
        VIN AS KEY1,
        LOCATION_NAME AS KEY2,
        AS_VALUE(VIN) AS VIN,
        AS_VALUE(LOCATION_NAME) AS LOCATION_NAME
    FROM VEHICLE_LOCATION
    GROUP BY VIN, LOCATION_NAME
    HAVING COUNT(VIN) = 1;

## query for distinct values from the table
SELECT * FROM DETECTED_VEHICLE_LOCATION 
WHERE VIN IS NOT NULL
EMIT CHANGES;

But the output contains TOMBSTONE messages. I am confused as to why we see "one" TOMBSTONE and who/what part of query is deleting the messages and why? Also shouldn't there be

+------------------------------+------------------------------+------------------------------+------------------------------+
|KEY1                          |KEY2                          |VIN                           |LOCATION_NAME                 |
+------------------------------+------------------------------+------------------------------+------------------------------+
|2G1WL54T4R9165225             |DALLAS                        |2G1WL54T4R9165225             |DALLAS                        |
|2G1WL54T4R9165225             |DALLAS                        |<TOMBSTONE>                   |<TOMBSTONE>                   |
|2G1WL54T4R9165225             |HOUSTON                       |2G1WL54T4R9165225             |HOUSTON                       |
|2G1WL54T4R9165225             |HOUSTON                       |<TOMBSTONE>                   |<TOMBSTONE>                   |

And below is the topic that contains the messages including TOMBSTONE messages

PRINT 'DETECTED_VEHICLE_LOCATION' FROM BEGINNING LIMIT 6;
Key format: JSON or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/09/17 18:09:16.087 Z, key: {"KEY1":"2G1WL54T4R9165225","KEY2":"HOUSTON"}, value: {"VIN":"2G1WL54T4R9165225","LOCATION_NAME":"HOUSTON"}, partition: 0
rowtime: 2021/09/17 18:09:16.276 Z, key: {"KEY1":"2G1WL54T4R9165225","KEY2":"HOUSTON"}, value: <null>, partition: 0
rowtime: 2021/09/17 18:09:15.420 Z, key: {"KEY1":"2G1WL54T4R9165225","KEY2":"DALLAS"}, value: {"VIN":"2G1WL54T4R9165225","LOCATION_NAME":"DALLAS"}, partition: 1
rowtime: 2021/09/17 18:09:15.687 Z, key: {"KEY1":"2G1WL54T4R9165225","KEY2":"DALLAS"}, value: <null>, partition: 1

Much appreciated.

mjsax commented 2 years ago

This question was cross posted in the Confluent Community forum (https://forum.confluent.io/t/tombstone-message-in-table-when-filtering-duplicate-events/2808). C&P my answer from there:

=====

Given SQL semantics, the output is correct. However, the query does not express de-duplication. The query, expressed in English, means: “Give me all records that occur exactly once”. And the underlying TABLE semantics that are based on updates work differently than you expect.

When you start your query, the result table is empty. When you process the first input record, its count is one and the record is added into the result table. When the second record is processed, the count goes up to two. The HAVING COUNT(VIN) = 1 implies that this group of records does not qualify for the result any longer, and thus, the result row must be retracted by sending a tombstone. – When the third record is processed, the count goes up to three. For this case, we don’t need to send another tombstone because the result table is already empty – there is nothing to be retracted/deleted.

Similarly your second query: it’s executed on a TABLE and thus uses “upsert” semantics: Hence, the IS NOT NULL results is a tombstone again when if “fires” after a previous row with the same key passed the filter.

Bottom line: you need to distinguish between STREAM and TABLE semantics.

In the end, de-duplication is a “conditional filter”: drop a row if you have seen it before, ie, the filter is context-sensitive, and it’s not easy to express with SQL. ksqlDB does not support sub-queries yet, but I think the correct SQL for STREAM de-duplication would be:

SELECT * FROM myStream AS main
  WHERE NOT EXISTS ( -- ksqlDB does not support NOT EXISTS
    SELECT * FROM myStream AS aux -- ksqlDB does not support sub-querie
      WHERE aux.key = main.key
      -- syntax borrowed form "system versioned temporal tables"
      -- ksqlDB does not support this syntax
      -- similar to AS OF <timestamp> for table
      -- we express a time range the query operates on
      -- ie, consider all records from the beginning of the STREAM,
      --     up to the currently processed record from the outer query
      --     (exclusive upper bound)
      FROM 0 TO main.ROWTIME 
    );

A working alternative would be, to use a STREAM based filter instead of the TABLE base HAVING filter. For this case, you would first define the stream aggregation that gives you a TABLE (similar to what you do atm, but without the HAVING clause). Second, you define a STREAM over the table result topic, and in addition define a second query over this STREAM filtering for count == 1. This way the first record with count==1 will be in the result, a consecutive output with a larger count won't be in the output; furthermore, because the filter is defined on a STREAM, there won't be any tombstones. -- One more thing: atm you define a non-windowed aggregation, which implies that (if you have a unbounded number of unique GROUP BY attributes) the result table might grow unbounded. Thus, it would most likely make sense to switch to a windowed aggregation to allow ksqlDB to drop old row in the result table eventually.

bluedog13 commented 2 years ago

Thank you very much for the explanation and your time. This makes total sense and helps me understand the underlying behavior.

For me, it also highlights the very behavior of how a table in ksqlDB behaves. This will be beneficial for me as I navigate my way in the world of ksqlDB.