mspass-team / mspass

Massive Parallel Analysis System for Seismologists
https://mspass.org
BSD 3-Clause "New" or "Revised" License
32 stars 12 forks source link

How to tag data saved at different stages of processing? #43

Open pavlis opened 4 years ago

pavlis commented 4 years ago

We need to come up with some ideas on how to tag objects to define what had been done to them to get them from some initial stage to the current state they define. Provenance aware was a key word in your original proposal and this is where the rubber meets the road on that goal.

Now that I have worked with mongo for a while I have some ideas on ow we might approach this, I'm still a rookie, however, so I think I'd like to hear your ideas before I start casting about for fish in waters I don't know that well. I'm thinking you may have thought about this some since it was a key goal in your funded proposal so I'll let you go first.

wangyinz commented 4 years ago

This is definitely a core functionality of mspass. My design for data provenance is to have a from key defined in every documents. The value of from can be either a _id that refers to the previous document that this document is generated from or an embedded document that is the exactly the copy of the previous version. Note that in some cases it is necessary to make it a list of either types. The former is used in the case that we don't remove any documents from a collection, so that the older version could still be referenced by the newer one. The latter case can be used when a document is removed from the collection, and we might automatically put a copy as an embedded document in the other documents that refers to it in its from key. To make such reverse look up efficient, we need to define a reverse to key that links to the products of current document by their _id value. This is essentially making the documents a doubly linked list, and a remove operation would not break up the list by design, so any provenance info is preserved.

pavlis commented 4 years ago

During prototyping of database reader and writers I came to realize I think we need to flesh this out a bit more. First, I like the basic idea of using a linked list to define the processing chain. I think there we need a bit more than that, however, to make this functional. Here are the issues I recognized:

  1. A reader has immediate access to the ObjectId of the wf document from which a seismogram/timeseries is created. A writer, however, can't know that magic number until the insert is successful. It means all writers must handle the problem. Would be easy if we had a save method in the C++ object, but because we are (wisely) avoiding mixing C and python database handles that isn't really possible. Hence, we have a maintenance problem for core writers and a documentation issue for when someone chooses to write a custom writer. Solvable, just not ideal.
  2. Should ever seismogram load a full linked list of history or should it only know it's parent? Only knowing the parent is simpler but has a serious robustness issue; what happens if the user decides and intermediate result is no longer needed and decides to delete it. You break the chain and you lose the whole history. Thus, I think each seismogram on construction will need to load it's full history chain.
  3. What should be in the history record? Bare bones, I would think, would be something like ObjectId and a list of algorithms applied to create it. With Mongo at our disposal, however, I think we could do more. I'll suggest a class definition next.

Here is a first order suggestion on a possible ProcessHistory class:

class ProcessHistory
{
public:
  string rawdata_id;  //str presentation of raw waveform ObjectId
  list<Processes> history;  //Actual linked list of processing history records
  add(const Processes& newalg);
}; 

The processes class might look like this:

class Processes
{
public:
  string algorithm;   // name of the algorithm applied
  string oid_algorithm_parameters;  //ObjectId string of saved document of parameters used to run alg
  bool saved;   //True if saved to MongoDb 
  string oid_raw;   //string representation of raw data ObjectId that is oldest ancestor
  string last_save_id;   // string representation of ObjectId of last save 
}

That won't survive the first few lines of coding, but it gets out the key things I think we need to implement:

  1. The history log needs more than just the chain of processes
  2. History has to contain a record of algorithms that weren't saved to db
  3. History needs links to input files for each algorithm. This one needs the most thought.

Please think about this and add to this design or propose an alternative. Realize we need to do this now as this needs to be built into readers, writers, and any processing module.

pavlis commented 4 years ago

I have been pondering this issue some more, and have a very different idea we should consider. I started to look at how we would implemented the ideas above, and realized it was really awkward because it created a lot of baggage to have each data object carry around. So, the alternative idea is why not handle this with python and mongodb directly? The basic idea is these items:

A different thing this buys is that can be made a capability independent of the C library. I can see some user's who might want to borrow our history mechanism and nothing else if we can make this work.

The big unknown I have in this is how it would work in a Spark environment? Would the history logger need to be defined as a component of and RDD to assure all the components got preserved? This brings up a strategic point we may need to discuss for this development: what is our next critical step? We now have a working schema, which was one of our early project landmarks. The issue here came up out of that development and needs attention, but it seems the big thing we need to do next is implement a full Spark data flow to test our concepts - this one included.

What do you think ? Actually there are two things - the strategic question id admittedly kind of out of place.

wangyinz commented 4 years ago

I agree that handling the provenance within Python is a much cleaner solution. I think it actually means that the C++ code will be purely internal, and only an experienced user will be using that.

If I understand your description correct, this is suggesting having a dedicated history collection to manage provenance information of all the data objects. So it is different from what I originally thought. I was thinking saving the provenance info within the data object, which is an embedded data model, and yours is following the normalized data model. I think the core functionality is the same for the two models. The only tricky part is how to avoid the history chain breakup.

In both models, whenever an object is deleted, we can have the delete routine automatically put a copy of current data object into the data provenance session of the next object down the chain. For history collection in the normalized model, say we have:

---
last_save: Object_A_id
wf_id: Object_B_id
algorithm: filter
---
last_save: Object_B_id
wf_id: Object_C_id
algorithm: deconvolution

when Object_B_id is deleted in the in the wf collection, the history collection becomes:

---
last_save: 
  history: 
    last_save: Object_A_id
    wf_id: Object_B_id
    algorithm: filter
  wf:
    _id: Object_B_id
    t0: XXXX
    ...
wf_id: Object_C_id
algorithm: deconvolution

Similar procedure could be done for a embedded model, which just embeds the history documents within a wf document. I am not exactly sure which model works better (need to think carefully about how provenance data is most likely accessed), so I just leave the thinking here.

I guess when it comes to Spark, it is probably easier to have provenance stick to the data object, so the history is part of the RDD. The normalized model will be less straightforward in this case I guess.

pavlis commented 4 years ago

I see the point you make about what would be needed if you delete a piece in the middle of the chain. That issue, in fact, points out why I think the normalized model is the only way to go here.

A key point I think you may have missed here and, in fact, is a dahh moment for me as well. The main data for a "provenance" reconstruction are common to the ENTIRE DATA SET. What we need to preserve are:

  1. What algorithms were run on the data set in what order
  2. What tunable parameters were used to run each algorithm.

That means the history collection would contain only minimal data that have millions to one relationships. The only thing a wf document would need to contain is a reference to the ObjectId of the history document used to create it. That would be easily handled by writers. Before writing any waveform it would save the history to that stage and save that ObjectId to each datum before saving it. I can completely see how we can implement this part and will take this on shortly.

BUT we have another problem. How do we handle reducers? e.g. to be reproducible any stack would need to define the data that was used to produce it. In reflection processing this is always treated as implicit. e.g. if you do a CDP sort, bin, and run stack the algorithm is reproducible on the a given data set. Input parameters make the result umambiguous assuming the algorithm used is stable. Would be easiest of we could just assume that model will work here. A general solution for a reducer would be for any reducer to always produce a list of wf ObjectIds of used to produce a stack. Trouble I see is that might not be enough. e.g. how would you describe a pseudostation stack concisely? Seems to me it is cleaner just to let the input parameters to the algorithm define the reproducible behavior.

I am going to prototype the history logger assuming just keeping the parameters and algorithms are enough to reproduce a result. You will almost certainly have a chance to comment on this before I get very far into that

pavlis commented 4 years ago

I may be making this too complex for getting started, but realized there is a big problem with the scheme we have been discussing. A python script can do several things we may need to capture:

  1. We need to allow data to branch depending on some generic branching decision. In practice that means some step has to direct some objects into different paths depending on conditionals that could naturally arise in a python script. There are numerous examples where this will be necessary including something as basic as dropping data that don't pass some QC test to various kinds of binning.
  2. We have to allow iterations. In an actual processing script that means some kind of loop.

The simple mechanism we have been discussing could not handle that directly. This requires some thought. This must be a generic computer science problem. Can you beat the bushes among people you know to see if there are general solutions to this problem?

The lowest common denominator, which will be needed anyway, would be to always preserve the python script that was used to drive a processing chain with the history collection of MongoDB. Maybe that is enough to make the result reproducible, which is our primary goal. It would also make it easy to alter the workflow and create a new result by modifying the processing script. I think we can start with that simple approach, but need to evaluate if that will be sufficient.

pavlis commented 4 years ago

Been pondering this some more and think we need to implement a history mechanism at the job level and object level. That is, I think history (provenance) information needs to be preserved in two ways.

  1. Required - each run needs a global history record for the job. The global provenance/history would record: a unique jobid for each run and an algorithm registration. The later would record the name of the algorithm and the parameters used to run it.
  2. Optional: object level history. Data area for each processed object should also contain a more brief history record. On construction each object at the start of the processing chain would be tagged with the jobid. Each algorithm would need to register itself with the object which would produce a record in the history log.

All history objects should use an inheritance mechanism. We should define a basic_history class that contains the required minimum information. Algorithms could post an extended history record which would make it easier for us to extend the system when we discover things we didn't think about for this problem.

So, I'm going to try to write three things for an initial implementation of this:

  1. A python global history class. It will do the global algorithm registration base class.
  2. Add a BasicHistory class to the private area of the MsPASSCoreTS object and getter putters to be called by each registered algorithm.
  3. Database save functions for the global and object level history.

Sure I'll run into more detailed issues as this develops, but that is my plan. Let me know soon if you see any issues here.

