mspass-team / mspass

Massive Parallel Analysis System for Seismologists
https://mspass.org
BSD 3-Clause "New" or "Revised" License
28 stars 11 forks source link

raw data handling #74

Open pavlis opened 3 years ago

pavlis commented 3 years ago

I want to start a new thread on what I'm calling raw data handling. The topic here is how to efficiently handle organizing data assembled from a range of sources to prepare that data for efficient handling as a coherent data set. At this time I know of the following common ways people get data for assembly into a coherent data set:

  1. They generate data themselves without any outside restriction. This can be one of two very different things: (a) data collected in house with local equipment, and (b) simulation data. These data can be a variety of formats.
  2. Data downloaded from a national/international data center. This usually means SEED or miniseed data, but could be something else like segy data from Australia which has open seismic reflection data.
  3. Data collected with IRIS-PASSCAL equipment that is available only locally and subject to "embargo" for a time period.

Because of this complexity I think it is essential to have a place to assemble raw data in mixed formats in a separate collection. In a previous thread (I don't know how to do an internal link in github - sorry) I proposed a rawdata or raw_data collection that is a variant of wf for building an index to a large data set. I think that model is still correct. For css3.0 people raw_data will act much like wfdisc, but with greater flexibility because MongoDBs schema is infinitely extendible.

What I'd like to start for discussion today is the issue of linking raw data to station and event metadata. I built the following simple wiring diagram for the station components: station_metadata

Some points about this proposed schema:

We should discuss this in detail during our weekly meeting on Thursday.

pavlis commented 3 years ago

Worked on this for the past few hours and want to suggest a few changes. Here is the revised schema: station_metadata_v2 There are a few key differences here:

Note there is a prototype in mspasspy.db for site, but it is (a) incomplete, and (b) may not be as robust as it should be. In particular, I think it has a bug in that if you give it a file with multiple stations it may not work right. I don't remember how I used it before, but that needs to be handled one way or the other. Working on improving that between now and Thursday.

pavlis commented 3 years ago

I have implemented what is described above by modifications to the Database class (in db.py). There was an existing method called save_inventory that was close to what we needed. All I did was change it to also add entries for the new channels collection. I also cleaned up a few potential bugs, but more importantly I made a unilateral decision to not use the keys site_lat, sitelon, etc to drop the site. We will still need a qualifier like site_lat and site_lon in wf to avoid confusion with source coordinates, but that will be easy to implement in a new function to match site or channels data to wf. I plan to do that the same way for wf and the raw_data collections. Not sure I'll get that done before our call tomorrow.

An issue this brings up we should discuss tomorrow is the design of the Database class in db.py. Some things in there should be depricated or turned into standalone functions. We might also want to consider a design where we have different classes for functionality that might be somewhat orthogonal to standard processing. I'm thinking the Inventory and Catalog stuff may be of that class - largely static data assembled for preprocessing but not needed within most processing workflows. Not sure if that is a good idea, but worth considering.

pavlis commented 3 years ago

Was reflecting on this before our call today and had some ideas I want to suggest we consider:

  1. I think the "rawdata" collection name above is too generic. I think a better name for the intended use is "seeddata" or "seed_data" to emphasize the purpose is to index passive array miniseed data. The format field would not be necessary then either.
  2. I suggest we consider other raw formats as different similar collections. e.g. one obvious choice is "wfdisc" that would do a simple one-to-one map of css3.0 wfdisc table attributes to document keys for this collection. An antelope reader or writer could then be split off easily from mspass without concerns of licensed software.
  3. This model would make formats conversions easily isolated. e.g. we could define segy collection or seismic_unix collection for active source data.
pavlis commented 3 years ago

I want to record a problem we will need to deal with at some point, but which I will punt down the road for now.

the issue I've uncovered is that obspy's mass downloader, which I am using for retrieving the usarray data set we are using for large scale testing, has a somewhat obnoxious behavior in handling station data. It becomes difficult to assure you actually get all the time epochs with changes. The reason is this quote from their documentation here:

_Note If the StationXML file already exists, it will be opened to see what is in the file. In case it does not contain all necessary channels, it will be deleted and only those channels needed in the current run will be downloaded again. Pass a custom function to the stationxmlpath argument if you require different behavior as documented in the following section

Requesting all the usarray data in a single run would not be prudent for a whole lot of reasons so I did requests by year. I did not take on the problem of defining a custom function this note suggests. As a result, I find the union of my xml files have holes. I know this because when I try to run the prototype cross referencing function to link the seed data to the site and channels collections I get some failures. I can look that collections data and know why.

I know of two solutions to this for different contexts:

  1. For this specific problem I could probably solve a lot of problems by using antelope's db2stationxml and loading the resulting xml file(s) to mongo.
  2. We could design a special web services function that would to a custom web service query to try to fill holes found by indexing as I did here.

Before I take on any of that I want to see how all this behaves when I do a larger scale test with a whole year of data.

pavlis commented 3 years ago

I want to get feedback from you guys before I push the next rev of the Database class that implements the ideas on this thread for raw data.

First, here is what the revised wiring diagram looks like for the site and channels collection:

station_metadata_v3

Note two key differences from above:

  1. I made a decision to call the raw data collection to store the indexing data seedwf. That emphasizes that the schema is specialized for seed data. I recommend we plan to use different collection names for data with a different structure. (e.g. we could have a segy collection for active source data if this develops and we can get funding to do something like that. The OBS data support we have proposed, for example, could use that approach).
  2. I moved the pickle serialization of response to be accessed by the (very verbose) key "serialized_channel_data". I thought about using something simpler like "response" but elected to not do so. The reason is that stationxml and obspy's inventory constructed from such data contain a lot of other metadata that might be useful to preserve. It turns out that their actual "response' class is a member of their channels class. Hence, if a user wants to do a response correction they would need this algorithm:
    curs=find_channel(db,args)  # custom function to find a specific channel probably needs net:sta:chan:loc:time
    # assume curs is a MongoCursor returned by a find
    c=curs.next()
    respdata=c['serialized_channel_data']
    c=pickle.loads(respdata)
    r=c.response

    or something like that. Note that the reason we have to do it this way instead of saving the full inventory object in site is two fold:

  3. We would need to require a custom web service request to ask for the complete metadata definition for each station by name for all channels. The method I'm using does not structure the xml files that way. The metadata would be incomplete if I used the files I have as is.
  4. The inventory would be subject to bloat unless we used their "select" filter on the inventory object for each and every entry. That would be tedious and error prone. My original test put the full inventory read for a year of data for the download for every site entry - not right.

So, I think this is a better solution.

Finally, we are creating an attribute name mismatch with obspy I had not noticed before. Our currrent schema documentation defines the keys "net","sta","chan", and "loc". The obspy Trace object stats used "network", "station", "channel", and "location" for these keys. I am using the short form as illustrated in the wiring diagram. The main argument is it maps cleaner into site_lat, site_lon, etc. which we put in wf for processing and we have to translate those already. i.e. site and channels are auxiliary data to processing and would not normally be hit. I think this is a minor issue, but it should be addressed right now. Do we want to use the short or long names for generic (i.e. lat versus latitude and similar) in site and channels. I'm inclined to use the short names, but it is an open question.

JiaoMaWHU commented 3 years ago

I can't understand all the ideas here but the design of the find_channel function looks good to me.

wangyinz commented 3 years ago

Short name works fine with me, and that is also what's defined in CSS3.0. If I were to guess why ObsPy used the long ones, it is probably following the zen of Python - explicit is better than implicit. I don't think we have to follow that thought.

pavlis commented 3 years ago

A few details about a more final implementation of these ideas worth putting here for the record:

  1. Following our conference call today I changed source_id, site_id, and chan_id to be the string representation of the ObjectId ("_id" key) of the document holding the original record. i.e. every document in the "site" collection will a "site_id" attribute set as str(oid) where oid is the ObjectId pyobject representation of "_id".
  2. I made an executive decision to make the collection "channel" instead of "channels" for consistency - source and site are singular. The alternative would have been to use "sites" and "sources" but I chose singular because for some of us "site" is memorable - a css3.0 table name.
pavlis commented 3 years ago

The experience downloading usarray data with obspy's web services has revealed some issues I'd like to put down here for the record. These can and should fold into our database design. Here are some things I want to put in here for the record in no particular order:

I have workarounds for all of these problems for the current problem, but the experience should be kept in mind as we move from prototype to supported procedures for handling raw data. Initially we should probably just punt this problem as a "user problem" and just supply what tools we can with appropriate documentation to help them do appropriate preprocessing. First step is appropriate documentation. That is, I think, two main things: (1) a raw data processing or "preprocessing" reference page, and (2) a good jupyter notebook tutorial showing the use of tools we end up supporting.

pavlis commented 3 years ago

In developing an ensemble reader I immediately hit a show stopping performance issue that is informative and illustrates why we want to do testing on a large data set. There are two issues that require different solutions:

  1. It comes as no surprise to me, at least, that the algorithm I tried to implement immediately hit a memory barrier. The algorithm is naive because it has to deal with an inadequacy of obspy's seed reader that eats up any seed file and returns a Stream object. When the seed file is the concatenation of the somewhat extreme data request I used this is a problem. I created event files, which contain all the data for one earthquake. For a typical 2012 event that is about 3800 channels for my request of all broadband sensors in the lower 48. That makes seed files that are around 650 Mbytes. Even a decade go just eating a file that size would present an issue, but realize it is even worse when you the reader creates a Stream object because the samples in a numpy array are doubles. Hence, 650 Mb of data quickly becomes at least 5.2 Gb. My naive algorithm tried to make a second copy by calling Trace2TimeSeries on each successive member of the Stream object pushing the result to a TimeSeriesEnsemble. That process brought my poor desktop machine to a standstill as it started to thrash as it ran out memory.
  2. On top of the memory issue there was a clear performance problem I noticed before it slowed to a halt from memory problems. The reason is obvious from this loop in the python code for Trace2TimeSeries:
    # Here, we do not use append, which is equivalent to the C++ method
    # push_back, for efficiency.
    for i in range(ns):
        dout.s[i] = trace.data[i]

    In spit of the comment in the code I think this process is not that efficient. I can't prove that yet as I don' have timing data at that level of granularity at this point, but think that is a reasonable hypothesis. I suspect this is a classic example of a loop best not done in python when ns is large (in this case around 160000.

To give you an idea how bad this is the seed reader could eat up this file in about 2 or 3 seconds (do no have precise timing data, but that is the right order of magnitude). The ensemble loader ran for more than 30 minutes before it pretty much brought the machine to a standstill. Not sure if it will ever finish.

Fixing item 1 clearly requires changing the algorithm. Fixing item 2, I think, could be done by some more clever use of a lower level copy. I'm not sure what the solution there is.

For item 1 I wonder if there is a way to have the algorithm immediately redefine the enemble data as an RDD and let spark handle breaking it up for subsequent processing? I do not understand spark well enough yet to know how to proceed. Lacking that the only way I could see to fix that issue to write the TimeSeries objects immediately to gridfs to reduce memory bloat.

This also shows a general need to automatically handle large data objects that would be problematic if we try to eat them up like this without a sanity check.

pavlis commented 3 years ago

Update - that program eventually crashed this computer. Ubuntu PC with 20 GB of RAM and a minimal virtual memory space. Didn't see it crash, but pretty clear that is what happened.

pavlis commented 3 years ago

Another update - The good news is I'm unable to recreate the memory eating problem and resulting system crash. This thing is definitely a memory pig, but in a retry with system monitor running I watched it grow to about 70% of the limit of 16Gb for this toy machine when it finished processing. I suspect I may have hit a bug in spyder3 I was using for running the code with break points. It is kind of a memory pig itself and seems to hog memory to allow debugging. Trying again running my test program under just a python command run and I think it will only get to just over 50% of memory use based on what I saw in the last run. So, I the big memory issue won't have to be dealt with immediately, BUT it still shows the need for some kind of sanity check on inputs.

The performance issue of the python code remains. The run noted above under just python instead of spyder3 took 916 s to process 3827 TimeSeries objects or slightly less than 0.24 s per TimeSeries. That neglects read time of just a few seconds which demonstrates how awful this is in comparison to the miniseed reader that has to to a lot more complicate manipulations but is based on a C library. Useful data - how can we speed this short of writing a wrapper function in C?

pavlis commented 3 years ago

I think the performance problem with Trace2Seismogram is easily solved by adding a new constructor for TimeSeries with this signature:

TimeSeries(const Metadata& md, const vector<double>& d);

or

TimeSeries(const Metadata& md, const double *d, const int nsamp);

I think the std::vector container maps more cleanly into a numpy array, but I'm not sure. Both would require handling ProcessingHIstory independently in the Trace2TimeSeries code. I think the new decorators have a way to handle that already - is that correct?

I'm going to proceed with this approach unless one of you have another way that is purely pythonic.

pavlis commented 3 years ago

When I sat down to do this, I realized there was probably a cleaner way to do this and that is in the pybind11 wrappers. I found your example for a customized constructor for Metadata that had this construct:

.def(py::init([](py::dict d) {
      auto md = new Metadata();
      for(auto i : d) {
        if(py::isinstance<py::float_>(i.second))
          md->put(std::string(py::str(i.first)), (i.second.cast<double>()));
        else if(py::isinstance<py::bool_>(i.second))
          md->put(std::string(py::str(i.first)), (i.second.cast<bool>()));
        else if(py::isinstance<py::int_>(i.second))
          md->put(std::string(py::str(i.first)), (i.second.cast<long>()));
        else if(py::isinstance<py::bytes>(i.second))
          md->put_object(std::string(py::str(i.first)), py::reinterpret_borrow<py::object>(i.second));
        else if(py::isinstance<py::str>(i.second))
          md->put(std::string(py::str(i.first)), std::string(py::str(i.second)));
        else
          md->put_object(std::string(py::str(i.first)), py::reinterpret_borrow<py::object>(i.second));
      }
      return md;
    }))

That suggests we might be able to replace Trace2TimeSeries with a construct you could use in python this:

d=function(x) # some function that returns an obspy Trace object for illustration
x=TimeSeries(d)

To handle history a function could call the load_history mechanism in TimeSeries.

I haven't hacked ont his,but the problem I see is how to define what an obspy Trace object is as the argument on the C++ wrapper. I think pybind11 has some kind of prebuilt bindings for py::dict that makes the Metadata example work. Is that the way you understand it? If not, do you know a trick to put the right incantation in the arg list to define a Trace generating constructor for TimeSeries?

If that isn't possible it would, I think, be pretty easy to build a wrapper with this signature:

.def(py::init([](py::dict d,double *)  

or whatever the right arg definition is to pass a numpy array pointer. If either of these are to prove feasible I need help on two issues you guys may know - maybe I can avoid some hacking time:

  1. What is the mechanism to pass a numpy data vector to C++ through pybind11? This is complicated by the fact we cannot just pass a pointer and use it directly because of collisions in memory management. i.e. no matter what we do the data in the numpy has be undergo a deep copy the C++ std::vector s container in TimeSeries. Reminder that the fundamental problem is I found doing that operation in python was an unambiguous performance problem.
  2. Can we pass the "Stats" directly as something like py::Stats or do we have to convert it to a dict first and use the `py::dict`` wrapper as used above? As an aside why the obspy authors chose to create the "Stats" class for their equivalent to Metadata instead of just using a dict is beyond me. Seems to me they should have implemented Trace as a subclass of dict, but their design seems to view "Stats" as an object design where "Trace has a Stats" not "Trace is a Stats" which is the choice I made in the design for mspass.

For efficiency I'm going to put this aside until I find out if either of you can make the path forward clearer.

wangyinz commented 3 years ago

So, all we need here is a way to efficiently copy an array into the vector under TimeSeries, is that right? Basically, you can use Trace2TimeSeries for now, but it is slow due to this loop https://github.com/wangyinz/mspass/blob/d0be6190caafcdf7398de76a208782aa92546328/python/mspasspy/io/converter.py#L234-L235 I guess the solution has to be in the buffer protocol of pybind11. Probably we could implement a custom constructor with something similar to the dmatrix one: https://github.com/wangyinz/mspass/blob/d0be6190caafcdf7398de76a208782aa92546328/cxx/python/utility/utility_py.cc#L431-L437 Note that we probably can and should use py::buffer instead of py::array_t here.

The Stats question is easy. If I remembered right, ObsPy's Stats object can be used as a dict directly. You can try it out with the dict constructor of Metadata, it should just work.

pavlis commented 3 years ago

Thanks, that is very helpful. What I need for TimeSeries, in fact, is easier, I think, than dmatrix since the data are just a vector and not a matrix. I'll see what I can produce.

We may have similar issues with the inverse and comparable Seismogram converters.

wangyinz commented 3 years ago

Do you want me to implement this? I think something like this should work:

    .def(py::init([](py::dict d, py::array_t<double, py::array::f_style | py::array::forcecast> b) {
      py::buffer_info info = b.request();
      if (info.ndim != 1)
        throw MsPASSError("CoreTimeSeries constructor:  Incompatible buffer dimension!", ErrorSeverity::Invalid);
      size_t npts = info.shape[0];
      auto v = new CoreTimeSeries(BasicTimeSeries(), py::cast<Metadata>(py::module_::import("mspasspy.ccore.utility").attr("Metadata")(d)));
      v->set_npts(npts);
      memcpy(v->s.data(), info.ptr, sizeof(double) * npts);
      return v;
    }))    
wangyinz commented 3 years ago

btw, I realize that we probably should clean up the constructor definitions in our pybind11 code. There are certain constructors missing while some not quite pythonic ones included. Because we expect the users to only interact with the objects without Core or Basic, we need to make sure the constructors in those make sense.

pavlis commented 3 years ago

I'm working on implementing your suggestions above. I'll leave any housecleaning of constructor oddities up to you to do at your leisure. I'm not good at sorting out what is and is not pythonic

pavlis commented 3 years ago

Progress on this, but there remains a sticking point. The good news I was able to build on the suggestion above to get something that works (sort of - more in a moment). Here is the wrapper code I derived from what you gave above:

.def(py::init([](py::dict d, py::array_t<double, py::array::f_style | py::array::forcecast> b) {
      py::buffer_info info = b.request();
      if (info.ndim != 1)
        throw MsPASSError("CoreTimeSeries constructor:  Incompatible buffer dimension!", ErrorSeverity::Invalid);
      size_t npts = info.shape[0];
      auto v = new CoreTimeSeries(BasicTimeSeries(), py::cast<Metadata>(py::module_::import("mspasspy.ccore.utility").attr("Metadata")(d)));
      v->set_npts(npts);
      memcpy(v->s.data(), info.ptr, sizeof(double) * npts);
      /* We set these from stats which should be passed through arg 1.
      There may be a more efficient way to do this - this was borrowed from
      how this is handled other wrapper (noteable ErrorLogger) */
      for(auto i : d){
        if(std::string(py::str(i.first)) == "delta"){
          double delta;
          delta=i.second.cast<double>();
          v->set_dt(delta);
        }
        else if(std::string(py::str(i.first)) == "starttime"){
          double t0;
          t0=i.second.cast<double>();
          v->set_t0(t0);
        }
      }
      v->set_tref(TimeReferenceType::UTC);
      v->set_live();
      return v;
    }))

The bad news is if I try to run it directly using the stats attributes in obspy Trace as if it were a dict I get this:

from obspy import read                                                  
st = read('http://examples.obspy.org/RJOB_061005_072159.ehz.new')       
tr=st[0]         
# note running this in the build/python directory so this is the form                                                       
from seismic import CoreTimeSeries                                      
d=CoreTimeSeries(tr.stats,tr.data)                                      
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-5-26da24029f20> in <module>
----> 1 d=CoreTimeSeries(tr.stats,tr.data)

TypeError: __init__(): incompatible constructor arguments. The following argument types are supported:
    1. mspasspy.ccore.seismic.CoreTimeSeries()
    2. mspasspy.ccore.seismic.CoreTimeSeries(arg0: mspasspy.ccore.seismic.CoreTimeSeries)
    3. mspasspy.ccore.seismic.CoreTimeSeries(arg0: int)
    4. mspasspy.ccore.seismic.CoreTimeSeries(arg0: dict, arg1: numpy.ndarray[numpy.float64])

Invoked with: Stats({'sampling_rate': 200.0, 'delta': 0.005, 'starttime': UTCDateTime(2005, 10, 6, 7, 21, 59, 850000), 'endtime': UTCDateTime(2005, 10, 6, 7, 24, 59, 845000), 'npts': 36000, 'calib': 0.0949, 'network': '', 'station': 'RJOB', 'location': '', 'channel': 'Z', 'gse2': AttribDict({'auxid': 'RJOB', 'datatype': 'CM6', 'calper': 1.0, 'instype': '', 'hang': -1.0, 'vang': -1.0, 'lat': -999.0, 'lon': -999.0, 'coordsys': '', 'elev': -0.999, 'edepth': -0.999}), '_format': 'GSE2'}), array([-38,  12,  -4, ..., -14,  -3,  -9], dtype=int32)

However, if I create a dict with the minimum attributes required (delta and starttime) it works fine.

h={'delta' : tr.stats['delta']}                                         
h['starttime']=tr.stats['starttime']                                    
print(h)                                                               
{'delta': 0.005, 'starttime': UTCDateTime(2005, 10, 6, 7, 21, 59, 850000)}
d=CoreTimeSeries(h,tr.data)                                            
print(d)                                                               
{'delta': 0.005000, 'npts': 36000, 'starttime': 1128583319.850000}

So, your guess that stats would alias as a dict is not valid. I can create a workaround for a new version of Trace2TimeSeries, but wonder if there is a cleaner solution? My code above is inadequate in several ways anyway - notably it does not copy the full stats array as it should - and need some more work anyway. I'm going to be working on this problem more this morning so if you have any ideas how to fix pass the stats class directly let me know.

pavlis commented 3 years ago

For the record, I found a good solution to the TimeSeries problem with the following code now in the wrappers for TimeSeries:

      .def(py::init([](py::dict d, py::array_t<double, py::array::f_style | py::array::forcecast> b) {
        py::buffer_info info = b.request();
        if (info.ndim != 1)
          throw MsPASSError("CoreTimeSeries constructor:  Incompatible buffer dimension!", ErrorSeverity::Invalid);
        size_t npts = info.shape[0];
        Metadata md;
        md=py::cast<Metadata>(py::module_::import("mspasspy.ccore.utility").attr("Metadata")(d));
        BasicTimeSeries bts;
        double dt=md.get_double("delta");
        bts.set_dt(dt);
        double t0=md.get_double("starttime");
        bts.set_t0(t0);
        /* We invoke the BasicTimeSeries method for set_npts which sets the
        internal protected npts attribute of the base class.  We then set
        npts is the metadata.   This trick allows the use of initialization of
        the  std::vector container only once with the push back below.
        Otherwise we could do an initalization zeros followed by insertion.
        This algorithm will be slightly faster. */
        bts.set_npts(npts);
        md.put("npts",npts);  // don't assume npts is set in metadata
        /* We only support UTC for this constructor assuming it is only used
        to go back and forth from obspy trace objects. */
        bts.set_tref(TimeReferenceType::UTC);
        ProcessingHistory emptyph;
        double *dptr;
        vector<double> sbuf;
        sbuf.reserve(npts);   // A standard efficiency trick for std::vector
        /* Initialize dptr here for clarity instead of putting it inside the
        initialization block of the for loop - clearer to me anyway */
        dptr=(double*)(info.ptr);
        for(size_t i=0;i<npts;++i,++dptr) sbuf.push_back(*dptr);
        auto v = new TimeSeries(bts,md,emptyph,sbuf);
        return v;
      }))

This is about as efficient as it can be using several tricks that are potentially fragile if we alter some details in how the TimeSeries constructor I used here behaves. This algorithm does only a single copy operation using only the std::vector reserve to alloc the TimeSeries buffer and push_back to load the data. It works fine in my tests. The next stage is to see how much this improved performance - I suspect a lot.

For the record as well this currently requires a conversion of an obspy Trace stats to dict. There is also a not so trivial detail about handling the starttime and endtime attributes hard wired into Trace and TimeSeries. There is most definitely a type collision as Trace wants these to be UTCDateTime objects while TimeSeries insists they be epoch times (double). Trace2TimeSeries code I'm working on now uses this basic code (without error checking the production code probably should have):

h={}
for k in tr.stats.keys(): 
    h[k]=tr.stats[k]
t=h['starttime']
h['starttime']=t.timestamp
t=h['endtime']
h['endtime']=t.timestamp
d=TimeSeries(h,tr.data)

The inverse function (TimeSeries2Trace) needs to do the reverse.

pavlis commented 3 years ago

I can already see the inverse function (TimeSeries2Trace) will have a performance problem as it does the same thing Trace2TimeSeries did to copy data between the C and python sides. Here, in fact, is the loop of concern:

dresult.data = np.ndarray(ts.npts)
for i in range(ts.npts):
        dresult.data[i] = ts.data[i]

where ts is the TimeSeries object input and dresult is what the function will return. As the first line says dresult.data is a numpy array. I wonder if there is a way to vectorize that loop in a simple way?

wangyinz commented 3 years ago

Cool! For the stats to dict conversion. You can just do dict(tr.stats). It should give a better performance than that loop.

wangyinz commented 3 years ago

Regarding the TimeSeries2Trace issue, I tested a little bit on the behavior of ts.data which is a DoubleVector created by pybind11. I found that we can easily convert it into a numpy array with np.array(ts.data). So, I guess we should just change

dresult.data = np.ndarray(ts.npts)
for i in range(ts.npts):
        dresult.data[i] = ts.data[i]

into

dresult.data = np.array(ts.data)

It should fix the performance issue (I think).

pavlis commented 3 years ago

Simple solution for the inverse function - bravo. Won't have time to test it for a while, but a trivial change really.

Nice trick on converting the stats array that makes sense after I see the solution. One more example of why on a project like this multiple heads on the problem help.

pavlis commented 3 years ago

Some benchmark numbers after revising the Trace2TimeSeries function. Note above the pure python implementation took 916 s to do this task:

  1. Under spyder with print statements removed 6.9 s
  2. Run with plain python interpreter 5.93 s and 5.99 s in two successive runs on a machine doing nothing else.
  3. After change to use dict(trace.stats): 5.87 s ( useful but, not surprisingly, far less significant than 916/6=152 fold reduction
pavlis commented 3 years ago

Ian, maybe you can help me find a solution to a problem I encountered in fixing the inverse function: TimeSeries2Trace. Basic on what you said I implemented the following new version of TimeSeries2Trace:

def TimeSeries2Trace(ts):
    """
    Converts a TimeSeries object to an obspy Trace object.

    MsPASS can handle scalar data either as an obspy Trace object or
    as with the mspass TimeSeries object.  The capture nearly the same
    concepts.  The main difference is that TimeSeries support the
    error logging and history features of mspass while obspy, which is
    a separate package, does not.  Obspy has a number of useful
    algorithms that operate on scalar data, however, so it is frequently
    useful to switch between Trace and TimeSeries formats.  The user is
    warned, however, that converting a TimeSeries to a Trace object
    with this function will result in the loss of any error log information.
    For production runs unless the data set is huge, we recommend saving
    the intermediate result AFTER calling this function if there is any
    possibility there are errors posted on any data.  We say after because
    some warning errors from this function may be posted in elog.  Since
    python uses call by reference d may thus be altered.

    :param ts: is the TimeSeries object to be converted
    :type ts: :class:`~mspasspy.ccore.TimeSeries`
    :return: an obspy Trace object from conversion of d.  An empty Trace
        object will be returned if d was marked dead
    :rtype: :class:`~obspy.core.trace.Trace`
    """
    dresult = obspy.core.Trace()
    dresult.dead_mspass = True
    # Silently return an empty trace object if the data are marked dead now
    if not ts.live:
        return dresult
    # We first deal with attributes in BasicTimeSeries that have to
    # be translated into an obspy stats dictionary like object
    dresult.dead_mspass = False
    dresult.stats['delta'] = ts.dt
    dresult.stats['npts'] = ts.npts
    dresult.stats['starttime'] = obspy.core.UTCDateTime(ts.t0)
    # It appears obspy computes endtime - this throws an AttributeError if
    # included. Ratained for reference to keep someone from putting this back
    #dresult.stats['endtime']=obspy.core.UTCDateTime(ts.endtime())
    # todo relative time attribute
    # These are required by obspy but optional in mspass.  Hence, we have
    # to extract them with caution.  Note defaults are identical to
    # Trace constructor
    if ts.is_defined('net'):
        dresult.stats['network'] = ts.get_string('net')
    else:
        dresult.stats['network'] = ''
    if ts.is_defined('sta'):
        dresult.stats['station'] = ts.get_string('sta')
    else:
        dresult.stats['station'] = ''
    if ts.is_defined('chan'):
        dresult.stats['channel'] = ts.get_string('chan')
    else:
        dresult.stats['channel'] = ''
    if ts.is_defined('loc'):
        dresult.stats['location'] = ts.get_string('loc')
    else:
        dresult.stats['location'] = ''
    if ts.is_defined('calib'):
        dresult.stats['calib'] = ts.get_double('calib')
    else:
        dresult.stats['calib'] = 1.0

    # We have to copy other metadata to preserve them too.  That is 
    # complicated by the fact that some (notably endtime) are read only
    # and will abort the program if we just naively copy them. 
    # The list below are the keys to exclude either because they 
    # are computed by Trace (i.e. endtime) or are already set above
    do_not_copy=["delta","npts","starttime","endtime","network","station",
                   "channel","location","calib"]
    for k in ts.keys():
        if not (k in do_not_copy):
            dresult.stats[k] = ts[k]
    #dresult.data = np.ndarray(ts.npts)
    #for i in range(ts.npts):
        #dresult.data[i] = ts.data[i]
    #tsdata=np.ndarray(ts.data)
    print('debug - calling ndarray constructor')
    tsdata=np.ndarray(shape=(ts.npts,),buffer=ts.data,dtype='float64')
    dresult.data=tsdata.copy()
    return dresult

The key problem is that this line

tsdata=np.ndarray(shape=(ts.npts,),buffer=ts.data,dtype='float64')

throws this exception:


  File "/home/pavlis/testdata/test_ensemble_dbsave.py", line 23, in <module>
    y=cv.TimeSeries2Trace(x)

  File "/home/pavlis/testdata/converter.py", line 134, in TimeSeries2Trace
    tsdata=np.ndarray(shape=(ts.npts,),buffer=ts.data,dtype='float64')

TypeError: a bytes-like object is required, not 'mspasspy.ccore.seismic.DoubleVector'

I have seen code in the pybind11 wrappers that suggest you (Ian) know some tricks in defining the buffer object to make this work. Rather than hack on this to see if I can find a combination that will work I thought I'd ask you for ideas first.

btw - don't pay any attention to the second to last line of this code. I don't think that is necessary. It is a relic of an earlier version that had a different problem. The key question is how to use the ndarray constructor with a DoubleVector array as input.

pavlis commented 3 years ago

For the record while this is fresh in my mind, the experience rewriting Trace2TimeSeries and its inverse show that we have two parallel problems in comparable code for Seismogram objects:

  1. We have a metadata name alias issue that I had to address here that needs to be handled with Seismogram data as well. A solution for obpsy Trace is seen above, but we perhaps need a more generic approach. The issue is the same thing I implemented in MetadataDefinitions of alliases. Not appropriate to use that here, however, since this code should largely be hidden inside the engine. We do, however, perhaps need a global dictionary of obspy to mspass aliases comparable to that above. It would be cleaner to have those in one place instead of scattered in different functions like above. Unclear how that would be best implemented in the framework.
  2. We need a way to standardize the transfer of history data through these conversions and how we would define an obspy processing chain in that history? That is important since a common thing will be a sequence of convert to obspy, runs some obspy algorithms, and convert back to mspass. I think it could be maintained in wrappers for obspy modules, but I'm not sure. I do not know if what Jinxin has done can handle this or not but worthwhile discussing in our conference call this week.
pavlis commented 3 years ago

New benchmark result. Ran this function on 10 usarray data ensembles. The SEED files were each just a bit smaller than 1 Gb. The elapsed time was 39s and 38.8 for two identical runs. So, the time is about 3.9 s per 1 GB ensemble. That time is totally dominated by time to read and crack the seed files. The function below actually writes some data into the seed_data.ensemble collection but although I haven't actually done the timing I seriously doubt that is a significant factor given 1 GB/4 is about 250 Mb/s transfer rate. That number will be much poorer on a conventional disk system - this test was a system reading from a solid state disk.

Note this algorithm creates what I think MongoDB calls a nested document with the ensemble data being at the top level and the members being subdocuments under the key "members". I think that is a useful model for an ensemble as it maps cleanly into the actual data structure.

def dbsave_seed_ensemble_file(db,file,gather_type="event",
                keys=None):
    """
    Prototype reader for SEED files that are already assembled in a 
    "gather" meaning the data have some relation through one or more 
    keys.   The association may be predefined by input though a
    keys array or left null for later association.   There is a large
    overhead in this function as it has to read the entire seed file 
    to acquire the metadata it needs.  This version uses a bigger
    memory bloat than required because it uses obspy's seed reader
    that always eats up the whole file and returns a list of 
    Trace object.  A less memory intensive approach would be to 
    scan the seed blockettes to assemble the metadata, but 
    that would be a future development.  

    This function writes records into a seed_data.enemble collection.
    Be warned that the "." in the name is a common convention in 
    MongoDB databases but really only defines a unique name and 
    does not do any hierarchy of collections as the name might 
    suggest (O'Reilly book on MongoDB by Shannon et al. 2019).  
    It is a useful tag here, however, to distinguish it from seed_data 
    that contains an index to individual channels that can be 
    used to construct TimeSeries (or Trace) objects.   

    The records written are a hierarchy expressed in json (bson) of
    how a Ensemble object is define: i.e. ensemble Metadata 
    combined with a container of TimeSeries of Seismogram objects.  
    Because SEED data always defines something that directly maps to 
    TimeSeries this only works to create an index to build 
    TimeSeriesEnsemble objects.   

    This prototype was written to index ensembles that are passive
    array common event (source) gathers.  These are the passive array
    equivalent of a shot gather in reflection processing.   The 
    format of the json(bson) document used for the index, however, 
    is not limited to that case.  The gather type is defined by a 
    metadata key (for this prototype the key is "gather_type").  The 
    concept is the gather_type can be used as both a filter to select
    data and as a hint to readers on how to handle the data bundle.

    The gather (ensemble) metadata include a list of dict
    data that define a json/bson document defining the members of
    an ensemble.   Other documentation will be needed to define this
    concept more clearly with figures.  

    A design constraint we impose for now is that one file generates 
    on document in the seed_data.ensemble collection.   This means if 
    the data for an ensemble is spread through several files it would 
    have to be constructed in pieces.  That will require implementing 
    a function that merges ensemble data.  That model should make this 
    more generic as an end member is an ensembled created by merging
    files with one TimeSeries per file.   

    :param db:  MongoDB database pointer - may also be a mspass Database 
      class
    :param file:  seed file containing the data to be indexed.
    :param gather_type: character string defining a name that defines 
      a particular ensemble type.  Default is "event", which is the 
      only currently supported format.  (others keyword will cause an 
      error to be thrown)  Anticipated alternatives are:  "common_receiver" 
      or "station", "image_point", and "time_window".  
    """

    try:
        dbh=db['seed_data.ensemble']
        pr=Path(file)
        fullpath=pr.absolute()
        [dirself,dfileself]=os.path.split(fullpath)
        dseis=read(file,format='mseed')
        # This holds the ensemble metatdata 
        ensemblemd={'dir':dirself}
        ensemblemd['dfile']=dfileself
        ensemblemd['format']='mseed'
        # this is a placeholder not really necessary for seed data \
        # as seed data by definition yield TimeSeries type data although 
        # not necessarily seismic data (e.g. MT data are distributed as mseed
        ensemblemd['member_type']='TimeSeries'
        ensemblemd['mover']='obspy_seed_ensemble_reader'
        members=[]   # this list will contain one dict for each dseis Trace
        # we want to put time range of the data into enemblemd - we use these for that
        stimes=[]
        etimes=[]
        for d in dseis:
            mddict={}
            mddict['net']=d.stats['network']
            mddict['sta']=d.stats['channel']
            mddict['chan']=d.stats['location']
            st=d.stats['starttime']
            et=d.stats['endtime']
            mddict['starttime']=st.timestamp
            mddict['endtime']=et.timestamp
            stimes.append(st.timestamp)
            etimes.append(et.timestamp)
            mddict['sampling_rate']=d.stats['sampling_rate']
            mddict['delta']=d.stats['delta']
            mddict['npts']=d.stats['npts']
            mddict['calib']=d.stats['calib']
            members.append(mddict)
        ensemblemd['members'] = members
        tmin=min(stimes)
        tmax=max(etimes)
        ensemblemd['starttime']=tmin
        ensemblemd['endtime']=tmax
        result=dbh.insert_one(ensemblemd)
        return result.inserted_id
    except:
        print('something threw an exception - this needs detailed handlers')
wangyinz commented 3 years ago

The key problem is that this line

tsdata=np.ndarray(shape=(ts.npts,),buffer=ts.data,dtype='float64')

throws this exception:


  File "/home/pavlis/testdata/test_ensemble_dbsave.py", line 23, in <module>
    y=cv.TimeSeries2Trace(x)

  File "/home/pavlis/testdata/converter.py", line 134, in TimeSeries2Trace
    tsdata=np.ndarray(shape=(ts.npts,),buffer=ts.data,dtype='float64')

TypeError: a bytes-like object is required, not 'mspasspy.ccore.seismic.DoubleVector'

Well, the DoubleVector from pybind11 is not a buffer, so that error makes sense here. You should just do this:

dresult.data = np.array(ts.data)

Note that np.array is different from np.ndarray the latter would not work because it will falsely treat the DoubleVector as a multidimensional array somehow (which I have no idea why).

pavlis commented 3 years ago

Ahh, that was an easy fix. Works now. I'll check the revision in noting that Seismogram related converters are likely subject to multiple bugs.

wangyinz commented 3 years ago
  1. We have a metadata name alias issue that I had to address here that needs to be handled with Seismogram data as well. A solution for obpsy Trace is seen above, but we perhaps need a more generic approach. The issue is the same thing I implemented in MetadataDefinitions of alliases. Not appropriate to use that here, however, since this code should largely be hidden inside the engine. We do, however, perhaps need a global dictionary of obspy to mspass aliases comparable to that above. It would be cleaner to have those in one place instead of scattered in different functions like above. Unclear how that would be best implemented in the framework.

Yeah, this is something we had discussed before. We decided to remove MetadataDefinitions in these converters because we want it to be agnostic about the schema, and the schema only makes sense to the database related functions. My current design is to have a default MetadataDefinitions defined in the constructor of our database object. Some methods of the database object will also provide an option to override the default.

  1. We need a way to standardize the transfer of history data through these conversions and how we would define an obspy processing chain in that history? That is important since a common thing will be a sequence of convert to obspy, runs some obspy algorithms, and convert back to mspass. I think it could be maintained in wrappers for obspy modules, but I'm not sure. I do not know if what Jinxin has done can handle this or not but worthwhile discussing in our conference call this week.

The history data is also something irrelevant to a converter, because there is no counterpart in ObsPy to properly convert it to and no functions operating on ObsPy objects would expect a history record or properly use it. You are absolutely correct that this is something to be handled in the wrapper, and Jinxin has already implemented that. You can see find it here: https://github.com/wangyinz/mspass/blob/90b8a86cd1b2265f76e8d22681c044baf8d33b15/python/mspasspy/util/decorators.py#L22-L49

pavlis commented 3 years ago

This particular gem could have appeared on several issues pages or even made a separate issue, but I'm putting it here because it arose during construction of a prototype raw data handling workflow.

This issue is how data should be properly saved to handle the issue of UTC versus a relative time standard. I discovered a bug in the way we handle this right now for what will be a common case. In the prototype workflow I cut data in a relative time window defined by using a P wave arrival time as a reference point. In particular, for the case in point was defined as 20 s before P to something like 2 minutes after P. When the TimeSeries objects were saved with our current Database handle class I get these kind of records in the wf_TimeSeries collection:

{'_id': ObjectId('5fecdef708853b3376a7ede3'), 'calib': 612.5183810935765, 'delta': 0.025, 'npts': 6801, 'sampling_rate': 40.0, 'site_id': ObjectId('5fecca8cf27e30d55fe762e1'), 'starttime': -20.0, 'time_standard': 'UTC'}
{'_id': ObjectId('5fecdeff08853b3376a7ede8'), 'calib': 686.5646147363515, 'delta': 0.025, 'npts': 6801, 'sampling_rate': 40.0, 'site_id': ObjectId('5fecca8cf27e30d55fe762e1'), 'starttime': -20.0, 'time_standard': 'UTC'}
{'_id': ObjectId('5fecdf0108853b3376a7eded'), 'calib': 945.9932849652521, 'delta': 0.025, 'npts': 6801, 'sampling_rate': 40.0, 'site_id': ObjectId('5fecca8cf27e30d55fe762e1'), 'starttime': -20.0, 'time_standard': 'UTC'}
{'_id': ObjectId('5fecdf0a08853b3376a7edf2'), 'calib': 840.5740078092336, 'delta': 0.025, 'npts': 6801, 'sampling_rate': 40.0, 'site_id': ObjectId('5fecca8cf27e30d55fe7639e'), 'starttime': -20.0, 'time_standard': 'UTC'}

The issue is that starttime is saved as -20.0, which is the relative window time. Nothing is saved here to preserve the original UTC time stamp. Most definitely not what we want it to do.

Note there are two ways the time can be automatically and always correctly preserved:

  1. The writer could (after doing some safety checks) call the BasicTimeSeries rtoa method and restore the data to UTC. Then starttime will be a true UTC time. There are,however, two side effects of this approach. First, to avoid real confusion the writer would need to either call ator again before return or make a copy before calling rtoa - an expensive solution. Second, unless we do something the time reference used to define the original shift to relative time would be lost. Thus, I don't think this is a good solution.
  2. We could add some new attributes to handle this automatically.

Since I think 2 is the only reasonable solution even if we aimed to only support UTC time let's iron out a design for what those attributes should be.

There are four private attributes in BasicTimeSeries that define all this. Here is the C++ code striping the attributes not related to this issue:

protected:

    /*!  Data start time.  That is the time of the first sample of data.
    **/
    double mt0;
    /*!
    Time reference standard for this data object.  Defined by enum Time_Reference
    this currently is only one of two things.  When set as "UTC" the time
    tandard is an epoch time.  When set as "relative" time has no relationship
    to any external standard but are relative to some arbitrary reference that must
    ascertained by the algorithm by some other means (in seispp this is normally
    done through a metadata object).  A classic example is multichannel data where
    channels have a time relative to a shot time.
    **/
    TimeReferenceType tref;
    /* We actually test for t0shift two ways.  If this is true we always accept it.
     * If false we check for nonzero t0shift and override if necessary.
     * */
    bool t0shift_is_valid;
    /*When ator or rtoa are called this variable defines the conversion back
     * and forth.  The shift method should be used to change it. */
    double t0shift;

But we have a set of getters and setters to deal with these since they are declared protected. The most subtle thing in this definition is the variable t0shift_is_valid. It exists as a kind of sanity check but it has to be initialized when appropriate or downstream errors in ccore will occur.

To sort this out I think it useful to enumerate the finite set of cases we need to handle. There are only three:

So, there are definitely more compact ways to do this, but would suggest we require 3 attributes in the wf table to define this:

  1. time_standard would be "UTC" or "Relative" and used to call the putter BasicTimeSeries::set_tref.
  2. starttime as we have it now. Probably best to have it saved and read verbatim and use time_standard and the next item to sort out the possibilities.
  3. t0shift would be used to set the t0shift value in BasicTimeSeries with the putter force_t0_shift or some logic checking time_standard and calling ator if necessary. The cleanest solution is to just use the template above for the 3 options for how this is handled.

If you concur, I will implement these in the schema file and hack the readers and writers to handle this correctly.

pavlis commented 3 years ago

PS to previous: is should have said the only new attribute here is t0shift. The rest is some code in the readers and writers to deal with the three possible permutations.

wangyinz commented 3 years ago

Yeah, that looks good to me. Maybe we should name the new attribute starttime_shift in the schema so that it is consistent with the unique name of starttime. The only caveat is that @JiaoMaWHU is also working on the reader and writer, so we need to make him aware of the changes and make sure it got merged correctly.

pavlis commented 3 years ago

My knee jerk reaction was to object to the name with the "_" since that somewhat implies a normalized table name like site_latitude, but I see there are plenty of other names in the schema that are regular attributes that contain underscores in the name. I will use startime_shift.

I am also going to make two changes to Jinxin's Database.save_data code:

  1. The argument update_all must default to True for that function. It is passed to update_metadata and has the bad side effect of discarding most of the metadata by default. For save_data, in fact, update_all=False, for the metadata should not even be an option. I need to poke into the code more but the functionality we do need is a method that only updates metadata. That can be important for an expensive algorithm for which the results can be reduced to a small quantity appropriate to post to metadata.
  2. This section is not the behavior we want in this kind of process:
       for k in copied_metadata:
            if k in exclude or k in skip_list:
                continue
            if update_metadata_def.is_defined(k):
                if type(copied_metadata[k]) != update_metadata_def.type(k):
                    raise TypeError('{} has type {}, forbidden by definition'.format(k, type(copied_metadata[k])))

    The issue is this should never throw an exception and it has the undesirable side effect in this situation of having already written the sample data to gridfs and not having a corresponding record in the relevant wf collection. Not good at all. I need to work on this a bit to get the right solution. I'm not sure the algorithm he is using is the most efficient for the most common situations. i.e. I think the existing algorithm may be saving things in a way that is not optimal because of how it handles the cross linking ids. Since we can and should assume most data have empty elog entries most wf entries should not have an elog_id defined at all. The point is I think what this should do is save the metadata and the line above should post to elog instead of raising an exception. Then the errors can be save when elog is saved and the wf record can be updated with elog_id. At least that is my initial plan. I'll post here what I what finally did to resolve this.

pavlis commented 3 years ago

Small update on this issue. When I thought through what this was doing I realized that what I need to do is replace the raise exception being thrown with something more like this:

if type(copied_metadata[k]) != update_metadata_def.type(k):
  try:
     # This function tries to convert feasible type changes (e.g. ints to floats) throwing a type exception only if 
     # no conversion is possible
     repaired_value=change_type(copied_metadata[k],update_metadata_def.type(k))
     insert_dict[k]=repaired_value
  except TypeError as err:
     mspass_object.elog.log_error("informative message goes here using content of err message")

The main idea is change_type will try to convert the data passed to the requested type and will only throw an exception for something like trying to convert a string to a number. Implementation needs to be as robust as possible and only totally fail if there is no rational conversion. The logged messages should never be worse than a complaint as errors like this should never invalidate the result or cause an abort (which the current code does).

pavlis commented 3 years ago

One more thing on this. The current yaml file only allows double, int, string, bytes, and ObjectId. I just thought of an additional strategy to make this more robust. If there is an unresolvable type collision like trying to convert a string to an int we can still save it to the result but using an altered key. e.g. suppose someone set source_id='nuke22' which could not be converted to an ObjectId. I think we can just save the data with some appendage to the key like #bad, _wrongtype, or some such thing. (e.g. source_id might then become 'source_id#wrongtype' ) This way things people won't completely lose things the might unintentionally mishandle.

wangyinz commented 3 years ago

That sounds like a good strategy. Maybe we should let @JiaoMaWHU push his latest version to his branch and then revise it. I know he's doing some significant updates that are not yet pushed.

pavlis commented 3 years ago

Ok, I'll iron out the small, robust function to handle trying to convert and handling any failures. That is not a small job since it includes a lot of testing to sort out all the possible things that can go wrong. i.e. that will keep me busy until he can get his changes pushed.

JiaoMaWHU commented 3 years ago

Thanks for bringing it out. Replied in the email.