Kristina-Pianykh / iron-maven

0 stars 0 forks source link

Add sliding window #8

Open Kristina-Pianykh opened 5 months ago

Kristina-Pianykh commented 4 months ago

Considerations

Option 1

Keyed stream: no Window assigner: event time + sliding window (processing time would result in inconsistent processing in the system due to network latencies) Window functions: ReduceFunction, AggregateFunction, or ProcessWindowFunction. The latter buffers ALL elements within a window before starting to process --> not suitable for us.

Source: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/

Option 2

!! .within() is recommended for state clearing

public Pattern<T,F> within(Time windowTime)
Defines the maximum time interval in which a matching pattern has to be completed in order to be considered valid. This interval corresponds to the maximum time gap between first and the last event.

Parameters:
windowTime - Time of the matching window
Returns:
The same pattern operator with the new window length

Source: https://nightlies.apache.org/flink/flink-docs-release-1.11/api/java/org/apache/flink/cep/pattern/Pattern.html

Seems like a preferable solution. It's easy to use, it clears the state and it simulates the windowing behavior.

Option 3

Compute manually (by comparing event timestamps in the filtering condition)

Kristina-Pianykh commented 4 months ago

Option 2: https://github.com/Kristina-Pianykh/iron-maven/commit/57675c50aba7f0684cdaecdd9d380dbfdba51ffb

Kristina-Pianykh commented 4 months ago

Option2 + Option3

Finally figured how it works: https://github.com/Kristina-Pianykh/iron-maven/commit/6232148d29b05efc7823d97f8a98ce57ab493aa1

The TimestampAssigner now returns the system clock time instead of the event-extracted timestamp. The time window in which events have to occur to produce a match is handled manually within the pattern construction itself. The .within() method is used for the maximum time in the past to look for potential matches. It's essential to clear the state periodically cause otherwise flink would be overloaded.