faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.64k stars 181 forks source link

QUESTION: Any possibility of getting a sliding window example #185

Open fonty422 opened 3 years ago

fonty422 commented 3 years ago

I note in the docs that it states you support hoping, tumbling, and sliding windows. There's even a faust.SlidingWindow class, but I can't seem to figure how to implement it. The requirement is to have a live count per event for the preceding 10, 30, 60, and 300s every second. In the future, we really also want to include a count and some time based calculations for the last 24 hours, 1 week, 1 month, and 3 months too.

I have a workaround at the moment, which is to use a 1s tumbling window with an expiration time of 300s, then I sum all the results up from 300 to now using the delta method on the windowed table and writing that to a topic. That's ok for now, with a small number of messages from a small number of sources, but production is possibly going to be thousands of messages per second with tens of thousands of sources, so I'm not sure how salable my solution is.

class AlarmCount(faust.Record, serializer='json'):
  event_id: int
  source_id: int
  counts_10: int
  counts_30: int
  counts_60: int
  counts_300: int

@app.agent(events_topic)
async def new_event(stream):
  async for value in stream:
    # calculate the count statistics
    counts_10=0
    counts_30=0
    counts_60=0
    counts_300=0

    event_counts_table[value.global_id] += 1

    for i in range(300):
      if(i<=10):
        counts_10+=event_counts_table[value.source_id].delta(i)
      if(i<=30):
        counts_30+=event_counts_table[value.source_id].delta(i)
      if(i<=60):
        counts_60+=event_counts_table[value.source_id].delta(i)
      if(i<=300):
        counts_300+=event_counts_table[value.source_id].delta(i)

    await event_counts_topic.send(
      value=EventCount(
        event_id=value.event_id,
        source_id=value.source_id,
        counts_10=counts_10,
        counts_30=counts_30,
        counts_60=counts_60,
        counts_300=counts_300
      )
    )
fonty422 commented 3 years ago

I also had a method that was faster for large windows where rather than a windowed table, I had a standard table where the value was a list of times. Each time something new comes in it performs a list comprehension to remove those past the expiry time, append the new value, then call len(<list>) on the result. This is much quicker, but it still results in issues with delays - the consumer is about 2000 records behind the producer after 50k records and a lot of Timer Monitor.sampler woke up too late, with a drift of issues.

Other than using kafka directly (meaning Java... which I'd dearly love to avoid), there has to be a better way.

fonty422 commented 3 years ago

I also found this class that seems to shwo a sliding window, and this test example from the old robinhood version, but I have no idea how to implement it with respect to a topic stream or table. Assistance or guidance greatly appreciated.

fonty422 commented 3 years ago

One possible solution that should be fast to aggregate across a large window would be to always increment the value in the current window, but if the current window is 0 then grab the previous window's value and then add one to that. Then the calculation of how many in the last n seconds is simply the difference between the window.delta(n) and now. But one issue is what if the previous window is also 0, and a few hundred before that? What if there is no non-zero value in the history which might be tens of thousands of periods old. Still just looking for a good solution here, as all other methods I've tried up to now have a limit where you start to get a lot of Timer Monitor.sampler woke up too late, with a drift of errors printed.

fonty422 commented 3 years ago

So in looking through the test for sliding windows I also happened to find the table.py file.

Then by looking at how hopping and tumbling windows were implemented and cross referencing the respective test files, I added the following to the table.py file:

def sliding(
        self,
        before: Seconds,
        after: Seconds,
        expires: Seconds = None,
        key_index: bool = False,
    ) -> WindowWrapperT:
        """Wrap table in a hopping window."""
        return self.using_window(
            windows.SlidingWindow(before, after, expires),
            key_index=key_index,
        )

But that appears to only ever give me a count of 1 for any window range when performing something like the word count example.

fonty422 commented 3 years ago

A new solution that works very well, but still needs more smarts to work perfectly:

It's a really nice solution to aggregating very large windows. But the problem is that it doesn't handle a re-balance. If a worker shuts down abruptly, or a change is made that requires a re balance, then any windows that expire during that time are gone and don't appear to send the update to remove them.

Is there a way to ensure that expired windows are always accounted for - perhaps by Kafka its self?

Or is this only an issue because I'm trying everything in memory, not using RocksDB yet?

doncat99 commented 3 years ago

So in looking through the test for sliding windows I also happened to find the table.py file.

Then by looking at how hopping and tumbling windows were implemented and cross referencing the respective test files, I added the following to the table.py file:

def sliding(
        self,
        before: Seconds,
        after: Seconds,
        expires: Seconds = None,
        key_index: bool = False,
    ) -> WindowWrapperT:
        """Wrap table in a hopping window."""
        return self.using_window(
            windows.SlidingWindow(before, after, expires),
            key_index=key_index,
        )

But that appears to only ever give me a count of 1 for any window range when performing something like the word count example.

I am facing the same issue, seem sliding() function doesn't work actually?

doncat99 commented 3 years ago

I try to make hopping windows works, check out https://github.com/robinhood/faust/issues/514 and my fork https://github.com/doncat99/faust for more detail.

fonty422 commented 3 years ago

Thanks @doncat99. I did try using the on_window_close from the example , but found that the closed/expired windows were 'lost' during a re-balance or after a crash of a worker never to be recovered. If we're using this to count over a range then we ended up with a count that would never return to zero even when no events occurred for longer than the window range. Again, unless using rocks db fixes this?

fonty422 commented 3 years ago

Just in addition - @doncat99 correct me if I'm wrong, but using the hopping window won't be serviceable if we're looking to get 3 month aggregates at 1 minute intervals. Firstly, because there will be an enormous amount of duplicate data in different windows, but also, until the data fills up the whole 3 months we can't get the result. And if we reset the system, then we need to wait 3 months to get a single value.

fonty422 commented 3 years ago

So after a little more looking, I managed to understand what you were asking me to do, @doncat99. I took a look at the base.py file and put a few print statements in there to see what happens when a window closes and how it decides which keys to remove. I found that when a worker restarts the last_closed_window is set to 0 and the timestamps are empty. When a re-partition occurs, then last_closed_window stays as it was earlier, and the timestamps for those the worker was previously responsible for stay in place, but the timestamps for the new partitions are empty indicating this information is lost when transferred. I'm not really competent enough to follow the rabbit hole all the way to the bottom to find the route cause of this issue, so hopefully someone else can help here.