Open HGeoAle opened 1 month ago
Hi Hugo,
Since you are writing about 100 events, the limitation is as you write the long continuous streams of data. To avoid holding the long streams in memory when you use extract_stream
, you can use EQcorrscan's functionality to create a tribe where all your events (detection-templates and picked detections) are handled as templates. Then you can use tribe = Tribe().construct(method="from_client", ...
to read the waveforms with the client-functionality. You can write the tribe to a file that you read in from your script, and then run write_correlations
with the shortest possible traces in memory:
tribe = Tribe().read('tribe.tgz')
# Prepare dicts for write_correlations
stream_dict = dict(zip([template.event.resource_id.id for t in tribe],
[template.st for ttemplate in tribe]))
cat = Catalog([templ.event for templ in short_tribe])
event_ids = [ev.resource_id.id for ev in cat]
event_id_mapper = dict(zip(
[event.resource_id.id for event in cat],
range(1, len(stream_dict)))) # consecutive IDs
write_event(cat, event_id_mapper=event_id_mapper)
write_phase(cat, event_id_mapper=event_id_mapper)
event_id_mapper_out2 = write_correlations(
cat, stream_dict, extract_len=extract_len, pre_pick=pre_pick,
shift_len=shift_len,
include_master=False,
lowcut=None, highcut=None, min_cc=min_cc, min_link=min_link)
If you can decide on lowcut and highcut at template creation, then you can also skip all waveform processing in write_correlations
.
I worked some on creating differential arrival times dt.cc
files for large problems (up to ~300k events in one run) with write_correlation
last year. If you need to scale up beyond thousands of events, running write_correlations
in parallel will cost a lot of memory the more events are clustered. I have some extra functionality implemented in write_correlation
, e.g., parameters max_sep, max_neighbors, min_max_sep, max_max_sep, use_shared_memory, prepare_sub_stream_dicts, write_dt_from_workers
to limit the number of events that are correlated against the master event by each worker. I have not gotten merged in the EQcorrscan's main yet, but I could do that when I get the time, especially if it's relevant for someone.
Hi Felix,
Thank you very much for the information; it’s been really helpful! I just implemented your suggestions and can now access the waveform snippets more efficiently when needed.
I still have one question. Previously, I was using the multiprocess() function to process my stream of waveforms, aiming to replicate the processing done when constructing the tribe and detecting events. However, since I can now use the client option and process with the tribe construction as you mentioned, I realize I can streamline this.
That said, I still need the picked detections to construct the tribe. To get these picks, I have to run party.lagcalc() with a stream. Currently, I upload the full waveform to pass it to lagcalc(). While this isn’t an issue at the moment, I foresee it becoming a bottleneck as I scale up.
The documentation mentions that this stream can be gappy. I assume the solution would be to upload a processed stream that includes a few seconds of data around the templates and detections, depending on their size and shift lengths. So my question is: is using multiprocess() the right way to process the raw waveforms in a manner that mirrors what is done in tribe.construct() and party.client_detect()? And is uploading this "gappy" processed stream the best approach for passing it to lagcalc()?
Thanks again for your help!
Best, Hugo
Generally you should let party.lag_calc
handle the processing of your streams for you, which ensures the same processing is done as at the detection stage. If you are giving lag_calc a processed stream you should probably also be giving that processed stream to tribe.detect
.
There are lots of ways to do this, and without knowing more about the limitations of your system and the scale of your problem I can't recommend the "best" way to do this.
Update:
I've been working on processing earthquake swarms, which, before template matching, typically consist of 200-2,000 events, with durations ranging from three weeks to two months. After applying template matching, the number of events increases significantly, ranging from 1,700 to 8,000.
I’m running my pipeline on our university's computing cluster, where I primarily use GPU nodes with 198 GB of virtual memory to handle the large data volumes efficiently. The pipeline workflow is as follows: tribe construction -> detection -> declustering -> lag_calc -> relative magnitudes -> write correlations Where any retrieval of segments of the events I leverage the tribe construct functionallity to process and trim the "templates" I need.
To optimize memory usage, I implemented a version of lag_calc that uses the waveform bank client. The code is modeled after the chunk-based loading and processing used in template construction and client detection. This setup has allowed me to successfully process many of my largest datasets without running into memory issues, as it avoids loading the entire waveform dataset at once.
However, I am currently facing a challenge when writing correlations for the largest datasets. The process fails when the node runs out of memory and crashes without producing an error message. The issue persists even when I bypass waveform processing, as the tribe construction functionality has already handled that.
Given the tectonic nature of the swarm, the inter-event distances are quite small, making it difficult to filter the correlations effectively using only epicentral distance. I believe a solution similar to what Felix suggested—a more memory-efficient write_correlations function that can handle a high number of events—would be essential.
Any guidance or suggestions on how to approach this issue would be greatly appreciated!
Kia ora Hugo,
Great, thank you for the update! Nice work on updating the lag-calc. I have similarly been implementing a wavebank/client backed lag-calc, but haven't gone the whole way that you have. It would be great if you have time or the inclination to make a pull request to add that code to EQcorrscan.
For write-correlations - yes, this is an issue. I have been working on a more memory efficient (but slower) implementation here for some near-real-time detection and analysis work. You would be welcome to try to use that and see if it meets your needs.
The Correlator
object is designed to be a consumer of events and will correlate any "new" event added to it with all old events - this effectively generates a triangular matrix of the correlations. All correlations are written as it goes. There are some bugs with not removing event-pairs when there are not enough links that I need to fix, but generally this is working.
Usage is something like:
import logging
from correlator import Correlator
from obspy import read_events
logging.basicConfig(level="INFO")
cat = read_events("...")
correlator = Correlator(
minlink=8,
min_cc=0.2,
maxsep=10.0,
shift_len=0.2,
pre_pick=0.4,
length=2.0,
lowcut=1.0,
highcut=10.0,
interpolate=False,
client=WaveBank("waveforms"), # Or any client-like object - if using a wavebank, the wavebank needs to exist
max_event_links=500, # Limit to correlate to only the n nearest events, can be set to None to run everything
outfile="dt.cc",
weight_by_square=False)
correlator.add_events(cat)
This will write correlations to the dt.cc file as it goes. Event waveforms are processed once per event and stored on disk then read in on-demand when needed for new event correlations.
Any comments greatly appreciated!
Thank you Calum, I will implement this soon and let you know how it goes.
About the pull-request. I don't think my programming skills suits that. but here is the module that I wrote. If you have any observations they are most welcome. https://www.dropbox.com/scl/fi/gzdcc5u0njnbnq1p0kg2t/client_lag_calc.py?rlkey=ri4yp1eeg89zu9hg015az9hso&st=3a3fom4z&dl=0
I suggest testing the above code with a small subset of events first by the way - make sure it is actually doing what you want it to.
Thanks for sharing that link to your code. I think that is somewhat similar to the client_lag_calc
implementation here (added to develop very recently in #589). I think our approaches were slightly different, particularly in that I am getting data for all channels in a party, rather than on a per-family basis. If you have time to look and/or test it would be great to know if this new client_lag_calc
method meets your needs, or if you have suggestions for issues you ran into or improvements that could/should be made.
Hi, following this,
I tried to run the Correlator but there is a problem. It needs to import the following:
from eqcorrscan.utils.clustering import dist_array_km
which lead to an import error:
ImportError: cannot import name 'dist_array_km' from 'eqcorrscan.utils.clustering'
I checked and I do not have this function, also I could not find it in the master branch in the github realease. I thought to skip the function whatsoever to check if it works but it is relevant for the method add_event() any guidance?
Ah, sorry @HGeoAle - that is in the develop
branch of EQcorrscan - there are some things in that branch that should help speed up calculation of cross-correlations as well - you should just be able to pull the develop branch and pip install
that, e.g.:
cd EQcorrscan # To whereever you have the EQcorrscan git repository - clone it from github if you need to.
git checkout develop
git pull origin develop
pip install . -e # Install in editable mode - this should mean that you can make changes to the source without needing to reinstall.
EQcorrscan Version: 0.5.0 ObsPy Version: 1.4.1 NumPy Version: 1.26.4
I'm working on applying the FMF matched filter to a swarm of about 100 events over a month of continuous data. My goal is to create a .dt.cc file with correlations and an events catalog with magnitudes. To do this, I need to process the data and pass the whole processed stream through several steps: lag calculation, relative magnitudes estimation, and writing correlations.
Right now, I load the entire stream into memory to get snippets for each detection using detection.extract_stream(). This approach is faster but requires a lot of RAM. I know I could load only the necessary data for each detection, which would save memory, but that would involve a lot of iterations and potentially redundant data loads, making it time-consuming. Unfortunately, I can't use that strategy for lag_calc, since it needs continuous data rather than snippets.
I'm looking for a more efficient way to handle this processing, ideally one that mirrows the 'client_detect' processing. Is there a way to pass the waveform in smaller segments to ease the memory pressure? Or I’m also thinking a solution could be something like detection.extract_stream() that works with a Wavebank or similar.
I’m also hitting memory issues when running write_correlations. This function is pretty memory-intensive, and I’ve crashed the process more often than I’d like while trying to run it.
the call looks something like this:
Any tips on how to make this function run more smoothly or alternative processing strategies would be super helpful! What is your setup?
Operating System: Rocky Linux release 8.10 (Green Obsidian) Python Version: 3.11.8 EQcorrscan Version: 0.5.0