pavlis commented 4 years ago

Made some progress on this yesterday. Created a prototype history logging mechanism at the job level. Had an interesting programming exercise requiring something I've only done once or twice before: recursive functions. Not surprisingly the most recent was the source of the need for recursion; the AntelopePf object. The issue was how to save parameters defined by a "parameter file" format into MongoDB. A Pf file is just one of what I now realize are many ways of defining hierarchic data that could be abstracted as a tree structure. One such way to capture that concept, of course, is a python dict. Anyway, I realized a clean way to store the contents of an AntelopePf was to translate one into a python dict and use insert_one to store the dict. Nested parameters that would be defined by a Arr in a pffile (or an indentation in a YAML file) are translated into subdocuments in MongoDB when passed constructs like this:

{ 'simple1': 2,
  'simple2':4.0,
  'nested':{ 
     'nest1':'666,
     'nest2':another',
     'foo':'bar'
  }
}

As expected MongoDB handled that kind of construct just fine.

That was a nice example of a need for recursion in a program because traversing a tree structure is a classic example, along with factorials, of a problem most easily solved by recursion. Was tricky to debug. It would have been much harder if I hadn't installed and learned an interactive debugger for python. I found Spyder works pretty well, although like most packages it has some quirks. Not sure what you are using for debugging python programs, but after using it a bit I'd highly recommend Spyder.

One departure of a Pf from a pure tree structure is their use of Tbl& to allow blocks of text that can define more complicated data input. That was readily solved by translating the contents of a Tbl to a python list of strings. e.g. if we had a pf construct like this:

tbl_example &Tbl{
AAK 2.0 3.4
ABC 1.0 2.0
XYZ 4.0 5.0
}

It would get translated to this (in pretty form):

{'tbl_example': ['AAK 2.0 3.4',
     'ABC 1.0 2.0',
     'XYZ 4.0 5.0'
    ]
}
pavlis commented 4 years ago

I just pushed an experimental branch that implements the concepts discussed in this issues section. It has a fairly sophisticated global processing history mechanism and a very primitive, but presumably efficient mechanism for data level history.

First, the global processing history. Probably the best way to describe it is to make up a simple processing workflow and show how it would be used. Not all the functions noted below are defined by hopefully it will give the idea.

Supposed the workflow was defined as 3 processing steps: (1) simple filter, (2) cut down the raw data window to a smaller time period, and (3) deconvolution. Supposed the functions called would look like this:

d=loaddata(db)  # not the right name but this is pseudocode
filtstr="BW 0.1 3 10 3"
d=filter(d,filtstr)
d=WindowData(d,-2.0,5.0)
pf=AntelopePf('deconparameters.pf')
d=decon(d,pf)    # not the way I'd do this, but again this is pseudocode

The history mechanism devised here would precede the above by something like the above by something like this:

... Mongo connection stuff...
db=client['mydatabase']
h=HistoryLogger(db)
filterpar={'filter_string':'BW 0.01 3 10 3'}
h.register('filter',filterpar)
windowpar={'start':-2.0,'end',-5.0}
h.register('window',windowpar)
pf=AntelopePf('deconparameters.pf')  # loaded here instead of later for the processing script above
h.register('decon',pf)
# here or later issue
h.save()

The save preserves the data in a history collection as a (potentially complex) python dict container defining documents and subdocuments.

By contrast the data level history mechanism is trivial. I added a new method to Metadata called append_chain. We would want to define a constant key for the "chain" of something like "algorithms_applied" or something less verbose. To use the mechanism any algorithm in mspass would simply have the following at the top of the function body:

d.append_chain("algorithms_applied","myalgname",":");

Note all of these things could be treated as optional. Recommend all thing we supply contain the algorithm level history mechanism, but user functions could use it or not. Setting up the global history preservation would definitely be optional as it would require setup in the workflow script.

pavlis commented 4 years ago

Ian, this is the section you need to refer to to update obspy2mspass to correctly create a history mechanism.

I think there is an error in the repository as I don't find the append_chain method in the master branch I have on this laptop. I'll check against my ubuntu machine at home.

pavlis commented 4 years ago

I think I know what is wrong, but don't want to do this willy nilly. I think I must have neglected to issue a pull request on the branch called cxx_add_historychaining. I can resolve this problem if I can establish that the cxx_add_historychaining branch was never merged to master. I used to have a tool to show that visually, but do not have it any longer and don't remember what it was even called. Anyway, if you can confirm the branch is isolated I'll work on merging it into master. I don't think we need to do it as a pull request. I think it would be just as easy to resolve with my local copy.

pavlis commented 4 years ago

Realized how to do this and merged the cxx_add_historychaining branch without going through the pull request mechanism. Assuming you agree with these changes since they were discussed in this issues page.

wangyinz commented 4 years ago

While writing the test for db module, I realized that we might need to further narrow down the details of how to do the provenance. (btw, it is extremely helpful to write tests that cover as large percentage of the code as possible - it can uncover not only bugs but also subtle issues that are usually ignored.) This is from the db.save3C() call when smode='unchanged'. Right now, the code will insert a new record into wf collection and update the wf_id field of the Seismogram object. The problem here is that the old record associated with the data object will be preserved but left stranded. It will be confusing when loading the data later on as the two records will share identical fields. We could just overwrite the old record instead of inserting a new one, but I think that will be equally problematic as part of the "history" will be missing. Basically, the problem here is that we need to implement the history feature to these database operations.

Based on the design above, we now have two different history mechanisms here - global level and object level. After actually manipulating the database, I realize both will need some improvement to become really useful. For the global level, which is implemented as the history module, I can see the issue is that it only records the steps of a workflow, but the connection of the workflow to the data objects being created/modified in the database is missing. A straightforward solution is just adding a list of wf_id to each sub document within a history document. It might still be difficult to index when the number of data is large, but at least it's better than nothing. This way, we created a link from global level history to individual objects.

For the object level history, I think we need something more sophisticated than the append_chain method, which only records the name of algorithms. The goal here is to create a connection to the global level history so that we can identify which workflows a object has been through. The challenge here is that one object could be modified by multiple workflows (uniquely identified by the jobid in the history collection), and it may not have been through all the steps of each workflow. To capture this, we need to preserve a list like [job1.alg1, job1.alg4, job2.alg2, ...]. This could still work as a long string with some small tweak to the append_chain method.

Then, there is still the issue of removing a record. When a object is removed from database, it will 1) break the link from global level history with that wf_id and 2) drop the object level history associated to it. So, I am thinking that we will need to have a way to preserve a deleted wf_id as well as its object history. This makes me think that we probably need to have some form of the embedded model implemented in the Metadata object. Say we have a history key, the value will be like:

history: 
  algorithms_applied: 
    - job3:
      - alg1
      - alg4
    - job4:
      - alg2
  previous:
    - wf_id: "string_of_objectid"
      history: 
          algorithms_applied:
            - job2:
              - alg1
    - wf_id: "string_of_objectid"
      history: 
          algorithms_applied:
            - job1:
              - alg3

If we have something like this, the history chain will be preserved as much as possible. The only potential issue with this design is that we still need a way to update the global level history to make it aware that a wf_id has become the sub document of another wf_id, so that it will be convenient to look up.

The question here is how can we implement this in Metadata. Such a embedded structure definitely works cleanly for Python's dict type and MongoDB. We could probably also make it work with the any type within C++ (with some complex syntax). What I'm not sure of is whether we can also translate it to Python with pybind11...

pavlis commented 4 years ago

Extremely good observations here. I didn't think we had this history mechanism sorted out and you have confirmed that and come up with some important points. Making history itself an object seems a good solution.

I think the design might be cleaner if we created a child of Metadata with some variant of this as the definitions:

 // name is prototype mnemonic for Provenance Preserving Metadata
class PPMetadata : public History
{
 --methods--
}

where the History object implements the ideas above and has appropriate methods.

The advantage of this is we could then redefine Seismogram and TimeSeries to inherit PPMetadata instead of Metadata.

That design would preserve the concept that Seismogram and TimeSeries are the core objects for processing. It also encapsulates all the History stuff in one place making it easier for other objects to link into MsPASS.

pavlis commented 4 years ago

Had an additional thought about this. If we go with this design and add an object for history, it could and should include ErrorLogger. Logging errors is a key issue for history preservation on one sense. The design should depend upon our "concept" of what history is and what is should do?

One good thing about is a laziness issue. If we put the declaration ErrorLogger elog; in History and History was a parent to Seismogram and TimeSeries not changes for any elog related code would be needed.

pavlis commented 4 years ago

Another thought: We need something more than wf_id to tag a waveform. There are any cases where the parent of a waveform is volatile and never saved. In fact, that should be the norm. Hence, whatever is in History it must be able to reconstruct a complete chain to reproduce any waveform from a parent. Hence, I think the model is a parent wf_id with a linked list of algorithms applied to produce what might be called the current version. When saved a new wf_id would be defined and saved with this history record. Needs more details to flesh out for sure. My time is limited right now due to grandkid visit, but let's continue this dialogue.

pavlis commented 4 years ago

Here is a tentative class design to do all this in C++.

/*! \brief Data structure holding full process information.

This class could be declared a struct as it is a set of named variables
that are all public.  This class is used by ProcessController to provide a
mechanism for a running algorithm to put a stamp on each data object it
modifies.
*/
class ProcessEntry
{
public:
  ProcessEntry(const std::string alg, const std::string par_form,
    const std::string poid, const std::string itype, const std::string otype);
  ProcessEntry(const ProcessEntry& parent);
  ~ProcessEntry(){};   //Explicilty declared null - this could be a struct
  ProcessEntry& operator=(const ProcessEntry& parent);

