apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.02k stars 1.14k forks source link

Document streaming usecase (like `UNBOUNDED` tables) #9016

Open alamb opened 8 months ago

alamb commented 8 months ago

Is your feature request related to a problem or challenge?

Someone asked in discord:

I'm looking at Datafusion and Polars as potential solutions for calculating averages over a sliding window of events, where the window is bound by event time. I've just come across Datafusion, would anyone be able to clarify if it's suitable for this use case? In essence, I have events streaming in via RPC that I want to feed into a a system that gives the above outcome.

I am pretty sure this is exactly the case for using UNBOUNDED tables with explicitly defined ORDER BY from Synnada and Arroyo others. However, when I went to look for the documentation, I could't find any mention of this usecase or documentation of unbounded tables

Describe the solution you'd like

I would like to help make it easier for people to use DataFusion for streaming usecases by:

Describe alternatives you've considered

No response

Additional context

No response

alamb commented 8 months ago

@metesynnada @mustafasrepo @ozankabak or @edmondop @mwylde do you know of any existing documentation / examples we could adapt?

alamb commented 8 months ago

I think the major initial push in this area came in https://github.com/apache/arrow-datafusion/pull/4694

Tangruilin commented 8 months ago

I can help with it.

Assign to me. @alamb

edmondop commented 8 months ago

Would one need to write custom unbounded sources?

alamb commented 8 months ago

Would one need to write custom unbounded sources?

I don't think so @edmondop -- I was thinking anything that gives others examples / help starting would be great. Maybe we can start with some SQL reference / API docs and high level commentary in one PR and then add an example as another PR

Right now there is nothing documented, so it will be very easy to imprve the status quo!

trungda commented 8 months ago

I've realized that StreamEncoding is not supported for Parquet. Is it intentional?

leoyvens commented 7 months ago

The example on #9070 is great for understanding how to track event time with GROUP BY. I have a question that this doc effort could help answer: Is there any trick out there for watermarking, that is, a way for a table to emit a signal that all events at a given event time have been sent?

alamb commented 7 months ago

: Is there any trick out there for watermarking, that is, a way for a table to emit a signal that all events at a given event time have been sent?

The way most of the code in DataFusion works is that it will use the next distinct value in the data to trigger emission (as in you have to see an event of the next time)

I don't know of any way to send a synthetic signal that says "will never see any more values in this time interval"

ozankabak commented 7 months ago

I don't know of any way to send a synthetic signal that says "will never see any more values in this time interval"

Yes, IMO this seems to fall out of scope for "upstream" Datafusion (even though we also have these challenges but choose to solve them downstream due to this reasoning).

milenkovicm commented 5 months ago

A bit late to this party :), few questions regarding this topic:

  1. I would tell that UNBOUNDED is intended to be used with queries like INSERT INTO kafka_table_1 SELECT * FROM kafka_table_2, do I get this correctly?
  2. if 1 is correct, I believe we would have to detach ctx.sql(...) from the main thread, not to block other statements, so the question is who will be in charge of query cancelation? Would it be/ is it part of datafusion, or it has to be implemented as part of the source, or somewhere else

thanks a lot

alamb commented 5 months ago

2. if 1 is correct, I believe we would have to detach ctx.sql(...) from the main thread, not to block other statements, so the question is who will be in charge of query cancelation? Would it be/ is it part of datafusion, or it has to be implemented as part of the source, or somewhere else

I think the execution is run as a separate tokio task -- so whenever someone wants to see if the next result is available they would do stream.next().await or something equivalent

milenkovicm commented 5 months ago

wrt point ,. if we have a running unbounded pipeline, when shutdown/cancel event comes, sources should stop producing, probably commit offsets and so on and give chance for rest of the pipeline to finish all the bits in-flight before shutdown.

so if i can reformulate my question 2, "who will inform source that it should wrap up for this execution and shut down ? :)"

or am I missing something obvious here?

alamb commented 5 months ago

so if i can reformulate my question 2, "who will inform source that it should wrap up for this execution and shut down ? :)"

I think this needs to be handled by the containing system (nothing in DataFusion will do it directly). It would have to hold on to some reference to the source that can be signaled to stop.

In terms of shutdown, when the stream is droped, it will shutdown all resources attached to the executing plan. This is described a bit here https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#cancellation--aborting-execution. However, this doesn't do anything to save state as would be required in a streaming system