apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.34k stars 1.2k forks source link

Fix record batch memory size double counting #13377

Closed 2010YOUY01 closed 1 week ago

2010YOUY01 commented 1 week ago

Which issue does this PR close?

First step to fix https://github.com/apache/datafusion/issues/13089

Rationale for this change

Now record_batch.get_array_memory_size() will overestimate memory usage: If multiple array are pointing to the same underlying buffer, they will be counted repeatedly. A more detailed explanation can be found in this PR's comment:

/// Calculate total used memory of this batch.
///
/// This function is used to estimate the physical memory usage of the `RecordBatch`. The implementation will add up all unique `Buffer`'s memory
/// size, due to:
/// - The data pointer inside `Buffer` are memory regions returned by global memory
/// allocator, those regions can't have overlap.
/// - The actual used range of `ArrayRef`s inside `RecordBatch` can have overlap
/// or reuse the same `Buffer`. For example: taking a slice from `Array`.
///
/// Example:
/// For a `RecordBatch` with two columns: `col1` and `col2`, two columns are pointing
/// to a sub-region of the same buffer.
///
/// [xxxxxxxxxxxxxxxxxxx] <--- buffer
///       ^    ^  ^    ^
///       |    |  |    |
/// col1->[    ]  |    |    
/// col2--------->[    ]
///
/// In the above case, `get_record_batch_memory_size` will return the size of
/// the buffer, instead of the sum of `col1` and `col2`'s actual memory size.
///
/// Note: Current `RecordBatch`.get_array_memory_size()` will double count the
/// buffer memory size if multiple arrays within the batch are sharing the same
/// `Buffer`. This method provides temporary fix until the issue is resolved:
/// https://github.com/apache/arrow-rs/issues/6439
pub fn get_record_batch_memory_size(batch: &RecordBatch) -> usize {
...
}

This function is used for spilled execution to estimate physical memory usage, this overestimation caused many bugs in memory-limited sort/aggregation/join. For example, if there is a RecordBatch with 10 columns, all of 10 columns are sharing the same Buffer, then record_batch.get_array_memory_size() will return a 10X estimation, to make memory-limited query fail quite easily.

I believe https://github.com/apache/datafusion/issues/13089 is caused by this issue, and likely https://github.com/apache/datafusion/issues/9417 https://github.com/apache/datafusion/issues/10511 https://github.com/apache/datafusion/issues/12136 https://github.com/apache/datafusion/issues/11390

What changes are included in this PR?

Introduced a new get_record_batch_memory_size() to avoid double count, by using a internal HashSet to recognize reused buffers. While @waynexia is working on a comprehensive solution in arrow-rs https://github.com/apache/arrow-rs/issues/6439, I think it's useful to introduce this temporary fix in DataFusion due to:

Are these changes tested?

Yes

Are there any user-facing changes?

No

blaginin commented 1 week ago

This PR does indeed fix https://github.com/apache/datafusion/issues/10511 😀. I just tested the branch, and the code that crashes in main works perfectly here

alamb commented 1 week ago

Thanks @2010YOUY01 -- will look at this later today or tomorrow

2010YOUY01 commented 1 week ago

One thing to mentioned is how fast this method is? as I believe the method will be called frequently

This is a very good point, I think when doing the same fix at arrow side, we should cache the result inside RecordBatch (if they're immutable) I will run some benchmarks

2010YOUY01 commented 1 week ago

Thank you all for the feedbacks! I've updated the followings: