Open shankari opened 1 year ago
yeah sure.
Understood some logic of the data / code flow where changes might be required in:
emission.pipeline.intake_stage.run_intake_pipeline_for_user
LABEL_INFERENCE stage occurs after MODE_INFERENCE Present in: emission.analysis.classification.inference.labels.pipeline as eacilp
Inside the main pipeline function these are called:
inferred_trip = ecwe.Entry.create_entry(user_id, "analysis/inferred_trip", cleaned_trip_dict)
# Inserts inference/labels
results = self.compute_and_save_algorithms(inferred_trip)
# Inserts analysis/inferred_labels
ensemble = self.compute_and_save_ensemble(inferred_trip, results)
# Finally analysis/inferred_trip inserted
self.ts.insert(inferred_trip)
In existing execution logs, can only see these 3 keys and not inference/trip_model. It does say no model found. So perhaps with the model available, it will perform the model loading step. Since without model, execution seems to occur really fast between trips.
2023-09-22 13:02:29,454:DEBUG:4550551040:getting key model_type in config
2023-09-22 13:02:29,454:DEBUG:4550551040:getting key model_storage in config
2023-09-22 13:02:29,459:DEBUG:4550551040:no GREEDY_SIMILARITY_BINNING model found for user 946c7bfb-7285-44d0-8c04-bfad40895e48
2023-09-22 13:02:29,459:DEBUG:4550551040:In predict_cluster_confidence_discounting: n=-1; returning as-is
2023-09-22 13:02:29,459:DEBUG:4550551040:insert_data called
2023-09-22 13:02:29,459:DEBUG:4550551040:insert called with entry of type <class 'emission.core.wrapper.entry.Entry'>
2023-09-22 13:02:29,459:DEBUG:4550551040:entry was fine, no need to fix it
2023-09-22 13:02:29,459:DEBUG:4550551040:Inserting entry Entry({'_id': ObjectId('650df2d58435f07026152b80'), 'user_id': UUID('946c7bfb-7285-44d0-8c04-bfad40895e48'), 'metadata': Metadata({'key': 'inference/labels', 'platform': 'server', 'write_ts': 1695412949.4593122, 'time_zone': 'America/Los_Angeles', 'write_local_dt': LocalDate({'year': 2023, 'month': 9, 'day': 22, 'hour': 13, 'minute': 2, 'second': 29, 'weekday': 4, 'timezone': 'America/Los_Angeles'}), 'write_fmt_time': '2023-09-22T13:02:29.459312-07:00'}), 'data': Labelprediction({'trip_id': ObjectId('650df2d58435f07026152b7f'), 'algorithm_id': 7, 'prediction': [], 'start_ts': 1440168891.095, 'end_ts': 1440169080.326})}) into timeseries
2023-09-22 13:02:29,460:DEBUG:4550551040:insert_data called
2023-09-22 13:02:29,461:DEBUG:4550551040:insert called with entry of type <class 'emission.core.wrapper.entry.Entry'>
2023-09-22 13:02:29,461:DEBUG:4550551040:entry was fine, no need to fix it
2023-09-22 13:02:29,461:DEBUG:4550551040:Inserting entry Entry({'_id': ObjectId('650df2d58435f07026152b81'), 'user_id': UUID('946c7bfb-7285-44d0-8c04-bfad40895e48'), 'metadata': Metadata({'key': 'analysis/inferred_labels', 'platform': 'server', 'write_ts': 1695412949.461068, 'time_zone': 'America/Los_Angeles', 'write_local_dt': LocalDate({'year': 2023, 'month': 9, 'day': 22, 'hour': 13, 'minute': 2, 'second': 29, 'weekday': 4, 'timezone': 'America/Los_Angeles'}), 'write_fmt_time': '2023-09-22T13:02:29.461068-07:00'}), 'data': Labelprediction({'trip_id': ObjectId('650df2d58435f07026152b7f'), 'start_ts': 1440168891.095, 'end_ts': 1440169080.326, 'algorithm_id': 7, 'prediction': []})}) into timeseries
2023-09-22 13:02:29,462:DEBUG:4550551040:insert called with entry of type <class 'emission.core.wrapper.entry.Entry'>
2023-09-22 13:02:29,462:DEBUG:4550551040:entry was fine, no need to fix it
2023-09-22 13:02:29,462:DEBUG:4550551040:Inserting entry Entry({'_id': ObjectId('650df2d58435f07026152b7f'), 'user_id': UUID('946c7bfb-7285-44d0-8c04-bfad40895e48'), 'metadata': Metadata({'key': 'analysis/inferred_trip', 'platform': 'server', 'write_ts': 1695412949.4539552, 'time_zone': 'America/Los_Angeles', 'write_local_dt': LocalDate({'year': 2023, 'month': 9, 'day': 22, 'hour': 13, 'minute': 2, 'second': 29, 'weekday': 4, 'timezone': 'America/Los_Angeles'}), 'write_fmt_time': '2023-09-22T13:02:29.453955-07:00'}), 'data': {'source': 'DwellSegmentationTimeFilter', 'end_ts': 1440169080.326, 'end_local_dt': {'year': 2015, 'month': 8, 'day': 21, 'hour': 7, 'minute': 58, 'second': 0, 'weekday': 4, 'timezone': 'America/Los_Angeles'}, 'end_fmt_time': '2015-08-21T07:58:00.326000-07:00', 'end_loc': {'type': 'Point', 'coordinates': [-122.0770814, 37.3947974]}, 'raw_trip': ObjectId('650df2d28435f070261529e0'), 'start_ts': 1440168891.095, 'start_local_dt': {'year': 2015, 'month': 8, 'day': 21, 'hour': 7, 'minute': 54, 'second': 51, 'weekday': 4, 'timezone': 'America/Los_Angeles'}, 'start_fmt_time': '2015-08-21T07:54:51.095000-07:00', 'start_loc': {'type': 'Point', 'coordinates': [-122.0822358, 37.3926163]}, 'duration': 189.23099994659424, 'distance': 601.605978572294, 'start_place': ObjectId('650df2d58435f07026152b73'), 'end_place': ObjectId('650df2d58435f07026152b74'), 'cleaned_trip': ObjectId('650df2d38435f07026152a0f'), 'inferred_labels': []}}) into timeseries
@MukuFlash03 in machine learning, there are two main steps:
We apply the model on every pipeline run
But we build the model separately, and currently, once a day
See the crontab/scripts that we run in the analysis container, and bin/build_label_model.py
What I've read up on:
I found this issue from March 2022, which gave a much better understanding of the two separate pipelines: model building and analysis.
I did take a look at the crontab and observed that there is a cronjob that runs bin/intake_multiprocess.py
every hour.
This file uses the scheduler to create parallel processes to execute epi.run_intake_pipeline
for different sets of uuids.
Also examined bin/build_label_model.py
which updates the trip_model:
eamur.update_trip_model(user_id, model_type, model_storage, min_trips)
I've also gone through the various pipeline stages and their respective called functions in epi.run_intake_pipeline
to see at which stage the "model loading" occurs but I'm unable to find it out as yet.
What I'm stuck at:
I am not making much progress on this as I'm stuck on understanding where to start in the code base. I'm unable to see where exactly the model is being loaded for each pipeline run in the code. I am still unclear on what "model loading" means in the server codebase.
update_trip_model
calls save_model
in emission/analysis/modelling/trip_model/model_storage.py
emission/storage/modifiable/abstract_model_storage.py
emission/storage/modifiable/builtin_model_storage.py
edb.get_model_db()
compute_and_save_algorithms
predict_labels_with_n
is called once for every trip.
So it is $O(n)$ where $n$ in the number of trips we are trying to predict.
And because of the way it is currently implemented, we are also loading the model $O(n)$ times
but there is only one model per user. so we should load it only $O(1)$ times.
the flow we have now is:
for t in trips:
load
predict
so rolled out, we get load, predict, load, predict, load, predict...
The flow we want is:
load
for t in trips:
predict
so rolled out, we should get load, predict, predict, predict...
I thought of some approaches on incorporating code changes to load the model only once for each user and not for every trip of every user. These approaches are:
1) Refactor eacilp.run_prediction_pipeline()
to extract the model loading step out of the prediction iteration loop.
2) Utilize the Singleton design pattern concept in the core deeper level classes which deal with the model
Now for the reasons why I chose to implement changes in both the functions mentioned above: edb.get_model_db() and bims.get_current_model():
As I haven't seen model data in Stage_updateable_models
, I am unsure whether the slowness with the model loading operation is due to creating indexes or sorting data:
Creating indexes: Model fetching from database followed by subsequent index creation present in edb.get_model_db():
ModelDB = _get_current_db().Stage_updateable_models
ModelDB.create_index([("user_id", pymongo.ASCENDING)])
ModelDB.create_index([("metadata.key", pymongo.ASCENDING)])
ModelDB.create_index([("metadata.write_ts", pymongo.DESCENDING)])
Querying and sorting data: in builtin_model_storage.get_current_model():
result_it = edb.get_model_db().find(find_query).sort("metadata.write_ts", -1).limit(1)
Thus, I've added a class variable in edb
and an instance variable in builtin_model_storage
(separate instance for each user) along with getter and setter methods to access these variables.
With regards to actually running the pipeline and testing out and observing the time delays, I am unclear on how to use all_ceo_db dataset.
I had a few queries regarding dataset itself being downloaded and extracted properly which I was able to clarify with help from @humbleOldSage:
Was all_ceo tar dataset downloaded properly? A. Yes, confirmed 20.94 GB size of tar file for Satyam as well.
Was it extracted properly? - Stage.updateable_models.bson had no data. A. Yes, even Satyam had 0 bytes for this file after extracting tar.
If no data in this collection, does Satyam have data in this collection (Stage_updateable_models) in his container? A. Satyam did have data in the container for this collection. I assume that I am missing some step to actually generate / build the model.
How to generate / build model so data is loaded into Stage.updateable_models? A.
High level comments:
Refactor eacilp.run_prediction_pipeline() to extract the model loading step out of the prediction iteration loop.
This is what I had in mind for this change.
Additionally, while this would solve the current problem specific to this LABEL_INFERENCE stage, similar changes would be required in other parts wherever model is being loaded.
The model is not being loaded anywhere else. Feel free to verify.
This would need exposing model related parameters outside of the model related classes and would not be providing good abstraction as in this part of the pipeline code we should not be concerned with how our model looks like.
I don't understand. Which model related parameter would we be exposing outside the model related classes?
Utilize the Singleton design pattern concept in the core deeper level classes which deal with the model
That is an interesting concept
Thus, I've added a class variable in edb and an instance variable in builtin_model_storage (separate instance for each user) along with getter and setter methods to access these variables.
I'd have to see the code, but this seems like overkill. Note that there is not really "the model" in the interface. Instead, all the interface methods take a key. Right now, we have only one model (the trip model), but we may have a user model or a tour model or other models in the future. That will likely add additional complexity to your code.
I agree that the singleton pattern is possible, and may be the correct design choice if refactoring eacilp.run_prediction_pipeline()
is too complicated. But I am not convinced that refactoring is that complicated or will lead to poor abstraction. Please provide additional clarification!
The model is not being loaded anywhere else. Feel free to verify.
Yes, verified, so it should be alright.
I don't understand. Which model related parameter would we be exposing outside the model related classes?
The iterative loop for each trip for a user is in eacilp.run_prediction_pipeline. This is where we’ll need to take the model load out of the loop.
Current execution flow for model loading is as follows:
eacilp.run_prediction_pipeline -> eacilp.compute_and_save_algorithms -> eacili.predict_cluster_confidence_discounting -> eamur.predict_labels_with_n -> eamur._load_stored_trip_model -> eamums.load_model -> esma.ModelStorage.get_model_storage.get_current_model
eamur.predict_labels_with_n
.eacilp.run_prediction_pipeline
that loads and stores the model.[model_type, model_storage]
in eacilp.run_prediction_pipeline
which is one level higher than eacili.predict_cluster_confidence_discounting
where these parameters are first introduced in the execution flow. inferrers.py
which is why I felt some abstraction might be lost.I am expecting the refactoring to look like this:
# In eacilp.run_prediction_pipeline:
def run_prediction_pipeline(self, user_id, time_range):
user_id = trip['user_id']
model_type = eamtc.get_model_type()
model_storage = eamtc.get_model_storage()
model = _load_stored_trip_model(user_id, model_type, model_storage, model_config)
for cleaned_trip in self.toPredictTrips:
results = self.compute_and_save_algorithms(inferred_trip, model)
def compute_and_save_algorithms():
predictions = []
for algorithm_id, algorithm_fn in primary_algorithms.items():
prediction = algorithm_fn(trip, model)
# In eacili.predict_cluster_confidence_discounting():
def predict_cluster_confidence_discounting():
# load application config
model_type = eamtc.get_model_type()
model_storage = eamtc.get_model_storage()
labels, n = eamur.predict_labels_with_n(trip, model_type, model_storage, model)
# In eamur.predict_labels_with_n():
# Model would now be available from parameters and no need to call _load_stored_trip_model()
def predict_labels_with_n(
trip: ecwc.Confirmedtrip,
model_type = eamumt.ModelType.GREEDY_SIMILARITY_BINNING,
model_storage = eamums.ModelStorage.DOCUMENT_DATABASE,
model_config = None,
model):
New execution flow would look like:
eacilp.run_prediction_pipeline -> eacilp.compute_and_save_algorithms -> eacili.predict_cluster_confidence_discounting -> eamur.predict_labels_with_n
I'd have to see the code, but this seems like overkill. Note that there is not really "the model" in the interface. Instead, all the interface methods take a key. Right now, we have only one model (the trip model), but we may have a user model or a tour model or other models in the future. That will likely add additional complexity to your code.
As mentioned above, an advantage of refactoring the pipeline code seems to be that the execution flow would avoid these function calls for each trip which reduces the execution stack size:
eamur._load_stored_trip_model -> eamums.load_model -> esma.ModelStorage.get_model_storage.get_current_model
However, the current code I've changed as per the Singleton pattern isn't too complicated either. It's just a simple if-else check on the model whether it's been loaded previously or not. Similar changes made in emission.storage.modifiable.builtin_model_storage
# In edb.get_model_db()
ModelDB = _get_model_db()
if ModelDB == None:
logging.debug("Started model load in edb.get_model_db()...")
ModelDB = _get_current_db().Stage_updateable_models
# Create indexes
return ModelDB
I see what you are saying, but I don't think that your solution is correct.
Here, we’ll need these parameters [model_type, model_storage] in eacilp.run_prediction_pipeline which is one level higher than eacili.predict_cluster_confidence_discounting where these parameters are first introduced in the execution flow.
We will actually need them in compute_and_save_algorithms
. This doesn't seem to be particularly terrible, since the configs come from eamtc
, which is not dependent on inferrer
or pipeline
compute_and_save_algorithms
currently also takes a trip input.trip_list
instead of one trip at a time to compute_and_save_algorithms
and predict_cluster_confidence_discounting
and then iterating over the trips inside predict_cluster_confidence_discounting
will also fix itModelDB
because get_current_db()
is already a singleton. And getting a reference to the DB is not the same as loading the data.You really need to load the data and understand what is going on.
of course, both methods can be made to work. Might be worthwhile to think through the pros and cons of:
Spent this week on two things:
1. Loading model data
Faced some issues initially but figured them out:
Issue 1: Was unable to execute the delete_pipeline_state command as incorrect outputs were received. Was returning count as 0 for all collections and no entries were being found.
Reason: Added debug statements to find this. Then realized that Stage_database was being used as the default DB. But in this snapshot, openpath_stage was the default DB with data.
Solution: So changed this in edb by updating Stage_database to openpath_stage as the default DB just for development / testing purposes and reverted back before committing and pushing code.
Issue 2: Couldn't find appropriate users and models that take a lot of time to execute.
Observation: For some users that I ran the pipeline, the execution completed really fast. May need to choose another user with more features Currently using a user with 14 features, so tried another user with 48 features user - but still running pretty quickly.
Additionally, found that not all users with models were executed completely on re-running pipeline: There were 10 distinct users in the stage snapshot dataset; of these only 2 ran pipeline completely:
updating incoming user inputs**********
.Solution: I ended up using just the 2 users anyways for testing as I was able to see the model load is done only once when I tested the pipeline with my refactored code.
2. Refactoring label inference pipeline by passing trip_list
Execution flow will now look like this. :
edb.get_model_db()
.eamur.predict_labels_with_n
.Deciding what to return back from predict_labels_with_n() all the back to run_prediction_pipeline()
Currently eamur.predict_labels_with_n
returns: predictions, n = model.predict(trip)
.
I traced this to emission.analysis.modeling.trip_model.greedy_similarity_binning
where
def predict(self, trip: ecwc.Confirmedtrip) -> Tuple[List[Dict], int]:
-> returns list of predictions for a single trip
Hence, I will now need to return a prediction_list which would be a list of tuples with a list of predictions for each trip and features count for each trip. This is because now instead of a single trip, I'll be passing a trip_list.
predict_labels_with_n()
will now return predictions in the following format:
Current: Tuple[List[Dict], int] -> New: List[Tuple[List[Dict], int]]
eacili.predict_cluster_confidence_discounting()
returns:
Current: List[Dict] -> New: List[List[Dict]
eacilp.compute_and_save_algorithms()
returns:
Current: List -> New: Dict[List]
eacilp.compute_and_save_ensemble()
returns:
Current: ecwl.Labelprediction() -> New: List[ecwl.Labelprediction()]
Changed the predictions results returned in eacilp.compute_and_save_algorithms()
from:
results -> results_list -> results_dict
Moved from results -> results_list since we are now passing a trip_list instead of a single trip. Initially, I thought this should be enough to keep the rest of the execution flow same and correctly process the returned data.
But I realized this was incorrect, since in this function, we are fetching predictions for a trip using each of the different available algorithms (we may have only one algorithm now, but can be more in the future.)
On using a list of results, I had inadvertently converted this current format of returned data into:
For all trips -> List [trip1_algo1, trip2_algo1, trip3_algo1, ..., trip1_algo2, trip2_algo2, trip3_algo2, ... ]
This is essentially a i x j
matrix of trips versus algorithms where i (rows) represent algorithms and j (columns) represent trips.
While this could be okay, this was messing up the logic in eacilp.compute_and_save_ensemble(). In this function, since the original expected data format looked like:
For trip_1 -> List [ trip1_algo1, trip1_algo2, ... ],
...
So, it was running the ensemble function on predictions made by different algorithms on the same trip.
However, contrasting with the above data format in my latest changes, it would now run the ensemble function on predictions made by different algorithms on different trips.
Hence, I decided to use a dictionary which would store predictions in the following format:
results_dict = {
trip_1 : List [ trip1_algo1, trip1_algo2, trip1_algo3, ... ],
trip_2 : List [ trip2_algo1, trip2_algo2, trip2_algo3, ... ],
...
}
This would ensure that like the original implementation, the ensemble() would receive a list different algorithmic predictions for the same trip.
There were 10 distinct users in the stage snapshot dataset; of these only 2 ran pipeline completely:
You have not loaded the data correctly. There are way more than 10 users in the stage snapshot. Please see the thread starting here: https://github.com/e-mission/op-admin-dashboard/issues/76#issuecomment-1773455384 and https://openpath-stage.nrel.gov/public/
7 users - Pipeline doesn’t run completelyl; aborts at updating incoming user inputs**.
Without the stacktrace, nobody can help you figure this out. The goal of these updates is not a status report, but a technical collaboration for implementation.
1 user - AssertionError: curr_state.curr_run_ts = 1695312650.8540428.
There are only approximately 100 issues for this error (j/k, hyperbole). The pipeline died/was killed in the middle. Reset it.
Thank you for the note on incorrect data loading. I was in fact loading incomplete data as not enough memory / RAM was allocated to my Docker container and hence the mongorestore process was being killed:
root@c2b013d69003:/tmp# mongorestore ./dump/ --numParallelCollections=1
2023-11-20T19:06:04.662+0000 preparing collections to restore from
2023-11-20T19:06:04.664+0000 reading metadata for openpath_stage.Stage_Profiles from dump/openpath_stage/Stage_Profiles.metadata.json
...
...
2023-11-20T19:10:30.212+0000 restoring openpath_stage.Stage_updateable_models from dump/openpath_stage/Stage_updateable_models.bson
2023-11-20T19:10:31.662+0000 [########################] openpath_stage.Stage_updateable_models 1.46GB/1.46GB (100.0%)
2023-11-20T19:10:34.667+0000 [########################] openpath_stage.Stage_updateable_models 1.46GB/1.46GB (100.0%)
Killed
Earlier limit was 8 GB while the restoration needed about 14 GB. I solved this by going to Settings -> Resources in Docker Desktop and increased the Memory to 16 GB, which was sufficient.
2023-11-20T19:47:40.852+0000 [########################] openpath_stage.Stage_timeseries 6.69GB/6.69GB (100.0%)
2023-11-20T19:47:40.852+0000 restoring indexes for collection openpath_stage.Stage_timeseries from metadata
2023-11-20T19:49:31.513+0000 finished restoring openpath_stage.Stage_timeseries (11295656 documents, 0 failures)
2023-11-20T19:49:31.516+0000 12860756 document(s) restored successfully. 0 document(s) failed to restore.
I now observed that the size of the database in mongo had increased, and there were more entries in the collections. Apologies, I see that I had not mentioned the collection name in my earlier comment but I intended to talk about Stage_updateable_models specifically.
I ran the below MongoDB query commands as soon as I mongorestored the stage dump data.
db.Stage_uuids.distinct("uuid").length
O/P: 76
# Count of documents per user
db.Stage_updateable_models.aggregate([
{
$group: {
_id: "$user_id",
count: { $sum: 1 }
}
},
{
$project: {
_id: 0,
user_id: "$_id",
count: 1
}
},
{
$sort: {user_id: 1}
}
]).pretty()
{ "count" : 265, "user_id" : BinData(3,"FsLTzW1iQtyY322SfNmjyA==") } { "count" : 40, "user_id" : BinData(3,"KDiFIC3fTvaNYMVXuwSZGA==") } { "count" : 4, "user_id" : BinData(3,"KkrK2O6YRf+0tTIjRXAGRg==") } { "count" : 27, "user_id" : BinData(3,"QvU5OiIiTXmep8B7hgWXhw==") } { "count" : 196, "user_id" : BinData(3,"nAhO9C+XQZa9N5UMF5OOxg==") } { "count" : 16, "user_id" : BinData(3,"ofHAH9MLQ/C8eLek2X1Xag==") } { "count" : 29, "user_id" : BinData(3,"rUlr7kCvTOOQPvcozHKqFw==") } { "count" : 29, "user_id" : BinData(3,"sIsl6VUhQOqIEhwyGm2ZNQ==") } { "count" : 28, "user_id" : BinData(3,"05lL7qtbRXqCV4A6bfBmfw==") } { "count" : 293, "user_id" : BinData(3,"2DpDod9rQu2Yb/W19hUCIQ==") }
7 users - Pipeline doesn’t run completelyl; aborts at updating incoming user inputs**.
Without the stacktrace, nobody can help you figure this out. The goal of these updates is not a status report, but a technical collaboration for implementation.
After loading the data completely, the pipeline runs completely for these 7 users as well. It somehow solved itself automatically, so I should be good here.
Anyways, as for the stack trace, there was no error, it just stopped abruptly. So, I wasn't sure what was happening and why.
ERROR:root:habitica not configured, game functions not supported
Traceback (most recent call last):
File "/Users/mmahadik/Documents/GitHub/e-mission-server/emission/net/ext_service/habitica/proxy.py", line 22, in <module>
key_file = open('conf/net/ext_service/habitica.json')
FileNotFoundError: [Errno 2] No such file or directory: 'conf/net/ext_service/habitica.json'
storage not configured, falling back to sample, default configuration
URL not formatted, defaulting to "openpath_stage"
Connecting to database URL localhost
analysis.debug.conf.json not configured, falling back to sample, default configuration
Nominatim Query URL Configured: None
Nominatim URL not configured, place decoding must happen on the client
overpass not configured, falling back to default overleaf.de
transit stops query not configured, falling back to default
The Zen of Python, by Tim Peters
Beautiful is better than ugly.
Explicit is better than implicit.
Simple is better than complex.
Complex is better than complicated.
Flat is better than nested.
Sparse is better than dense.
Readability counts.
Special cases aren't special enough to break the rules.
Although practicality beats purity.
Errors should never pass silently.
Unless explicitly silenced.
In the face of ambiguity, refuse the temptation to guess.
There should be one-- and preferably only one --obvious way to do it.
Although that way may not be obvious at first unless you're Dutch.
Now is better than never.
Although never is often better than *right* now.
If the implementation is hard to explain, it's a bad idea.
If the implementation is easy to explain, it may be a good idea.
Namespaces are one honking great idea -- let's do more of those!
analysis.trip_model.conf.json not configured, falling back to sample, default configuration
expectations.conf.json not configured, falling back to sample, default configuration
2023-11-16T15:54:26.097164-07:00**********UUID d3994bee-ab5b-457a-8257-803a6df0667f: moving to long term**********
New entry count == 0 and skip_if_no_new_data = False, continuing
2023-11-16T15:54:26.131746-07:00**********UUID d3994bee-ab5b-457a-8257-803a6df0667f: updating incoming user inputs**********
Have fixed the failing tests and one-time model loading implementation looks good so far. Details on failing tests here in the PR.
I did some manual testing by adding log statements to measure time elapsed between operations:
Outputs shown below confirm that:
A. Time elapsed for the current repeated model load:
2023-11-21T10:20:53.122137-07:00**********UUID 16c2d3cd-6d62-42dc-98df-6d927cd9a3c8: moving to long term**********
New entry count == 0 and skip_if_no_new_data = False, continuing
2023-11-21T10:20:53.669595-07:00**********UUID 16c2d3cd-6d62-42dc-98df-6d927cd9a3c8: updating incoming user inputs**********
2023-11-21T10:20:53.927615-07:00**********UUID 16c2d3cd-6d62-42dc-98df-6d927cd9a3c8: filter accuracy if needed**********
2023-11-21T10:20:53.938999-07:00**********UUID 16c2d3cd-6d62-42dc-98df-6d927cd9a3c8: segmenting into trips**********
2023-11-21T10:20:54.350146-07:00**********UUID 16c2d3cd-6d62-42dc-98df-6d927cd9a3c8: segmenting into sections**********
2023-11-21T10:20:54.388056-07:00**********UUID 16c2d3cd-6d62-42dc-98df-6d927cd9a3c8: smoothing sections**********
2023-11-21T10:20:54.431719-07:00**********UUID 16c2d3cd-6d62-42dc-98df-6d927cd9a3c8: cleaning and resampling timeline**********
2023-11-21T10:20:54.499884-07:00**********UUID 16c2d3cd-6d62-42dc-98df-6d927cd9a3c8: inferring transportation mode**********
2023-11-21T10:20:54.566852-07:00**********UUID 16c2d3cd-6d62-42dc-98df-6d927cd9a3c8: inferring labels**********
2023-11-21T10:21:17.391582-07:00 Inside run_prediction_pipeline: Model load time = 14.285613000000001
2023-11-21T10:21:17.391645-07:00 Inside run_prediction_pipeline: Predictions complete for trip_list in time = 3.8428109999999913
2023-11-21T10:21:17.391683-07:00 Inside run_prediction_pipeline: Saving inference_labels total time = 0.13518700000000017
2023-11-21T10:21:17.391703-07:00 Inside run_prediction_pipeline: Saving inferred_labels total time = 0.1299359999999945
2023-11-21T10:21:17.391721-07:00 Inside run_prediction_pipeline: Saving inferred_trip total time = 0.09874099999998975
2023-11-21T10:21:17.398696-07:00**********UUID 16c2d3cd-6d62-42dc-98df-6d927cd9a3c8: populating expectations**********
2023-11-21T10:21:17.430616-07:00**********UUID 16c2d3cd-6d62-42dc-98df-6d927cd9a3c8: creating confirmed objects **********
2023-11-21T10:21:17.489745-07:00**********UUID 16c2d3cd-6d62-42dc-98df-6d927cd9a3c8: creating composite objects **********
B. Time elapsed for the new refactored pipeline with one-time model load:
2023-11-20T18:25:55.946773-07:00**********UUID 16c2d3cd-6d62-42dc-98df-6d927cd9a3c8: moving to long term**********
New entry count == 0 and skip_if_no_new_data = False, continuing
2023-11-20T18:25:56.492612-07:00**********UUID 16c2d3cd-6d62-42dc-98df-6d927cd9a3c8: updating incoming user inputs**********
2023-11-20T18:25:56.984888-07:00**********UUID 16c2d3cd-6d62-42dc-98df-6d927cd9a3c8: filter accuracy if needed**********
2023-11-20T18:25:56.997506-07:00**********UUID 16c2d3cd-6d62-42dc-98df-6d927cd9a3c8: segmenting into trips**********
2023-11-20T18:25:58.908610-07:00**********UUID 16c2d3cd-6d62-42dc-98df-6d927cd9a3c8: segmenting into sections**********
2023-11-20T18:25:58.957242-07:00**********UUID 16c2d3cd-6d62-42dc-98df-6d927cd9a3c8: smoothing sections**********
2023-11-20T18:25:59.006705-07:00**********UUID 16c2d3cd-6d62-42dc-98df-6d927cd9a3c8: cleaning and resampling timeline**********
2023-11-20T18:25:59.074585-07:00**********UUID 16c2d3cd-6d62-42dc-98df-6d927cd9a3c8: inferring transportation mode**********
2023-11-20T18:25:59.129728-07:00**********UUID 16c2d3cd-6d62-42dc-98df-6d927cd9a3c8: inferring labels**********
2023-11-20T18:25:59.441397-07:00 Inside predict_labels_n: Model load time = 0.15951699999999924
2023-11-20T18:25:59.441452-07:00 Inside predict_labels_n: Predicting...
2023-11-20T18:26:03.121773-07:00 Inside predict_labels_n: Predictions complete for trip_list in time = 3.6762179999999995
2023-11-20T18:26:03.348690-07:00 Inside compute_and_save_algorithms: Saving inference/labels total time = 0.11739799999999967
2023-11-20T18:26:03.559746-07:00 Inside compute_and_save_ensemble: Saving inferred_labels total time = 0.116753000000001
2023-11-20T18:26:03.763400-07:00 Inside run_prediction_pipeline: Saving inferred_trip total time = 0.08415500000000087
2023-11-20T18:26:03.770997-07:00**********UUID 16c2d3cd-6d62-42dc-98df-6d927cd9a3c8: populating expectations**********
2023-11-20T18:26:03.803700-07:00**********UUID 16c2d3cd-6d62-42dc-98df-6d927cd9a3c8: creating confirmed objects **********
2023-11-20T18:26:03.866073-07:00**********UUID 16c2d3cd-6d62-42dc-98df-6d927cd9a3c8: creating composite objects **********
Anyways, as for the stack trace, there was no error, it just stopped abruptly. So, I wasn't sure what was happening and why.
If it happens again, you might want to add additional logs into the stage that failed to see why. It might just be the lack of data due to your previous poor load
However, I still see that the distinct users is 10 in Stage_updateable_models collection. I'm specifically considering these users since model is loaded from this table. I'm guessing model is built only for these users.
This should not be the case - let's check and compare against the real database.
As an aside, I also note that you seem to be using the mongo shell to explore the data - you might want to use the python shell instead to be more consistent with the way that the rest of the app accesses data. This doesn't really matter for this analysis, but just a note for the future.
Comparing the real DB with https://github.com/e-mission/e-mission-docs/issues/950#issuecomment-1819769393, I do see roughly the same number of users
$ ./e-mission-py.bash
Python 3.9.18 | packaged by conda-forge | (main, Aug 30 2023, 03:52:10)
[Clang 15.0.7 ] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import emission.core.get_database as edb
>>> edb.get_uuid_db().count_documents({})
79
However, I see way more models (two orders of magnitude)
>>> edb.get_model_db().count_documents({})
967
But they are from 10 users!
>>> all_users = edb.get_model_db().distinct("user_id")
>>> len(all_users)
10
So it looks like every user has ~ 100 models stored!
These duplicates are not uniformly distributed
>>> for i, u in enumerate(all_users):
... print("Model count for user %s = %s" % (i, edb.get_model_db().count_documents({"user_id": u})))
...
Model count for user 0 = 38
Model count for user 1 = 39
Model count for user 2 = 265
Model count for user 3 = 4
Model count for user 4 = 27
Model count for user 5 = 303
Model count for user 6 = 40
Model count for user 7 = 29
Model count for user 8 = 196
Model count for user 9 = 26
Aha! we are not cleaning up old/obsolete models - it is marked as TODO
emission/storage/modifiable/builtin_model_storage.py
def upsert_model(self, key:str, model: ecwb.WrapperBase):
...
ins_result = edb.get_model_db().insert_one(entry)
## TODO: Cleanup old/obsolete models
return ins_result.inserted_id
@MukuFlash03 after you finish the PR for this issue, can you:
k
models and delete anything older. k
should be a compile time constant or a config file parameter. Note that this is similar to https://github.com/e-mission/e-mission-docs/issues/1020Notes on Testing on Staging
I got to know that staging environment that we have set up internally isn't something you can just execute and test. It's already been executed and we can observe the logs directly to see if things are working properly.
I set the time frame for searching from: 12/03 to 12/16
Since, this implementation included the model loading for the Label inference pipeline stage, I figured the correct cloudwatch logs to observe would be: openpath-stage-analysis-intake
It looks like model isn't loaded for the two users present in the time range checked from 12/03 to 12/16. Additionally, some of my print statements are not being logged. I am not sure whether that's because I used print() instead of logging.debug() since the pipeline stage messages with (***) are also logged using print().
Here, I tried searching for the print statements I had added for model loading time, however, I did not find those suggesting that the model loading was not executed:
Testing on staging environment - Part 2:
Found some exceptions when checking the openpath-stage-analysis-build-trip-model
log group.
I believe this deals with building / storing trip model in eamur.run_model.py
| 2023-12-16T18:00:17.802-07:00 | Traceback (most recent call last):
| 2023-12-16T18:00:17.802-07:00 | File "/usr/src/app/emission/analysis/modelling/trip_model/run_model.py", line 77, in update_trip_model
| 2023-12-16T18:00:17.802-07:00 | model.fit(trips)
| 2023-12-16T18:00:17.802-07:00 | File "/usr/src/app/emission/analysis/modelling/trip_model/greedy_similarity_binning.py", line 146, in fit
| 2023-12-16T18:00:17.803-07:00 | self._generate_predictions()
| 2023-12-16T18:00:17.803-07:00 | File "/usr/src/app/emission/analysis/modelling/trip_model/greedy_similarity_binning.py", line 298, in _generate_predictions
| 2023-12-16T18:00:17.803-07:00 | unique_labels = user_label_df.groupby(group_cols).size().reset_index(name='uniqcount')
| 2023-12-16T18:00:17.803-07:00 | File "/root/miniconda-23.5.2/envs/emission/lib/python3.9/site-packages/pandas/core/groupby/groupby.py", line 2420, in size
| 2023-12-16T18:00:17.803-07:00 | result = self.grouper.size()
| 2023-12-16T18:00:17.803-07:00 | File "/root/miniconda-23.5.2/envs/emission/lib/python3.9/site-packages/pandas/core/groupby/ops.py", line 912, in size
| 2023-12-16T18:00:17.803-07:00 | ids, _, ngroups = self.group_info
| 2023-12-16T18:00:17.803-07:00 | File "pandas/_libs/properties.pyx", line 36, in pandas._libs.properties.CachedProperty.__get__
| 2023-12-16T18:00:17.803-07:00 | File "/root/miniconda-23.5.2/envs/emission/lib/python3.9/site-packages/pandas/core/groupby/ops.py", line 946, in group_info
| 2023-12-16T18:00:17.804-07:00 | comp_ids, obs_group_ids = self._get_compressed_codes()
| 2023-12-16T18:00:17.804-07:00 | File "/root/miniconda-23.5.2/envs/emission/lib/python3.9/site-packages/pandas/core/groupby/ops.py", line 977, in _get_compressed_codes
| 2023-12-16T18:00:17.804-07:00 | return ping.codes, np.arange(len(ping.group_index), dtype=np.intp)
| 2023-12-16T18:00:17.804-07:00 | File "/root/miniconda-23.5.2/envs/emission/lib/python3.9/site-packages/pandas/core/groupby/grouper.py", line 621, in codes
| 2023-12-16T18:00:17.804-07:00 | return self._codes_and_uniques[0]
| 2023-12-16T18:00:17.804-07:00 | File "pandas/_libs/properties.pyx", line 36, in pandas._libs.properties.CachedProperty.__get__
| 2023-12-16T18:00:17.804-07:00 | File "/root/miniconda-23.5.2/envs/emission/lib/python3.9/site-packages/pandas/core/groupby/grouper.py", line 692, in _codes_and_uniques
| 2023-12-16T18:00:17.804-07:00 | codes, uniques = algorithms.factorize( # type: ignore[assignment]
| 2023-12-16T18:00:17.804-07:00 | File "/root/miniconda-23.5.2/envs/emission/lib/python3.9/site-packages/pandas/core/algorithms.py", line 822, in factorize
| 2023-12-16T18:00:17.804-07:00 | codes, uniques = factorize_array(
| 2023-12-16T18:00:17.804-07:00 | File "/root/miniconda-23.5.2/envs/emission/lib/python3.9/site-packages/pandas/core/algorithms.py", line 578, in factorize_array
| 2023-12-16T18:00:17.804-07:00 | uniques, codes = table.factorize(
| 2023-12-16T18:00:17.804-07:00 | File "pandas/_libs/hashtable_class_helper.pxi", line 5943, in pandas._libs.hashtable.PyObjectHashTable.factorize
| 2023-12-16T18:00:17.804-07:00 | File "pandas/_libs/hashtable_class_helper.pxi", line 5857, in pandas._libs.hashtable.PyObjectHashTable._unique
| 2023-12-16T18:00:17.805-07:00 | TypeError: unhashable type: 'dict'
| 2023-12-16T18:00:17.805-07:00 | The above exception was the direct cause of the following exception:
| 2023-12-16T18:00:17.805-07:00 | Traceback (most recent call last):
| 2023-12-16T18:00:17.805-07:00 | File "/usr/src/app/bin/build_label_model.py", line 93, in <module>
| 2023-12-16T18:00:17.805-07:00 | eamur.update_trip_model(user_id, model_type, model_storage, min_trips)
| 2023-12-16T18:00:17.805-07:00 | File "/usr/src/app/emission/analysis/modelling/trip_model/run_model.py", line 96, in update_trip_model
| 2023-12-16T18:00:17.805-07:00 | raise IOError(msg) from e
| 2023-12-16T18:00:17.805-07:00 | OSError: failure updating user label pipeline state for user 520819a4-abc4-437a-bb9f-7e2ab4b12767
While looking at the reason why one of the pipelines was stuck in the
LABEL_INFERENCE
, I found that we appear to re-load the full trip model before each trip inference.This is:
LABEL_INFERENCE
stage is being runAt a high level, it takes ~ 10 minutes to load the model every time for this user. So this stage is roughly O(600n), where n is the number of trips we are processing.
If we can load it just once, at the beginning of the
LABEL_INFERENCE
stage, we can get it down to O(n+600).Note also that, strictly speaking, we don't know whether the slowness is due to time spent in saving the
inferred_trip
or the time spent in loading the trip model. But given that the previous saves (inference/labels
,analysis/inferred_labels
) are very fast, I am fairly sure that the slowness is from the model load.@humbleOldSage, can you handle this next?
You might want to verify the root cause and the speedup by:
./e-mission-py.bash bin/monitor/delete_single_pipeline_state_and_objects.py -s LABEL_INFERENCE -d inference/labels analysis/inferred_labels analysis/inferred_trip -u <uuid>
), andThis will allow you to verify that the model load is in fact the problem, and once you have fixed it, verify the (100x?!) speedup for the stage.