  std::string algorithm;
  std::string param_format;
  std::string param_oidstr;
  std::string input_data_type;
  std::string output_data_type;
  bool modifies_metadata;
};

/*! \brief Global job control for MsPASS.

A processing system needs a means for algorithms to be registered and run
under a common executive.  In conventional seismic reflection systems this
is controlled by an executive program, that is the alpha program of the system.
As a research system MsPASS was designed to be more flexible so a dogmatic
executive could be a barrier to innovation.  This class then implements a
manager that is more hands off and acts more like and advisor.  That is, it
provides information but does not limit what can be done.  The methods
provide information algorithms use to register themselves and log their
impact on data objects as they are altered in the system.

The idea is this object should be created in a python job script for MsPASS
as one of the first steps of the processing.  Properly registered algorithms
would then ask for information from this object to define information that
is used to preserve the processing history of each data object.
*/
class ProcessingController
{
public:
  /* Would need a few constructor options */
  ProcessEntry get_full_process_info(const std::string alg) const;
  std::string get_param_format(const std::string alg) const;
  std::string get_oidstr(const std::string alg) const ;
  std::string get_input_data_type(const std::string alg) const;
  std::string get_output_data_type(const std::string alg) const;
  bool modifies_metadata(const std::string alg) const;
  std::string get_jobname()const {return jobname;};
  unsigned int get_jobid()const {return jobid;};
  std::string get_job_script() const {return job_script;};
private:
  map<string,ProcessEntry> process_to_run;
  std::string jobname;
  unsigned int jobid;
  std::string job_script;
};

/*! \brief struct used to hold processing history data to store with an object.

This is a struct defined as a class as a matter of style.   It contains the
minimum data required to uniquely define a process applied to a data object.
*/
class HistoryRecord
{
public:
  /*! Defines the ObjectId of parameter document for this algorithm in History collection*/
  ProcessEntry info;
  std::string jobname;
  unsigned int jobid;
  double process_starttime;  // Epoch time algorithm registers itself
};
class ProcessingHistory
{
public:
  /* This would need a set of appropriate constructors, copy constructor, and
  opeator =.   */
  ProcessingHistory();
  /*! \brief Put an entry that says an algorithm was run on these data.

  Processing History is always expected to be a child of a data object
  being processd by MsPASS (currently TimeSeries and Seismogram objects).
  This method should be called by any alorithm applied to the data to assure
  the history of what was done to it is saved.

  \param pc - the global ProcessingController for this job.
  \param alg - algorithm being run.

  \return position of the inserted history record.  This object keeps a
    list of each algorithm that has been applied to this data object.  The
    size of that list grows by one with each call to this method.

  \exception will throw a SeisppError if alg is not registered with the
    ProcessingController.
  */
  int register_algorithm(const ProcessingController& pc, const std::string alg);
  ErrorLogger elog;
private:
  list<HistoryRecord> history;
};

That is incomplete, but the idea is launch ProcessingController at the top of any mspass script. Seismogram and TimeSeries would be a child of ProcessingHistory. Then any algorithm would call the register_algorithm method before trying to handle a piece of data.

There are surely huge holes in this. Sat down to write this while my grandson was taking a nap and I was on childcare duties. He is waking up so this is may be incomplete.

pavlis commented 4 years ago

Just modified the previous slightly, but have a short window this morning and wanted to add a few explanations of my thinking on the preliminary design above:

  1. ProcessingController is an object that centralizes an idea currently was set up as a that creating a history collection described on this page of the current documentation. With a pybind11 wrapper a python program should call a constructor for ProcessingController at the top of the script much like like the rule for MetadataDefinitions. ProcessingController provides the "global-level" control you discuss above over how processing history is preserved.
  2. Each data object (Seismogram,TimeSeries, or Ensembles for now) would inherit ProcessingHistory.
  3. Algorithms would update the ProcessingHistory section of the each data object they handle. In this design this would be done through the register_algorithm method, but it is unclear if that is the right way to do that. The "concept" is that an algorithm needs to get the information from some master control (global-level) needed to reconstruct the full processing history. This shows one way to do that, but view that detail critically.
  4. The design needs to recognize that there are many cases where the input "parameters" would be a fair amount of data. An example is my particle motion code where the "multiwavelets" are loaded in a pf file in a large Tbl. Whatever we do we must be conscious of the fact "parameters" are really auxiliary data that could be huge. In this design I suggest we handle this by having parameter data stored in MongoDB as the prototype history collection does and the ProcessingController provide the link to that data through an ObjectId (actually an oid string).
  5. The chain of processes applied to each object is preserved in the linked here called "history" in the private area of ProcessingHistory. I see the above is missing a necessary getter to retrieve that history when data are saved.
  6. Handling Ensembles versus atomic TimeSeries and Seismogram objects is definitely not complete here. In fact, on reflection is seems we may need a more generic history mechanism to provide generic support for any data object?

Hopefully that will make the above clearer. Look at this carefully and critically so we can get this right.

So glad you raised this point. You were so right.

pavlis commented 4 years ago

An idea for handling Ensembles and other data objects. For that case operator+= will be needed for the ProcessingHistory. Necessary in case an Ensemble is decomposed to atomic Seismogram or TimeSeries objects.

Also the more generic problem is the above does not fully handle the issue that an algorithm may input one type and emit another. A simple example would be an obspy 3C converter that inputs scale TimeSeries or Trace objects and outputs Seismogram or (alternatively) an Ensemble of Seismogram objects. The input/output type variables may or may not be enough. This requires some thought. This may be a case where we need to research other solutions this generic problem.

wangyinz commented 4 years ago

Sorry for the delay - saw this over the weekend but did not have the time to completely absorb it. That's a significant amount of progress over my initial thoughts. The C++ code is clearly a good starting point for recording history. In addition to the points you listed above, I think it is also not clear to me how this design works for removing a record from the database. The HistoryRecord struct may need to have the embedded structure I described earlier to handle that.

It is a great point that including error log as part of the history is important. I agree that it is an essential part of the history that ought to be handled consistently with other parts of the history. Actually, I tweaked a little bit of what is contained in elog under Python, and it contains the name of current function as well as the full trace back that we normally see in a Python error (for example). With those additional information, I think the error log is now more useful and preserves the history information needed to trace back any problems.

I agree that wf_id alone is not sufficient for the purpose of reconstructing a job (or workflow). There are actually two parts to consider: the global level and the object level. For the global level history (which is stored as the history collection), each history document entry is a job defined by the jobid. We might just keep a list of parent wf_id and a list of output wf_id. This is actually tricky as the output is not necessary the same thing as the final product of the job. Some waveform could be saved during different stages of the job, and it is also possible that a saved waveform is later on removed in the same job. Since there is no intuitive way to define which is the final product of the job, I think the safe behavior is to just keep all the output wf_id. The only exception will be the case when a waveform is removed, and we should also remove it from the output list. That part of the history is actually at the finer level and should be preserved in the object level history anyway.

For the object level history, I think my previous schema design is still valid with the additional parent key. For example, if we start with a wf collection with these records:

- wf_id: "1"
  history: 
    parent: "0"
    algorithms_applied:
      - job1:
        - alg1
        - alg4
      - job2:
        - alg3
- wf_id: "2"
  history: 
    parent: "1"
    algorithms_applied:
      - job3:
        - alg1
- wf_id: "3"
  history: 
    parent: "2"
    algorithms_applied:
      - job4:
        - alg2

If we remove the wf_id == "2" record, it becomes:

- wf_id: "1"
  history: 
    parent: "0"
    algorithms_applied:
      - job1:
        - alg1
        - alg4
      - job2:
        - alg3
- wf_id: "3"
  history: 
    parent: "1"
    algorithms_applied:
      - job3:
        - alg1
      - job4:
        - alg2
    previous:
      - wf_id: "2"
        history: 
          parent: "1"
          algorithms_applied:
            - job3:
              - alg1

Then, we remove wf_id == "1":

- wf_id: "3"
  history: 
    parent: "0"
    algorithms_applied: 
      - job1:
        - alg1
        - alg4
      - job2:
        - alg3
      - job3:
        - alg1
      - job4:
        - alg2
    previous:
      - wf_id: "2"
        history: 
          parent: "1"
          algorithms_applied:
            - job3:
              - alg1
      - wf_id: "1"
        history: 
          parent: "0"
          algorithms_applied:
            - job1:
              - alg1
              - alg4
            - job2:
              - alg3

This way, current waveform can be easily reconstructed using the parent waveform identified by the parent key and the jobs and algorithms using the parameters saved in the global level history document. The previous field is used to record any waveform being removed. I do think there are issues in this schema that I am not aware of. Definitely need to think more carefully to cover all scenarios.

I think the design of the register_algorithm method is fine. We probably want to hide the details from the user by using Python Decorators somehow. Not exactly sure how we want to implement that yet, but I found we might do something similar to this project.

I agree that parameters is not necessarily a bunch of strings. We definitely need to include an implementation of putting them into the gridfs. Actually, we might just pickle any parameters into the gridfs so that we can virtually support any format.

The handling of ensemble is not clear to me at this point. Actually, given that we've been talking about wf_id so far, I guess it implies that the data object is waveform (Timeseries or Seismogram). I am not sure how do we want to handle ensembles with database yet. If we want to keep an ensemble as an atomic data object, maybe we should have something like an ensemble_id in an ensemble collection? Or, shall we just always disassemble the ensemble and find a way to keep its metadata only? Not quite sure yet...

