confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
118 stars 1.04k forks source link

Regarding Trasforming data multiple times in KSQL #6757

Open surajshettyshell opened 3 years ago

surajshettyshell commented 3 years ago

I am referring to clickstream example mentioned in examples

scenario: I want to transform clickstream data to 30 second count aggregation and then find the maximum number of data points received in 30 secs over a period of 1 hour.

for the specified clickstream ,

  1. I created a Table with hopping aggregation
CREATE TABLE  hopping_stream_size_30s_advance_30s AS
  SELECT TIME ,
         COUNT(*) as count_of_datapoints,
         USERID as user
  FROM clickstream
  WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 30 SECONDS)
  GROUP BY USERID,TIME;
  1. I created a Stream definition since aggregation can only be done on table schema
CREATE STREAM STREAM_hopping_stream_size_30s_advance_30s  ( 

   count_of_datapoints BIGINT,
    time BIGINT,
     user VARCHAR KEY

) 
    WITH (kafka_topic='HOPPING_STREAM_SIZE_30S_ADVANCE_30S', value_format='json',TIMESTAMP='TIME');
  1. then find the max from the stream created with table schema
CREATE TABLE HOPPING_TABLE_LEVEL2_1 AS
SELECT user , MAX( count_of_datapoints) AS max_per_minute
  FROM STREAM_hopping_stream_size_30s_advance_30s
  WINDOW HOPPING (SIZE 60 MINUTES, ADVANCE BY 60 MINUTES)
   GROUP BY user;
  1. then on

select * from HOPPING_TABLE_LEVEL2_1 emit changes;

I am not getting any output ,

agavra commented 3 years ago

I believe the issue (at least initially) is in your second step. The key is not user, it's a window including user and time - which means that at a minimum you will need to declare in your WITH clause WINDOW_TYPE='Hopping', WINDOW_SIZE='30 SECONDS'. See this example from our code base:

CREATE STREAM S1 (ID INT KEY, V bigint) WITH (kafka_topic='left_topic', key_format='JSON_SR', value_format='JSON', WINDOW_TYPE='Hopping', WINDOW_SIZE='5 SECONDS');

With the following contents:

        {"topic": "left_topic", "key": 1, "value": {"V": 1}, "timestamp": 0, "window": {"start": 0, "end": 5000, "type": "time"}},

The other issue is that when you group by two fields (GROUP BY USERID,TIME) ksqlDB under the hood actually concatenates them together. This is something we're looking to fix (#6371) but at the moment the USER VARCHAR KEY that you declare won't be what you think it is.

I recommend inspecting the contents of STREAM_hopping_stream_size_30s_advance_30s to see what it looks like and then you can report back here with that information.

Also, please include the ksqlDB version that you're using.

surajshettyshell commented 3 years ago

@agavra Thanks for the reply, I am using KsqlDB version 6.0.0

following is the stream information after step 1: Screenshot 2020-12-11 at 10 18 29 AM

i tried changing the step 2 , but could not see any data streamed in

Screenshot 2020-12-11 at 10 21 38 AM

Screenshot 2020-12-11 at 10 23 29 AM

surajshettyshell commented 3 years ago

Can you please point me to any documentation that helps me understand this in detail, documentation in https://docs.ksqldb.io/en/latest/concepts/time-and-windows-in-ksqldb-queries/ does not seem to cover this

surajshettyshell commented 3 years ago

corrected the condition 2 query now. following is the output Screenshot 2020-12-11 at 10 49 52 AM

surajshettyshell commented 3 years ago

now when i execute windowing again i get this error Screenshot 2020-12-11 at 10 49 36 AM