mspass-team / mspass

Massive Parallel Analysis System for Seismologists
https://mspass.org
BSD 3-Clause "New" or "Revised" License
29 stars 12 forks source link

FoldBy warning and related mysterious behavior #397

Open pavlis opened 1 year ago

pavlis commented 1 year ago

I thought about writing this as an extension to issue 377, but decided there are some different problems I want to preserve for the record. This issue started with this code fragment we've been struggling to get to work for several weeks (Emphasize this is a fragment showing only the relevant section for this issue):

# Both matcher constructors could use defaults for some parameters but it is better to be explicit
source_matcher = ObjectIdMatcher(db,
                                 collection="source",
                                 attributes_to_load=["lat","lon","depth","time"],
                                 prepend_collection_name=True)
channel_matcher = MiniseedMatcher(db,
                                  collection="channel",
                                  attributes_to_load=["lat","lon","elev","starttime","endtime","hang","vang"],
                                  load_if_defined=["loc"],
                                  prepend_collection_name=True)

model = TauPyModel(model="iasp91")
t0=time.time()
# error with undefined source_id without this query
query={"source_id" : {"$exists" : True}}
cursor = db.wf_miniseed.find(query).sort("source_id").limit(10000)

fname="performance-atomictest.html"
with performance_report(filename=fname):
   seisbag = read_distributed_data(db,cursor,npartitions=500)
   seisbag = seisbag.map(set_tref)
   seisbag = seisbag.map(normalize,source_matcher)
   seisbag = seisbag.map(normalize,channel_matcher)
   seisbag = seisbag.map(set_Ptime_atomic,model)
   seisbag = seisbag.map(detrend,type='constant')
   seisbag = seisbag.map(cautious_ator)
   seisbag = seisbag.map(WindowData,-150.0,250.0)
   seisbag = seisbag.map(rtoa)
   seisbag = mspass_dask_foldby(seisbag,key="source_id")
   seisbag = seisbag.map(bundle_seed_data)
   seisbag = seisbag.map(db.save_ensemble_data,data_tag="parallel_save")
   seisbag.compute()
   t=time.time()
print("Time to process data set=",t-t0)
ntrue=0
nfalse=0
for x in result_list:
    if x:
        ntrue += 1
    else:
        nfalse += 1
print("Number still live=",ntrue)
print("Number killed=",nfalse)

