apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.87k stars 1.11k forks source link

Memory account not adding up in SortExec #10073

Open westonpace opened 4 months ago

westonpace commented 4 months ago

Describe the bug

This is related / tangential to #9359

My primary problem is that I am trying to sort 100 million strings and always getting errors like:

Resources exhausted: Failed to allocate additional 39698712 bytes for ExternalSorterMerge[0] with 4053021824 bytes already allocated - maximum available is 26915520

After reading through the sort impl a bit I have noticed a few concerns (recorded in additional context)

To Reproduce

I don't have a df reproduction but this reproduces it for me in lance:

import pyarrow as pa
import lance

# Once the dataset has been generated you can comment out this generation code and reproduce the issue very quickly
print("Generating data")
my_strings = [f"string-{i}" * 3 for i in range(100 * 1024 * 1024)]                                                                                                                                               
my_table = pa.table({"my_strings": my_strings})                                                                                                                                                                  

print("Writing dataset")
ds = lance.write_dataset(                                                                                                                                                                                        
    my_table, "/tmp/big_strings.lance", mode="overwrite", schema=my_table.schema                                                                                                                                 
)                                                                                                                                                                                                                
del my_table                                                                                                                                                                                                     
# End of generation code

ds = lance.dataset("/tmp/big_strings.lance")
print("Training scalar index")
# To create a scalar index we must sort the column, this is where the error occurs
ds.create_scalar_index("my_strings", "BTREE")

Expected behavior

I can sort any number of strings, as long as I don't overflow the disk

Additional context

Here is how I understand memory accounting in the sort today:

The first problem (and the one causing my error) is that a sorted batch of strings (the output of sort_batch) is occupying 25% more memory than the unsorted batch of strings. I'm not sure if this buffer alignment, padding, or some kind of 2x allocation strategy used by the sort, but it seems reasonable something like this could happen. Unfortunately, this is a problem. We are spilling because we have used up the entire memory pool. We now take X bytes from the memory pool, convert it into 1.25 * X bytes, and try to put it back in the memory pool. This fails with the error listed above.

The second problem is that we are not accounting for the output of the sort perserving merge stream. Each output batch from the sort preserving merge stream is made up of rows from the various input batches. In the degenerate case, where the input data is fully random, this means we will probably require 2 * X bytes. This is because each output batch is made up of 1 batch from each input stream. We can't release any of the input batches until we emit the final output batch.

The solution to this second problem is that we should be streaming into the spill file. We should not collect from the sort preserving merge stream and then write the collected batches into the spill file. This problem is a bit less concerning for me at the moment because it is "datafusion uses more memory than it should" and not "datafusion is failing the plan with an error". We don't do a lot of sorting in lance and so we can work around it reasonably well by halving the size of the spill pool.

alamb commented 4 months ago

It sounds like the issue, at a (really) high level is "additional buffer space is required to actually implement the spill"

And since during spill the plan is under memory pressure, getting this additional memory can and does fail

Some strategies I can think of are:

  1. Simply turn off the memory accounting of intermediate results (String batches in your example) above during the spilling process (pro: simpler to implement I think, con: overshoots limits)
  2. Reserve additional buffer space up front to be used during spill (e.g. set aside 50MB). (pro: won't overshoot, cons: not clear how much is "enough" and will reduce amount of memory that can be reserved
  3. Reduce the memory required for intermediate spilling (e.g. maybe use a batch size 1/2 the size)

Maybe we can do 1 in the sort term while figuring out a more sophisticated strategy for 2 or 3

comphead commented 4 months ago

I'm investigating this as part of #9359

yjshen commented 1 week ago

Through examining the current implementation of multi-column sort's spill-to-disk strategies, I find we are asking for more memory during spill, which I think is worth discussing: During the spill, Rows are created for comparison efficiency for each in-memory RecordBatch. Considering why we spill in the first place, does this Rows optimization increase the possibility of execution failure due to memory shortage?

I find this also related. https://github.com/apache/datafusion/discussions/9528#discussion-6349150

yjshen commented 1 week ago

Another point of code worth noticing is inside the current sort_batch implementation: https://github.com/apache/datafusion/blob/79fa6f9098be9a6e5b269cd3642694765b230ff1/datafusion/physical-plan/src/sorts/sort.rs#L601-L607

Performance-wise, I think it's beneficial to apply the row format comparison to all multi-column cases, however, while considering sort_batch is used in multiple places where spill is being called; creating rows before comparing would introduce more memory pressure.

BTW, I think we should report memory usage inside lexsort_to_indices_multi_columns: https://github.com/apache/datafusion/blob/79fa6f9098be9a6e5b269cd3642694765b230ff1/datafusion/physical-plan/src/sorts/sort.rs#L650-L652