Open SebAlbert opened 6 years ago
Some quick feedback @SebAlbert. What you are describing is very much about the semantics of handling late-arriving or out-of-order data (like corrections) in tables. In other words, it's about the semantics of tables / of stateful operations (tables are a bit like a stateful stream).
Example: In a stream-table join, which is a stateful operation, the question is about whether a late-arriving record in the table would trigger a correction/update output record for the join to be sent downstream. Here's a simple example:
Time flows downwards.
STREAM TABLE | STREAM-TABLE JOIN OUTPUT
---------------------------------------------
- Asia | -
A - | (A, Asia)
- America | - [= ignore late table update] or (A, America) [= honor late table update] ? <<< this is the question
How long should a system keep around information from past records in the stream and table, so that late-arriving data can be handled correctly in the stream-table join? And, as you point out, this question is not limited in scope to the join step -- it might impact downstream processing steps, too, such as downstream aggregates:
However, I need the aggregates of "how many clicks from Asia" and "how many clicks from America" of the past corrected.
What information (think: state), and for how long, should a system keep around to correct the prior results of stateful operations? The stream-table join example above is one such stateful operation, and there might be further downstream stateful operations after the join step.
This has also significant impacts on the design and efficiency/performance of a system. Some types of aggregations/computations are commutative + associative, for example, such as addition (think: SUM
), which means you can optimize and get away with having to retain only some limited information to correct a prior aggregation result. For the general case however you'd have to retain the actual input records for correct re-computing, which requires significantly more resources.
Thanks for this view on the topic. Thinking of this as "stateful" (and "state") helps a lot. I am coming from the relational database side, where this appears a bit more straightforward to me, but it is very useful to think of all tables and their entire content being "state". A Kafka topic with long enough retention may or may not also be an option here, since you could choose to rewind the consumption index pointer. However, with a real relational table and proper indexes, this may well be much faster.
How is this handled in KSQL? Do you plan to make available some sort of parameter so that we could specify for each JOIN how much back-correction we want? As I understand it by now, no such correction would be made at all in the current implementation of KSQL's stream-table join, right?
Some types of aggregations/computations are commutative + associative, for example
Absolutely, and I think the last condition to mention here is the existence of inverse elements with regard to the commutative/associative operation, so you'd basically need an abelian group, unless you want to recalculate the entire aggregate from scratch (requiring having everything that ever went into it still in the state).
A Kafka topic with long enough retention may or may not also be an option here, since you could choose to rewind the consumption index pointer. However, with a real relational table and proper indexes, this may well be much faster.
A stream or "log", which Kafka is, is IMHO solving this problem in a general manner as it fully captures the history of what happened in the past (cf. event sourcing). In comparison, tables (and state) are derived from streams, hence I'd argue that streams are the first-order construct whereas tables are a second-order construct because they are derivations from streams. (FWIW, there's IMHO also an interesting relation to how LSTM networks work, cf. cell state).
The point you raise about table indexes is IMHO about optimizations, and yes, such optimizations can be applied to a table for certain scenarios. Normal indexing doesn't allow you to go back in time however. (One idea that we have been floating around is "multi-version tables".)
[commutative + associative] Absolutely, and I think the last condition to mention here is the existence of inverse elements [...]
Yep, exactly. 👍
How is this handled in KSQL? Do you plan to make available some sort of parameter so that we could specify for each JOIN how much back-correction we want? As I understand it by now, no such correction would be made at all in the current implementation of KSQL's stream-table join, right?
Unlike most other stream processing technologies, KSQL (and Kafka's Streams API) already handles late-arriving / out-of-order data. But there are some situations such as discussed above where the question is whether the current semantics (example: right now, a stream-table join does not send an update downstream if the table side of the join is seeing late-arriving data) are sufficient or whether more / something different is needed. And if the semantics should be changed -- or users given a choice -- in which form (think: SQL grammar in KSQL, API in Kafka Streams) this should happen. Feedback welcome. :-)
I totally agree on streams as first-order and tables as second-order (thus questioning the claimed "duality" upon a closer look), and I like the concept of event sourcing, with immutable history and the likes. This may also be part of the recent hype around "blockchain" (as in bitcoin, not as in cryptographics), as a side note.
The only question is whether you can or want to afford storing everything forever, and to re-read "everything" whenever you would only have to correct an arbitrarily small subset of past events (of which you just happen to not know the position in the log of the past, without an index). Bottom line: I advise considering (relational or general) databases as "state stores" for complex use cases. (And the issue of having to read "everything" in spite of being interested in only a very small subset (partitions by message key won't help if you have multiple criteria) also currently has me considering not to choose either Kafka or, say, RabbitMQ, but maybe both.)
As to wishes regarding KSQL: While I like it being close to the popular SQL syntax, the most compelling argument for me is it being declarative and high-level. So I'd appreciate a syntax, even if thereby moving away from strict SQL a bit, that eases parameterisation of things like "memorising the stream-side of a stream-table join in order to correct results upon late-arriving data on the table side for timespan 'x days'". (As another side note, I also find it a bit inconvenient to reconfigure a runtime property before every CREATE statement in order to say whether I want it to begin from "earliest" or "latest" (or maybe somewhere in between?).)
Hello everybody
I have a very general, conceptual question regarding streaming data in scenarios in which not only real-time, but also history matters. The question is not confined to KSQL (nor even to Kafka), but I think it is of general interest for KSQL, and I don't know of a better place to ask (pointers welcome).
The basic question is: What are (best) options to consistently handle corrections of data affecting the past, and, thus, past events that it was joined to?
For example, if I have a clickstream and a users table, indicating for every user where they are living (take America, Europa, Asia, for the sake of the example). So the
clickstream
stream is enriched by the facts from theusers
table by means of a stream-table join into a new stream, sayclickstream_enriched
, which holds theclick_time
and theclick_user
from theclickstream
, and theuser_homelocation
from theusers
table. Furthermore, I have an aggregate table aggregating over some time windows (say tumbling by 1 day for sake of simplicity) regardingclick_time
, grouping byuser_homelocation
, just counting the number of click events. I realise that I "late arriving data" is no problem for the aggregates, as past windows can be updated late.Now I get the information that some user (call her "A"), who clicked many pages during the last month, has always been living in America, while I was thinking (and thus, the
users
table reflected) they were from Asia. What do I do in order to correct my aggregates of the past? I will put an update to theusers
table onto the stream, but without further consideration, this would probably only affect all events to come. However, I need the aggregates of "how many clicks from Asia" and "how many clicks from America" of the past corrected.My first thoughts revolve around having an additional "revocations" stream for every stream, in this case,
clickstream_enriched_revocations
, into which all the past messages fromclickstream_enriched
regarding user A (with the now wronguser_homelocation
of Asia) have to be copied, while theclickstream_enriched
gets the events from the past with the correcteduser_homelocation
as new events (with the old, pastclick_time
, of course). Then, for all aggregates, I would have to either maintain a second "revocations" aggregate, too, or (which in case of some more complex aggregates may not be possible) aggregate from both streams, subtracting the counts. A slight variation of subtracting might be summing instead of counting, with "1" for the original events and "-1" for the revoked events (but again, this may not be feasible for all types of aggregates). With this variation, we could also instead of the second events stream (clickstream_enriched_revocations
), just add an indicator field to all events of the original stream, indicating whether it means a new event or a revocation.I also see a possibility of treating everything like a (streamed) table, with the entire payload being the "key", which could then map to either the usual data set for the positive information, or to "null" for the revocation, which would support log compaction. However, that seems a bit radical and unwieldy to me.
Are there any concepts on the theme of "correcting the past", be it in KSQL or in general, that I may be missing? Of course, this would also apply to table-table joins, I think it is a very fundamental conceptual thing that needs to be embraced for a whole lot of use cases.