confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
123 stars 1.04k forks source link

How to do Top-N query with KSQL? #403

Open ChenShuai1981 opened 7 years ago

ChenShuai1981 commented 7 years ago

Does KSQL currently support Top-N query? If true, how to achieve it?

hjafarpour commented 7 years ago

@ChenShuai1981 currently, KSQL only supports TOP-1 query (Min/Max). However, you can extend the min/max aggregate functions to support Top-n (if n is relatively small). We will be adding this in near future.

miguno commented 6 years ago

@ChenShuai1981: This functionality is now available in KSQL since the v0.4 release (January 2018).

There are two new functions: TOPK and TOPKDISTINCT. See https://github.com/confluentinc/ksql/blob/master/docs/syntax-reference.md#aggregate-functions for more information.

ChenShuai1981 commented 6 years ago
  1. I met ServerError:java.lang.ArrayStoreException when tried ksql v0.4 with confluent v4.0.0, any suggestion?

    ksql> DESCRIBE END_TRIP_COUNT_BY_STATIONS;
    
    Field            | Type                      
    ----------------------------------------------
    ROWTIME          | BIGINT           (system) 
    ROWKEY           | VARCHAR(STRING)  (system) 
    DATE             | VARCHAR(STRING)  (key)    
    END_STATION_ID   | BIGINT           (key)    
    END_STATION_NAME | VARCHAR(STRING)  (key)    
    TRIP_COUNT       | BIGINT                    
    ----------------------------------------------
    For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
    ksql> SELECT DATE, TOPK(TRIP_COUNT, 2) FROM END_TRIP_COUNT_BY_STATIONS GROUP BY DATE;
    2017-03-04 | [7.0, 3.0]
    2017-03-06 | [7.0, 1.0]
    **ServerError:java.lang.ArrayStoreException**
    Query terminated
  2. And how can I get station info which has top 1 trip_count?

ksql> SELECT t2.DATE, t2.MAX_TRIP_COUNT, t1.END_STATION_NAME, t1.END_STATION_ID FROM (SELECT DATE, MAX(TRIP_COUNT) as MAX_TRIP_COUNT FROM END_TRIP_COUNT_BY_STATIONS GROUP BY DATE) t2 LEFT JOIN END_TRIP_COUNT_BY_STATIONS t1 ON t1.DATE = t2.DATE AND t1.TRIP_COUNT = t2.MAX_TRIP_COUNT;
relation is null
Caused by: relation is null
ybyzek commented 6 years ago

I am re-opening the issue because I'm trying to accomplish the same thing that @ChenShuai1981 is trying to do, but based on my understanding, this is not supported at this time? It would be good to enhance documentation to clarify what TOPK can and cannot do.

apurvam commented 6 years ago

the specific problem here is that the current topK implementation does not allow ranking one column by the value of another column.

So we want to do something like

CREATE TABLE ksqltop5 AS SELECT song_name, TOPKDISTINCT(count,5) FROM ksqlsongplaycounts GROUP BY constant_key;

The above takes an input of 'song_name, count', routes all messages to a single partition (through grouping by constant_key), and then prints the top five song names ordered by the play counts.

Right now, KSQL has no ability to do either global topK (the workaround is to use the 'constant_key' hack), and it also has no ability to sort by one column and select the value of another column. In the example above, there is no way to sort the top5 song_names by their play count in KSQL Today. The above query would just print the top5 song play counts but without a name attached.

dtheodor commented 6 years ago

I think this TOPK implementation didn't quite hit the mark. The whole usefulness of such queries to be be able to get the top K columns sorted on their frequency, not sorted on their actual values. Or even better sorted on an arbitrarily computed metric that may use values from different columns, i.e. give me the top 5 songs by count, the top 5 URLs by pageviews, etc.

See examples of topk queries done right: https://clickhouse.yandex/docs/en/agg_functions/reference/#topkncolumn http://druid.io/docs/latest/querying/topnquery.html

trtg commented 6 years ago

:+1: A true topN would be very useful for alerting- e.g. select top 20 user IDS by count of browser sessions over the current 60 minute tumbling window. It seems like the closest approximation currently would be to write your own consumer that updates a fixed size set with each record coming out of the query stream/table. I.e. maintain a sorted set of size 20. Is there any other way to approximate the kind of query I'm referring to? @miguno the documentation link you pasted for topk is broken, I assume this is the current link? https://docs.confluent.io/current/ksql/docs/developer-guide/syntax-reference.html#aggregate-functions

apurvam commented 5 years ago

This should be part of revamping our UDFs, linking with the UDF epic #3556

apurvam commented 5 years ago

cc @big-andy-coates

big-andy-coates commented 4 years ago

Note, we try to use this UDAF from our own music example, but its not currently very useful to know the count of the top 10 songs, without also knowing which songs the count relates to!

https://github.com/confluentinc/examples/blob/5.5.0-post/music/statements.sql

big-andy-coates commented 4 years ago

Implementing this will require enhancing the UDAF framework, which is much needed. Who ever picks this up, please reach out and we can discuss.

philipschm1tt commented 4 years ago

I am also interested in a more general Top N function and want to provide the following use case as an example.

We want to show the customer in an online shop their N last viewed products – or "top N" by message time.

I believe that use case could be implemented with the current UDAF framework. However, the Confluent Cloud ksqlDB does not support custom UDFs.