kaskada-ai / kaskada

Modern, open-source event-processing
https://kaskada.io/
Apache License 2.0
352 stars 15 forks source link

feat: windowing syntax and inclusivity behavior #297

Open jordanrfrazier opened 1 year ago

jordanrfrazier commented 1 year ago

Summary The inclusivity bounds of windows are generally confusing, and in some cases can be considered incorrect. The current behavior is that we will "update, emit, reset" windows. This means:

  1. If a new input occurs at the window time, update that value.
  2. Emit that value at the window time. (For hourly(), this is at "01:00:00, 02:00:00, 03:00:00", etc)
  3. Reset the window to null.

This behavior is [exclusive, inclusive}.

Is your feature request related to a problem? Please describe. The first problem we see with the above behavior is that values that occur in the next window are being included in the previous window. Looking at the attached dataset, we see that at times 2023-01-01, we have two rows for Ben with amounts 12 and NaN. The first row is an input row, the second row is the tick. The weirdness lies in that the sum_amount value for the tick includes the 12, but the tick was for the window from 2022-12-01 - 2023-01-01.

Use Case We want to make windowing syntax and behavior intuitive. A user should be able to predict what values are produced.

Describe the solution you'd like Design/propose solutions.

Ideas:

Describe alternatives you've considered ...

bjchambers commented 1 year ago

Elaborating a bit -- there seem to be two use cases:

  1. I want the complete result of a windowed aggregate, once per window
  2. I want the cumulative result so far within a window, resetting when a new window starts

And these seem to have contradictory requirements.

kerinin commented 1 year ago

Adding bug, because we don't emit the current value when it changes on window reset (to null)

bjchambers commented 1 year ago

I think we had a reason why we said it was the correct value...

Wanted to add that this can be "hacked" for now:

sum(e, window=since(p))
count(e, window=since(p))

Can become:

sum(e, window=since(p)) | if(p)
count(e, window=since(p)) | if(p) | else(0)
kerinin commented 1 year ago

I think we chose this behavior because there was no other way to produce "daily emitting at end of window" style aggregations. I guess it's true that we can hack this behavior, but IMO the current behavior still violates the expectation that the "historical" output describes the state of the computation at all points in time.

bjchambers commented 1 year ago

In notebooks, when wanting to emulate the behavior I want, I have previously used a filter to null out rows after the end:

For example, for count:

let page_views_since_purchase_raw = count(PageViews, window=since(is_valid(Purchases)))
let page_views_since_purchase = page_views_since_purchase_raw 
      # Hack to work-around https://github.com/kaskada-ai/kaskada/issues/297
      | if({ predicate: is_valid(Purchases), input: is_valid($input)} | not($input.predicate | else(false)))
      | else(0)