The more important question here is how to handle the history of any generic data object. I agree that we shouldn't limit the history mechanism to the Timeseries and Seismogram objects. In fact, I can imagine people will be using the Trace object a lot as that is the standard of ObsPy. I actually think that a better way to do it is to not restrict ourselves to C++ code. Right now, our Python code is pretty much centered around our ccore module. I can see that we will be doing a lot of conversions back and forth with the ObsPy's data types to be able to leverage that package. I think we might want to create a core module that has Timeseries and Seismogram objects inherited from ObsPy's Trace object (or Stream object) to provide the compatibility with ObsPy. We can add some conversion methods to make them compatible with the ccore types and to use the algorithms from the C code. Then, within the new core object, we can just add the history and elog mechanisms easily.

pavlis commented 4 years ago

Lot for me to absorb now too with limited to think through everything. Two immediate thoughts come to mind though:

  1. I think we don't need to cover a complete rollback mechanism to cover data deleted during processing. Let's keep in mind the main goal here: enable reproducible science. That means the primary goal is that the end product of a chain of processing needs to be reproducible from raw data. I think that means that a linked list that always has a raw waveform (data object) as the first entry pretty much defines what is needed at the object level.
  2. We will need to have our readers set up to test if a waveform is a root (raw) or a partially processed result. If the later, the reader should automatically load the previous history and append to it. If each history record for any wf is cumulative we won't lose any history from pruned branches that are dead ends, BUT we need to preserve a mechanism that defines why a branch (in this case a waveform that gets marked bad or marked for deletion for any reason) was pruned. I think the way to preserve that most easily is to make sure the python script(s) that created the result are preserved.
  3. Reproducibility is impossible if there is any human interaction for editing. We need to think of a generic mechanism to define that process in a history record - a new wrinkle.
  4. We need to consider how to handle other data types than TimeSeries and Seismogram to make our design extendible, but let's keep this development under control and try to stay strategic. That is a warning to me more than you.
  5. Not sure I agree about making obspy Trace and Stream objects native. There are two problems with that. First, we are adding features that are important to our development for which obspy has no concepts. Second, all the external package dependencies we have will create maintenance issue and obspy seems a bit less stable than other things we are dependent upon. In support of your argument, however, is that we will definitely want to be running some obspy algorithms in parallel and the subject of this thread (processing history) will not transport without some effort. Perhaps we need to discuss how hard it would be to create a history mechanism that could be utilized by a function using obspy methods? That requires some thought by someone like you who is a better python programmer than me.

All for now. Let's kept his conversation going.

wangyinz commented 4 years ago
  1. That's a good point. In that case, we will need to define in a database what raw waveform is, which varies for different workflow. Also, I just realized that even with the complete rollback mechanism I proposed, it would break when the "raw waveform" is removed (in the example above that will be the wf_id: "0" record). I guess we need to preserve the information of where it came from in the raw waveform. This could be an external file or an obspy download procedure. Then, the real "raw waveform" can always be defined as something external. In the case that the waveform is removed, we can still reconstruct waveforms derived from it.
  2. I agree with this approach, but will add that the script used to import the raw waveforms should also be preserved. I guess we can just use that to test if a waveform is root or not.
  3. I guess we will have to live with that as it is unreasonable to not let the user edit the database manually (e.g. through mongo shell). But, if it is edited with the API we provide, we have no reason to not capture the history record. In fact that's why I started this discussion - to figure out the correct behavior of a database update operation done by mspasspy.db.Database.save3C. It seems to me based on the discussion so far, we will just overwrite the record, preserve the information of current operation in the object level history, and update the corresponding wf_id field in the global level history.
  4. Well, instead of making Trace and Stream native, I think the real point here is to add new methods to them to make them behave like our Timeseries and Seismogram object. For example, to add the history mechanism to Trace object, we could create our own mspasspy.core.Trace class that is inherited from both the obspy.Trace class and the ProcessingHistory class. Actually, I am not sure how or whether we should do that to the Stream class as that is very different from the Seismogram concept but more like our Ensemble.
pavlis commented 4 years ago

Responses to two points:

  1. On this item I wasn't thinking about database updates, but that is a good point too. I was thinking about the classic way way of measuring arrival times - "hand" (actually always now computer graphic picking) picks. That is irreproducible. At best we can preserve the data like we do today in an arrival table. A simple solution might be to add the boolean uses_human_interaction to the ProcessEntry struct(class).

  2. I don't know how to do an extension of a type in python, but that sounds like exactly what we need to do to make obspy Trace and Stream native. I would like to hand you that task as my knowledge of python programming is still pretty rudimentary - I'm only as far as I am because python is close to the 20th computer language I've learned (counting various shell like scripting programs)

wangyinz commented 4 years ago

Absolutely! In fact, we can just do multiple inheritance to extend any type. e.g.

class Trace(obspy.Trace, mspasspy.ccore.Metadata, mspasspy.ccore.ErrorLogger):
    def __init__(self, data=np.array([]), header=None, elog=None):
        if elog is None:
            super(mspasspy.ccore.BasicMetadata, self).__init__()
        else:
            super(mspasspy.ccore.BasicMetadata, self).__init__(elog)
        if isinstance(header, mspasspy.ccore.Metadata):
            stats = Metadata2dict(header)
            super().__init__(data, stats)
            super(obspy.Trace, self).__init__(header)
        else:
            super().__init__(data, header)
            md = dict2Metadata(self.stats, MetadataDefinitions(), self)
            super(obspy.Trace, self).__init__(md)

Note that the syntax is a bit weird because of the complex inheritance across pybind11 and python code here (actually it took me a while to figure out a bug related to the use of pybind11 here). Anyway, the idea here is to create our own Trace object that is compatible with ObsPy and our existing code. The problem here is pretty obvious: while we have obspy's stats in sync with the Metadata in the constructor, there is no way to guarantee that for other methods, and we will have to put in some sort of sync method there.

Another approach is to have Metadata as an attribute like this:

class Trace(obspy.Trace):
    def __init__(self, data=np.array([]), header=None, mdef=MetadataDefinitions(), elog=ErrorLogger()):
        super().__init__(data, header)
        self.mdef = mdef
        self.elog = elog

    @property
    def metadata(self):
        return dict2Metadata(self.stats, self.mdef, self.elog)

The @property decorator will turn that method into a property, so a call on trace.metadata is equivalent to trace.metadata(). Because we have the conversion done on the fly here, it is guaranteed that the metadata is in sync. The only problem here is that we will need to implement a bunch of wrapper methods to make it behave like Metadata. The best way to implement certain methods may not be obvious such as Metadata.modified().

Also, I realize the use of MetadataDefinitions above is awkward. Need to think of a way to hide this object as much as possible as I don't think it is something changes frequently in a job.

pavlis commented 4 years ago

Interesting. I suspect this will take some experimentation to keep this from being very confusing. i.e. it will be confusing if mspass.Trace and obspy.Trace behave very differently. This would be particularly bad if people used the tempting line:

from mspass.ccore import Trace

Then, I think, they could just refer to the object as Trace and get it muddied with obspy.Trace downstream. Might be less error prone to just demand a converter, but this would be extremely convenient for using obspy functions. Overall, a case in point that we need to experiment with this processing some actual data to understand the pitfalls. Suspect it best put this idea on the shelf for now and bring it up when we have had more experience with spark interactions - seems to me the critical major item we need to be addressing right now.

wangyinz commented 4 years ago

Well, the name is not really a problem here as internally Python would treat them as different objects under different namespaces. We can surely name it as something like mspTrace. Then, for those who don't want to change the names in an existing ObsPy code, they can just use a line like:

from mspass.ccore import mspTrace as Trace

Anyway, I agree this is just something we could explore. The point really is that we are capable of doing all these under Python.

Actually I was to reply here earlier and then figured there is something about pybind11 that I am not sure of, so I went off doing a bunch of experiments... Anyway, I found something interesting. Actually, I see part of the issue here is that our Metadata is fixed to a handful of types that we defined in our schema. However, to have a more complex history logging, we probably want something more adaptive. Actually, that was the reason we use boost::any in Metadata. I found that our python binding is where the bottleneck is, given that the C++ side of Metadata and the Python dictionary can both virtually support any data types. So, I figured out how to make our Metadata Python binding behave more like a python dictionary, and all we need is just adding a couple more lines. Anyway, I will push those changes after I clean up all the debug stuff that's floating around.

wangyinz commented 4 years ago

Well, found a problem... serialization does not work on the python object type. I will see how to fix that

wangyinz commented 4 years ago

OK, I just got pickle working. It should be bulletproof as long as something like this is avoided:

import pickle
from mspasspy.ccore import Metadata
a = Metadata()
a.put('md', a)
pickle.dumps(a)
pavlis commented 4 years ago

I sat down to start the next level of design for this history mechanism. This thread has gotten really long, and after reading through all of it again I decided step 1 was to summarize where are in this design. Before I start doing anything concrete we should think this through carefully to be sure what we are thinking is the best we can do.

First, on reflection I think we should initially ignore the specific class and implementation issues and first settle the functional specifications for what we are aiming to accomplish here. So, let's begin with the goals of what we are discussing:

  1. Provide a mechanism to reproduce a processing work flow from start to finish. This is important to address the modern problem of irreproducibile results in data processing.
  2. Provide a top-level mechanism to validate a workflow to minimize job failure during a long processor run. That has two parts: (a) validate input seismic data, and (b) validate parametric input to all algorithms to catch as many input errors as feasibl.
  3. Preserve the history of processing for all processed data saved in the database.
  4. Preserve all error messages ranging in severity from informational to errors that cause a seismogram to be marked dead. This is essential to assure bad data in one seismogram does not abort a complete job sequence.

You make an excellent point that to achieve these goals requires global-level and object-level features. I suggest the global level should have these components:

  1. We should use the proven init/exec model used in reflection processing.
  2. In a research environment it is important that init not be too restrictive. In particular, we must provide a mechanism to use an experimental algorithm within a workflow that does not completely implement the history mechanism.
  3. init needs to validate input data. That means validate all required Metadata are defined and assure the starting waveform data is available to run the processing sequence.
  4. init needs to validate (as much as possible) parametric input. This is a nontrivial, generic problem that may require a separate design issues page here on github.
  5. We need to provide a generic mechanism to define a processing step as requiring human interaction.
  6. init needs to pass an abstraction of the workflow to drive the generic exec program. By that I mean the user should not have to look at the output of init, but simply use it as an input to exec. This would implement the model I know from ProMAX and other reflection processing systems I'm aware of where step one is to run init something and then run exec something only when init succeeds.
  7. The data transferred from init to exec should probably be written by init into MongoDB and then read by exec. What collection that should be and what it should contain is a lower level design issue.

There is a lot to implement above, but I do think it is a necessary design feature. Item 2 is important as it should be common to write a script to do some processing that doesn't implement any of the history preservation baggage. Not only will that give the user important flexibility, but it will allow us to develop other key features in mspass without having to have this init/exec completely working.

Now the object level functions:

  1. Each data object needs a mechanism to preserve the chain of algorithms that were applied to produce it. That chain should be automatic for all mspass algorithms.
  2. The history must allow the same algorithm to be applied multiple times within a workflow.
  3. The history store mechanism must work for iterative algorithms - a special case of 2 where the same tag appears sequentially for n passes.
  4. The history storage must be scalable to reduce the chances of bloat. i.e. the history mechanism should be lightweight in size.
  5. The history should include an error log.
  6. The history should be stored automatically for all database writes.
  7. To reduce history bloat database reads should start a new chain with a link back to the parent. (Note: this is a new point I don't think we covered earlier. Point is a job starting with partially processed data should not need to load the previous history, but have only something like the oid of the history document from the dbsave defining the partially processed data.)
  8. For history we need to define what is atomic for history preservation. For seismic data I think that means TimeSeries and Seismogram objects.
  9. You note above a mechanism to potentially extend obspy Trace objects to include the history mechanism object. That is important for making this as functional as possible without recreating all the algorithms now in obspy. I think that means generalized Trace objects should also be allowed as atomic data for history preservation.
  10. We need a generic, scalable mechanism to define an ensemble for history preservation. By generic I mean it should allow groupings of any atomic data (initial TimeSeries and Seismogram but perhaps also generalized obspy Trace). By scalable I mean it must be lightweight and not bloat the data stored with each atomic object.

Please read over this critically and look for holes and potential tar babies (i.e. things we could commit to do and find it bogs down the development to not allow time for other critical functions we need to make mspass successful).

One last random thought. I think writing an init/exec program is a standalone problem that might be a good task for the student assistant for which you have funding.

wangyinz commented 4 years ago
  1. init needs to validate input data. That means validate all required Metadata are defined and assure the starting waveform data is available to run the processing sequence.

The metadata validation can be done through MetadataDefinitions

  1. The history store mechanism must work for iterative algorithms - a special case of 2 where the same tag appears sequentially for n passes.

Maybe we can add a iteration key to the algorithm entry in our schema:

- wf_id: "3"
  history: 
    parent: "0"
    algorithms_applied: 
      - job1:
        - alg1:
          - parameter: parameter_id1
          - iteration: 100
          - completed: True
          - elog: elog_id1
  1. To reduce history bloat database reads should start a new chain with a link back to the parent. (Note: this is a new point I don't think we covered earlier. Point is a job starting with partially processed data should not need to load the previous history, but have only something like the oid of the history document from the dbsave defining the partially processed data.)

I thought we already covered this with the parent key that points to an object id of a waveform.

  1. For history we need to define what is atomic for history preservation. For seismic data I think that means TimeSeries and Seismogram objects.

I was also thinking how we should deal with Ensembles, but probably that's not necessarily a problem as we will not treat it atomic for any database operations, will we?

  1. You note above a mechanism to potentially extend obspy Trace objects to include the history mechanism object. That is important for making this as functional as possible without recreating all the algorithms now in obspy. I think that means generalized Trace objects should also be allowed as atomic data for history preservation.

While we could extend Trace objects, it will not be trivial to make the history mechanism working automatically within the object. i.e. any functions that interact with the Trace object will not trigger the history attribute to record the action. One way to implement that implicitly may be to use the __getattribute__ method, but it will not be able to (easily?) discriminate multiple access to the same attribute within an algorithm. There are definitely other more complex ways to achieve this perfectly, but it can easily get overwhelming.

  1. We need a generic, scalable mechanism to define an ensemble for history preservation. By generic I mean it should allow groupings of any atomic data (initial TimeSeries and Seismogram but perhaps also generalized obspy Trace). By scalable I mean it must be lightweight and not bloat the data stored with each atomic object.

In our current history schema, parent, parameter, and elog are all stored as object id pointing to a different collection, so it shouldn't bloat even if we are keeping one entry per atomic data in an ensemble. If we really want to better optimize it, maybe we can just allow the wf_id and parent to be arrays:

- wf_id: 
   - wf_id3
   - wf_id4
  history: 
    parent: 
      - wf_id1
      - wf_id2
    algorithms_applied: 
      - job1:
        - alg1:
          - parameter: parameter_id1
          - iteration: 100
          - completed: True
          - elog: elog_id1

Another thought I have reading through this is that we should probably focus on the object level history for now as it is almost perfectly isolated from the global level history. We may consider implementing the global level history in Python with certain features link to Spark since that will be the main driver of the workflows. We can come back to this once we got a workflow running under Spark framework.

wangyinz commented 4 years ago
  1. You note above a mechanism to potentially extend obspy Trace objects to include the history mechanism object. That is important for making this as functional as possible without recreating all the algorithms now in obspy. I think that means generalized Trace objects should also be allowed as atomic data for history preservation.

Just realized that this can actually be done using python's decorators.

pavlis commented 4 years ago

Helpful comments. One point to note is the idea of the post yesterday was to abstract the lengthy discussion in this issue into some functional specifications.

I concur that object level implementation can proceed without the full functionality of the global provenance aware being implemented. It is essential I do that anyway as I have to make decisions on that to finish the API changes I am working on. That is stalled until this is settled.

What do you think of the idea of making the global provenance implementation a student project? Strikes me as achievable and appropriate for a student as it has a clear focus. We will need to control the design.

pavlis commented 4 years ago

Started more serious thinking on object level history. Realized a hole in concept we had not considered fully.

We had considered the obvious need to handle data created from averaging other data (e.g. a simple average stack) . The problem I recognized when thinking about how to preserve that chain is handling the general problem of averages of averages. Let me explain more.

If all we had to do was save averages of data stored in wf we could define a stack history as some variation of this:

list<sting> wf_oids;
string algorithm_name;
parameter_definition_object obj;  //generic concept of parameters passed to algorithm_name

The averages of averages comes up as an issue if a second average from multiple data objects is to be created from intermediate results not saved in wf. That would create a problem that wf_oids would not work to define the intermediate result because it was never defined. The same problem would be created, in fact, by an issue you discussed earlier - deleting an intermediate result making it's objectid disappear.

We need a practical solution to this problem to ever make history work right. It will be complicated by requiring we do this in a multiprocessor mode that would work correctly running with spark. I say that because one potential solution is something like antelope's idserver. We could have a daemon that could be asked for the next valid id to assign any new waveform. I think that is a bad idea as it could create a scalability issue and a bottleneck if every function call to do any processing requires interaction with an id server. Seems inevitable that would be a potential bottleneck or worse a deadlock if the idserver died. I think the whole idea of an idserver, in fact, is at odds with map-reduce. Further, one would have to assure the id given could be linked to a unique record saved in wf. Not that hard, but might collide with spark if in an rdd a processor died and he work it was assigned had to be repeated. Might work, but another complication that shows what a bad idea an idserver is.

The conclusion I make is we need some chaining method that handles averages of averages to arbitrary levels. This needs some thought and may require some graphics to clarify the tree structures I think will be necessary to capture this. We may want to do a telecon to simply that process. Let me hear your thoughts on this first.

wangyinz commented 4 years ago

What do you think of the idea of making the global provenance implementation a student project? Strikes me as achievable and appropriate for a student as it has a clear focus. We will need to control the design.

I agree. This will also be in line with the timeline as we will not have a student any time soon.

The averages of averages comes up as an issue if a second average from multiple data objects is to be created from intermediate results not saved in wf.

That's a very good abstraction. I think there are two aspects of this issue. First of all, our history document is designed to capture the history of a workflow (of several algorithms) and save it with a reference to the wf_id of the end product. So, even if the intermediate results are not saved, the parent of such end product is still the waveforms that the workflow started with, and the parameters of the algorithms should reflect the multiple stackings in the workflow. However, the real problem here is that it is hard to reproduce the workflow based on the parameters alone because the intermediate results are not saved and is ambiguous for subsequent processing steps (i.e. parameters alone may not describe a step without specifying the inputs in the correct order). Of course, it depends on the algorithm. For those that are order independent, we should be good with current schema that captures the parameters.

The same problem would be created, in fact, by an issue you discussed earlier - deleting an intermediate result making it's objectid disappear.

Yes, that is a different way to look at this same problem - assuming the intermediate results were assigned a wf_id but then got deleted. I am not sure there is an easy way to deal with such a problem (especially when manual deletion is possible), but I do think that it is OK to have some minor holes in our history schema design. A full-blown provenance preservation might add unnecessary baggage to our goal here. I would consider the history mechanism to be something nice to have for data processing, but not something to depend upon for data recovery. We might just warn the users certain (complex) workflows may not be easily reproduced.

We need a practical solution to this problem to ever make history work right. It will be complicated by requiring we do this in a multiprocessor mode that would work correctly running with spark. I say that because one potential solution is something like antelope's idserver...

Yep, we will not consider an idserver at all. The purpose of using object id is to resolve the id collision issue without limiting scalability.

pavlis commented 4 years ago

Ok, I think we are making progress.

As I actually started to implement this (well actually I mean defining a class structure) I realized that we had to at least design the global history schema and data structures that are needed to cover all cases. The reason is that to achieve the goal of keeping the history lightweight requires an implementation that has an API defined for building the history data.

It helped my thinking to make a couple rough figures. They aren't worth posting because I think it is possible to describe what I learned from doing this pretty well for the record in words:

  1. We need an abstraction that defines what a "processing step" is. A full generalization, I think, would get out of control as it is essentially unlimited in complexity. Consider, for example, a function call in a language like python where the arguments can be anything. I think we need to limit the universe to classifying a "processing step" as defined by three things: (a) type of primary data input, (b) type of primary data output, and (c) a generic parameter definition that defines all auxiliary quantities that build a unique instance of the algorithm (something that could be reduced to an antelope pf of some yaml file). Our current implementation mostly follows this rule but does not currently deal with input and output data types. A key point is that a particular instance of an algorithm should be uniquely defined by this data structure. The definition just support not just a single algorithm by something like a name key, but multiple instances of the same algorithm with different defining parameters. e.g. a series of bandpass filters with different filter parameters.

  2. If we limit a "processing step" to be a single primary data input and output type, there are only four things possible that are more or less what map/reduce means anyway: one-to-one, one-to-many, many-to-one, and many-to-many. In reality I think the first three are special cases of many-to-many so if we can do that in a lightweight way we cover everything. I think I may have just such a solution, but be skeptical as you read on.

  3. If we accept my abstraction of a "processing step", what we preserve to define the process chain for a piece of data at a give step requires preserving only the following pair of linked lists:

    a) list of unique tags defining algorithm instances including preserved parameter data
    b) list of inputs used to construct this output objects at each processing step

    Here is where we need to address the issue of what is atomic? One solution is to have Ensembles be atomic objects along with TimeSeries and Seismogram. I don't particularly like that model as I can't think of an clean way to abstract the ensemble definition as a distinct type that wouldn't be cumbersome or have unintended consequences. I think a simpler model making only TimeSeries and Seismogram atomic can work and cover iteration as no different than anything else. In fact, it is already there in the definition above - a linked list of tags for all data used to create "this" data object in a given step. Consider, for example, any stacking that implements a many-to-one reduction; the list gives some tag defining all the inputs and is stored in the history for the result. I think the more complicated many-to-many works the same way. If the output of the algorithm has a set of objects that have an auxiliary relationship. Consider as an example the output of pwstack used as input to pwmig. That is a many to many operation, but the ensembles in pwmig are sorted on a different key than the natural output of pwstack. An mongodb implementation of pwstack would write outputs as Seismogram objects to MongoDB and pwmig would need to form gathers sorted and grouped by event:slowness_vector. Each input would retain the tags defining inputs to it.

