confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
118 stars 1.04k forks source link

KSQL Query returns inconsistent results #1095

Closed utkarshsaraf19 closed 6 years ago

utkarshsaraf19 commented 6 years ago

Hi,

Our JavaScript program writing 20 messages asynchronously using kafka-rest every seconds. We try to do aggregation on incoming message but it return some inconsistent result.

Please find topic, stream and aggregated result table definition below.

Topic: ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic order_flow --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"OrderID","type":"int"},{"name":"OrderDate","type":"long"},{"name":"Status","type":"string"},{"name":"ProductID","type":"int"}]}'

Stream: CREATE STREAM ORDERS_SRC WITH (KAFKA_TOPIC='order_flow', VALUE_FORMAT='AVRO');

NEW STREAM - this stream use the actual event date rather than time when message wrote in kafka. CREATE STREAM ORDERS WITH (TIMESTAMP ='ORDERDATE') AS SELECT ORDERDATE,ORDERID, STATUS, PRODUCTID FROM ORDERS_SRC;

Now we are aggregating data based on it status using CREATE TABLE ORDERS_AGG_SEC as select Status,Count(*) from ORDERS_D WINDOW TUMBLING(SIZE 1 SECONDS) GROUP BY STATUS;

now when we run query SELECT * FROM ORDERS_AGG_SEC; it returning below result

1522328177000 | Processing : Window{start=1522328177000 end=-} | Processing | 20 1522328178000 | Processing : Window{start=1522328178000 end=-} | Processing | 20 1522328179000 | Processing : Window{start=1522328179000 end=-} | Processing | 5 1522328179000 | Processing : Window{start=1522328179000 end=-} | Processing | 20 1522328180000 | Processing : Window{start=1522328180000 end=-} | Processing | 20 1522328181000 | Processing : Window{start=1522328181000 end=-} | Processing | 15 1522328181000 | Processing : Window{start=1522328181000 end=-} | Processing | 20 1522328182000 | Processing : Window{start=1522328182000 end=-} | Processing | 20 1522328183000 | Processing : Window{start=1522328183000 end=-} | Processing | 15 1522328183000 | Processing : Window{start=1522328183000 end=-} | Processing | 20 1522328184000 | Processing : Window{start=1522328184000 end=-} | Processing | 20 1522328185000 | Processing : Window{start=1522328185000 end=-} | Processing | 15 1522328185000 | Processing : Window{start=1522328185000 end=-} | Processing | 20 1522328186000 | Processing : Window{start=1522328186000 end=-} | Processing | 20 1522328187000 | Processing : Window{start=1522328187000 end=-} | Processing | 15 1522328187000 | Processing : Window{start=1522328187000 end=-} | Processing | 20 1522328188000 | Processing : Window{start=1522328188000 end=-} | Processing | 20 1522328189000 | Processing : Window{start=1522328189000 end=-} | Processing | 15 1522328189000 | Processing : Window{start=1522328189000 end=-} | Processing | 20 1522328190000 | Processing : Window{start=1522328190000 end=-} | Processing | 20 1522328191000 | Processing : Window{start=1522328191000 end=-} | Processing | 15

Expected Result : I should get 20 count at every 1 second for Processing status Actual Result : I am getting more than one records for every 1 second interval for same status like below. 1522328179000 | Processing : Window{start=1522328179000 end=-} | Processing | 5 1522328179000 | Processing : Window{start=1522328179000 end=-} | Processing | 20

Please find my java-script code below. function getRandomInt(min, max) { min = Math.ceil(min); max = Math.floor(max); return Math.floor(Math.random() * (max - min)) + min; //The maximum is exclusive and the minimum is inclusive } var orderdate = Date.now(); for (var i=0;i<20;i++){ var data = { "OrderID": getRandomInt(1, 20000), "OrderDate": orderdate, "Status": "Processing", "ProductID": getRandomInt(1, 10) } node.send({payload:data}); // this function asynchronously call kafka-rest api producer. } Note: kafka rest api running with default properties

rodesai commented 6 years ago

This is the way that streaming aggregations work. KSQL has no way of knowing whether it has received all the data contained within a window. So it may emit the results of an aggregation before all keys within a window have been processed. KSQL may later receive another record with the same key for the same window and then emit an update to the previously emitted aggregation result.

To reduce the number of records emitted for a given key, KSQL caches aggregation results internally. This cache is periodically flushed. You can tune the size of the cache and the flush interval to control when aggregation results are emitted. Use the ksql.streams.commit.interval.ms property to tune flush period. You can also tune the size of the cache using the kafka.streams.cache.max.bytes.buffering.

utkarshsaraf19 commented 6 years ago

Thanks for the explanation.It quite explained the strategy involved for out-of-order data which comes after tumbling window passes.

Now is it possible that i can create a separate delayed stream(delayed by 2 seconds or more) which utilizes the above stream ORDERS_AGG_SEC and i can get the row having maximum value which is actually 20. This will actually help in creating graphs directly using it.

hinguabhishek commented 6 years ago

@rodesai even with increase interval and buffering sized we facing same issue. in this case key is timestamp + order status for table; Also if that is how KSQL works then how I can do accurate aggregation realtime and use that for data visualization. whatever we are trying to achieve is simple use case of aggregation. can you please provide the exact direction which can help us to resolve this issue.

hinguabhishek commented 6 years ago

@rodesai is there settings which can help us to control this behaviour, because what we are trying is general uses case in realtime ETL.