python-streamz / streamz

Real-time stream processing for python
https://streamz.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
1.24k stars 148 forks source link

Proposal: Watermarking #354

Open jsmaupin opened 4 years ago

jsmaupin commented 4 years ago

This is a feature that exists in other streaming data system with windowing and aggregation functions. Its purpose is to support late arriving data in a window or aggregation. It will require that Streamz becomes semi-aware of the data structure because we will need to specify a column that represents the event time. So, for clarity, there are two timestamps here:

  1. The time at which the event enters the data pipeline
  2. The time at which the event is created in the source. We'll call this the "event time".

Due to various latencies in a distributed system, an event that should be included into an aggregation arrives too late into the pipeline to be counted. As an example, if you have a window of 5 minutes, but an event that has an event time within those 5 minutes arrives 3 minutes after the window closes, it will not be included in any aggregations.

What watermarking will do is keep the window open for a specified amount of time to include all of the data. So, if we have a 5 minute window and a watermarking threshold of 5 minutes, the window will include all events in the first 5 minutes and all events in the second 5 minutes if the event time belongs to the previous 5 minutes. If the event time is outside of the window, it will be dropped. This may be the key to implementing this, because we may just be able to include all data and then just drop data that is outside of the watermark threshold.

Here is an example of windows of 5 seconds and a watermark threshold of 5 seconds. Arrives Event Time Included
00:01:01 00:01:01 Yes - Is inside of the window time
00:01:02 00:01:01 Yes - Is inside of the window time
00:01:02 00:01:02 Yes - Is inside of the window time
00:01:03 00:01:02 Yes - Is inside of the window time
00:01:04 00:01:04 Yes - Is inside of the window time
00:01:06 00:01:04 Yes - Is with-in watermark threshold
00:01:09 00:01:04 Yes - Is with-in watermark threshold
00:01:11 00:01:04 No - Arrived too late
00:01:12 00:01:12 No - Is outside of window

I've been spending the last few days trying to figure out where this would fit into Streamz because it seems like Streamz doesn't determine what gets included in a batch. So, I'm thinking this could be implemented a few places.

My current thinking is that the Streamz Dataframe would need new parameters for the watermark threshold time and the event time column. And, when operations like windowing or aggregations are performed, it would take into account the watermarking threshold.

As always, feedback is greatly appreciated here. I'd like to include something like this so that it works for everyone.

Also, let me know if this explanation isn't clear.

martindurant commented 4 years ago

I believe the first part, adding time-of-arrival to a row, can be easily achieved with a node between the source and any batch/windowing operations.

The current time-based windowing implementation is not too far from what you have in mind (it maintains a time index and organises events), but you you have multiple buckets that are "live" at a given time. I imagine this should be implemented as a new Window-ing class.

One question that will need careful thought: what are the outputs of the window/agg, and when do they get emitted? I assume we would do something similar to the other non-python frameworks.

By the way, why is this called "watermarking"?

jsmaupin commented 4 years ago

I believe the first part, adding time-of-arrival to a row, can be easily achieved with a node between the source and any batch/windowing operations.

Are you suggesting this needs to be managed by the user of Streamz rather than the library?

The current time-based windowing implementation is not too far from what you have in mind (it maintains a time index and organises events), but you you have multiple buckets that are "live" at a given time.

Yes, the rolling windows do provide a unique problem, but I think it's a problem that has already been solved in other systems.

I imagine this should be implemented as a new Window-ing class.

Like a sub-class or possibly sibling of the Window class?

By the way, why is this called "watermarking"?

Heh, that's a good question! I don't know. I even asked others here who have used that feature in other systems, and they aren't sure either :)

jsmaupin commented 4 years ago

One question that will need careful thought: what are the outputs of the window/agg, and when do they get emitted? I assume we would do something similar to the other non-python frameworks.

Yes, I totally agree. From what I gather about what non-python frameworks are doing, Watermarking is a general implementation that, while it only makes sense in a batch context, it applies to more than just windowing operations. The watermarking metadata is propagated through the various stages of the data pipeline where each stage has an input timestamp and an output timestamp. The input timestamp is basically just the output timestamp from the previous stage and the output timestamp is the watermarking state of the current stage. And, stages that receive the input of multiple previous stages will use the watermark that is the minimum of all previous stages' output timestamps.

jsmaupin commented 4 years ago

Reading that back, it sounds very confusing. Honestly, I'm still trying to digest everything.