TGSAI / mdio-python

Cloud native, scalable storage engine for various types of energy data.
https://mdio.dev/
Apache License 2.0
35 stars 13 forks source link

Loading speed #259

Open SergeyTsimfer opened 1 year ago

SergeyTsimfer commented 1 year ago

Hey guys

Nice work with SEG-Y loader! At our team, we use our own library to interact with SEG-Y data, so I've decided to give a try to MDIO and compare the results of multiple approaches and libraries.

Setup

For my tests, I've used a ~21GB sized SEG-Y with IEEE float32 values (no IBM float shenanigans here). The cube is post-stack, so it is a cube for seismic interpretation: therefore, it has a meaningful regular 3D structure.

Using the same function you provided in tutorials, get_size, I've got the following results:

SEG-Y:              21504.17 MB
MDIO:                3943.37 MB
MDIO LOSSY+:         1723.78 MB
SEG-Y QUANTIZED:     5995.96 MB

The LOSSY+ MDIO file was created by using compression_tolerance=(std of amplitude values). The SEG-Y QUANTIZED file was created by quantizing the data and writing SEG-Y file (according to the standard) with int8 values.

The system is using Intel(R) Xeon(R) Gold 6242R CPU, just in case that may be of interest.

The tests

Multiple formats are tested against the tasks of loading slices (2D arrays) across three dimensions: INLINE, CROSSLINE, SAMPLES. Also the ability to load sub-volumes (3D arrays) is tested. For more advanced usage I have tests for loading batches of data: more on that later.

For tests, I use following engines:

To this slew of engines, I've added MDIO loader, which looks very simple:

slide_i = mdio_file[idx, :, :][2]
slide_x = mdio_file[:, idx, :][2]
slide_d = mdio_file[:, :, idx][2]

I also used mdio_file._traces[idx, :, :] but have not noticed significant differences.

The results

An image is better than a thousand words, so a bar-plot of timings for loading INLINE slices: image

The situation does not get better on CROSSLINE / SAMPLES axes either: image image

Note that even naive segyio, which takes a full sweep across file to get depth-slice, has the same speed.

The why

Some of the reasons for this slowness are apparent: during the conversion process, the default chunk_size for ZARR is 64x64x64. Therefore, loading 2D slices is not the forte of this exact chunking.

Unfortunately, even when it comes to 3D sub-volumes, the situation is not much better: image

Even with this being the best (and only) scenario for chunked storage, it is still not as fast as plain SEG-Y storage, even with no quantization.

Questions

This leaves a few questions:

I hope you can help me with those questions!

tasansal commented 1 year ago

Hi @SergeyTsimfer!

Thanks a lot for your excellent write-up and benchmarks, they're very useful. We would love to work on this aspect of MDIO and if you're open to it, we can standardize some benchmarks!

Can you provide more information about the shape of your dataset (#inlines, #crosslines, #samples, etc.) and the type of storage you are running your benchmarks on (Coud, SSD, nVME, HDD, Lustre, etc.)?

Is it possible to speed up somehow the loading times?

To improve decompression efficiency, please try adding backend="dask" in MDIOReader initialization. By default, decompression is single-threaded and there could be some blocking code, but using the dask backend allows for efficient multi-threaded unpacking of chunks and handles concurrent reads (of chunks) a lot better.

Consider not returning metadata (headers and live mask) by default when initializing MDIOReader. Cache the necessary headers and live mask in memory once to avoid slow reads every iteration.

Or, maybe, this is not the area you focus your format on and the current loading times are fine for the usecases you plan on developing;

We are currently working on improving performance for upcoming releases. Although single-read latency was not previously a priority, we are now focusing on it. Our team was able to saturate multiple high-end enterprise GPUs using local nVME SSDs for training with 2D image data (using 3D chunks), although it used more resources than we would like.

Is there a way to make a multipurpose file? The way I see it now I can make a file for somewhat fast 2D INLINE slices (by setting chunk_size to 1x64x64 or somewhat like that), but that would be a mess of lots of files.

We have plans to release that as well. The pattern is to have "lossless" data in 3D chunks and lossy compressed copies in orthogonal (il/xl/z) directions. It can be done manually and added to an existing MDIO file and accessed via the access_pattern in the initializer; however, we are refining it. This gives you approx. the same size as the original SEG-Y but with very fast access patterns.

is there a way to preallocate memory for the data to load into? That is a huge speedup for all ML applications?

Not at the moment, but a good idea. Can you elaborate on how it can be done?

is there a way to get values of a particular trace headers for the entire cube?

Please try using the reader._headers attribute to query data. While it is not officially part of the public API, it can still be accessed via this attribute. That exposes the possibility to extract fields from the headers like reader._headers["185"] if you want a single attribute. If you want multiple; with Zarr backend you have to use reader._headers.get_basic_selection(fields=["189", "193"]) etc., or in Dask it is reader._headers[["189", "193"]].compute().

tasansal commented 1 year ago

P.S. I think most of your latency is coming from de-compression. All the formats you compared against seem like they aren't compressed, correct?

tasansal commented 1 year ago

P.S. P.S. :) As you know default ingestion is lossless compression. Currently, there are no hooks to turn it off but can be added fairly quickly if you want to test that as well.