Now for any of that to work, however, we need a way to produce a unique tag of a particular data object at a particular stage of processing. As the earlier discussion in this issues section makes clear this must handle volatile data that only appear as intermediate steps passed between processes as well as raw data to which nothing has been done. We also need to be able to preserve a history where an intermediate waveform appeared for a time and then was deleted (outputs of pwstack would be an example of data that would need to be saved in the db and could then be destroyed when pwmig had been run). Solving that is the challenge.

I think a solution is to have each algorithm post a linked list of history records of all contributing data objects that created the output data object to which they are assigned. The first entry in this list would need a mechanism to define a list of initial data objects in the wf collection. The idea I have is that during a run subsequent algorithms post a list of history records extracted from each data object used as input for the object being created by the new algorithm. If we design the history record right, whenever we call a dbwrite for data with these records, the writer would need to extend the indexes defined the history record to allow the full chain to be reconstructed. I think that means starting at beginning of the list and unraveling which inputs were assembled to make each output, data types in and out, and constructing a unique tag for that datum even if it were transient. That information would be saved for each saved waveform. That is what I'm thinking right now, anyway, but not real sure that will work. I'm thinking a way to get a unique id for transient waveforms is to use the oid string of the saved history record as it walks down the list. i.e. it would need to do two db transactions for each datum created during processing (whether it is saved or not): (1) create a new empty document to get a oid, (2) update the document to add the history data to it. The oid are guaranteed to be unique so as the writer works down the list it could fill in hooks for transient waveforms used as input to subsequent algorithms that used them.

pavlis commented 4 years ago

This is without doubt not the ideal way to implement this, but this can illustrate the idea I have above in a more concrete form:

class HistoryRecord
{
public:
  bool is_root;  // true only for origin reading wf
  string parent:  //oid string of parent waveform - 
  string input_type;
  string output_type;
  string algorithm;
  string param_oid;
  string myoid;   //filled in by writer
  list<vector<HistoryRecord>> chain;
};

Each algorithm would generate one of these for each datum it outputs. It would set the algorithm information by a scheme yet to be determined fully, but presumably some mix of getting algorithm and parameter info from some process manager (called exec above). input and output types would be hard coded into the algorithm. During processing myoid would be flagged as yet being as yet undefined - to be filled in by writer as noted and explained a bit more below. The chain list-vector would be filled in from inputs (members of ensembles or for one-to-one just one entry). e.g. for a stack the vector would contain the history records extracted from each of the members stacked to produce the output.

A dbwriter in this model would have to do a lot of work. It would start at the top of the list writing one document for each HistoryRecord in the double container. As each entry is saved myoid would be filled in. The tricky part would be after the first entry, the writer would have to look back at the previous member of the list to get the unique id from myoid entries.

I may not have the start and end of this writer sequence right, but I think it might just do the job.

wangyinz commented 4 years ago

I have not fully absorbed this yet, but I can see that the issue here is having the writer to keep track of all the myoids in the chain. I am not sure that is actually feasible especially in the case all objects are not saved at once. Maybe, we need to generate the myoid when its first created, and that will require introducing the bsoncxx library. I also come across this article that mentioned UUID could also work here.

Another thought I have is that maybe we can make the chain pointers (list<vector<HistoryRecord*>> chain)? If so, there is no need to track the change of myoid anymore. We will need to make sure the memory don't get deallocated unexpectedly.

pavlis commented 4 years ago

This was very helpful. My conclusion is that using a uuid is the way to go to make this efficient and much much simpler. Based on that article's practical guidelines and how I understand our problem that seems the clear choice. Far far simpler than the ugly chain idea I described yesterday.

I think what we need to do is generate a uuid for very created data object and save that uuid whenever one is saved as just another attribute. Need to think a bit more about how this would fold into the history mechanism.

I think the obvious choice for generating uuids in the C++ code is boost implementation. In fact, a clean and consistent approach may be to have Seismogram and TimeSeries contain a uuid that will be generated by constructors. On reading some I think we will need to make a decision about whether copies produced by copy constructors or operator= should get a new uuid. I think that will simplify things at a low cost, but not certain about that so will leave that here as a residual issue. I will pursue a full design using these ideas further today and see where it leads.

wangyinz commented 4 years ago

OK, that is going to be a major change of API and potentially the schema (replacing some oid with uuid). We will see how it goes.

pavlis commented 4 years ago

Thought I would take longer to get this to the point of needing your input, but made some progress so here goes. This warned this a long post because I will be pasting in draft include files that have some complexity.

First, this design is shaped by a couple of assumptions. If they are wrong, this design may be seriously flawed.

  1. We need to make the processing history lightweight in memory use. It builds on the idea of version 1 of our history mechanism that writes parameter data to documents stored with mongodb.

  2. The history mechanism considers TimeSeries and Seismogram atomic. Ensembles algorithms will thus need to have code to properly define the history mechanism. An alternative design would be to generalize the ProcessingHistory class defined below to handle ensembles as a more generic concept.

  3. We have two ways to produce forms of UUIDs: (a) objectids from MongoDB that are unique when linked to database entities, and (b) true UUIDs created by some other mechanism

  4. I'm assuming the use of boost's uuid library. I also assume we would plan to use a random number generation method for the uuids based on my reading. An alternative method might simplify the api in ways we should consider. I was influenced by a comment in the web discussions of boost's uuid library that stated their random number uuid generator had a large startup overhead. I have no real data to know if that comment is true, but it strongly torques this design. It requires the history mechanism to have access to a ProcessManager that hands out uuids. We could remove that element of this design if there were another mechanism. It is not yet clear if requiring a ProcessManager is a bad thing or not, but this is a key question.

