mspass-team / mspass

Massive Parallel Analysis System for Seismologists
https://mspass.org
BSD 3-Clause "New" or "Revised" License
30 stars 12 forks source link

asyncio support #554

Open miili opened 1 month ago

miili commented 1 month ago

Dear mspass team,

to leverage true HPC computing and juggle I/O, CPU and GPU work asyncio support is required.

What are your plans to incorporate native Python coroutines?

Best, M

pavlis commented 1 month ago

Your question is, unfortunately, a bit vague so I have to somewhat guess what the concern is. As I understand it Python coroutines you refer to are a method of doing asynchronous io in python. There is no question that balancing I/O and CPU time is critical to optimize performance of a workflow handling large volumes of data. I first recommend you read this section of the MsPASS User Manual.

A second point I'd make is that because MsPASS is (currently at least) focused on the map-reduce model of parallel computing, I/O is mostly treated as a scheduling problem for the parallel scheduler and the operating system scheduling. It is our experience that most seismic processing workflows reduce to parallel pipelines with reads and head and writes at the tail of the pipeline. If the compute tasks are lightweight such jobs get I/O bound quickly. Howeer, the more you put the pipeline the more negligible I/O becomes. As the manual section above notes another key thing is avoid database transactions within any workflow a if a job blocks on a database transaction it is easy to create a completely I/O bound job from even a lightweight task.

Finally, you might want to read the new documentation for dask on what they call "Futures" in dask distributed found here. Futures are an abstraction of compute tasks submitted asynchrously to a cluster. They are used, in fact, under the hood in the map-reduce approach we have focused on using in MsPASS.

Long answer I hope was helpful. Happy to continue this dialogue.

miili commented 3 weeks ago

Looking at the pipelines I see the parallel handling through high-level processing pipelines.

The modern async/await syntax offers fine-grained threading control of jobs through an event loop, where I/O, CPU and GPU work can be scheduled transparently and with ease. This leads to efficient utilization of the hardware in single node and HPC environments. These are Python-native futures in a modern syntax.

I would like to make you aware that libmseed is a bottleneck for large datasets. The library performs very badly and limits I/O to ~100 MB/s (reading) due to bad memory management and single threaded decompression. Forturnately loading scales linearly when individual files are loaded in parallel. I suppose writing is also very bad, but we didn't that.

pavlis commented 2 weeks ago

Thanks for letting us know about this likely future standard in python. That could help a lot of pure python codes. Two points I'd like to make as a followup though:

  1. You should realize that the map-reduce paradigm using MsPASS with spark/dask is an abstraction of parallelization. The dask/spark schedulers handle threading and node-parallel processing more or less the same. I suspect strongly that if the new standards noted are adopted it could enhance dask and/or spark by allowing them to improve the handling of mixed threads and node parallelism. Today that is a nasty fundamental computer science problem that is far from solved. It is particularly nasty in python due to the GIL problem that makes threading impossible in python without some nasty, nonstandard constructs that are fragile for maintenance. Point is I think this is worth watching but addressing it is way outside the scope of MsPASS development but lands squarely in the laps of the developers of dask/spark.
  2. You are right that older libmseed library was single threaded. The newer release, however, does have a multithreaded C function. We used it in MsPASS but realized it it was pointless because of how dask/spark handle threading. Instead we recommend a usage like that given at the recent Earthscope short course found in this notebook. The following block of code will run the libmseed indexing, which is the nasty step that touches every byte of ever file, in parallel in a generic way:
import os
import dask.bag as dbg
# remove the comment below if you need to restart this workflow 
# at this point c
#db.drop_collection('wf_miniseed')
# Note this dir value assumes the wf dir was created with 
# the previous command that also downloads the data from AWS
current_directory = os.getcwd()
dir = os.path.join(current_directory, 'wf')
dfilelist=[]
with os.scandir(dir) as entries:
    for entry in entries:
        if entry.is_file():
            dfilelist.append(entry.name)
print(dfilelist)
mydata = dbg.from_sequence(dfilelist)
mydata = mydata.map(db.index_mseed_file,dir=dir)
index_return = mydata.compute()
pavlis commented 2 weeks ago

Postscript to previous: In MsPASS you should realize we treat indexing of miniseed files as a fundamentally different operation than reading them. We, in fact, recommend normal use with large data sets should minimize the number of files and organize data into files grouped in a way appropriate for the data being analyzed. e.g. event-based processing should have all the data for an event in one file. One of the best worst things you can do on an HPC system is create the equivalent of a set of SAC files with one signal per file. You can crash an entire HPC cluster if you try to read millions of files in parallel if the files or stored in a Lustre file system. Read the tutorials from the Earthscope short course I pointed to above where we discuss some of these issues further. The final point is that our reader called read_distributed_data will run in parallel once the indexing is done. There the issue of multithreading is irrelevant as each reader will appear to the operating system as just another program issuing a read request. Modern operating systems are extremely good at sorting out how to optimally respond to multiple pending read requests from multiple programs. It happens many times per second on any running computer.