Also we have a recent article you may like: https://doi.org/10.1190/tle42070465.1

SergeyTsimfer commented 1 year ago

Hey

Thanks for the quick response!

Parallelization

Regarding the parallelization (with either threads or processes), it is a thing that affects IO in a number of ways:

Compression

Regarding the compression:

The reason for focusing on SEG-Y is because of it being really convenient in production. All of the data is already in SEG-Y format, and having additional files makes a mess. Also, we've optimized our ML algorithms so that most of them take less time than the cube conversion, which can take 10+ minutes for large fields. Also, we've been able to speed up SEG-Y enough to compete with other ndarray storages: the only setting where it is slower is loading along SAMPLES dimension. At the same time, it does not have to deal with weird chunking / axis ordering problems that were pointed out in previous posts. Lastly, conversion is pretty much impossible before the stacking process, where the speed of data crunching is the most important.

Preallocation

If you have a number of slices to load, for ML pipelines you would have to stack them into one array. Something like that:

batch_images = np.stack([loader[slice] for slice in slices]) # (N, n_I, n_X, n_S) shape

That requires an unnecessary copy of the entire data! To really optimize things, it would be nice to pre-allocate array of desired shape and then make loader load data directly into that memory:

buffer = np.empty(shape, dtype=...)
for i, slice in enumerate(slices):
    loader.load_data_into_buffer(slice, buffer[i])

If all of the other places are already highly optimized, this speed ups it by another ~2x, though it is quite tricky to implement (especially when relying on external storages). In most cases, that means getting very familiar with the underlying low-level functions:)

Multiple projections in one file

Yep, having the same data in multiple orientations/chunks is the default approach to solve this.

Though, IMO, they would need to have the same (lossy or lossless) data, because getting different values due to the mercy of loader would be strange.

Benchmark

The cube has (2600, 1400, 1500) shape. I'm running benchmarks on SSD. I forgot to include a link to benchmark notebook in the previous post, so here it is:)

I've tried backend='dask' and it only slowed things further; given the fact that dask is not really about raw speed, but about computing functions on data in parallel, it is not so surprising.

I would like to add more engines to the benchmark, so if you would have major updates / features to test -- be sure to hit me up!

tasansal commented 1 year ago

Hi again, great conversation, thanks for the details!

Parallelization

I understand your concern. However, by design, the 3D MDIO bricked data (as you noticed) will be slower (compared to naturally ordered SEG-Y in one direction); compression will also increase the latency. The 3D bricked case is the most suitable for archival storage and big concurrent writes but non-latency-critical applications. One benefit is that it significantly reduces the cloud storage cost due to compression, and data is still accessible with a very reasonable performance. We have off-core serving methods (proprietary) to offload the overhead to a scalable cloud cluster so a single tenant can read multi-GB/s without much CPU overhead with the 3D bricks; however, you pay the cost in computing. Caching on the server side greatly helps with this (i.e., once a group (chunk) of inlines is cached, the next ones are instantaneous). One important benefit is also (on the cloud object stores) the writes are more efficient. Object stores are copy-on-write, so modifying 1-bit of SEG-Y is terribly inefficient. Chunked storage fixes that.

Currently, the optimal way to open the 3D bricked data for inline/xline reading, etc., is to have the Dask backend (threads, no distributed dask at this point) with new_chunks optimized for the read direction and use threads for decompression. It creates "virtual" chunks to group read operations. This way, in the benchmark below, it is 2x faster to read. I forgot to mention the new_chunks option for earlier for the Dask backend, which groups chunk reading to reduce the task overhead of Dask.

When reading from the cloud, using a Dask distributed client (distributed.Client) and by using multiple processes as workers; we can fetch a lot of data very quickly to one machine (given the network I/O is sufficient).

Compression

Yes, we optimized it mainly for maximum compression and reasonable (but not the fastest) performance. We found Zstandard performs better than the default LZ4 in Blosc for higher compression ratio. However; I recently saw this by the Blosc team and may not hurt to run the suite to find the optimal parameters Btune.

I agree about the convenience of SEG-Y data (already available); however, sometimes, that is also a problem due to variations in SEG-Y. (IBM vs. IEEE; weird byte locations etc.) which makes downstream tasks painful. We try to standardize the data (more coming in later releases). Earlier, you mentioned IBM not being an issue; but unfortunately, most SEG-Y data we have health with are big-endian IBM; so there is also the overhead of parsing it. Big-endian encoded IBM data has to die :).

There is also lossless ZFP compression, which is very good. The only caveat is that it needs standardized data to have approx. standard deviation of 1. This causes extremely minor quantization errors but reduces the size by 75% in the FP32 case, and it is better than any lossy compressor when it comes to quality. We still need to implement this.

Preallocation

Ok, I see what you are saying; that's pretty trivial to implement with Zarr because it supports the out= keyword argument. So data can be written to a pre-allocated numpy array's view. However, it has yet to be exposed in the MDIO API.