With those assumptions stated, here is my design. It might help to first emphasize this design uses four intermingled class definitions:

  1. ProcessingHistoryRecord is really a struct that holds a unique definition of one processing step.
  2. AlgorithmDefinition defines a unique instance of a particular function/algorithm. It is defined here by a unique name and and id used to define the unique set of parameters that define that instance. ProcessingHistoryRecord contain an instance of AlgorithmDefinition plus other things that extend the concept.
  3. ProcessingHistory is the class that becomes a parent class for TimeSeries and Seismogram objects. It replaces and extends what was previously had the awful name MsPASSCoreTS. It has elog plus a key generalization of the history chain using a linked list of ProcessingHistoryRecord objects.
  4. ProcessManager is an alpha design for a global executive manager for a job. The idea is that like MetadataDefinitions it would be created at the top of any mspass python script and functions in any job could access it. I see now way to avoid that and keep the history records lightweight.

With that here are each of these ideas cast as C++ class definitions. Note none of these have bee checked for basic syntax - they are here to make this more concrete.

ProcessingHistoryRecord

class ProcessingHistoryRecord
{
public:
  /*! Default constructor.  Creates object with no data. */
  ProcessingHistoryRecord();
  /*! Standard copy constructor.  This copies the uuid set in id. */
  ProcessingHistoryRecord(const ProcessingHistoryRecord& parent);
  /*! Partial copy constructor.   Clones the old history data but creates a
  new uuid assignd to id. */
  ProcessingHistoryRecord(const vector<ProcessingHistoryRecord> parent);
  bool is_root;  // true only for origin reading wf
  mspass::AlgorithmDefinition alg;
  string myid;
  vector<ProcessingHistoryRecord> inputs;
};

Note that myid in this case is a MongoDB ObjectID string defining the document saving the related parameter set. The inputs vector is a weird construct might cause compilations problems, but I hope not. The idea is that something like a stack has input from multiple objects and this would preserve the history of each component that went into that stack.

AlgorithmDefinition

/*! \brief Lightweight data structure to completely describe an algorithm.

Processing data always involves application of one or more algorithms.
Most algorithms have one to many parameters that define the algorithm's
detailed behavior.  Because the size of parametric input can sometimes
be huge MsPASS needed a way to carry a concise summary of algorithms applied
to data.   The issue is complicated by the fact that the same algorithm
may be applied to data at diffrent stages with different parameters (e.g.
bandpass filters applied before and after deconvolution).  To address this
problem we save input parameters for any instance of an algorithm in MongoDB
as a document in the history collection.  This implementation uses a
simple string to define a particular instance.  In the current implementation
of MsPASS that is set as the string representation of the ObjectID defined
for that document.   An alternative would be UUID as used in TimeSeries and
Seismogram objects, but we prefer the ObjectID string here as it is a guaranteed
unique key by MongoDB, always has an index defined for the collection, and
reduces the size of the history collection by not requiring a uuid attribute.

Any algorithm for which no optional parameters are needed will have the id
field empty.

A special case is readers that act as origins from "raw"  which may not
literally be "raw" but is just a signal that a reader initiated a history
chain.   For readers input_type is set to "NotApplicable" and output_type
is to be defined for that reader.  Readers may or may not have control
parameters.
*/
class AlgorithmDefinition
{
public:
  /*! Default constructor.

  This consructor is realy the same as what would be automatically generated.
  We define it to be clear and because I think pybind11 may need this
  declaration to allow a python wrapper for a default constructor.  */
  AlgorithmDefinition() : nm(),myid(),input_type(),output_type() {};
  /*! Primary constructor.

  This constructor sets the two primary attributes of this object.
  name is a descriptive (unique) name assigned to the algorithm and
  id is a unique id key.   In MsPASS it is the string representation of the
  ObjectID assigned to the MongoDB document holding the parameter data
  that defines this instance of an algorithm.

  \param name is the algorithm's (unique) name
  \param id is a unique id string defining the parameters that were used for
    this instance of an algorithm.
  */
  AlgorithmDefinition(const string name, const string typin,
    const string typout, const string id);
  AlgorithmDefinition(const AlgorithmDefinition& parent);
  string name() const;
  /*! \brief return the id as a string.

  In MsPASS the id is normally a MongoDB ObjectID string representation of
  the documnt saved in the database that holds the paramters defining a
  particular algorithm instance.  If the algorithm has no parameters
  this string will be null. Callers should test that condition by calling
  the length method of std::string to verify the id is not zero length */
  string id() const;
  /*! Set a new id string.

  The id straing is used to define a unique instance of an algorithm
  for a particular set of parameters.  This is the only putter for this
  class because it is the only attribute that should ever be changed
  after construction.  The reason is the name and type constraints are
  fixed, but id defines a particular instance that may be variable. */
  void set_id(const string id){myid=id;};
  //void set_name(const string name){nm=name;};
  AlgorithmDefinition& operator=(const AlgorithmDefinition& parent);
private:
  string nm;
  string myid;
  string input_type;
  string output_type;
};

An instance of an algorithm is assumed to be defined by a combination of a name keyword and an id, which here will be a MongoDB objectid string. The type names may not be essential, but seem necessary to clarify the full history. For now they are one of three things: TimeSeries, Seismogram, or some key defining this as a reader. It might be better to signal readers and writers by a different mechanism. That is something I hadn't considered until I started to put this down.

ProcessingHistory

First, a base class. This may not be essential, but it defines the lowest order concept of a definition of a unique job run.

template <typename JobIDType=std::string>
        class BasicProcessingHistory
{
  BasicProcessingHistory();
  /*! Return a list of algorithm name tags that have been applied.

  All concrete implementations of this base will need to supply a list of
  name tags that define the algorithms applied to the data to produce the
  current state.  Implementations will need to store more than these names.
  The linked list returned must define the order the algorithms are applied.
  The same algorithm may appear sequentially with or without a change in
  parameters that define it's properties.   Classes that implement this
  virtual will want to deal with such ambiguities.
  */
  virtual std::list<std::string> algorithms_applied() const =0;
  /*! Return number or processing algorithms applied to produce these data.

  Implementations of this method need to return a count of the number of
  processing steps that have been applied to put the data in the current state.
  Note that computing this number is complicated by the fact that most data
  processing is not completed in a single run, but with multiple "jobs".
  */
  virtual size_t count()=0;
  JobIDType jobid()
  {
    return jid;
  };
  void set_jobid(const JobIDType& newjid)
  {
    jid=newjid;
  };
private:
  JobIDType jid;
};

This is the design for ProcessingHistory:

class ProcessingHistory : public BasicProcessingHistory<std::string>
{
public:
  ErrorLogger elog;
  ProcessingHistory();
  ProcessingHistory(const ProcessingHistory& parent);
  std::list<string> algorithms_applied() const;
  std::string jobname() const;
  void set_jobname(const std::string jobname);
  size_t count();
  void set_as_parent(const ProcessManager& pm, const string reader_name);
  size_t append(const ProcessManager& pm, const string algname, const size_t instance);
  size_t append(const ProcessingHistoryRecord& rec);
  ProcessingHistory& operator+=(ProcessingHistoryRecord);
private:
  list<ProcessingHistoryRecord> history;
};

The linked list history is the primary data - it is a list in order that I think can uniquely define any processing sequence provided all algorithms register themselves properly with this mechanism. Note the on append and the += operator are different ways to do the same thing - define a data object to be a new level in the processing chain.

ProcessManager

class ProcessManager
{
public:
  ProcessManager();
  ProcessManager(string fname);
  AlgorithmDefinition algorithm(const string name,
    const size_t instance=0) const;
  std::string jobname() const
  {
    return jobnm;
  };
  std::string jobid() const
  {
    return boost::uuids::to_string(job_uuid);
  };
  /*! \brief Get string representation of a UUID.

  MsPASS data objects are tagged with a UUID to properly handle
  processing history.  Procedures can call his method to get a
  uuid based on boost's random_generator.   We keep the generator
  in this object because web conversations suggest construction of
  random_generator is expensive in time and needs to not be done
  at the object level. */
  string new_uuid()
  {
    boost::uuuids::uuid id;
    id=gen();
    return boost::uuids:to_string(id);
  }
private:
  string jobnm;
  boost::uuids::uuid job_uuid;
  boost::uuids::random_generator gen;
  std::map<std::string,std:vector<AlgorithmDefinition>> algs;
};

The idea is that a ProcessManager would be created near the top of any mspass job script. It has a dual purpose: (a) centralize the history record definitions, and (b) serve as a central id server. The first requires something like an init phase or having the user construct a definition of all the algorithms defined in a python script. Said another way, an init script could parse the python script and could be designed to generate a file that could be used to construct this object automatically. That may require some additional changes later, but is the idea I'm thinking of for the "string fname" constructor.

This uses a fairly complicated data structure to allow an application to fetch the unique information for an instance of a particular algorithm. It is thus wise to hide that implementation detail - the map container.

The id server is in this object because of the assumption made at the top of this post - is assumes the uuid generator is expensive to create and needs to be available from a global source. That component could be removed from this object if we can find a more lightweight algorithm to generate a uuid.

pavlis commented 4 years ago

You may well be reading this as I type this, but I already see one issue with the above. ProcessManager should not use a uuid created by boost. The id there should be an objectid string created by MongoDB for a given run. Seems to me this shows the need for yet another collection - let's call it jobs. The jobs collection would have documents that contain the python script run and maybe the file created by the planned "init" program that defines could be used to constuct ProcessManager. The id for a given id would then define a particular document in this "jobs" collection.

wangyinz commented 4 years ago

OK. It took me a while to absorb this, and I still don't think I got 100% of it, but it should be close. I think this design pretty much summarized all our discussions above and should function as expected. Well done!

