confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
44 stars 1.04k forks source link

Please keep Stream-Stream JOINs without GRACE a feature in future versions #9668

Open filidorwiese opened 1 year ago

filidorwiese commented 1 year ago

Hi there,

I understand that in most cases when JOIN-ing streams a GRACE period is a necessity to avoid "spurious" result records as is described here and here. However I'd argue that there are situations where intermittent updates to the result table is the intend of the developer.

Let me explain with an example. Here we have two streams joined to a table:

CREATE OR REPLACE STREAM `stream-a` (
    id STRING,
    a INT
) WITH (kafka_topic='stream-a', partitions=1, value_format='json');

CREATE OR REPLACE STREAM `stream-b` (
    id STRING,
    b INT
) WITH (kafka_topic='stream-b', partitions=1, value_format='json');

CREATE TABLE `table` WITH (KEY_FORMAT='JSON') AS
SELECT
    `stream-a`.id as `id`,
    LATEST_BY_OFFSET(`stream-a`.a) as `a`,
    LATEST_BY_OFFSET(`stream-b`.b) as `b`
FROM `stream-a`
LEFT OUTER JOIN `stream-b` WITHIN 1 HOUR GRACE PERIOD 10 SECONDS ON `stream-a`.id = `stream-b`.id
GROUP BY `stream-a`.id
    EMIT CHANGES;

In our use-case we want to have a table with all rows from stream-a left joined with stream-b regardless if there is a stream-b matching row. So for instance when inserting this data:

INSERT INTO `stream-a` VALUES ('key1', 1);
INSERT INTO `stream-b` VALUES ('key1', 2);
INSERT INTO `stream-a` VALUES ('key2', 1);

The resulting table should now contain two rows:

ksql> SELECT * FROM `table`;
+------------------------+------------------------+------------------------+
|id                      |a                       |b                       |
+------------------------+------------------------+------------------------+
|key1                    |1                       |2                       |
|key2                    |1                       |null                    |

However the actual output is one row for which both streams have emitted an event:

ksql> SELECT * FROM `table`;
+------------------------+------------------------+------------------------+
|id                      |a                       |b                       |
+------------------------+------------------------+------------------------+
|key1                    |1                       |2                       |

This was unexpected and might even be a bug? Shouldn't key2 be yielded after the GRACE period of 10 SECONDS?

If we leave out the GRACE PERIOD 10 SECONDS part of the JOIN, the resulting table is as expected, however KSQL outputs the following warning:

WARNING: DEPRECATION NOTICE: Stream-stream joins statements without a GRACE PERIOD will not be accepted in a future ksqlDB version.

For our use-case we're looking for a LEFT JOIN that is conceptually common to relational databases where the left row is always part of the output even if there is no right-row match.

So please do not deprecate this feature or instruct us how to achieve the same effect with a GRACE period specified.

Thanks!

ppilev commented 1 year ago

hello. my upvote here.

I'm not in a position to argue with the reasons behind the current behavior but having an explicit GRACE PERIOD changes the semantics of the LEFT joins and during the grace period the LEFT joins will literally behave as INNER joins.

In regard the documentation the most detailed explanation I found was at ksqldb-0-23-1-features-updates announcement page. Based on what is shown there you actually might expect messages appearing out of order, correct?

IMO the behavior of having explicit GRACE PERIOD is poorly covered in the ksqlDB documentation. The only place mentioning how grace period will affect the stream-stream left joins is a note in that section within-and-grace-period. All other examples with grace period I found here and here are for table windows.

Thanks!

mjsax commented 1 year ago

This was unexpected and might even be a bug? Shouldn't key2 be yielded after the GRACE period of 10 SECONDS?

Well, key2 would still be emitted, but note that grace-period is base on stream-time (not wall-clock time), thus the window only closes if new data with higher timestamps arrive to advance stream-time further and close the window. If data flow stops, key2 is just "stuck" (but not dropped). In real deployment when data flows continuously, this is not an issue. For testing, it can be annoying and the trick would be to send some additional "dummy" record to flush out stuck records.

during the grace period the LEFT joins will literally behave as INNER joins

It has to. Before the window closes, it's unknown if a not-yet-joined record might get a join partner in the future or not, and thus, emitting left-join result must be delayed.

Based on what is shown there you actually might expect messages appearing out of order, correct?

If you refer to the output, yes. As stated above, emitting outer-join results must be delayed until the window closes, and thus, those input events would be emitted out-of-order.

IMO the behavior of having explicit GRACE PERIOD is poorly covered in the ksqlDB documentation.

I guess that's fair... :( -- @JimGalasyn @bbejeck; could you help with this?

Thus, overall I am not convinced that removing grace-period and keeping the old (potentially incorrect) behavior would really bring benefits? -- The only thing I could image would be, to allow for an additional wall-clock time window-close mechanism in case data flow stops for too long and stream-time does not advance -- but it would open the door for non-determinism that we actually try to avoid if possible.

JimGalasyn commented 1 year ago

@mjsax Sure! @bbejeck, can you please open a DOCS ticket for me with the details? Thanks!

ppilev commented 1 year ago

please don't remove the grace period but just consider to have some option to keep the normal LEFT JOIN behavior because sometimes that (potentially incorrect) behavior is intended one.

mjsax commented 1 year ago

