ArroyoSystems / arroyo

Distributed stream processing engine in Rust
https://arroyo.dev
Apache License 2.0
3.45k stars 189 forks source link

Non-windowed updating aggregates using datafusion. #588

Closed jacksonrnewhouse closed 2 months ago

jacksonrnewhouse commented 2 months ago

The main functionality this provides is the ability to run aggregates without windows, emitting update and retract messages that can be written to a debezium sink.

The logic for calculating the aggregate is done in UpdatingAggregatingFunc. This operator has three different versions of the aggregate exec, with three different modes: Partial: Takes input and emits partial aggregate representations. CombinePartial: Merge multiple partials into a single partial. This mode was added in https://github.com/ArroyoSystems/arrow-datafusion/pull/1/files. Final: The final aggregation that finishes any aggregates, expecting partials as input.

These are combined with the new LastKeyValueView. This is a simple key-value map that uses the _timestamp field as expiration time. For any group by tuple there'll be at most one live entry in the map. Writes to state include a _generation field in parquet, which is used to ensure we restore the newest value.

In the operator data is fed into the partial exec until it is time to flush, which happens under the following conditions:

Flushing follows the following steps:

  1. Close the active sender for computing partials. If there isn't one, there's no work to be done, so just exit.
  2. Compute the new partial data that has been received since the last flush.
  3. Look for data in the store of partial data that have the same key-set as the new partials. If there aren't any, skip to step 5.
  4. Feed the data in 2 and 3 to the combine exec, then spool out its output, writing them to state and storing them as the input to the final step.
  5. For the final result, first check if the final table ("f") has any matches. These will become retracts. Then, write the new data to that final table. The retracts will be emitted before the appends.

In order to make progress between flushes, the partial exec is advanced. We panic in handle_future_result() because the input will never have been closed on that exec.

Some other things that were changed in this PR: