eqcorrscan / EQcorrscan

Earthquake detection and analysis in Python.
https://eqcorrscan.readthedocs.io/en/latest/
Other
167 stars 86 forks source link

Concurrent execution of workflow steps #544

Closed calum-chamberlain closed 11 months ago

calum-chamberlain commented 1 year ago

What does this PR do?

This PR implements concurrent processing of the main steps in the .detect workflow. Alongside this there are several other major changes including moving all the matched-filter components out of core.matched_filter.matched_filter to focus on the tribe.detect method. The matched_filter function has been retained, but it simply creates a Tribe internally and runs the tribe.detect method.

Fundamentally this PR breaks the workflow down into the following steps:

  1. Downloading data (if running .client_detect, which I recommend)
  2. Processing data (which is now possible in parallel from a Process thanks to #540 )
  3. Prepare data for correlation
  4. Correlate and detect peaks
  5. Convert peaks to Detections
  6. Convert detections to Party
  7. Collect all parties into a final Party

Each step as listed runs in it's own process, except steps 0 and 1, which run together when using .client_detect, and 0 is not run when using .detect. Communication between steps is via multiprocessing Queues. Steps will wait until data are available in their incoming queue, then work on those data and put them into their output queue which forms the input queue for the following step. Steps loop until they get None from the input queue, used to signify the end of processing.

.detect supports passing a queue as input, or a Stream. This means that if users do not want to use a client to get their data, they can provide a simple queue that might look something like:

import glob

from typing import Iterable
from multiprocessing import Queue, Process

from obspy import read

from eqcorrscan import Tribe

def reader(files: Iterable, output_queue: Queue):
    for file in files:
        output_queue.put(read(file))
    return

def main():
    tribe = Tribe().read("Some_tribe.tgz")
    files = glob.glob("somewhere-with-some-waveforms")
    files.sort()
    stream_queue = Queue(maxsize=1)
    stream_getter = Process(target=reader, kwargs={"files": files, "output_queue": stream_queue})

    stream_getter.start()
    party = tribe.detect(st=stream_getter, ...)

As currently implemented, only step 3, correlate and detect peaks, runs in the MainProcess. This enables this step to make use of multiprocessing if needed, and provides more safety for openmp threading in underlying C-functions. I attempted to move peak finding into its own Process, but found that putting the large arrays of cross-correlation sums into the Queue was prohibitively slow, and there is a strong risk of exceeding queue size. I also tried saving these and reading them back in in another process, but again, saving them to disk was too slow.

Minor changes:

Why was it initiated? Any relevant Issues?

Concurrent processing should enable us to more heavily load systems, which should please HPC admins... This PR speeds up EQcorrscan for large datasets, and should enable much better GPU utilisation when using the FMF backend.

PR Checklist

TODO:

calum-chamberlain commented 1 year ago

Benchmarks (part 1: 1000 templates over 3 days)

Benchmarks computed using memory-profiler, running the script here and a tribe of 5000 templates spanning the entirety of North Island, New Zealand. Note that only the most picked 20 stations are run which somewhat helps the grouping.

All benchmarks below have been run using the same thresholds, processing, and the FMF backend. All benchmarks were run on my Desktop at VUW running an NVIDIA Quadro RTX 4000 (8GB VRAM), intel i7-11700 (8 core/16 thread) and 64GB RAM (unknown generation). GPU load was never 100%, but doesn't appear to have been overheating (fan was never running at 100%), so I suspect that this was a limitation of FMF rather than the machine.

For this first set of benchmarks I ran a subset of 1000 templates through 3 days of data. Larger sets (the full 5000 through 10 days) are coming soon. I also neglected to include children processes in the memory calculations, so the actual memory usage will be higher. I will post some more benchmarks soon with child memory included.

Current master

Runtime: 549 s Peak memory: 11,682.656 MiB (Note actual memory use will be higher when including children)

mprofile_master_fmf_250

Current develop

Runtime: 522.36 s Peak memory: 11,610.844 MiB (Note actual memory use will be higher when including children)

mprofile_develop_fmf_250

This branch, serial

Runtime: 331 s Peak memory: 9,761.625 MiB (Note actual memory use will be higher when including children)

mprofile_serial_fmf_250

This branch, concurrent

Runtime: 329 s Peak memory: 8018.465 MiB (Note actual memory use will be higher when including children)

mprofile_concurrent_fmf_250

Summary:

This branch is faster (runtimes ~60% of current master and develop) and consume less memory (Note actual memory use will be higher when including children). The higher memory in serial is somewhat surprising and probably points to me having done something lazy there due to all memory being within the mainprocess. One of the possible reductions in memory use is now that everything is in a single flow I have reduced the copying of streams (lets see if this holds up when including child memory). The concurrent and serial operations also write temporary party files, which may be speeding things up.

One obvious feature of the graphs comparing develop to either the serial or concurrent runs in this branch is the reduced time spent waiting for data. The result of this is that more time is spent in the multi-threaded parts of the workflow (either on the CPU or GPU). Note that the serial workflow still downloads the next day of data in advance in this benchmark.

Comparing the concurrent to serial versions we see that in the concurrent version less time is spent in the processing and preparation step (between the four sawtooths of the template groups). This is more pronounced for larger runs with more channels.

In all cases there is an unavoidable single-threaded start-up period when the tribe is read in (I'm using a pickled tribe for speed and would recommend this to users in the future if they are not transferring the tribe between machines), and when getting the initial dataset.

calum-chamberlain commented 1 year ago

And now with children...

As above (1000 templates, 3 days), but this time getting mprof to include memory from child processes.

Current master

Runtime: 527 s Peak memory: 110,102.359 MiB (Note that this highlights that the memory reported isn't accurate even when including children! I only have 64GB RAM and was nowhere near the limit...)

mprofile_master_fmf_250

Current develop

Runtime: 524 s Peak memory: 11494.309 MiB

mprofile_develop_fmf_250

This branch, serial

Runtime: 329 s Peak memory: 18177.848 MiB

mprofile_serial_250_fmf

This branch, concurrent

Runtime: 321 s Peak memory: 30369.926 MiB

mprofile_concurrent_250_fmf

Summary

Note that the serial implementation here still downloads streams eagerly. This doesn't have to be the case (e.g. you could loop over streams using .detect) and this would reduce memory consumption as well. For these tests the concurrent version is only marginally faster than the serial implementation. Memory use for this branch is significantly increased compared to current develop. Runtimes also reduce significantly. It would be good to make the serial implementation of client_detect comparable to develop serial to make sure that the memory increase is not excessive for this speed-up (e.g. download data in serial).

Compared to the current master (reported) memory usage is significantly reduced - this is mostly due to moving pre-processing from multiprocessing to multithreading.

I will work on some benchmarks to highlight the speedup afforded by the concurrent branch. I think that the main benefit of this is when the correlation preparation stages take a long time (when using many channels).

calum-chamberlain commented 1 year ago

Updated with serial downloads

I made the serial workflow actually downlaod data in serial, and did some tweaks to memory passing for the concurrent process. The results below are for 5000 templates over 3 days using 50 stations (so a heavier load than the previous benchmarks), again running with the FMF backend and on the same computer.

This branch serial

Runtime: 2356 s Peak Memory: 22187.023 MiB

mprofile_serial_fmf_250

This branch concurrent

Runtime: 2252 s Peak Memory: 58750.164 MiB

mprofile_concurrent_fmf_250

So we get a minor speed-up for this template case (4% faster), but much greater memory use (>2.5x). Some of that comes from handling and processing an additional day of data in memory, but not all - the final day of processing, with only one stream in memory, still sits at ~2x the peak memory of the serial version. I suspect that using all these processes and queues is resulting in more objects being pickled and shared to new processes than I would like.

I think I should try and reduce the number of processes to reduce the sharing. Ideally we might just need three processes:

  1. Get and prepare data for correlation
  2. Correlate and find peaks
  3. Convert peaks to detections and party. Currently step 1 has 3 processes (downloader, grouper and prepper), and step 3 has a further three processes (detector, detect builder and party builder). I think I got carried away!
calum-chamberlain commented 1 year ago

Updated concurrent timings and memory use

I updated to have fewer queues and processes, and use a temporary template and stream cache on disk to reduce in-memory sharing of data. This also reduces the need to copy streams in memory.

Timings for concurrent workflow with 5000 templates through three days are below.

This branch concurrent

Runtime: 2136 s Peak memory: 42861.961 MiB

mprofile_concurrent_fmf_250

This is now faster, and use ~16GB less memory than before, but still ~20GB more than in serial. Timings remain similar between concurrent and serial for this template set. The main benefits remain when templates are very dissimilar.

calum-chamberlain commented 1 year ago

I think that the main stumbling block is moving the large numpy arrays around between processes through the pipe. I could probably explore using shared memory arrays, but I think that I would end up using even more memory for a very small time-saving. If these shared memory arrays could be passed directly to the correlation functions that would be fine, but I think to do that would require a lot of re-writing for correlation code and wrappers. It was fun to explore though!

I'm impressed by how much faster the serial approach now is that the old approach and I think I should concentrate on that, and maybe allow concurrent downloading for client_detect, but have everything else in the serial process. The time savings seem minimal, even for the more complex cases that I have tested.

calum-chamberlain commented 1 year ago

A simpler, and lighter re-write would be to make the stream-dict in the main process, and just get the template dict/arrays in from the prepper. The stream could be passed once via disk and then channels selected from that stream for each template group.

I also still think that it is worth having the grouping running prematurely, because this can be very slow if the stream seed ids change (e.g. if a station goes down or become available).

calum-chamberlain commented 1 year ago

Update: This has worked well for the 11k, 15 year test that I ran. I'm now running a bigger template set of ~38k templates with the same code. One obvious limiter remains the template grouping: this can take a long time initially, but is then cached for later runs. When splitting a long-duration run into chunks (as would be common for large-scale matched-filters), each chunk currently has to run the grouping. It might make sense to have the option to specify which templates are grouped together a-priori so that a user can run the grouping once, cache to a file, then load that file for each chunk. This might not give the optimal grouping every time, but it would reduce the start-up (serial) cost of grouping.

TODO:

calum-chamberlain commented 1 year ago

This seems pretty robust to me, and faster and more memory efficient for large datasets. I'm going to merge this once the tests pass and I have finished the todo list up the top (tutorial and code cleaning/docs).

I'm probably going to base a new release on these changes as well. This new release will support at least python 3.11 now that we don't depend on pyfftw (from #540).

calum-chamberlain commented 12 months ago

Benchmarks

I wrote a small script for a simple benchmark test here that takes data from GeoNet's AWS bucket. This uses around 1000 templates and can be run on any of the parallel_steps, master and develop branches. The plots below are the output from running the script with mprofile. I ran two instances, one on my home computer using the fftw backend and local data (pre-downloaded), and one on my work computer using the fmf backend and downloading data on the fly. These highlight two things:

  1. Using concurrent_processing=True increases memory use (as expected)
  2. Using concurrent_processing=True is only faster when getting the data is a reasonably large time-cost, otherwise the costs of serialising and de-serialising data from disk between processes is more expensive than reading in and processing data.

Based on this, I recommend that we set the default to be concurrent_processing=False, and document the option of concurrent_processing=True, possibly we could add a message when using .client_detect(concurrent_processing=False, ...) to suggest that concurrent_processing=True might be beneficial.

Local-data, FFTW backend, 10 stations (AMD Ryzen 5 2600X, 6 core, 12 thread, M.2 SSD, 3600 MT/s RAM)

Master

mprofile_master

Develop

mprofile_develop

Parallel_steps [concurrent_processing=False]

mprofile_parallel_steps_serial

Parallel_steps [concurrent_processing=True]

mprofile_parallel_steps

Remote data, FMF backend, 30 stations (Nvidia Quadro RTX 4000, Intel i7 (?), NFS (should be slower for on disk data sharing), unknown RAM) - ran in groups of 100 templates

Note, I did not profile master because I ran out of RAM.

Develop

mprofile_develop_grp100

Parallel_steps [concurrent_processing=False]

mprofile_parallel_steps_serial_grp100_aws

Parallel_steps [concurrent_processing=True]

mprofile_parallel_steps_grp100_aws

calum-chamberlain commented 11 months ago

Interim docs builds are here