mspass-team / mspass

Massive Parallel Analysis System for Seismologists
https://mspass.org
BSD 3-Clause "New" or "Revised" License
30 stars 12 forks source link

parallel reading of ensembles #320

Open pavlis opened 2 years ago

pavlis commented 2 years ago

I think we have two problems we need to deal with for reading large ensembles in parallel. This should perhaps be put into two issues pages, but they are closely enough related I elected to put them together. The two issues are:

  1. There may be an issue with serialization of MongoDB Cursor objects.
  2. Any handling of large ensembles is prone to a memory error.

For item 1, the evidence is this. Running the test we discussed on the huge lower 48 broadband array from 2012 I used this parallel construct:

import dask.bag as daskbag
dataset = daskbag.from_sequence(srcidlist)
dataset = dataset.map(srcid_query)
dataset = dataset.map(db.wf_miniseed.find)
#dataset.map(db.read_ensemble_data,collection='wf_miniseed',normalize=normalize_list)
#dataset = dataset.map(window_ensemble)
#dataset = dataset.map(bundle_seed_data)
#dataset = dataset.map(lambda ens : len(ens.member))
x=dataset.compute()
print(x)

where I commented out the lines that do real processing to limit the workflow to only the call to MongoDB find. (A number of things here are defined earlier in the notebook.) Note I know the syntax is right and the data are intact as a serial test on few ensembles runs. When I run the above I get this error:

---------------------------------------------------------------------------
KilledWorker                              Traceback (most recent call last)
<ipython-input-6-77a914a1235f> in <module>
     12 #dataset = dataset.map(bundle_seed_data)
     13 #dataset = dataset.map(lambda ens : len(ens.member))
---> 14 x=dataset.compute()
     15 print(x)

