apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.67k stars 1.06k forks source link

[DISCUSSION] Support for Streaming in DataFusion #11404

Open ameyc opened 2 weeks ago

ameyc commented 2 weeks ago

Is your feature request related to a problem or challenge?

Hi DataFusion community,

Last month on June 24th, 2024 we demonstrated our POC Stream Processing system on top of DataFusion at the inaugural DataFusion meetup which generated some interest from the attendees as well as Data Infra community at large. Streaming processing is an ask that has come time and up again, and @alamb we re-open the disucssions that were first initiated by metesynnada in #4285. Folks at Synnada put together a detailed proposal here. Remarkably, our POC ended up following similar general principles outlined in the doc. The POC currently supports Streaming Windowed aggregations with watermark tracking and check pointing.

Currently state of the play for streaming in DataFusion per our understanding as it stands is as follows -

  1. You can write sources that operate in ExecutionMode::Unbounded
  2. Some complex operators such as SymmetricHashJoinExec & `AggregationExec' already operate in this mode (unwatermarked)
  3. You can await results of the continuous computations in batches using df.execute_stream().await.unwrap()

I know Synnada folks have done a lot of work already, so feel free to chime in. We wanted to gauge interest in the community for reviving #4285 . Streaming Processing may appear a somewhat niche use case, much of work within DataFusion is highly relevant as well as can be enabled with minimal changes to the DF upstream which may benefit developers of other use cases as well.

Stream Processing workloads are continuous in nature and as such require some operators to be stateful as to keep track of things such as watermarks as well as checkpoint state for failure recovery. A pluggable state backend support would be ideal, this also may be useful for operators that spill to disk.

To that end we opened some tickets -

  1. StateBackend in DataFusion's RuntimeEnv
  2. Make Accumulators and ScalarValue serializable
  3. Deterministic IDs for ExecutionPlan
  4. Add StreamingWindowExec to DataFusion physical plan to support aggregations over unbounded data

Additionally, for places where changes are too streaming specific, would love to get input on how to best overlay our streaming project on top of DataFusion, @alamb had some tips from InfluxDB but a how to would be very nice. Happy to take a stab at putting together a gentle introduction for future developers from our learnings.

Thanks,

@ameyc & @emgeee

Describe the solution you'd like

No response

Describe alternatives you've considered

No response

Additional context

No response

ozankabak commented 2 weeks ago

The consensus at the time when we proposed #4285 was to add general-purpose functionality upstream and keep stream processing focused features downstream. To that end, we added things like

(and many other things I forget now) upstream. Per this consensus, checkpointing and watermarking (especially when it throws away data depending on processing time) did not get the same treatment as they are quite specific to stream processing.

Having added many features to upstream DF for a long time now, and going through the experience of implementing specific functionality like checkpointing, watermarking and others on top of DataFusion, I think the consensus reached at the time of #4285 proved to be a quite reasonable one.

A pluggable state backend support would be ideal, this also may be useful for operators that spill to disk.

This is an interesting idea. If there is sufficient interest in generalizing spill-to-disk code to go through a backend, then we should add this upstream. In that case, it would be a win-win: We would be helping general-purpose cases and also simplify downstream code for people like you and us.

Happy to take a stab at putting together a gentle introduction for future developers from our learnings.

This would be very nice and will certainly be helpful to others who want to build streaming systems on top of DataFusion.

alamb commented 2 weeks ago

was to add general-purpose functionality upstream and keep stream processing focused features downstream I think the consensus reached at the time of https://github.com/apache/datafusion/issues/4285 proved to be a quite reasonable one.

Yes, I entirely agree. The features that were added, substantially by @ozankabak @metesynnada @berkaysynnada and @mustafasrepo have proven to be applicable to many different systems (not just stream processing).

I think this follows the basic philosophy we take at InfluxData as well (general purpose things in DataFusion, timeseries specific stuff downstream), though of course where exactly to draw the line always takes some judgement

In general I think the basic rule of thumb should be "if more than one downstream system will use a feature, then consider putting it in DataFusion". If it is a feature that realistically only one will use I think it is best left downstream

Given the interest in streaming systems I think extending / adding more support in DataFusion makes a lot of sense to me

Here are some other potential related things I think would help:

alamb commented 2 weeks ago

I also tried to document some of how the Extension API process works here: https://github.com/apache/datafusion/pull/11425

alamb commented 2 weeks ago

Also related blog post: https://www.linkedin.com/pulse/future-datafusion-streaming-matt-green-ril7c from @emgeee