confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
125 stars 1.04k forks source link

Materializing the latest values by time #6443

Open alibaghernejad opened 4 years ago

alibaghernejad commented 4 years ago

I'm working on an IoT project. on the Edge, each device that has a unique IMEI sends GPS packets at a specified interval. On the server-side, I want to keep the latest (based on OccurredAt field of GPS packet) GPS location of each device as a materialized view, but as mentioned by MICHAEL DROGALIS in a blog post ksqldb doesn't support aggregation by time for now (Latest_By_Timestamp). Simply compare the newly arrived message with the cache state and update it only if the occurredtAtUct value of the new message is greater than exists cache. it is what I want to do. How I can achieve that until latest_by_timestamp is available? Maybe a custom aggregate function is required idk.

here is my GpsDataStream: CREATE STREAM GpsDataStream

(imei VARCHAR, latitude Decimal(9,6), longitude Decimal(9,6), OccurredAtUtc BIGINT, Speed Double, GpsStatusId int) WITH (kafka_topic='GpsData', key='Imei', value_format='Avro', partitions=1, timestamp='OccurredAtUtc');

vpapavas commented 4 years ago

Hi @AliBaghernejad!

Yes, your intuition is right. You would need to implement a custom UDAF that keeps track of the occurredtAtUct and replaces any older values in the cache if they exist. You can look at the UDAF 'LATEST_BY_OFFSETin the code to get an example of how to go about implementing such a function. In the case ofLATEST_BY_OFFSET`, the UDAF maintains the latest value per key where latest is specified based on the offset in the topic.

alibaghernejad commented 4 years ago

Thanks, I have created a new aggregate function with success. but keep in mind that on result materialize cache (Table), I need correlated Lattitude and Longitude of the most recent Gps Packet also. In other words, I want to keep the most recent GPS point info (IMEI, OcurredAtUtc, Latitude, Longitude) for each device as a materialized cache and retrieve that with its key. (IMEI) What about that? I checked structured columns but find out that 'LATEST_BY_OFFSET' (and also my custom-made one) does not have any overload to support structured columns.

alibaghernejad commented 4 years ago

when creating a new aggregate function (LATEST_By_Timestampt_Custom), and applying it on a field like Latitude, I should have access to the value of the OccurredAtUtc field inside the function. but I don't know if it is possible or not. Let me know if I miss understanding that.

PeterLindner commented 4 years ago

@AliBaghernejad I think ARGMAX is what you are looking for (#5300) , but, if I understand correctly, the current UDF Framework can't handle multiple columns as input https://github.com/confluentinc/ksql/issues/3985#issuecomment-588482904

vpapavas commented 4 years ago

Is this the kind of query you are looking to issue? :

SELECT IMEI, latest_by_timestamp(OcurredAtUtc, Latitude), latest_by_timestamp(OcurredAtUtc, Longitude) 
FROM table 
GROUP BY IMEI

If yes, then maybe you can have the UDAF take a struct as input parameter so that you can give both OcurredAtUtc and Latitude as arguments? Then, the query would look something like this:

SELECT IMEI, latest_by_timestamp(STRUCT(o := OcurredAtUtc, l := Latitude)),  latest_by_timestamp(STRUCT(o := OcurredAtUtc, l := Longitude))
FROM table 
GROUP BY IMEI