I watched this run in real time with dask diagnostics and a system monitor and the results were illuminating. The use of the foldby operator in this workflow is big trouble with a large data set. In fact, until I limited the size of the find output with the limit method and tweeked the npartitions parameter it would just crash the container before the dask diagnostics showed anything. What I learned with the right set of parameters (I won't guarantee the above are the same ones I used in the discovery) was that it was very clear from the "Graph" display in the real-time diagnostics that foldby was not releasing any memory upstream from that line until it had completed. The practical consequences of that is foldby is ill advised for any workflow designed to handle a huge dataset. If the entire dataset cannot fit in distributed memory of the job it won't ever work.

If that were the whole story I could complete this and mark it resolved. The only followup would be a need to add strong warning about the use of foldby in user manual and dostrings for foldby. That is needed, but I also ran the following variant that removes the foldby and doesn't even save anything. I aimed to do a similar real-time monitoring of a workflow that was just a string of map operators. The following is the variant of above I used:

# Both matcher constructors could use defaults for some parameters but it is better to be explicit
source_matcher = ObjectIdMatcher(db,
                                 collection="source",
                                 attributes_to_load=["lat","lon","depth","time"],
                                 prepend_collection_name=True)
channel_matcher = MiniseedMatcher(db,
                                  collection="channel",
                                  attributes_to_load=["lat","lon","elev","starttime","endtime","hang","vang"],
                                  load_if_defined=["loc"],
                                  prepend_collection_name=True)

model = TauPyModel(model="iasp91")
t0=time.time()
# error with undefined source_id without this query
query={"source_id" : {"$exists" : True}}
cursor = db.wf_miniseed.find(query).sort("source_id").limit(30000)

fname="performance-atomictest.html"
with performance_report(filename=fname):
    seisbag = read_distributed_data(db,cursor,npartitions=300)
    seisbag = seisbag.map(set_tref)
    seisbag = seisbag.map(normalize,source_matcher)
    seisbag = seisbag.map(normalize,channel_matcher)
    #seisbag = seisbag.map(set_Ptime_atomic,model)
    seisbag = seisbag.map(detrend,type='constant')
    #seisbag = seisbag.map(cautious_ator)
    #seisbag = seisbag.map(WindowData,-150.0,250.0)
    #seisbag = seisbag.map(rtoa)
    seisbag = seisbag.map(terminator)
    result_list=seisbag.compute()
    #seisbag = mspass_dask_foldby(seisbag,key="source_id")
    #seisbag = seisbag.map(bundle_seed_data)
    #seisbag = seisbag.map(db.save_ensemble_data,data_tag="parallel_save")
    #seisbag.compute()
    t=time.time()
print("Time to process data set=",t-t0)
ntrue=0
nfalse=0
for x in result_list:
    if x:
        ntrue += 1
    else:
        nfalse += 1
print("Number still live=",ntrue)
print("Number killed=",nfalse)

Noting the terminator function is what I used to have the output of compute be a list of booleans:

def terminator(d):
    return d.live

Problem is the above does not run. At partition 253 it hung and just sat doing nothing for a couple hours before I killed it. Now what I didn't say is that I had been working down the set of map calls sequentially adding one at a time. The time immediately before running the above I had had the detrend function call commented out. When the job was rerun with detrend running is when it hung. That is, if I ran the above with the detrend line commented out it ran to completion.

There are several things that have left me scratching my head about this:

  1. I created a version of the notebook that ran the code fragment above replacing the dask map calls to a serial loop over the cursor with the same logic as above. That ran to completion with this output:
    Time to process data set= 210.45681762695312
    Number set live= 26804
  2. The dask logs for both the worker and scheduler give absolutely no hint why the job hung other than the common message: distributed.core - INFO - Event loop was unresponsive in Scheduler for 2659.39s. This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability. There definitely is an instability here but unclear it is a GIL problem. There is a similar message in the worker log.
  3. It is important to note there is NOT a memory problem here. All this ran with minimal memory usage. It is very very clear, by the way, that memory use in a job defined by a sequence of map calls scales inversely with the number of partitions. i.e. using lots of partitions reduces memory use at the cost of slower performance.

I have no idea how to sort out the problem with this workflow? Any ideas to help move this forward would be appreciated. We really need to solve this problem as it is a serious gotcha for anyone wanting to process a very large data set.

pavlis commented 1 year ago

I did some reading to gain a more fundamental understanding of how dask bags work and how they interact with the cluster scheduler. This page is required reading for all MsPASS developers and any user who needs to tune a workflow to work on a cluster with a very large data set.

That page has a lot in it, but I think there is ONE BIG LESSON: for any MsPASS workflow running on a cluster NEVER use the compute method of a bag but instead call the persist method of the distributed client. Perhaps the most fundamental reason relevant to most mspass applications is that the bag compute approach disables bag partitioning and is prone to creating memory faults. The documentation page above complicates that issue by reference to the jargon concept of a dask "futures". My interpretation of the jargon is that when you use compute it always demands the result be returned in memory of the node running the parent python script (always the scheduler node in our standard cluster configuration scripts, which also share db and frontend roles to make this even worse). Persist effectively means the result is returned in a structure that doesn't have to fit in memory but is broken into the number of partitions in the result.

This isn't definitive yet, but use of persist solved the mystery behavior I reported above. To explain what I had to do it is necessary to add a section I omitted earlier. That is, for this test I wanted to use the distributed diagnostics so I created the scheduler client like this:

from dask.distributed import Client
scheduler_client = Client()

then I replaced

result_list = seisbag.compute()

with

scheduler_client.persist(seisbag)

The mystery error went away.

I'm going to continue this test by adding in more functions commented out above. I suspect it will work fine until I get to the foldby line. I don't think I can fix that line with out running this on a cluster with enough memory to hold the result, but it might work because of the partitioning. I'll report that when and if I get there.

pavlis commented 1 year ago

I ran additional tests on this workflow during the day today. Three different runs shed some additional light on this problem to the point I think we might be able to close this issue after others have had a chance to comment.

  1. I first enabled all the algorithms above the foldby operator. It again failed mysteriously leaving no hint why. I disabled the function doing travel time calculations, get_Ptime_atomic, changed the ator call to shift to make t0 = 0, and changed the WindowData range to 100-300 s. It then ran fine. Conclusion: there has long been a troubling message from the taup function about threading we probably cannot ignore. It may not be possible to do travel time calculations in parallel with obspy's calculator as we are using it.
  2. With the above change I then enabled everything else. That is, foldby and bundle_seed_data. It did about 280 partitions and aborted. Digging in the log files I think something threw an exception. I was watching and it did not fail on a memory fault. An important change with using the persist method of the dask.distributed client is the entire DAG was not kept in memory. I'll show plots of this below.
  3. I reran the full workflow with 20000 inputs and 200 partitions. It then ran to completion.

Below are a series of snapshots of the the "Graph" window of dask diagnostics I took during this run. Note this was my ubuntu desktop with limited memory compared to any hpc system. These are a progression. The third one shows what the graph display looks like when the workflow is killed by the scheduler. The last one is for the 20000 case as the 30000 case never got the point illustrated. The color scale for the nodes is visible only on the first screenshot:

dask_diagnostic_graph1_foldby dask_diagnostic_graph2_foldby dask_diagnostic_graph3_foldby

dask_diagnostic_graph5_foldby

The summary point is this really clarifies that on a cluster NEVER use compute but use the distributed.client persist method.

wangyinz commented 1 year ago

Oh, that is very interesting. I thought we ran out of memory on the workers, but here it is showing that it is really when the client collecting results from the workers, it ran out of memory. Or, maybe we have both issues? @Yangzhengtang could you please try this out and comment? Thank you!

pavlis commented 1 year ago

For the record, there has been a lot of activity on this issue behind the scenes since the last post. @Yangzhengtang has been running tests on how dask handles the foldby issue raised above. That isn't completely resolved, but I do want to preserve a solution for many workflows.

Many forms of seismic data processing can be naturally bundled into what we've called "ensembles" or what seismic reflection people would call a "gather". The foldby issue above, in fact, was created by the need to bundle a set of atomic TimeSeries objects into a bundle we call a TimeSeriesEnsemble. The fundamental problem that seems to arise in the use of a FoldBy operation as above is that FoldBy cannot know if the input bag has any structure. Hence, it has little choice but to accumulate the entire bag before it tries to rearrange it into the bundles that form a type of reduce operation. I suspect strongly that is why this is causing memory algorithm as this particular operation is not a memory reduce. For most applications of reduce the output of reduce, as the name suggests, is smaller. In the application above all we are doing is bundling the same data into groups and the output is as big or slightly bigger than the input.

The solution I want to post here is that we should advise our users that if you need to do ensemble processing for large data sets (this isn't an issue for a data set that fits in memory) they should always use the database to define the groupings and form a bag of ensembles as the working data set.

Here is a (mostly) working example that illustrates the idea.

from mspasspy.algorithms.window import WindowData
from mspasspy.algorithms.basic import ator,rtoa
from mspasspy.algorithms.snr import broadband_snr_QC
from mspasspy.ccore.seismic import Seismogram
from mspasspy.ccore.algorithms.deconvolution import MTPowerSpectrumEngine

def load_ensemble_by_source_id(srcid,db):
    """
    Wrapper function to load and ensemble driven by source id defined by input srcid.  
    Queries wf_miniseed using srcid, loads the data using the ensemble reader in db, and 
    returns the result.

    ALWAYS loads data normalizing source and channel collections during the read.
    """
    normalize_list=["channel","source"]
    query={"source_id" : srcid}
    cursor = db.wf_miniseed.find(query)
    d = db.read_ensemble_data_group(cursor,normalize=normalize_list,collection="wf_miniseed")
    print("After reader ensembled state is ",d.live())
    return d

def require_Ptime(e):
    """
    Scans ensemble e and kills any datum for which the required attribute "Ptime" is not defined.
    """
    for d in e.member:
        if not d.is_defined("Ptime"):
            d.kill()
    return e

def window_ensemble(e):
    """
    Cuts down the ensemble to window frozen inside this function.  Silently ignores dead members.
    """
    tstart=-150.0
    tend=250.0
    if e.live():
        for i in range(len(e.member)):
            d=e.member[i]
            if d.live:
                # Assuming we call require_Ptime before calling this so we don't have to test for Ptime
                d=ator(d,d["Ptime"])
                d=WindowData(d,tstart,tend)
                d=rtoa(d)
                e.member[i]=d
    return e

def save_data(e,db,dir="wf3c"):
    """
    Saves final enemble to wf3c in file name constructed from source id. 
    Returns pair length of ensemble and dfile where results were stored.
    """
    # should really have the logic here to assure dir exists but not handled for now
    srcid="UNDEFINED"
    dfile="UNDEFINED.dat"
    print("In save_data ensemble state=",e.live())
    if e.live():
        for d in e.member:
            if d.live:
                if d.is_defined("source_id"):
                    srcid = str(d["source_id"])
                    break
        dfile=srcid+".dat"
        db.save_ensemble_data_binary_file(e,dir=dir,dfile="dfile")
        nmem=len(e.member)
    else:
        nmem=0

    return [nmem,dfile]
from dask import bag
from mspasspy.algorithms.bundle import bundle_seed_data
from mspasspy.algorithms.signals import detrend
from mspasspy.algorithms.resample import ScipyDecimator
from mspasspy.ccore.algorithms.basic import TimeWindow
import time
nwin = TimeWindow(-150,-10)
swin = TimeWindow(-5.0,15.0)
t0=time.time()
# test to query for data with source_id defined
query={"source_id" : {"$exists" : True}}
srcids = db.wf_miniseed.distinct("source_id",query)
mybag = bag.from_sequence(srcids,npartitions=len(srcids))
mybag = mybag.map(load_ensemble_by_source_id,db)
mybag = mybag.map(require_Ptime)
mybag = mybag.map(detrend,type='constant')
mybag = mybag.map(window_ensemble)
mybag = mybag.map(bundle_seed_data)
mybag = mybag.map(save_data,db)
scheduler_client.persist(mybag)
mybag.compute()
t=time.time()
print("Elapsed time for main processing loop=",t-t0)

Two key things about this workflow:

  1. The working data objects are all ensembles, which in this case are pretty large.
  2. An important issue is to make the number of partitions in the bag == number of ensembles in the data set. That is done in the line that calls the from_sequence method of bag.

Here is a view of dag from dask diagnostics while this workflow was running on 19 ensembles of a test data set subsetted from usarray2012: dask_screenshot

This runs with no memory problems whatsoever and has the advantage of being naturally bundled.