metaspace2020 / Lithops-METASPACE

Lithops-based Serverless implementation of the METASPACE spatial metabolomics annotation pipeline
12 stars 4 forks source link

Fix first level segmentation and memory optimization #59

Closed omerb01 closed 4 years ago

omerb01 commented 4 years ago

main changes:

omerb01 commented 4 years ago

@LachlanStuart I have a memory consumption issue with merge_spectra_chunk_segments(segm_i, ibm_cos) that I want to discuss about before merge.

this PR ensures that no more than first_level_segm_size_mb = 256 size is loaded to merge_spectra_chunk_segments function - but the memory peak reported in the activation log of the merge function on big dataset is 917.2MiB which is much higher than what I expect. I suspect that we have another issue similar to #55 . I also suspect on lines 151-152 in segment.py.

Ideally, I would like to set first_level_segm_size_mb = 1500 and the total memory capacity of the merge function will be 2048MB - this will use COS efficiently and won't stress it by generating too much temp objects ("first-level-segments" should be large as much as they can)

LachlanStuart commented 4 years ago

@omerb01 Breaking down those two lines of code:

segm = np.concatenate(list(pool.map(_merge, keys)))

np.concatenate, just like pd.concat, unfortunately needs memory for both the input and output buffers, so its maximum memory usage is 2x the size of segm. The only way to avoid this is if we know in advance how big segm will be, preallocate the whole buffer, and deserialize chunks directly into the buffer. This is a viable option, but requires extra data to be passed around so that the size of segm can be known before reading any chunks.

segm = segm[segm[:, 1].argsort()]

This should require a maximum of ~2.33x the size of segm:

This line sorts the rows of segm by its 1 column (representing mz). In-place sorting algorithms exist, so in theory it's possible to sort segm with almost no additional memory. However, numpy's implementation of in-place sorting is pretty awkward - you need to use a custom dtype for the entire row if you want to be able to sort by just a single column.

The following should work, but doesn't. I'm not sure what's going wrong:

segm.view(np.dtype([('sp_ind', np.float32), ('mz', np.float32), ('int', np.float32)])).sort(order='mz')
omerb01 commented 4 years ago

@LachlanStuart there is another extra memory allocated, I suspect on the bug that we discovered in #55

what do you think?

omerb01 commented 4 years ago

@LachlanStuart I investigated this a bit with a line-by-line memory profiler.

previous version: this example shows memory consumption of concatenation (x2) and sorting (actually 0.33):

import numpy as np
from memory_profiler import profile

@profile
def my_func():
    segm_len = 10 * 1024 ** 2
    example_segm = np.array([[1, 2, 3]] * segm_len, dtype=np.float32)  # 125mb
    example_segm = list(np.array_split(example_segm, 100))

    example_segm = np.concatenate(example_segm)
    example_segm = example_segm[example_segm[:, 1].argsort()]

if __name__ == '__main__':
    my_func()

output:

Line #    Mem usage    Increment   Line Contents
================================================
     4     51.0 MiB     51.0 MiB   @profile
     5                             def my_func():
     6     51.0 MiB      0.0 MiB       segm_len = 10 * 1024 ** 2
     7    251.0 MiB    200.0 MiB       example_segm = np.array([[1, 2, 3]] * segm_len, dtype=np.float32)  # 125mb
     8    251.0 MiB      0.0 MiB       example_segm = list(np.array_split(example_segm, 100))
     9                             
    10    371.0 MiB    120.0 MiB       example_segm = np.concatenate(example_segm)
    11    411.1 MiB     40.1 MiB       example_segm = example_segm[example_segm[:, 1].argsort()]

last commit: this example shows the changes of my last commit - we can see that there is no memory increasment.

import numpy as np
from memory_profiler import profile

@profile
def my_func():
    segm_len = 10 * 1024 ** 2
    example_segm = np.array([[1, 2, 3]] * segm_len, dtype=np.float32)  # 125mb
    example_segm = list(np.array_split(example_segm, 100))

    omer = np.zeros((segm_len, 3), dtype=np.float32)
    row_i = 0
    for l in example_segm:
        for row in l:
            omer[row_i] = row
            row_i += 1

    omer.view('float32,float32,float32').sort(order=['f1'], axis=0)

if __name__ == '__main__':
    my_func()

output:

Line #    Mem usage    Increment   Line Contents
================================================
     8     51.1 MiB     51.1 MiB   @profile
     9                             def my_func():
    10     51.1 MiB      0.0 MiB       segm_len = 10 * 1024 ** 2
    11    251.1 MiB    200.0 MiB       example_segm = np.array([[1, 2, 3]] * segm_len, dtype=np.float32)  # 125mb
    12    251.1 MiB      0.0 MiB       example_segm = list(np.array_split(example_segm, 100))
    13                             
    14    251.1 MiB      0.0 MiB       omer = np.zeros((segm_len, 3), dtype=np.float32)
    15    251.1 MiB      0.0 MiB       row_i = 0
    16    371.1 MiB      0.0 MiB       for l in example_segm:
    17    371.1 MiB      0.0 MiB           for row in l:
    18    371.1 MiB      0.0 MiB               omer[row_i] = row
    19    371.1 MiB      0.0 MiB               row_i += 1
    20                             
    21    371.1 MiB      0.0 MiB       omer.view('float32,float32,float32').sort(order=['f1'], axis=0)

but these changes make the merge part slower, I cant think of a parallel solution which also bounds the memory usage. have any suggestions or insights?

LachlanStuart commented 4 years ago

@omerb01 Unfortunately Numpy doesn't seem to have any way to read files directly into a pre-allocated buffer. If it did, that would mean no temporary buffers would be needed.

msgpack might not be the best option here - I haven't checked its performance. It may be faster to use np.save/np.load (which includes dtype and shape in the saved file) or np.tofile/np.fromfile (which just store the array content, so they require you to manually provide dtype and call .reshape to restore the original array)

This code definitely doesn't need 128-thread parallelism though. I don't have the ability to test at the moment, but I would expect 2 threads to be optimal, because COS latency is so low and bandwidth is so high that Python is probably spending most of its time decoding the data, the only possible time saving is from hiding the small amount of network latency to COS.

It's possible that the slowness comes from this row-by-row copy:

            for row in segm_spectra_chunk:
                segm[row_i] = row
                row_i += 1

This can be written more efficiently, to copy the whole chunk in one operation:

chunk_len = len(segm_spectra_chunk)
segm[row_i:row_i + chunk_len] = segm_spectra_chunk
row_i += chunk_len

One note on the implementation - the segments' dtype can actually be different between datasets, and it should be preserved. It can be either float32 or float64 (float64 is more common). You can get the string dtype (f or d) from imzml_parser.mzPrecision, or just take .dtype from any of the arrays.

omerb01 commented 4 years ago

@LachlanStuart with this patch, "huge4" dataset should run successfully. (please test it as well with 4GB memory for annotating function) about the concatenation in the merge part - I'll think of a farewell alternative.

gilv commented 4 years ago

@omerb01 @LachlanStuart lets' wait a bit before we merge it...i want to run some experiments as well.

gilv commented 4 years ago

@omerb01 will you handle this we discussed?

omerb01 commented 4 years ago

@gilv not yet, I need to do some tests before merge

gilv commented 4 years ago

@omerb01 please also add the "sort" code that we talked about and document it in remark

omerb01 commented 4 years ago

@LachlanStuart FYI - we found that ndarray.view(...).sort(...) is around x3 slower, it's better to stay with the old approach although it consumes x2.33 memory.