We might discover more issues with it down the road, but for now I can only see a couple minor ones. The first one is definitely what you mentioned above. The uuid in ProcessManager appears unnecessary now. I agree that we should replace it with oid. Then, the use of id will appear less confusing in the whole package here. We will only have two different kinds of IDs, the application level ID and the database level ID. The former will be uuid, and it is only used internally by the application (within C++). The latter will be oid, and it is used whenever something directly interacts with the database (mostly on the Python side).

Another thing I noticed is the use of copies in ProcessingHistoryRecord. It seems to me that the alg can just be a pointer to an AlgorithmDefinition object, and the inputs can be a vector of shared_ptr to other records. Explicitly keeping copies may easily overflow the memory when the workflow gets complicated. Pointers should work as these records shouldn't be altered throughout the processing.

btw, if we remove the uuid server functionality in ProcessManager, it actually becomes a container only for job name and id, which makes it no longer an essential component of the object level history here. We should consider how to properly implement it so that we can handle other situations when we are not only dealing with algorithms within mspass but also calling other external packages (e.g. SAC). Similarily, some more thoughts should be put into the AlgorithmDefinition class.

pavlis commented 4 years ago

Ok, I can move forward now with a few of points I take from above:

  1. Already put uuid generator at the object level. I decided it was so trivial (a getter and putter) it was unnecessary baggage to put in in a common class shared by TimeSeries and Seismogram. I just put the same one line getter and two line putter code in TimeSeries.h and Seismogram.h.

    /*! Return sring representation of the unique id for this object. */
    std::id_string() const
    {
    return boost::uuids::to_string(id);
    };
    /*! PUtter for id. */
    void set_id(const string newid)
    {
    boost::uuids::string_generator gen;
    id=gen(newid);
    };
    private:
    boost::uuids::uuid id;

    Well, on reflection that isn't all. Need to think through when constructors do and don't set the uuid. I'll work on that.

  2. Very good idea to use shared_ptr to reference the history records. As an old school C programmer converted to C++ I acquired a great aversion to normal pointers in C++ code and the older (depricated) auto_ptr that is more drilled in my head would not have worked for this example. shared_ptr should work, so that is an excellent idea.

  3. The point about ProcessManager is interesting and requires some more thought. Splitting the object level history mechanism from ProcessManager has an useful side effect for us too - we don't need to solve that problem immediately. It is a different, self-contained problem we can work on later.

wangyinz commented 4 years ago

Yep, I agree adding another class inheritance is unnecessary there.

That's exactly why we want to separate the two. We will come back to that issue later on when everything else is more clear.

pavlis commented 4 years ago

In implementing this I ran into a barrier that has me a bit concerned. I aimed to implement the idea you had above of using shared_ptr to reduce the memory load of the history mechanism. A I experimented with a couple options for how to do that, I ran across a fundamental problem that is pretty subtle and to which I am not sure I know the answer. The problem is that if you use pointers of any kind in the history you run the risk of a worse memory bloat caused by (possible) delay free of transient seismic data. The reason is the ProcessingHistory in this design is a base class for Seismogram and TimeSeries objects. We want to make it also so that any future extensions with other types of data could utilize the same mechanism, so inheritance is a clean way to do that.

The subtle issue I don't know is if any pointer to any part of a complex object iike Seismogram and TimeSeries that have multiple base classes locks the whole object or just the portion pointed to by the base class to which is is linked? Either way it causes a problem. If it locks the whole object memory bloat will be rapid and extreme. If it only locks part, it could be subject to some weird memory management bugs. The only sure way I could see to avoid this is to pass around some global store for history data, but that would put us back to the need for data to hook to a global entity like the ProcessManager idea floated above.

Unless you have some great insight on this, I think i will back up and only copy history data. The API I've designed hides this as an implementation detail, so if this proves to be a problem down the road we can maybe revisit the issue. I'll hold off any action until I hear your thoughts on this. If you want my current version, I could push this branch to github. Let me know how you want to proceed.

wangyinz commented 4 years ago

hmm... I thought we are only making the list of ProcessingHistoryRecord as pointers:

class ProcessingHistory : public BasicProcessingHistory<std::string>
{
public:
  ErrorLogger elog;
  ProcessingHistory();
  /*...*/
private:
  list<shared_ptr<ProcessingHistoryRecord>> history;
};

Because the ProcessingHistoryRecord is not the base class and the ProcessingHistory is not pointed to by any pointers, it shouldn't cause the problem you described. Did I misunderstand something? Maybe I should take a look at the code and really think it through.

pavlis commented 4 years ago

The problem comes not there but from the contents of ProcessingHistoryRecord. It links back to others with in the inputs variable:

vector<ProcessingHistoryRecord> inputs;

If those are shared_ptr's they link back to earlier parents the first created them. Since that is the place bloat will occur the most (e.g. several hundred fold stack) it might make sense to have

vector<shared_ptr<ProcessingHistoryRecord>> inputs;

Then you definitely have links back to things that should be long gone. If they aren't pointers, but copies the gain is small if only done in the history linked list.

wangyinz commented 4 years ago

Yes, I forgot to say it but the inputs vector should definitely also be pointers. I thought it is OK to link as many ProcessingHistoryRecord through shared_ptr because the ownership of the record is shared. It doesn't matter if the actual data (e.g. TimeSeries) linked with a particular record is gone or not because the data doesn't necessarily own the record anyway, and the record being part of the history should be retained regardless of the status of the data object that first created it.

pavlis commented 4 years ago

My answer to that is maybe. I don't understand well enough how the std::shared_ptr implementation works in combination with C++ destructor rules. I THINK if I'm very careful to always uses new within shared pointer constructors to build these and not have the raw pointer ever appear it might work. As I read it shared_ptr just uses a internal reference counter to handle destruction. Not obvious to me that this couldn't create an oddity if the pointers are buried inside two layers of std containers. (a list and a vector) I'll go forward with this. I think there is a way to check the reference count in a shared_ptr. I just checked an indeed the use_count method does that. However, this statement in the documentation worries me:

Returns the number of different shared_ptr instances (this included) managing the current object. If there is no managed object, ​0​ is returned.

In multithreaded environment, the value returned by use_count is approximate (typical implementations use a memory_order_relaxed load) 

We should be able to use this for testing, but worry about the complexity whole approach this might add for the memory cost. Let me work a bit and get an estimate of the difference in memory cost.

wangyinz commented 4 years ago

I definitely don't fully understand shared_ptr, but based on my understanding, because we used shared_ptr, we will want to make sure all references to any ProcessingHistoryRecord object are through shared_ptrs. We are already ensuring this internally by having:

list<shared_ptr<ProcessingHistoryRecord>> history;
vector<shared_ptr<ProcessingHistoryRecord>> inputs;

Anything else that accesses/references a ProcessingHistoryRecord will be external, and they will be read access anyway, so it doesn't matter whether a shared_ptr is used or not (as long as the memory is not freed up by the last instance of shared_ptr).

I think multiple layer of containers shouldn't be a problem, but it is never a bad idea to verify that, so I did the following test:

#include <iostream>
#include <memory>
#include <vector>

struct Base
{
    Base() { std::cout << "  Base::Base()\n"; }
    ~Base() { std::cout << "  Base::~Base()\n"; }
};

int main()
{
    std::shared_ptr<Base> p = std::make_shared<Base>();

    std::vector<std::shared_ptr<Base>> v,v2;

    std::cout << "Created a shared Base\n"
              << "  p.get() = " << p.get()
              << ", p.use_count() = " << p.use_count() << '\n';
    v.push_back(p);
    std::cout << "Shared ownership within a vector\n"
              << "  v[0].get() = " << v[0].get()
              << ", v[0].use_count() = " << v[0].use_count() << '\n';
    v2 = v;
    std::cout << "Shared ownership within another vector\n"
              << "  v2[0].get() = " << v2[0].get()
              << ", v2[0].use_count() = " << v2[0].use_count() << '\n';
    p.reset(); // release ownership
    std::cout << "After p released\n"
              << "  p.get() = " << p.get()
              << ", p.use_count() = " << p.use_count() << '\n';
    std::cout << "After p released\n"
              << "  v[0].get() = " << v[0].get()
              << ", v[0].use_count() = " << v[0].use_count() << '\n';
    v2.clear(); // clear vector
    std::cout << "After v2 cleared\n"
              << "  v[0].get() = " << v[0].get()
              << ", v[0].use_count() = " << v[0].use_count() << '\n';
    std::cout << "removing an element from v\n";
    v.pop_back();
    std::cout << "All tests completed, the last one deleted Base\n";
}

and the output looks good:

  Base::Base()
Created a shared Base
  p.get() = 0x681be0, p.use_count() = 1
Shared ownership within a vector
  v[0].get() = 0x681be0, v[0].use_count() = 2
Shared ownership within another vector
  v2[0].get() = 0x681be0, v2[0].use_count() = 3
After p released
  p.get() = 0, p.use_count() = 0
After p released
  v[0].get() = 0x681be0, v[0].use_count() = 2
After v2 cleared
  v[0].get() = 0x681be0, v[0].use_count() = 1
removing an element from v
  Base::~Base()
All tests completed, the last one deleted Base

Also, based on my understanding of the document as well as many discussions can be found online, shared_ptr is thread safe for its control block, which means the increment/decrement of its internal reference counter is atomic. However, because use_count is only a read access, it is not guaranteed to give the accurate count in multithreaded environment, but I think that's reasonable practice for multithreading anyway.