/usr/local/lib/python3.6/dist-packages/dask/base.py in compute(self, **kwargs)
    281         dask.base.compute
    282         """
--> 283         (result,) = compute(self, traverse=False, **kwargs)
    284         return result
    285 

/usr/local/lib/python3.6/dist-packages/dask/base.py in compute(*args, **kwargs)
    563         postcomputes.append(x.__dask_postcompute__())
    564 
--> 565     results = schedule(dsk, keys, **kwargs)
    566     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    567 

/usr/local/lib/python3.6/dist-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2652                     should_rejoin = False
   2653             try:
-> 2654                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2655             finally:
   2656                 for f in futures.values():

/usr/local/lib/python3.6/dist-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1967                 direct=direct,
   1968                 local_worker=local_worker,
-> 1969                 asynchronous=asynchronous,
   1970             )
   1971 

/usr/local/lib/python3.6/dist-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    836         else:
    837             return sync(
--> 838                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    839             )
    840 

/usr/local/lib/python3.6/dist-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    349     if error[0]:
    350         typ, exc, tb = error[0]
--> 351         raise exc.with_traceback(tb)
    352     else:
    353         return result[0]

/usr/local/lib/python3.6/dist-packages/distributed/utils.py in f()
    332             if callback_timeout is not None:
    333                 future = asyncio.wait_for(future, callback_timeout)
--> 334             result[0] = yield future
    335         except Exception as exc:
    336             error[0] = sys.exc_info()

/usr/local/lib/python3.6/dist-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

/usr/local/lib/python3.6/dist-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1826                             exc = CancelledError(key)
   1827                         else:
-> 1828                             raise exception.with_traceback(traceback)
   1829                         raise exc
   1830                     if errors == "skip":

KilledWorker: ("('srcid_query-find-8fb1622b3e8749ce9583e74830492b15', 22)", <Worker 'tcp://172.17.0.2:40135', name: tcp://172.17.0.2:40135, memory: 0, processing: 107>)

Noting this leaves this message in the dask log:

distributed.core - INFO - Starting established connection
distributed.protocol.pickle - INFO - Failed to deserialize <memory at 0x7f4eb502a588>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/distributed/protocol/pickle.py", line 75, in loads
    return pickle.loads(x)
  File "/usr/local/lib/python3.6/dist-packages/pymongo/collection.py", line 278, in __getattr__
    full_name = _UJOIN % (self.__name, name)
  File "/usr/local/lib/python3.6/dist-packages/pymongo/collection.py", line 278, in __getattr__
    full_name = _UJOIN % (self.__name, name)
  File "/usr/local/lib/python3.6/dist-packages/pymongo/collection.py", line 278, in __getattr__
    full_name = _UJOIN % (self.__name, name)
  [Previous line repeated 321 more times]
RecursionError: maximum recursion depth exceeded while calling a Python object
distributed.protocol.core - CRITICAL - Failed to deserialize

The memory problem surfaces when I use this variant that came from our user's manual parallel processing section examples:

# dask test on 5 ensembles
daskclient = mp_client.get_scheduler()
import dask.bag as daskbag

def read_common_source_gather(db,collection,srcid):
  dbcol = db[collection]
  query = {"source_id" : srcid }
  # note with logic of this use we don't need to test for
  # no matches because distinct returns only not null source_id values dbcol
  cursor = dbcol.find(query)
  ensemble = db.read_ensemble_data(cursor, collection=collection)
  return ensemble

#dataset = daskbag.from_sequence(srcidlist[0:3])
dataset = daskbag.from_sequence(srcidlist)
dataset = dataset.map(lambda srcid : read_common_source_gather(db, "wf_miniseed", srcid))
#dataset = dataset.map(window_ensemble)
#dataset = dataset.map(bundle_seed_data)
dataset = dataset.map(lambda ens : len(ens.member))
x=dataset.compute()
print(x)

That job with the processing steps commented out seems to run ok. It has been running for about 30 minutes anyway without problems so think it is ok. I am going to kill the job and restore the 0:3 limits to see if it will actually finish. I'll do an update to this page when that finishes.

The memory problem surfaced when I ran this workflow with the map calls to window_ensemble and bundle_seed_data enabled (note they are commented out above). Note window_ensemble is a custom function that is the following:

def window_ensemble(ensemble,winstart = -200.0,winend = 200.0):
    i = 0
    for d in ensemble.member:
        if d.is_defined("source_id") and d.is_defined("channel_id"):
            detrend(d,type="demean")
            stalat=d['channel_lat']
            stalon=d['channel_lon']
            srclat=d['source_lat']
            srclon=d['source_lon']
            depth=d['source_depth']
            otime=d['source_time']
            georesult=gps2dist_azimuth(srclat,srclon,stalat,stalon)
            # obspy's function we just called returns distance in m in element 0 of a tuple
            # their travel time calculator it is degrees so we need this conversion
            dist=kilometers2degrees(georesult[0]/1000.0)
            arrivals=model.get_travel_times(source_depth_in_km=depth,distance_in_degree=dist,phase_list=['P'])
            # Arrivals are returned in time order 0 is always the first arrival
            # This computes arrival time as an epoch time and shifts the data to put 0 at that time
            d.ator(otime+arrivals[0].time)
            #d=WindowData(d,cutwin)
            ensemble.member[i]=WindowData(d,winstart,winend)
            #print(d.t0,":",d.endtime())
        else:
            ensemble.member[i].kill()

        i += 1

    return ensemble

So that function reduces the ensemble size by about a factor of 10 as the original data files I'm working with are about 4000 s long. I don't know at this point which of these functions is causing the memory fault, but that is actually beside the point. This experience shows we have a serious issue here with how MsPASS handles memory. Our users will experience this kind of mysterious error. As a minimum I think we need to develop some simple command line tools that can be used to compute and report ensemble sizes that would help user's know if they needed to worry about memory usage. To do this right they would need a way to know how many workers the scheduler would spawn. Is there a way to ask dask and spark how many workers the scheduler would use? If so, such a tool would not be hard to write. The rest I could do easily, but wouldn't know how to get that number (number of workers).

pavlis commented 2 years ago

Update. When I ran the second version of this workflow (the one calling only the read_common_source_gather) on a small subset of the data it ran for a long time and died with a similar "killed worker" exception in the notebook. However, the dask log file now shows a ton of errors like the following:

distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 7.33 GB -- Worker memory limit: 10.45 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 7.33 GB -- Worker memory limit: 10.45 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 7.34 GB -- Worker memory limit: 10.45 GB
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk.  Perhaps some other process is leaking memory?  Process memory: 7.35 GB -- Worker memory limit: 10.45 GB

This shows the serialization issue with cursors is an issue I certainly don't understand. It also shows the memory problem in dealing with large ensembles. I can calculate now what the issue is. There are about 3500 channels in the ensembles being read. Each is roughly an hour long (3600 s) at a nominal sample rate of 40 sps. We store all sample data as doubles so the size per ensemble is 35003600408=about 4 GB. The error log suggests we have a memory copy problem as the 24 GB exceed process memory posted of 7.34 GB.

There is a simple workaround for this particular workflow. The simplest is to just run an atomic demean, filter, window, and save workflow on this dataset. Then I can run bundle on the wf_TimeSeries data and I shouldn't have a memory problem.

So, I think it is clear we have some things we need to deal with here:

  1. We should design and implement a command line tool to advise on the memory limits needed to run ensemble processing or any process for that matter. i.e. the tool should report the minumum memory required to process of TimeSeries, Seismogram, TimeSeriesEnsemble, and/or SeismogramEnsemble for a specified query type on a specified collection.
  2. We need to consider some inline tools to simplify ensemble processing. In particular, @wangyinz worked out the incantation for creating ensembles with a grouping operator in dask (I think it was called FoldBy?). I think we need to move that up the development stack. The documentation warns that can be slow, but slow beats abort from a memory fault.
  3. If we cannot solve this cursor problem more directly, we must make it very very clear in the documentation that one needs to use a variant of the read_common_source_gather function above. If that is the only solution, in fact, we need to generalize that function in a way that avoids some fundamental problem here. Note I found some conversations on the web on this topic, but I was not able to fully digest all the dialogue and apply it to our context.
wangyinz commented 2 years ago
  1. Yes, that kind of tool will be helpful as a reference, but the core problem is that we should advise against using Ensembles as the atomic data type whenever possible. To my understanding, Dask and Spark are not good at handling large objects. The overhead is too big.
  2. I agree. We should use foldby for such an operation. However, I think the issue in this particular case is slightly different. The core problem here is that the pymongo's cursor objects are unpicklable. The read_common_source_gather example is simply combining two steps to avoid using cursor as the intermediate object. Foldby will not solve this issue. It is only the last workaround above can be simplified by the foldby. So, I think if we want to support a workflow that starts with ensembles, we should support an optional query argument in read_ensemble_data.
  3. I vaguely remember we ran into the cursor problem before. It is obvious that the cursor is unpickable, and pickle will not be supported in the future neither as can be seen from this discussion. As I mentioned above, the new argument in read_ensemble_data is pretty much the "variant of the read_common_source_gather function".