confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
118 stars 1.04k forks source link

Joining two streams, with "time shift" #410

Closed tvedtorama closed 6 years ago

tvedtorama commented 7 years ago

Hi,

I want to join two streams - on of messages between users - and one of their "reads". Reads are the event that some user has seen the message. I do this using a dummy data generator, by generating messages topic "messages", {key, {my_key, text}} (where my_key = key), and reads topic "reads", {key, {messageKey, userid}}. The reads are generated some n seconds after the messages, where n is random [0, 120] seconds. The point with this experiment is to figure out what messages remains unread after n minutes.

I have tried lots of things, but I can get the two data sources to join. I realize that stream->stream joins are not supported directly, but I don't understand why I can't treat the "reads"-stream as a table and then in combination with (session) windowing get the join to work?

Do you have a recipe on how this could be done?

I also thought that the SESSION window would allow me to create a "delayed stream", which would be very useful in checking for time-related-events (much the same way the brain checks for changes in sensory stimulation over time by using slow and fast neuron pathways, or something).


This is an except of what I have tried so far: I created a stream:

CREATE STREAM message_stream (text varchar, rnd int, my_key varchar) WITH (kafka_topic='messages', value_format='JSON'); 

and a table:

CREATE TABLE read_table (messageKey varchar, userId int) WITH (kafka_topic='reads', value_format='JSON');

When I select from the stream and table, they generate exactly the data I expect - with seemingly matching m.my_key and r.messagekey.

Then I tried to join the two:

SELECT * from message_stream m LEFT OUTER JOIN read_table r ON m.rowkey = r.messageKey;

This always produces null-values for the read_table part. I assumed that this was because read_table will never exist/match when message_stream "fires". So I added a WINDOW TUMBLING (SIZE 30 SECONDS) to the query, but still only nulls.

Then I tried to create a delayed stream, where the events of the message_stream would be offset by two minutes:

  CREATE STREAM message_delayed AS SELECT * FROM message_stream WINDOW SESSION (120 seconds);

This succeeds, but when I SELECT from it I seemingly get the same real-time stream as the source. I have also tried to create a table where I use GROUP BY, with the same result.

I have tried to use the "delayed" (as if...) stream in the join, but with the same result. I have also tried to CREATE STREAM from the join query, but it contains the exact same result.

BTW: I don't get an error when I put invalid key names in the ON-clause in the JOIN, but I do get them for the output fields. Not sure if this could be an indication of something?

hjafarpour commented 6 years ago

@tvedtorama thanks for your question. This seems a very interesting problem. Your first approach in using Stream-Stream join is the right approach in my opinion since the message and read are both streams and read is set of independent records and therefore is a stream. As you correctly noticed stream-stream join is not supported in KSQL yet but it is in our roadmap and will be supported soon. The reason you receive null values when you use 'read' topic as table and join it with 'message' topic is because of the join semantic we have in Kafka Streams. When you join a stream with a table, the record in the stream will be joined to the records in the table that exist before the stream record arrives and since the 'read' records come after the corresponding 'message' record this means that there is no 'read' record when every 'message' record arrives and therefore, the left side of the join will be null. One way of this is to change the timestamp on the 'message' stream and delay it as you did and in this case assuming that you receive the 'read' records before the changed 'message' record time you will be able to do the join. To see the results from the beginning you can set 'auto.offset.reset' to 'earliest'.

tvedtorama commented 6 years ago

@hjafarpour Thank you, this explains a lot.

Lets say I have a message that should generate a "read-timeout" after 10 minutes without a (one or more) read. Could I simply set the timestamp on the message to t + 10min? Would that affect the join logic, or would I actually have to post the message to the topic at t + 10min?

Currently, as a workaround, I have coded this logic in my node.js application, using two topic consumers. I could of course use this application to do a delayed insert of the message, but it would be so much better to be able to declare it all in KSQL. In particular, I'd like the timeout value to be part of the KSQL.

hjafarpour commented 6 years ago

@tvedtorama No need to delay the message, setting the time to t+10m should work. Make sure that you set the timestamp column in your stream definition correctly.

tvedtorama commented 6 years ago

@hjafarpour I turned the query around, and made the reads a stream and the messages a table. I have also added explicit timestamps to the tables, linking to a timestamp generated on the producer. I confirmed that the messages (the table) have timestamps that are earlier than the reads (stream). I then did the join, but still no hits on the table side of the join (only nulls).

The only strange thing I noticed is that I don't get the full table when doing a select, even as auto.offset.reset = earliest, which I do get on the stream. In fact it only returns one record.

I don't understand why this is not working, the demo case (from this github project) with the generators are working in the same docker container.

My scripts:

CREATE STREAM msgRead (messageId varchar, receipientId varchar, processTime BIGINT) WITH (kafka_topic='newRead', value_format='JSON', timestamp='processTime');
SET 'auto.offset.reset'='earliest';
SELECT * FROM msgREad LIMIT 3;
CREATE TABLE msgAdd (messageId varchar, receipientId varchar, processTime BIGINT) WITH (kafka_topic='newMessage', value_format='JSON', timestamp='processTime');
SELECT * FROM msgAdd LIMIT 3; # Only returns 1 row!!
# This one returns blanks for the table
SELECT r.messageId, r.processTime, a.messageId, a.processTime  FROM msgRead r LEFT OUTER JOIN msgAdd a ON a.messageId = r.messageId LIMIT 3;
# This one returns nothing, not even when adding new data
SELECT r.messageId, r.processTime, a.messageId, a.processTime  FROM msgRead r LEFT OUTER JOIN msgAdd a ON a.messageId = r.messageId WHERE a.messageId IS NOT NULL LIMIT 3 ;

I also tried to add the field messageId as key to the table, like on the clickstream_codes.

apurvam commented 6 years ago

We are tracking the work to support stream-stream joins in #731 . I think that will address the issues here, so I am going to close this out. Please reopen if I am mistaken.

vahidhashemian commented 6 years ago

@tvedtorama @hjafarpour, great discussion! I also tried the time-shift workaround that was suggested here without any luck. It seems to me that the table row must exist for the join to work (irrespective of the message timestamps). I believe delaying the message write on the stream could work, but cannot be done in KSQL afaik.

tvedtorama commented 6 years ago

@vahidhashemian I gave up on this and made my own custom logic in code, using producers / consumers. Being able to query across time in a descriptive way would definitely be powerful. If the required support arrives, I'll try it again.