There is no timeline to make GRACE mandatory. And we will keep it in mind, that there is demand for the old semantics for sure. We really appreciate all your feedback!

From a SQL point of view, the old semantics seem to be strictly incorrect (the old semantics don't really compute a join as SQL would define it sensibly), while the new semantics seem correct. -- Not sure if there is a better way to express the old behavior... We will for sure think about it. Maybe we can come up with a good solution, that would replace the old semantics in a clean way.

Of course: suggestions are very welcome!

ppilev commented 1 year ago

by the way I'm reading the description of KAFKA-10847 and wasn't all this actually about the inner-joins?

a record what will be an inner-join result, might produce a eager (and spurious) left/outer join result.

mjsax commented 1 year ago

The ticket was about left- and full-outer joins. The sentence you quote is based on the fact, that an inner-join result is a sub-set of and left-join result, which is itself a sub-set of the full-outer join result.

Assume the following input data <value,timestamp>, assuming all records have the same key and thus join (and a join window for 100):

left:   <a, 100>           <c,300>
right:           <b, 110>           <d, 500>

The inner join result should contain only <a-b>, and the left-join result would contain <a-b> as well as <c-null> as result.

However, before the fix, the computed left-join result was three records: <a-null>, <a-b>, <c-null>. It included a spurious <a-null> left-join result, for a which is actually part of the inner join result <a-b> and thus <a-null> should not be emitted (even for a left-join). With the fix, <a-null> is not longer in the result of a left-join as it should be.

The reason for the spurious <a-null> result was, that when a was processed, we did lookup into the other side window and did not find b yet, and thus emitted <a-null> eagerly. However, the join window was not closed when a was processed and thus it's not really correct to emit <a-null> eagerly because it's not clear at this point if a will have a join partner or not (the window is still open, and in fact, a joins with b in our example). With K10847, we basically don't emit <a-null> right away, but only "remember" a as left-join candidate. When b comes in, we emit <a-b> and can now remove a as left-join candidate, because there was a join partner and we should never emit <a-null> any longer. A left-join should only ensure that if a left input record has no join partner within the join window at all, it should still go into the output instead of being dropped (what would happen for an inner join) -- but we should not emit any left input record as "inner-join" and "left-join" result.

To complete the example, the same thing happens for c. With the fix, we first only remember c as candidate, and only when d comes it, we can close the join window for c and emit <c-null>. In our example, emitting <c-null> eagerly would not have resulting in an incorrect output, but we cannot know it when c itself is processed, but only when we get d.

Hope this helps.

ppilev commented 1 year ago

hi. I really appreciate your time spent answering all these questions.

I clearly understand the driver behind the new semantics I just disagree with the fact that <a-b> is delayed until b showed up and worst <c-null> is delayed until the window closes (window end + grace period). such behavior just breaks down the real-time processing of those bits the main stream delivers.

In regard the suggestions I can think of additional keyword applicable only to LEFT/OUTER joins allowing to opt in for that eager behavior:

FROM MAIN_STREAM M
LEFT EAGER JOIN  SECONDARY_STREAM S

or maybe it's better to make it part of the GRACE PERIOD clause because currently that clause actually triggers the uneager semantics

WITHIN 50 MILLISECONDS GRACE PERIOD 2 SECONDS EAGER 

Thanks!

mjsax commented 1 year ago

just disagree with the fact that is delayed until b showed up

This was also the case before the change -- how could we emit <a-b> without having seen b?

worst is delayed until the window closes

Well, if we don't delay it, and we look at the a/b example you would get two results <a-null> and <a-b>, what is not a "join result" only longer (following SQL semantics).

Btw, I want to give a different example for left-join:

left:            <a, 110>
right: <b, 100>

For this case, the right input arrives before the left input, and the result would only contain <a-b> in the new and old behavior. Any comments? Why is not having a <a-null> for this case ok?

In the end, I guess I am missing context on the use case: why do you want the spurious result record? Would be great to understand this better.

As pointed out: right now there is no timeline for making GRACE mandatory, and we will for sure revisit this topic before we make it mandatory, so the old behavior won't go away.

Really appreciate the input. It's interesting that there is actual demand for "non-SQL semantics".

There is actually a related ticket for Kafka Streams: https://issues.apache.org/jira/browse/KAFKA-13813

lhind-linus commented 1 month ago

First of all, thanks for your detailed explanations. This helped me to better understand the problem I am facing.

I'll try to answer but beware I am just starting out with KSQL.

In the end, I guess I am missing context on the use case: why do you want the spurious result record? Would be great to understand this better.

My use case: I have 3 input streams: A, B and C with schema (SOMEKEY STRING, VALUE STRING, TS TIMESTAMP). It is very possible that either "input" stream does not receive any record for more than 7 days.

Stream A is full-outer-joined with B to stream X within 7 days. Stream x is full-outer-joined with stream C to stream Y. Stream Y is now aggregated to a windowed table. The VALUEs of stream C are collected to a list. The VALUE of stream A and B is the LATEST_BY_OFFSET from each.

Now I can query this table to receive the combined latest data of the last 7 days. If I provide the GRACE PERIOD, the data might only be visible after the 7 days. But for me the data is relevant as soon as it arrives. And also worthless after the 7 days.

So a spurious result record is better than none. Please provide an alternative way to enable this EAGER mode when continuing with the deprecation.