Open the4thamigo-uk opened 5 years ago
Hi @the4thamigo-uk,
Have you considered treating the right stream as a table? Tables store the latest value for a specific key, based on the event timestamp. A null value would remove a key from the table.
Consider the stream of messages:
key, timestamp, value
x, 1, 100,
x, 5, 500,
x, 10, null
You could import the above topic as a table in KSQL via something like:
CREATE TABLE right (mykey STRING, val INT, event_time INT) WITH (key='mykey', timestamp='event_time', ...);
Note: KSQL currently requires the message key to be duplicated in field within the message value for tables. (This will soon be dropped).
Note: by default KSQL uses the timestamp of the message in Kafka. If you have a timestamp you want KSQL to use in a different field within the message's value, e.g. 'event_time', should should use the WITH(TIMESTAMP='event_time', ...)
option, and potentially the TIMESTAMP_FORMAT
option.
With your table now created, and assuming you've already imported the left stream, you can join the stream to the table and KSQL will join the stream messages to the latest value within the table.
Note: KSQL will attempt to process the records of the two underlying topics in time order. However, if you have large discrepancies in when messages are posted to the two topics then things might not work as you expect, e.g. if messages on the left stream are produced an hour after the right, then KSQL will have processed the right messages and hence have a more up to date value. To put this another way, KSQL will always process input from either stream if it is available and will always join the left to the latest value it has for the left, which may be at the same, lower, or higher timestamp.
select
l.event_time, l.mykey, l.val, r.val
from left join right
on l.mykey = r.mykey;
Please close this issue if this answers your question.
Unfortunately, we do have the situation where data arrives late so the ktable option is not reliable for us. If data arrives earlier to the ktable, then the kstream records join to ktable records from the future.
We have written a java stream app instead to do this. To do the equivalent, of what our stream app does, in ksql, would mean that a udaf would need to be able to read any field in the input, record rather than only a single field, as well as the ability to change any field in the entire aggregated record. In the streams dsl you can do anything with the whole record within the aggregation function. https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/Aggregator.html
Another thing we would need is to be able to ensure that ksql could set the rowtime for each record in a query.
I am trying to find some way in ksql to efficiently join streams as follows :
Left Stream
Right Stream
I have managed to get something 'close' using an inner join with a group aggregation, but this results in a
KTable
not aKStream
, and this is not ideal as the table will grow indefinitely, because I am grouping by l.event_time.e.g.
P.S. I realise using
max
is not strictly correct here, but I am planning to write alatest
UDAF aggregation function to use instead.Is there a canonical way to do this in ksql? If not, is this something that can be provided in the future, as I believe it is a common use-case to join to the most recent record.