European-XFEL / EXtra-foam

Fast Online Analysis Monitor at EuXFEL
https://extra-foam.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
9 stars 10 forks source link

Stream-from-file performance #420

Open zhujun98 opened 2 years ago

zhujun98 commented 2 years ago

I wonder is there a benchmark for streaming from the files? Have you considered making a virtual data source with a throughput of higher than 10 GB/s by streaming data from files? I am about to make one but have not thought about the technical challenges in detail.

JamesWrigley commented 2 years ago

Coincidentally I've been thinking about this recently because I'm working on something where it would be very useful :upside_down_face:

My tentative plan is:

@philsmt, did you implement something similar for extra-metropc-run's --readahead?

tmichela commented 2 years ago

I'm not certain there's a benefit to read large chunk of data in different process, you will likely hit the file system limit anyway from a single process. Where it is useful is when you read many small chunk of data in different datasets. (just asumptions, would be best to benchmark it). I think @D4vidH4mm3r did something similar to what you want for testing calng pipeline.

D4vidH4mm3r commented 2 years ago

I'm not certain there's a benefit to read large chunk of data in different process, you will likely hit the file system limit anyway from a single process. Where it is useful is when you read many small chunk of data in different datasets. (just asumptions, would be best to benchmark it). I think @D4vidH4mm3r did something similar to what you want for testing calng pipeline.

To sidestep file system and HDF5 details for online pipeline benchmarking, I actually just read a subset of trains to memory (leading to the imaginative device name MemToPipe) and stream them into the pipeline from there. For the purposes of measuring calng performance, seeing the same n trains in a loop is fine.

JamesWrigley commented 2 years ago

I'm not certain there's a benefit to read large chunk of data in different process, you will likely hit the file system limit anyway from a single process. Where it is useful is when you read many small chunk of data in different datasets. (just asumptions, would be best to benchmark it).

Interesting, indeed I will need to do some benchmarks. Reading many small chunks is not uncommon though, e.g. the thing I'm currently working on is analysis of JF500K data, which is pretty small.

zhujun98 commented 2 years ago

Coincidentally I've been thinking about this recently because I'm working on something where it would be very useful 🙃

Cool! Please keep me posted when you start to write your code. I can also contribute if you like.

@D4vidH4mm3r 's approach is very smart and should be adequate for a lot of benchmarking cases. But for some use cases, we will still need a large amount of real data, if not all data from a run. Imaging performing a stressful test of the data services (both live processing and data reduction) for an experiment which needs 3D tomography reconstruction, significant amount of data will need to evaluation the accuracy of the result, although currently I have no idea how much does it need.

Regarding the technical choice, the bottleneck is reading data from files. I'd like to see the benchmark numbers.

By the way, you guys are lucky. I will also have to deal will decompression, which seems to be very expensive!

philsmt commented 2 years ago

Yes, extra-metropc-run uses an IO thread for reading in the data. Since this tool is primarily geared at development though, this was not strictly done for performance but rather to play nice with the asyncio loop running the pipeline. It's using a queue to move data between this thread and the event loop with said --readahead specifying its size. If you try to run the pipeline at high rates, you might get away by increasing this value, but ultimately it may drain anyway if IO cannot keep up in the long run. I added this option when I encountered highly variable read times per train, and having a large queue allowed to smoothen them out.

Parallelizing reads can improve performance quite significantly, but how depends heavily on the filesystem you're on. GPFS scales very nicely with parallel readers on the same file, but dCache does not. If accessing a single file in parallel, you're distributing the same bandwidth on each worker. Where dCache scales better is accessing distinct files from each worker, and it really shines when accessing it from several nodes in parallel (not in the scope of this problem of course).

@zhujun98 Are you referring to FLASH files? Most of the extremely poor decompression performance here used to come from the unfortunate choice of chunking entire datasets rather than a smaller boundary like trains or pulses. I typically rewrote files during a beamtime with saner chunking to scratch for analysis...

zhujun98 commented 2 years ago

Hi @philsmt, Thanks a lot for the input! I am working for SLS2 at PSI :) But I think the same principle applies. I should pay attention to the chunking strategy and benchmark the performance by rewriting files.

takluyver commented 2 years ago

If you're dealing with compressed data, be aware that HDF5 caches decompressed chunks, but only up to a set size - 1 MB by default. If your chunks are bigger than that, they're not cached, so you can easily be reading & decompressing the same chunk multiple times, which obviously makes it extremely slow.

You can control the cache size either when opening the file or the dataset (also: docs for h5py). Or, of course, read a chunk into your own code and then take smaller pieces from it.

zhujun98 commented 2 years ago

Thanks @takluyver! I just started to work on the Eiger 16M (18M pixels to be precise, 32 bit). The chunk size is a single image size and it is compressed by BSLZ4. It takes 9.6 s to load 100 compressed images and 2.6 s to load 100 uncompressed ones. I am using h5py with hdf5plugin. If you have experience for improvement, please do let me know :-)

takluyver commented 2 years ago

If you're reading the obvious way, the decompression is happening one chunk at a time, so probably just using one CPU core (but check that, there might be some low-level parallelism I don't know about). There are a few options for decompressing chunks in parallel:

We do the equivalent of the last option for writing compressed data in the offline correction, and it was a considerable speedup over letting HDF5 do the compression. That was with simple deflate compression rather than bitshuffle-lz4, but I think the same idea should apply.

zhujun98 commented 2 years ago

Read raw chunks with the low-level method dset.id.read_direct_chunk() and then decompress them separately (can be done in threads if your decompression function releases the GIL).

This is exactly what I am looking for!!! I really appreciate it :)

zhujun98 commented 2 years ago

I finally make it faster than reading uncompressed data from a file. Thanks again for the recipe @takluyver!