apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.67k stars 1.06k forks source link

Add StreamingWindowExec to DataFusion physical plan to support aggregations over unbounded data #11366

Open ameyc opened 3 weeks ago

ameyc commented 3 weeks ago

Is your feature request related to a problem or challenge?

Currently DataFusion somewhat supports computations over Unbounded data, with SymmetricHashJoinExec being able to join unbounded streams of data. However ability to aggregate over unbounded data seems to be missing in project as yet. In stream processing, aggregating over windows of streaming data is a common concept. A Streaming Window however behaves more like an AggregationExec than a WindowExec, we have a POC StreamingWindowExec built off a fork of DataFusion 39.0.

We would like to collaborate with the community to upstream this operator as well improve the design.

Describe the solution you'd like

No response

Describe alternatives you've considered

No response

Additional context

No response

mustafasrepo commented 2 weeks ago

Actually existing AggregateExec supports streaming. It has a member input_order_mode which is following enum:

pub enum InputOrderMode {
    /// There is no partial permutation of the expressions satisfying the
    /// existing ordering.
    Linear,
    /// There is a partial permutation of the expressions satisfying the
    /// existing ordering. Indices describing the longest partial permutation
    /// are stored in the vector.
    PartiallySorted(Vec<usize>),
    /// There is a (full) permutation of the expressions satisfying the
    /// existing ordering.
    Sorted,
}

When mode is either Sorted or PartiallySorted (decided by planner according to ordering at the input) operation is streamable. However, to trigger this modes at least one of the group by expression should be ordered.

ozankabak commented 2 weeks ago

To add to the comment above -- BoundedWindowAggExec also supports streaming (when possible).

ameyc commented 2 weeks ago

@mustafasrepo & @ozankabak thanks for the feedback. the target usecases we were going for are flink style workloads, with data read from kafka that is generally not be ordered and thus needs to be watermarked. we tried the vanilla aggregates and ran into PipelineBreaking panics.

An example workload we're trying to compute is of the nature, lmk if this can already be expressed with current operators as is --

    let windowed_df = df
        .clone()
        .streaming_window(
            vec![],
            vec![
                max(col("imu_measurement").field("gps").field("speed")),
                min(col("imu_measurement").field("gps").field("altitude")),
                count(col("imu_measurement")).alias("count"),
            ],
            Duration::from_millis(5_000), // 5 second window
            Some(Duration::from_millis(1_000)), // 1 second slide
        )
        .unwrap();
ozankabak commented 2 weeks ago

To help as best as I can, let me first reiterate my understanding of your use case: You have a streaming source, which has some columns like speed and altitude, but your ts column may be out-of-order and is not monotonic. Or, maybe you don't have such a column at all. Still, you want to do streaming computations based on processing time on such data.

In such a case, what you can do is to use a projection to add an order defining column based on processing time, and use BoundedWindowAggExec to do streaming/incremental windowing based on that. So basically two built-in operators compose together to give you what you want (or I should say, what I think you want 🙂 ). Let me know if this helps.