apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.86k stars 1.11k forks source link

Memory Limited Joins (Externalized / Spill) #1599

Open alamb opened 2 years ago

alamb commented 2 years ago

Is your feature request related to a problem or challenge? Please describe what you are trying to do. Support Joining "arbitrarily" large inputs (e.g. when one or both of the inputs don't fit in the available RAM)

This ticket concerns the memory used the JoinExec operator -- it doesn't cover other potential targets (e.g. externalized sort or grouping). That will be covered by other tasks tracked by #587

Describe the solution you'd like

  1. Allow DataFusion users to specify a RAM budget (aka via the config introduced in #1526) and have their queries complete running without the join by exceeding the budget allocated to it via the memory manager.

There are many potential ways to Limit the memory used while joining. The classic way is "sort-merge-join" where the input data on both sides is sorted according to the equality predicates (using externalized sort, such as described in #1568 ) and then the two join inputs are streamed through and the output computed, depending on the type of Join required (INNER, LEFT, RIGHT, SEMI, etc)

I personally think the following would be the ideal behavior for DataFusion Joins:

  1. A single Join operator that behaved like like the following:
  2. Hashed one input or the other, if the memory limit was not exhausted, behave like the existing JoinExec
  3. If memory was exhausted, switch to a merge join strategy: sort the one or both sides that didn't fit in memory using externalized sort on the equality predicates, then stream them back through the Join

The rationale for a runtime switch is that then the optimizer (which always has limited information) can't make the "wrong" choice related to join order

In case anyone wants some "light reading" this stuff is nicely described by Goetz Graffe in "Query evaluation techniques for large databases": https://scholar.google.com/citations?view_op=view_citation&hl=en&user=pdDeRScAAAAJ&citation_for_view=pdDeRScAAAAJ:u5HHmVD_uO8C

Online link: http://infolab.stanford.edu/~hyunjung/cs346/graefe.pdf

Describe alternatives you've considered Have the Optimizer (aka the HashBuildProbeOrder ) pick both the order and the algorithm to use based on statistics or heuristics

Context This is follow on work from the great PR from @yjshen in https://github.com/apache/arrow-datafusion/pull/1526 and part of the story of limiting memory used by DataFusion https://github.com/apache/arrow-datafusion/issues/587

Task Tracking

alamb commented 2 years ago

I would love to implement this algorithm in DataFusion:

https://arxiv.org/abs/2010.00152 Sort-based grouping and aggregation Thanh Do, Goetz Graefe

xudong963 commented 2 years ago

I would love to implement this algorithm in DataFusion:

https://arxiv.org/abs/2010.00152 Sort-based grouping and aggregation Thanh Do, Goetz Graefe

Maybe you can share it at the next meeting ๐Ÿ˜„

Ted-Jiang commented 2 years ago

I would love to implement this algorithm in DataFusion: https://arxiv.org/abs/2010.00152 Sort-based grouping and aggregation Thanh Do, Goetz Graefe

Maybe you can share it at the next meeting ๐Ÿ˜„

+1 well look forward ๐Ÿ˜Š

yjshen commented 2 years ago

I would love to implement this algorithm in DataFusion:

https://arxiv.org/abs/2010.00152 Sort-based grouping and aggregation Thanh Do, Goetz Graefe

Haha, that's great! I have talked with @houqp about this paper before.

alamb commented 2 years ago

Related https://github.com/apache/arrow-datafusion/issues/141

yjshen commented 2 years ago

I would propose we do this in several steps:

- [ ] Provide a classic SortMergeJoin implementation that is less memory bound itself (but move the need of memory management to the sort operator, which we already have memory controlled). - [ ] Follow-up choice 1: Consolidate HashJoin and SortMergeJoin, providing a unified JoinExec, and do adaptive execution as @alamb suggested above. - [ ] Follow-up choice 2: incorporate a cost-based join optimizer to choose the most suitable physical plan: sort-based or hash-based.

Moved to issue descriptions above. ๐Ÿ‘†

alamb commented 2 years ago

@yjshen sounds like a great plan to me -- and I see that @richox has an implementation for SortMergeJoin already :)

korowa commented 1 year ago

Looks like now that we are able to fail query in case of breaching memory limit, it's the right time to start working on spills.

Taking into account what has been written above, I guess, next step could be to implement spilling for MergeJoin -- if our final intention to have runtime HJ -> MJ conversion it would be nice to have some guarantees that MJ won't fail for the same reason. I believe MJ spilling logic could be pretty straightforward without any pitfalls -- the naive approach would be to spill buffered-side data in .ipc batch by batch, more complex, and, probably, more effective way to think about would be spilling concatenation of all batches that fit in memory.

After that we could follow-up with what is mentioned in issue description -- HJ -> MJ conversion (I believe #2628 worth to be mentioned here, to unlock ability for more hash joins to be converted), and spilling mechanisms for other join implementations.

If this plan is fine, I'd like to take a stab at MJ spilling.

alamb commented 1 year ago

Looks like now that we are able to fail query in case of breaching memory limit, it's the right time to start working on spills.

I agree

I believe MJ spilling logic could be pretty straightforward without any pitfalls -- the naive approach would be to spill buffered-side data in .ipc batch by batch, more complex, and, probably, more effective way to think about would be spilling concatenation of all batches that fit in memory.

Unless there is a very compelling reason to have a separate implementation, I think we should leverage (reuse) the existing ExteraSorter used in spilling sort:

https://github.com/apache/arrow-datafusion/blob/30dba587f4749327605a2eecb7ae9c0c41769c58/datafusion/core/src/physical_plan/sorts/sort.rs#L73-L85

After that we could follow-up with what is mentioned in issue description ๐Ÿ‘

korowa commented 1 year ago

I think we should leverage (reuse) the existing ExteraSorter used in spilling sort

After some reading, it looks like that for MergeJoin case it makes sense to split spilling part of ExternalSorter (responsible for tracking spillable batch buffer) and sorting one -- we already have sorted streams in MJ, so spillable buffer is all that needed -- I believe this implementation could be acceptable and first I'll try to stick to this way.

And, further, ExternalSorter also seems to be a perfect fit for HashJoin -- this is the case where it can be reused as it is for resorting both build and probe sides of HJ.

alamb commented 1 year ago

After some reading, it looks like that for MergeJoin case it makes sense to split spilling part of ExternalSorter (responsible for tracking spillable batch buffer) and sorting one -- we already have sorted streams in MJ, so spillable buffer is all that needed -- I believe this implementation could be acceptable and first I'll try to stick to this way.

Thank you -- reusing the ExternalSorter will allow whatever spilling logic we develop to benefit from additional improvements in sort. For example, @jaylmiller has https://github.com/apache/arrow-datafusion/pull/5292 which makes substantial changes to ExternalSorter and hopefully it will be faster, but has some performance regressions that we haven't worked out yet,

It will be great if that work can directly benefit the spilling operators as well

And, further, ExternalSorter also seems to be a perfect fit for HashJoin -- this is the case where it can be reused as it is for resorting both build and probe sides of HJ.