jorgecarleitao / arrow2

Transmute-free Rust library to work with the Arrow format
Apache License 2.0
1.07k stars 220 forks source link

helper api to return memory usage of an array #421

Open houqp opened 2 years ago

houqp commented 2 years ago

arrow-rs has a helper api called get_array_memory_size to help calculate memory consumption of an array. This comes very handy for memory control in ballista. Filing this for tracking purpose so we can get back to it when we start to focus on ballista arrow2 migration.

jorgecarleitao commented 2 years ago

It has been moved to compute::aggregate::memory::estimated_bytes_size. It is called estimate because these things are always estimations (usually over-estimations) due to the Arcs involved in sharing memory

houqp commented 2 years ago

oh, nice, that's less work for us :D cc @yjshen

yjshen commented 2 years ago

I find the original arrow-rs has two methods for array size estimation, one for buffer size and the other for total physical memory consumption:

/// Returns the total number of bytes of memory occupied by the buffers owned by this [ArrayData].
pub fn get_buffer_memory_size(&self) -> usize {

/// Returns the total number of bytes of memory occupied physically by this [ArrayData].
pub fn get_array_memory_size(&self) -> usize {

The current compute::aggregate::memory::estimated_bytes_size resembles get_buffer_memory_size in arrow-rs.

I suggest we also add a method for physical memory consumption, which is more likely to represent total_size_in_bytes for an array. And we could use this new method if we want to control total memory consumption for physical operators

yjshen commented 2 years ago

More background on this: For this specific test in DataFusion: https://github.com/apache/arrow-datafusion/blob/master/datafusion/src/physical_plan/common.rs#L292-L310

If we use get_buffer_memory_size, it results in 128 in total size, 64 bytes for each float array. And the original one uses get_array_memory_size, it results in 416 in total bytes, i.e. 288 (416 - 128) bytes used for vals in struct.

houqp commented 2 years ago

I agree, it would be better to track memory usage in downstream compute engines by calculating physical memory consumption including all fields in the array structs.

jorgecarleitao commented 2 years ago

I did not add get_array_memory_size because:

For these reasons, I opted for exposing only the value that:

Note that there is a major difference in array sizes between arrow2 and arrow because we strip everything from the arrays in arrow2 (including two Vec and ArrayData). Taking the test you shared, the two arrays (f32 and f64) amount to 36 (arrow heap) + 380 (rest), while in arrow2 they amount to 36 (arrow heap) + 256 (stack).

The important aspect here is that because memory regions can be shared, none of these estimates are reliable: every time we clone an array, the total arrow size remains the same, but the reported size doubles because we do not take ref-counted regions into account.

To bring a more realistic example: a RecordBatch of two cloned non-null Float64Array with the datafusion-default size of 8192 has a reported arrow size of (8 + 8) * 8192 = 131.072. However, it has in-fact half its size, at 8 * 8192 = 65.536 (i.e. a 100% error) because memory is shared. The "rest" for such configuration in arrow is 380, or 380 / 131.072 ~ 0.25%. In arrow2 that is 256 and 256 / 131.072 ~ 0.2%.

I.e. get_array_memory_size is improving our estimation (mixing stack + heap) by less than 0.5%, while a single clone has an error of 100%. Arrays are very often either cloned or their bitmaps cloned between datafusion nodes, making this improvement an order of magnitude smaller.

For the above reasons, I decided to just stick to what matters: O(N).

Finally, note (just noticed it) that arrow's get_array_memory_size seems incorrect: there are String, Vec and BTreeMap on the DataType that are untracked.

yjshen commented 2 years ago

Thanks for the clarification.

I agree that for large record batches, the implementation variance is neglectable. But there are chances that users could potentially create tiny record batches with likely several or several tens of records inside. In this situation, we should count the on-stack size for each batch or each Array.

We ran into a similar tiny batch situation before during a vectorized shuffle implementation. We then have a 10240 batch size setting for each batch, but partition into 5000 shuffle output partitions; therefore for each batch, its content should be spread over 5000 tiny batches, resulting in a bad performance. Therefore I think the actual size will be good to know to lib users.

I know this would reveal some of the implementation detail of the library itself. But it is still beneficial to understand for some time. Since the size estimation functions are already for the advanced use cases, I think.

And for the Array clone issue while counting size. Is it possible to estimate a total size share based on Arc::strong_count()? I brought it out here not implying this should be count in arrow2. But just as an open question. Can we get an approximate value by size / ref_count to estimate and further limit each physical operator's total memory usage for a memory restraint running situation? Although I didn't quite get where our situations need to own several clones of a buffer simultaneously.

jorgecarleitao commented 2 years ago

Thanks a lot for the explanation. I am convinced that it is a valid use-case.

Did some digging, and it seems that servo does something like that here.

If we are to do this, let's offer first class support: I suggest a new module, heap_size or something, dedicated to present the size of every struct and enum on this crate. It contains a trait HeapSize or something (see here for an early version of servos' approach).

I would start with the simple stuff, DataType, Field, Schema, Buffer, Bitmap, and then build up from there. For buffers shared via FFI I suggest we fall back to what we observe (since we can't see the capacity).

Although I didn't quite get where our situations need to own several clones of a buffer simultaneously.

It is not that one array has several clones of the same buffer, rather that different arrays may share buffers, and different record batches may share arrays.

Bitmap is basically a (Arc<Vec<u8>, usize, usize). Some examples:

IMO we should make an effort to count Arcs correctly, e.g. at least to not double count within both an Array and RecordBatch (i.e. it is ok that record1.heap_size() + record2.heap_size() != (record1, record2).heap_size(), but record1.heap_size should not double-count buffers shared between its fields).