e-mission / e-mission-docs

Repository for docs and issues. If you need help, please file an issue here. Public conversations are better for open source projects than private email.
https://e-mission.readthedocs.io/en/latest
BSD 3-Clause "New" or "Revised" License
15 stars 34 forks source link

Loading full 1.5 year CanBikeCO dataset and adding sensed sections to confirmed trips is slow #854

Open allenmichael099 opened 1 year ago

allenmichael099 commented 1 year ago

I started running my (not actually parallel) parallel_add_sensed_sections.py script in allenmicael099's emission-eval-private-data to add sensed section fields to the confirmed trips and was able to successfully load the database after an unknown number of hours, but the script is still updating trip documents. I started it at 11:40ish pm and it is now 3:50 pm Eastern time - ~16 hours

I’m not sure how long it will take to finish updating. At a minimum I think the updating trips process should take 7 hours since doing so for PC took 64 minutes when the data was up to May 2022.

Looking at the code more carefully, I think it could take several days.

When I’m finding sections for one trip at a time with emission.storage.decorations.trip_queries > get_sections_for_trip(key, user_id, trip_id), it looks like I’m getting a new database object each time.

On a related note with the analysis notebooks: When I fetch trip dataframes with each get_time_series call, does a new Stage_timeseries database object have to be created?

for u in user_list:
    ts = esta.TimeSeries.get_time_series(u)
    ct_df = ts.get_data_df("analysis/confirmed_trip")
shankari commented 1 year ago

To answer your questions:

When I fetch trip dataframes with each get_time_series call, does a new Stage_timeseries database object have to be created?

I think you are a bit confused between the database, a connection to the database, and the database wrapper object. There is only one timeseries collection in the database. We open a connection to the database every time we create a wrapper object. We create a wrapper object every time we call get_time_series.

I don't think that the slowdown is due to the creation of wrapper objects. My gut feeling is that it is due to making multiple queries to the database, each of which is slow. A faster approach would be to get_data_df for the inferred sections as well so that they would all get into memory and then merge the two dataframes.

Note that this approach can also run into issues with limited memory, so you probably want to process the data in batches of a month or so at a time.

allenmichael099 commented 1 year ago

Here is the error where it stopped:

(emission) mallen2-31552s:Error_bars mallen2$ source $HOME/OpenPATH_Data/e-mission-server/e-mission-py.bash parallel_add_sensed_sections.py
storage not configured, falling back to sample, default configuration
URL not formatted, defaulting to "Stage_database"
Connecting to database URL localhost
Finding all confirmed trips
Updating trip documents
Traceback (most recent call last):
  File "parallel_add_sensed_sections.py", line 76, in <module>
    update_trip(ct)
  File "parallel_add_sensed_sections.py", line 35, in update_trip
    ct["data"]["section_modes"], ct["data"]["section_distances"] = get_section_modes_and_distances(ct)
  File "parallel_add_sensed_sections.py", line 21, in get_section_modes_and_distances
    segments = esdtq.get_sections_for_trip(key = "analysis/inferred_section", user_id = ct["user_id"], trip_id = ct["data"]['cleaned_trip'])
  File "/Users/mallen2/alternate_branches/eval-compatible-server/e-mission-server/emission/storage/decorations/trip_queries.py", line 62, in get_sections_for_trip
    return [ecwe.Entry(doc) for doc in section_doc_cursor]
  File "/Users/mallen2/alternate_branches/eval-compatible-server/e-mission-server/emission/storage/decorations/trip_queries.py", line 62, in <listcomp>
    return [ecwe.Entry(doc) for doc in section_doc_cursor]
  File "/opt/anaconda3/envs/emission/lib/python3.7/site-packages/pymongo/cursor.py", line 1207, in next
    if len(self.__data) or self._refresh():
  File "/opt/anaconda3/envs/emission/lib/python3.7/site-packages/pymongo/cursor.py", line 1124, in _refresh
    self.__send_message(q)
  File "/opt/anaconda3/envs/emission/lib/python3.7/site-packages/pymongo/cursor.py", line 1001, in __send_message
    address=self.__address)
  File "/opt/anaconda3/envs/emission/lib/python3.7/site-packages/pymongo/mongo_client.py", line 1372, in _run_operation_with_response
    exhaust=exhaust)
  File "/opt/anaconda3/envs/emission/lib/python3.7/site-packages/pymongo/mongo_client.py", line 1471, in _retryable_read
    return func(session, server, sock_info, slave_ok)
  File "/opt/anaconda3/envs/emission/lib/python3.7/site-packages/pymongo/mongo_client.py", line 1366, in _cmd
    unpack_res)
  File "/opt/anaconda3/envs/emission/lib/python3.7/site-packages/pymongo/server.py", line 137, in run_operation_with_response
    first, sock_info.max_wire_version)
  File "/opt/anaconda3/envs/emission/lib/python3.7/site-packages/pymongo/helpers.py", line 168, in _check_command_response
    max_wire_version)
pymongo.errors.OperationFailure: error while multiplanner was selecting best plan :: caused by :: 
Sort exceeded memory limit of 104857600 bytes, but did not opt in to external sorting., 
full error: {'ok': 0.0, 'errmsg': 'error while multiplanner was selecting best plan :: 
caused by :: Sort exceeded memory limit of 104857600 bytes, 
but did not opt in to external sorting.', 'code': 292, 'codeName': 'QueryExceededMemoryLimitNoDiskUseAllowed'}
shankari commented 1 year ago

so python automatically manages memory through reference counting and garbage collection. So you want to use del and gc.collect() to free up memory

so you would do something like

for u in user_list:
    ts = get_timeseries(u)
    for month in month_list:
         time_query_for_month = ...
         ct_df = ts.get_data_df("analysis/confirmed_trip", time_query_for_month)
         is_df = ts.get_data_df("analysis/inferred_section", time_query_for_month)
         # merge ct_df and is_df based on the trip key and find the percentages
         del ct_df
         del is_df
         gc.collect()

This should make it faster, but might or might not fix the error above, which is in the database and not in your script. If the database error recurs, you should probably look into the "external sorting" option and configure your database to use it.

allenmichael099 commented 1 year ago

One question I just thought of: how do I update trips in the database after getting a dataframe? When each ct was a mongo document I used:

ct["data"]["section_modes"], ct["data"]["section_distances"] = modes, distances
# Update the corresponding confirmed_trip entry in the database.
estbt.BuiltinTimeSeries.update(ecwe.Entry(ct))

But now ct is a row of a dataframe.

shankari commented 1 year ago

ok, don't use get_data_df, use get_entries and match against the in-memory entries or use get_entries, then use to_data_df (https://github.com/e-mission/e-mission-server/blob/b402927f427d7b6fdf4f759e7e36979ca06834e8/emission/storage/timeseries/builtin_timeseries.py#L278) to convert it to a dataframe, do the matching, and then find the trip in the entry list using the _id

Lots of options here