confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
123 stars 1.04k forks source link

Multiple Entries for same primary key in CTAS query #9437

Open emrekuecuek opened 2 years ago

emrekuecuek commented 2 years ago

Hello. We have a question regarding windowed aggregations on tables, specifically with CREATE TABLE AS SELECT queries. We have the following pipeline for processing our data.

graph TD;
    Raw_Stream_From_Kafka_1-->Joined_Stream_with_Tumbling_Window;
    Raw_Stream_From_Kafka_2-->Joined_Stream_with_Tumbling_Window;
    Joined_Stream_with_Tumbling_Window-->Statistics_Table_with_Non-Windowed_Aggregation;
    Joined_Stream_with_Tumbling_Window-->Enrichment_Table_with_Windowed_Aggregation;
    Statistics_Table_with_Non-Windowed_Aggregation-->Enrichment_Table_with_Windowed_Aggregation;

Enrichment table will be used for checking several conditions using CASE clauses. We would like to use some non-windowed aggregations with windowed aggregations, but apparently that is not possible in the same SQL clause. Hence, we are creating a new table for non-windowed aggregations as an extra step.

We would like to use these persistent, non-windowed statistics with windowed statistics, (such as comparing the last 5 minute average with standard deviation of entire sensor value history values etc.) and check some conditions and apply enrichment to our data. If there is a status change, our consumers from Kafka (or some service that checks ksqlDB with push queries).

As a solution, we thought creating an enrichment table with windowed aggregations would solve our issue, since values in a table are updated rather than appending it end to a stream. In final, whenever we check the Enrichment_Table_with_Windowed_Aggregation we will have the current updated status regarding our conditions.

But we applied it to our systems, we came across an interesting behavior of ksqlDB. I should also mention, we saw this behavior in "Mastering Kafka Streams and ksqlDB" as well. In documentations, we did not encounter such examples. Question is given below.

Multiple Entries for same primary key in CTAS query

When we create a table with windowed aggregations, there are multiple entries for same PRIMARY KEY. These are different than each other regarding window's starting and ending times, but still if we need the last value, we can not access it with just a simple SELECT query. The main structure of our table is given below.

ksql> describe test_table;

Name                 : TEST_TABLE
 Field  | Type                                                   
-----------------------------------------------------------------
 field_1 | VARCHAR(STRING)  (primary key) (Window type: TUMBLING) 
 field_2 | VARCHAR(STRING)                                        
-----------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;

When we run a simple SELECT query, multiple results return for same PRIMARY KEY.

SELECT * FROM TEST_TABLE;
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|field_1                                            |WINDOWSTART                                       |WINDOWEND                                         |field_2                                            |
+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|1                                            |1660135860000                                     |1660135870000                                     |SOME_VALUE                                           |
|5                                            |1660135860000                                     |1660135870000                                     |SOME_VALUE                                           |
|3                                            |1660135870000                                     |1660135880000                                     |SOME_VALUE                                           |
|8                                            |1660135870000                                     |1660135880000                                     |SOME_VALUE                                           |
|1                                            |1660135870000                                     |1660135880000                                     |SOME_VALUE                                           |

In the example, you can see there are multiple entries with same primary key. In this case 1 has more than one value. But this duplication happens for all primary keys. How can we prevent that and obtain only the last value?

Also we know that there are WINDOWSTART and WINDOWEND columns which are indicating that these are windows from different time frames, but instead of appending to table, shouldn't all of the values be changed, including WINDOWSTART and WINDOWEND and only display the latest value?

Thank you for your patience, any help would be great!

emrekuecuek commented 2 years ago

I've found something in #8643 but is this the correct answer? I could not be certain.

suhas-satish commented 2 years ago

@emrekuecuek , #8643 is 1 way to go about it.

vpapavas commented 2 years ago

Hi @emrekuecuek !

The behavior you are experiencing is correct. Pull queries will return the rows in all windows. There is no way currently to get the rows in the latest window only. Not sure what exactly you are trying to do but EMIT FINAL would be one way to explore and LATEST_BY_OFFSET another.

emrekuecuek commented 2 years ago

Thank you for your insights. My problem requires windowed aggregation over a sensor, and I would like to apply enrichment to the data over some conditions. For that, I am using CASE clauses. In the application, I need to make this enrichment according to last 5 minutes' aggregation. Hence, previous aggregations are not required in my case.

If some condition is met, I would like to update the table's status as ALERT and let the system know that. Moreover, our application requires the most updated aggregation (to inform the current status). That's why I need the latest aggregated results.

I tried EMIT FINAL and LATEST_BY_OFFSET, but since EMIT FINAL is a undocumented feature that should be used cautious, I decided not to use that (also there were other issues such as I could not arranged time windows correctly etc. and not having a documentation did not help unfortunately.) and LATEST_BY_OFFSET did not work for CTAS because I could not apply nested aggregation over sensors, maybe there is some work around for that?

Thank you so much for your input, I really appreciate it. Regards,