Open yetinam opened 10 months ago
Kia ora @yetinam - we have discussed this a little before in obspy conversations offline, but there wasn't a lot of desire at the time (years ago, so this may have changed).
Since those discussions I have worked on multithreaded versions of filtering and resampling to overcome the same issues that you are facing. Because the underlying routines for both filtering and resampling use numpy routines that release the GIL it is possible to use multithreading rather than multiprocessing which avoids the memory cost and time cost of copying memory between processes. It does require a little more code than just using joblib when looping over traces, but it is usually worth it to reduce the memory usage.
I have implemented some of this in EQcorrscan (see below). Obviously you are welcome to install EQcorrscan and use those routines immediately, but I remain keen to see parallel processing of streams more readily available than just my tinkering in EQcorrscan though, particularly with more use of large-N data.
Specific processing routines relevant to EQcorrscan's workflow are now implemented here for filtering, here for resampling and here for detrending. These could form the basis for the more general routines for obspy (they were based on obspy routines and produce the same results within precision as obspy for a specific set of parameters) if there was desire to port them across.
The pull request that added these enhancements to EQcorrscan is here and has some simple benchmarks.
Alongside this, we might want to consider at some point using the GPU accelerated drop-in replacements for some numpy functions to further speed up these routines when GPUs are availavble using cupy. I have explored ways that this could be implemented in a separate repo. I haven't had time to work more on this recently, but the bones are there and hopefully you can see the idea.
Part of the appeal to me for not integrating these accelerations with obspy immediately is that it would be quite a lot of work, and it would be good for the people who develop the code to write a paper on the work and get credit for it (along with the obspy developers). In the repo above I was developing a package that would work as a simple one-line import statement (e.g. import obspy_accelerated
) which would overload all the obspy methods that it could with an accelerated version.
For what it is worth, I think accelerating all the obviously parallel-able routines on Streams is something we should be doing. Possibly this is something we could have an obspy get-together/hackathon to get done?
Hey all,
I think this is something that ObsPy devs have been thinking about for more than a decade now. @krischer mentioned years ago that there was a branch of ObsPy that implemented all the trace processing loops in a threadpool, but they found at that time it made ObsPy slower overall. Of course a lot has changed in python since then as @calum-chamberlain's benchmarks show.
Also, with GIL-less python builds coming soon as per pep703, there is probably a lot of merit in revisiting the idea.
My worry though, is running all of these types of trace functions in a threadpool by default might cause some subtle changes in behavior, and maybe bugs, to existing codes. IMHO It should be something users opt into. Maybe a new namespace would be appropriate? Something like this:
from concurrent.futures import ThreadPoolExecutor
import obpsy
tpool = ThreadPoolExecutor()
# This way you can use any executor like object (eg dask client, process pool, etc)
obspy.parallel.set_executor(tpool)
st = obspy.read()
st = st.parallel.detrend()
Or each of the embarrassingly parallel stream methods could have a new argument to indicate they should be in parallel?
from concurrent.futures import ThreadPoolExecutor
import obpsy
tpool = ThreadPoolExecutor()
st = obspy.read()
st = st.detrend(parallel=tpool) # or parallel=True might just create a threadpool.
Caching the default ThreadpoolExecutor
would probably be a reasonable optimization no? I think they are fairly cheap to keep around.
I also thought about re-implementing the stream entirely using awkward array, but haven't gotten around to exploring this.
If I was in your shoes, @yetinam, I would look into implementing a generic function for this, then optimizing after running some bench marks. Something like (haven't tested):
from concurrent.futures import ThreadPoolExecutor
from functools import partial
import obspy
DEFAULT_EXECUTOR = ThreadPoolExecutor()
def get_executor(executor=None):
if executor is None:
return DEFAULT_EXECUTOR
return executor
def apply_trace_func(tr, name, args, kwargs):
return getattr(tr, name)(*args, **kwargs)
def stream_apply(st, func_name, *args, executor=None, **kwargs):
executor = get_executor(executor)
func = partial(apply_trace_func, name=func_name, args=args, kwargs=kwargs)
# I cant remember if map returns a future or just a trace here...
results = [x for x in executor.map(func, st)]
return obspy.Stream(traces=[res for res in results])
I did try to pursue something similar to your suggestion @d-chambers, but I ended up having to split some of the trace functions into parts that released the GIL and parts that did not to get useful speed-ups, hence the functions as they are in EQcorrscan (linked above). Nevertheless, I think being able to provide different executors is a great idea.
I also agree that this could cause subtle changes in behavior, and should be opt in, which might be another reason to develop a separate package that monkey-patches Stream
methods?
The "benchmarks" in the PR I linked weren't very helpful. I thought I would give a brief demonstration of the kind of speed-ups and memory changes we might expect to see for multithreading, using my implementation in EQcorrscan. The code below if designed to be run in an ipython session and requires obspy, eqcorrscan, numpy and memory_profiler to be installed.
%load_ext memory_profiler
import numpy as np
from obspy import read, Stream
from eqcorrscan.utils.pre_processing import _multi_detrend, _multi_filter, _multi_resample
n_traces, n_samples = 24, 8640000
st = read()
original_st = Stream()
for i in range(n_traces):
tr = st[0].copy()
tr.stats.channel = str(i)
tr.data = np.random.randn(n_samples)
original_st += tr
# We have to copy the data to avoid resampling the resampled data using timeit, so lets time the copy first
%timeit original_st.copy()
%memit original_st.copy()
%timeit original_st.copy().resample(20)
%memit original_st.copy().resample(20)
%timeit _multi_resample(original_st.copy(), 20)
%memit _multi_resample(original_st.copy(), 20)
# Check that the results are the same
eqc_resamp = _multi_resample(original_st.copy(), 20)
original_resamp = original_st.copy().resample(20)
for original_tr, eqc_tr in zip(original_resamp, eqc_resamp):
assert np.allclose(original_tr.data, eqc_tr.data)
On my Ryzen 5 (12 threads) I get:
.copy
: time: 312ms, peak memory: 3241.58 MiB.resample
: time: 12.6s, peak memory: 3754.72 MiB_multi_resample
: time: 4.3s, peak memory: 8082.91 MiBSo over double the memory use (this could probably be reduced by a better programmer), but ~3x faster using the multi-threaded variant. Obviously not the 12x we would all hope for given 12 threads were operating, but a significant improvement.
Hi Calum, hi Derrick,
thanks for your detailed comments and the pointers to existing implementations of parallelised codes! I agree that the idea of allowing users to set an executor sounds reasonable. It's also a good point that the scipy and numpy functions in the background will anyhow release the GIL. I'll see how far I can get the parallelisation with threads. They're also preferable because they'll cause a lot less headache with other libraries. ProcessPoolExecutors and, for example, ipython or dask, don't work particularly well together.
Regarding the GPU implementation, I agree with Calum that this is out of scope here. Not only because of the amount of work (and the potential to sum this up in a publication) but also because of the added dependencies. CUDA can be quite a pain for both installing (the binaries are pretty big) and loading (init times and memory footprint). For the ML use cases with SeisBench, it turns out to be substantially more economical to just use more CPUs than using GPU (Krauss et al. (2023)).
Moving forward, I'll write a little benchmark that runs through all relevant functions for different workloads (few short streams, many short streams, few long streams, many long streams, heterogeneous workloads, ...). Once this is done I'll see which options offer relevant speed-ups. I'm a little concerned this might be pretty dependent on the hardware and OS (because overheads for threading/processes are very different). Probably nothing to worry about now but before merging anything we might want to reproduce benchmark results on different machines.
Last point, even though nothing that has to be decided now: I'd suggest to make parallel execution the default (if the benchmarks show conclusive speed-ups). Yes, this change might cause hiccups, which should be minimised as good as possible by testing, but I think the benefits outweigh the costs. An optional functionality will only be found and used by a fraction of the users that could benefit from it. This is even more of a problem for users that mostly interact with obspy through other libraries, such as EQCorrscan and SeisBench. I think it wouldn't be optimal if users of these libraries have to patch obspy in their runtimes or otherwise if these libraries quietly apply monkey patches to the obspy environment. Clearly, as I'm not an obspy maintainer my concerns about stability and compatibility are probably a little lower than yours.
I kinda agree with most of the above, I think it should be opt-in probably.
concerns about stability and compatibility
one of the most important points when it comes to obspy development, I would say =)
I think it wouldn't be optimal if users of these libraries have to patch obspy in their runtimes or otherwise if these libraries quietly apply monkey patches to the obspy environment
I think there should be ways to stay in full control, even if it is some other library calling obspy.. no? Not the expert on stuff like that, but maybe some flag on obspy, e.g. obspy.__parallel__ = False
that the user could set before running third party libraries?
Problem
I'm using obspy extensively as the backend of the SeisBench library for machine learning. While profiling the performance of picking waveform streams with deep learning, I noticed that substantial amounts of time go into obspy stream processing functions such as filtering or resampling. The problem is that while the deep learning parts are parallelised, the obspy operations are not. On the other hand, many of these functions (filtering, resampling, response removal, ...) are embarassingly parallel, i.e., in principle each CPU core could process an individual CPU core at a time. This could lead to a major speed-up.
Proposed solution
All embarrassingly parallel stream operations in obspy should be parallelised. I admit that Python and the GIL make this a non-trivial task. I've played around with a few options but none of my solutions so far was satisfying. For example, with joblib I managed to half the time for some operations but at the cost of using 8 CPU cores, so substantial overhead.
With this issue, I mostly want to start a discussion, if this parallelisation should be integrated into obspy and how this could be achieve. I have some experience with parallel programming in Python, but I'm by far no expert on the topic. Maybe someone else has suggestions how to best implement this? As I think this change could substantially improve the runtime of many codes in seismology (at no additional coding time for the users), I'd also be willing to invest some coding time into this myself.