Multiple Projections

If disk space or storage cost is not a concern; full precision can be saved of course :) However, that's not always the case. That's why we usually keep the original data in 3D; and "fast-views" of the data for ML or Viz purposes. I have an example below, which shows the benefits. Anything that needs full 32-bit float precision must be built to process data in chunks for maximum efficiency.

Some Basic Benchmarks

Dataset is the Poseidon data (open-source; cc-by 4.0, ingested as 64x64x64 bricks), I did this. Reading from SSD on a Mac M1 Max (10 cores) laptop. Original SEG-Y is ~100GB: psdn11_TbsdmF_full_w_AGC_Nov11.segy. This is a larger dataset than the one you are using.

>>> mdio = MDIOReader(path, access_pattern="012")
>>> mdio_dask = MDIOReader(path, access_pattern="012", backend="dask", new_chunks=(64, "auto", "auto"))
>>> 
>>> print(mdio.shape)
(3437, 5053, 1501)
>>> %timeit mdio[1750]
1.47 s ± 77.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
>>> %timeit mdio_dask[1750].compute()
718 ms ± 19.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

The above uses ~50% CPU when running.

ZFP and Optimized Chunks Example

I made IL/XL/Z optimized and lossy compressed versions within the same MDIO file.

$ du -hs psdn11_TbsdmF_full_w_AGC_Nov11.mdio/data/chunked_* psdn11_TbsdmF_full_w_AGC_Nov11.mdio/metadata/chunked_*
 13G    psdn11_TbsdmF_full_w_AGC_Nov11.mdio/data/chunked_01
 34G    psdn11_TbsdmF_full_w_AGC_Nov11.mdio/data/chunked_012
 18G    psdn11_TbsdmF_full_w_AGC_Nov11.mdio/data/chunked_02
 17G    psdn11_TbsdmF_full_w_AGC_Nov11.mdio/data/chunked_12
 50M    psdn11_TbsdmF_full_w_AGC_Nov11.mdio/metadata/chunked_012_trace_headers
102M    psdn11_TbsdmF_full_w_AGC_Nov11.mdio/metadata/chunked_01_trace_headers
138M    psdn11_TbsdmF_full_w_AGC_Nov11.mdio/metadata/chunked_02_trace_headers
134M    psdn11_TbsdmF_full_w_AGC_Nov11.mdio/metadata/chunked_12_trace_headers

The total file size is ~85GB; the original SEG-Y was ~100GB. So we have very fast and flexible access in every possible direction and still smaller.

In this case, we don't usually need the new_chunks because tiles are quite large (i.e., not many tasks). The access pattern tells the reader which optimized version to choose.

>>> fast_il = MDIOReader(path, access_pattern="12", backend="dask")  # il pattern
>>> fast_xl = MDIOReader(path, access_pattern="02", backend="dask")  # xl pattern
>>> fast_z = MDIOReader(path, access_pattern="01", backend="dask")  # z pattern

Benchmark the line (with Dask)

>>> %timeit il = fast_il[1750].compute()
41 ms ± 808 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

Without Dask:

>>> fast_il = MDIOReader(path, access_pattern="12")
>>> %timeit il = fast_il[1750]
87 ms ± 1.75 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

Much more reasonable :) It would be faster without compression of course. Not the inlines have ~5000 crosslines, which would be ~30MB in FP32.

SergeyTsimfer commented 1 year ago

Hi again

Took a pause to think some ideas through:)

Benchmark (thinking out loud)

A more challenging thing is to eliminate the system cache. For example, in my benchmarks, I've used a system with ~infinite (don't want to be specific) amount of RAM, so that the entire cube could easily fit into it. For my tests I either ignore this fact or forcefully remove the cube from OS cache, but this should be also a part of the benchmark itself. This is where random read patterns are also coming in handy.

Remote storage

A question I've been thinking about is as follows:

Given two loading methods A and B, with A being faster on local storage than B, is it true that A is faster for remote loads if both are equipped with a reasonable proxy to interact with the remote API?

On the surface, the answer is yes, but the devil is in the details. reasonable proxy for interacting is quite an abstract thing: for example, if I wanted to implement remote storage loading for segfast, I would need to load bytes from the file at specific locations. As s3 / aws abstracts away those concepts from the user, it would not be that easy.

So a better answer is yes, if you are willing to invest a lot of time in writing such a proxy, which is not terribly helpful.

As a result of all of this, I should definetely think about adding some benchmarks for working with non-local data.

Benchmark (MDIO)

These are quite nice speedups! Can you provide the code to set-up such a file, so that I could test it also?

As a side note: what do you think about automating the selection of the best (fastest) projection?

Side note 2: for ZFP compression, you could use a fraction of std instead of absolute values, that should solve the problem of it requiring standardized data range.

PS

I feel that eliminating IBM floats (making IEEE the default) from seismic processing software could be the biggest speed up for all of the subsequent stages in exploration, as IBM format slows down all operations in software like Petrel by an order of magnitude:)