e-mission / e-mission-docs

Repository for docs and issues. If you need help, please file an issue here. Public conversations are better for open source projects than private email.
https://e-mission.readthedocs.io/en/latest
BSD 3-Clause "New" or "Revised" License
15 stars 34 forks source link

Data copy from one server to the other: new data is not processed #845

Open shankari opened 1 year ago

shankari commented 1 year ago

As we have separate enclaves for different projects, we need to have mechanisms for copying data from one enclave to another.

The second example has actually occurred, and I copied over the data using a combination of bin/debug/extract_timeline_for_day_range_and_user.py and bin/debug/load_multi_timeline_for_range.py

The data was copied over correctly, and the user confirmed that it was displayed correctly. However, data collected after the migration is not processed correctly and always remains in draft mode.

On investigating further, this is because the CLEAN_AND_RESAMPLE stage fails with the following error


2023-01-14T06:16:40.952Z    2023-01-14 06:16:40,952:DEBUG:140037033350976:get_entry_at_ts query = {'user_id': [redacted], 'metadata.key': 'background/filtered_location', 'data.ts': 1672435958.12}
    2023-01-14T06:16:41.521Z    2023-01-14 06:16:41,521:DEBUG:140037033350976:get_entry_at_ts result = None
    2023-01-14T06:16:41.522Z    2023-01-14 06:16:41,521:ERROR:140037033350976:Found error 'NoneType' object is not iterable while processing trip Entry({'_id': ObjectId('63b1db470f8d9992c7b4455f'), 'user_id': [redacted], 'metadata': {'key': 'segmentation/raw_trip', 'platform': 'server', 'write_ts': 1672600391.668694, 'time_zone': 'America/Los_Angeles', 'write_local_dt': {'year': 2023, 'month': 1, 'day': 1, 'hour': 11, 'minute': 13, 'second': 11, 'weekday': 6, 'timezone': 'America/Los_Angeles'}, 'write_fmt_time': '2023-01-01T11:13:11.668694-08:00'}, 'data': {'source': 'DwellSegmentationTimeFilter', 'start_ts': 1672586043.121, 'start_local_dt': {'year': 2023, 'month': 1, 'day': 1, 'hour': 8, 'minute': 14, 'second': 3, 'weekday': 6, 'timezone': 'America/Denver'}, 'start_fmt_time': '2023-01-01T08:14:03.121000-07:00', 'start_place': ObjectId('63af64b6aef87808843e480c'), 'start_loc': {'type': 'Point', '
coordinates': [-104.9209856, 39.9478752]}, 'end_ts': 1672588188.026, 'end_local_dt': {'year': 2023, 'month': 1, 'day': 1, 'hour': 8, 'minute': 49, 'second': 48, 'weekday': 6, 'timezone': 'America/Denver'}, 'end_fmt_time': '2023-01-01T08:49:48.026000-07:00', 'end_place': ObjectId('63b1db470f8d9992c7b44560'), 'end_loc': {'type': 'Point', 'coordinates': [-105.056373, 40.0321814]}, 'duration': 2144.9049999713898, 'distance': 14863.150328118767}})
    2023-01-14T06:16:41.522Z    Traceback (most recent call last):
    2023-01-14T06:16:41.522Z    File "/usr/src/app/emission/analysis/intake/cleaning/clean_and_resample.py", line 121, in save_cleaned_segments_for_timeline
    2023-01-14T06:16:41.522Z    filtered_trip = get_filtered_trip(ts, trip)
    2023-01-14T06:16:41.522Z    File "/usr/src/app/emission/analysis/intake/cleaning/clean_and_resample.py", line 187, in get_filtered_trip
    2023-01-14T06:16:41.522Z    (filtered_section_entry, point_list) = get_filtered_section(filtered_trip_entry, section)
    2023-01-14T06:16:41.522Z    File "/usr/src/app/emission/analysis/intake/cleaning/clean_and_resample.py", line 292, in get_filtered_section
    2023-01-14T06:16:41.522Z    with_speeds_df = get_filtered_points(section, filtered_section_data)
    2023-01-14T06:16:41.522Z    File "/usr/src/app/emission/analysis/intake/cleaning/clean_and_resample.py", line 391,
 in get_filtered_points
    2023-01-14T06:16:41.522Z    filtered_loc_df = _add_start_point(filtered_loc_df, raw_start_place, ts, section.data.sensed_mode)
    2023-01-14T06:16:41.522Z    File "/usr/src/app/emission/analysis/intake/cleaning/clean_and_resample.py", line 562, in _add_start_point
    2023-01-14T06:16:41.522Z    raw_start_place_enter_loc_entry = _get_raw_place_enter_loc_entry(ts, raw_start_place)
    2023-01-14T06:16:41.522Z    File "/usr/src/app/emission/analysis/intake/cleaning/clean_and_resample.py", line 696, in _get_raw_place_enter_loc_entry
    2023-01-14T06:16:41.522Z    raw_place.data.enter_ts))
    2023-01-14T06:16:41.522Z    File "/usr/src/app/emission/core/wrapper/wrapperbase.py", line 42, in __init__
    2023-01-14T06:16:41.522Z    super(WrapperBase, self).__init__(*args, **kwargs)
    2023-01-14T06:16:41.522Z    File "/root/miniconda-4.12.0/envs/emission/lib/python3.7/site-packages/attrdict/dictionary.py", line 17, in __init__
    2023-01-14T06:16:41.522Z    super(AttrDict, self).__init__(*args, **kwargs)
    2023-01-14T06:16:41.522Z    TypeError: 'NoneType' object is not iterable

We need to see why the start point is not available

shankari commented 1 year ago

Related code is

def _get_raw_place_enter_loc_entry(ts, raw_place):
    if raw_place.data.enter_ts is not None:
        raw_start_place_enter_loc_entry = ecwe.Entry(
            ts.get_entry_at_ts("background/filtered_location", "data.ts",
                               raw_place.data.enter_ts))
    else:
        # These are not strictly accurate because the exit ts for the place
        # corresponds to the ts of the first point in the section. We are trying
        # to determine the correct exit_ts here. But its a reasonable estimate,
        # at least for the time zone, which is required when we extrapolate
        # note that this will fail for the specific case in which the first point outside
        # the geofence of the first place in a trip chain is in a different timezone
        # than the point itself. We can work around that by storing the enter_ts even
        # for the first place.
        dummy_section_start_loc_doc = {
            "loc": raw_place.data.location,
            "latitude": raw_place.data.location.coordinates[1],
            "longitude": raw_place.data.location.coordinates[0],
            "ts": raw_place.data.exit_ts,
            "fmt_time": raw_place.data.exit_fmt_time,
            "local_dt": raw_place.data.exit_local_dt
        }
        raw_start_place_enter_loc_entry = ecwe.Entry.create_entry(raw_place.user_id,
                                                                  "background/filtered_location",
                                                                  dummy_section_start_loc_doc)
    logging.debug("Raw place is %s and corresponding location is %s" %
                  (raw_place.get_id(), raw_start_place_enter_loc_entry.get_id()))
    return raw_start_place_enter_loc_entry
shankari commented 1 year ago

So my current speculation is that the raw place for this trip was copied over from the old data and the link up with the new data is broken somehow. However, if we copied everything, we should have copied over all of the location timestamps as well. Why was this missing?

Let's first get the raw place (unnecessary fields redacted):

{'_id': ObjectId('63af64b6aef87808843e480c'), 'metadata': {'key': 'segmentation/raw_place', 'write_fmt_time': '2022-12-30T14:22:46.134360-08:00'}, 'data': {'source': 'DwellSegmentationTimeFilter', 'enter_ts': 1672435958.12, 'enter_fmt_time': '2022-12-30T14:32:38.120000-07:00', 'ending_trip': ObjectId('63af64b6aef87808843e480b'), 'exit_ts': 1672586043.121, 'exit_fmt_time': '2023-01-01T08:14:03.121000-07:00', 'starting_trip': ObjectId('63b1db470f8d9992c7b4455f'), 'duration': 150085.00100016594}}

As expected, the enter timestamp is from the 30th, so before the copy. Let's confirm that there really isn't a location point with that timestamp. There isn't, even if we expand the range a bit.

>>> edb.get_timeseries_db().find_one({'user_id': UUID(...), 'metadata.key': 'background/filtered_location', 'data.ts': 1672435958.12})
>>> edb.get_timeseries_db().find_one({'user_id': UUID(...), 'metadata.key': 'background/filtered_location', 'data.ts': {"$gte": 1672435900, "$lte": 1672435800}})
shankari commented 1 year ago

Did all the locations not get exported properly? I was worried about that, but I checked the number of entries and it seemed to match. Let's double-check the location profile. Now that we have loaded a bunch of other data on top, our queries have to be more complicated. #$#$# trying to do this at the last minute...

shankari commented 1 year ago

Hm, searching for the last three entries before the start_ts, they are from Oct

>>> list(edb.get_timeseries_db().find({'user_id': UUID('...'), 'metadata.key': 'background/filtered_location', 'data.ts': {"$lte": 1672435800}}).sort("data.ts", -1).limit(3))
               data.fmt_time
0  2022-10-02T08:47:59-06:00
1  2022-10-02T08:47:56-06:00
2  2022-10-02T08:47:53-06:00
shankari commented 1 year ago

let's look for everything a little after the exit_ts to see if there is a transition

>>> pd.json_normalize(list(edb.get_timeseries_db().find({'user_id': UUID('...'), 'metadata.key': 'background/filtered_location', 'data.ts': {"$lte": 1672586043.121 + 10 * 60}}).sort("data.ts", -1).limit(20)))[["data.fmt_time"]]
                       data.fmt_time
0   2023-01-01T08:23:48.014000-07:00
1   2023-01-01T08:23:17.079000-07:00
2   2023-01-01T08:22:47.037000-07:00
3   2023-01-01T08:21:14.033000-07:00
4   2023-01-01T08:20:12.074000-07:00
5   2023-01-01T08:19:41.096000-07:00
6   2023-01-01T08:19:11.055000-07:00
7   2023-01-01T08:18:40.019000-07:00
8   2023-01-01T08:18:09.021000-07:00
9   2023-01-01T08:17:38.002000-07:00
10  2023-01-01T08:16:36.016000-07:00
11  2023-01-01T08:16:05.105000-07:00
12  2023-01-01T08:15:35.090000-07:00
13  2023-01-01T08:15:05.083000-07:00
14  2023-01-01T08:14:35.025000-07:00
15  2023-01-01T08:14:03.121000-07:00
16         2022-10-02T08:47:59-06:00
17         2022-10-02T08:47:56-06:00
18         2022-10-02T08:47:53-06:00
19         2022-10-02T08:47:50-06:00
shankari commented 1 year ago

Checked the mongodump and confirmed that there are location entries there. So we need to change bin/debug/extract_timeline_for_day_range_and_user.py to retry until we get to the actual end of the list. Depending on how we do this, we may end up with a few duplicate entries at the borders. So we will need to change bin/debug/load_multi_timeline_for_range.py to deal with duplicate entries.

I could also write a special script just for this that finds the missing entries from the timeseries and copies them over, but it seems like the more general fix addresses the underlying problem as well, and will fix this one too.

>>> pd.json_normalize(list(edb.get_timeseries_db().find({'user_id': UUID('...'), 'metadata.key': 'background/filtered_location', 'data.ts': {"$lte": 1672586043.121 + 10 * 60}}).sort("data.ts", -1).limit(20)))[["data.fmt_time"]]
                       data.fmt_time
0   2022-12-30T14:35:41.025000-07:00
1   2022-12-30T14:35:10.056000-07:00
2   2022-12-30T14:34:40.047000-07:00
3   2022-12-30T14:34:09.100000-07:00
4   2022-12-30T14:33:38.201000-07:00
5   2022-12-30T14:33:08.162000-07:00
6   2022-12-30T14:32:38.120000-07:00
7   2022-12-30T14:32:08.073000-07:00
8   2022-12-30T14:31:37.184000-07:00
9   2022-12-30T14:31:07.156000-07:00
10  2022-12-30T14:30:37.096000-07:00
11  2022-12-30T14:30:07.053000-07:00
12  2022-12-30T14:29:36.122000-07:00
13  2022-12-30T14:29:06.083000-07:00
14  2022-12-30T14:28:36.065000-07:00
15  2022-12-30T14:28:01.315000-07:00
16  2022-12-30T14:27:31.296000-07:00
17  2022-12-30T14:27:01.227000-07:00
18  2022-12-30T14:26:31.197000-07:00
19  2022-12-30T14:26:01.190000-07:00

Checking to see whether this happens for the recreated locations as well. I'm guessing not, since the original copy seemed to work fine.

>>> pd.json_normalize(list(edb.get_analysis_timeseries_db().find({'user_id': UUID('...'), 'metadata.key': 'analysis/recreated_location', 'data.ts': {"$lte": 1672586043.121 + 10 * 60}}).sort("data.ts", -1).limit(20)))[["data.fmt_time"]]
                       data.fmt_time
0   2022-12-30T14:32:38.120000-07:00
1   2022-12-30T14:32:16.888177-07:00
2   2022-12-30T14:31:46.888177-07:00
3   2022-12-30T14:31:16.888177-07:00
4   2022-12-30T14:30:46.888177-07:00
5   2022-12-30T14:30:16.888177-07:00
6   2022-12-30T14:29:46.888177-07:00
7   2022-12-30T14:29:16.888177-07:00
8   2022-12-30T14:28:46.888177-07:00
9   2022-12-30T14:28:16.888177-07:00
10  2022-12-30T14:27:46.888177-07:00
11  2022-12-30T14:27:16.888177-07:00
12  2022-12-30T14:26:46.888177-07:00
13  2022-12-30T14:26:16.888177-07:00
14  2022-12-30T14:25:46.888177-07:00
15  2022-12-30T14:25:16.888177-07:00
16  2022-12-30T14:24:46.888177-07:00
17  2022-12-30T14:24:16.888177-07:00
18  2022-12-30T14:23:46.888177-07:00
19  2022-12-30T14:23:16.888177-07:00

BINGO! As expected, the recreated locations have in fact been copied over correctly.

shankari commented 1 year ago

Tried obvious solution to retry until we get to the end, similar to https://github.com/MobilityNet/mobilitynet-analysis-scripts/blob/master/emeval/input/spec_details.py#L173

Code diff
``` diff --git a/bin/debug/extract_timeline_for_day_range_and_user.py b/bin/debug/extract_timeline_for_day_range_and_user.py index 9fe0ee2a..e9ff091e 100644 --- a/bin/debug/extract_timeline_for_day_range_and_user.py +++ b/bin/debug/extract_timeline_for_day_range_and_user.py @@ -18,6 +18,7 @@ import json import bson.json_util as bju import arrow import argparse +import copy import emission.core.wrapper.user as ecwu import emission.storage.timeseries.abstract_timeseries as esta @@ -28,6 +29,32 @@ import emission.storage.timeseries.cache_series as estcs # https://github.com/e-mission/e-mission-docs/issues/356#issuecomment-520630934 import emission.net.usercache.abstract_usercache as enua +def check_done(curr_batch, list_so_far, query): + if len(curr_batch) == 0: + logging.debug(f"No more entries, finished reading for {query}") + return True + elif len(curr_batch) == 1 and curr_batch[0]["_id"] == list_so_far[-1]["_id"]: + logging.debug(f"Re-read the same single entry, finished reading for {query}") + return True + else: + logging.debug(f"curr_batch has length {len(curr_batch)}, not finished reading for {query}") + return False + +def get_with_retry(user_id, in_query): + # Let's clone the query since we are going to modify it + query = copy.copy(in_query) + # converts "data.ts" = ["data", "ts"] + timeTypeSplit = query.timeType.split(".") + list_so_far = [] + done = False + while not done: + curr_batch = list(estcs.find_entries(user_id, key_list=None, time_query=query)) + logging.debug(f"Retrieved {len(curr_batch)} entries for {query}") + done = check_done(curr_batch, list_so_far, query) + list_so_far.extend(curr_batch) + query.startTs = curr_batch[-1][timeTypeSplit[0]][timeTypeSplit[1]] + return list_so_far + ```

However, we get a concatentation of the ts and uc data, from escts So if the timeseries has > limit entries, we will retrieve the first "limit" entries, let's say that the last entry is at t1 we then retrieve the usercache, the last entry of which may be t2 > t1. So we will again skip the entries between t2 and t1

In practice, while testing with the user above, it looks like t1 > t2, so we end up with an infinite loop

DEBUG:root:Retrieved 2787 entries for TimeQuery data.ts with range [1672195088.341, 1703980800)
DEBUG:root:curr_batch has length 2787, not finished reading for TimeQuery data.ts with range [1672195088.341, 1703980800)
DEBUG:root:curr_query = {'user_id': UUID('802667b6-371f-45b2-9c7a-bb051244836a'), 'data.ts': {'$lte': 1703980800, '$gte': 1672195088.341}}, sort_key = data.ts
DEBUG:root:orig_ts_db_keys = None, analysis_ts_db_keys = None
DEBUG:root:finished querying values for None, count = 2491
DEBUG:root:finished querying values for None, count = 295
DEBUG:root:orig_ts_db_matches = 2491, analysis_ts_db_matches = 295
DEBUG:root:last timeseries entry is 1672435958.12
DEBUG:root:Found 1 messages in response to query {'user_id': UUID('802667b6-371f-45b2-9c7a-bb051244836a'), '$or': [{'metadata.type': 'message'}, {'metadata.type': 'sensor-data'}, {'metadata.type': 'rw-document'}], 'data.ts': {'$lte': 1703980800, '$gte': 1672195088.341}}
DEBUG:root:last uc entry is 1672195088.341

Regardless, we need to check the timeseries and the usercache separately, which means that we need to change find_entries. Let's add a flag (retry) for backwards compatibility and set it to false. For the common case where we are requesting data for a week at a time, we will not hit the limit and will not need to take the performance hit of retrying

shankari commented 1 year ago

Added the retry flag and some more logging. And even without the retry, we do read all the points successfully while reading from a restored mongodump. Not quite sure why we didn't do this on the server - maybe we had dropped down the max limit of entries read? Alas, the server is now shut down, so I can't verify.

DEBUG:root:start_day_ts = 1577836800 (2020-01-01T00:00:00+00:00), end_day_ts = 1703980800 (2023-12-31T00:00:00+00:00)
WARNING:root:Called find_entries with retry=False but duration = 126144000 > 6 months, result is likely TRUNCATED
DEBUG:root:curr_query = {'user_id': UUID('802667b6-371f-45b2-9c7a-bb051244836a'), 'data.ts': {'$lte': 1703980800, '$gte': 1577836800}}, sort_key = data.ts
DEBUG:root:orig_ts_db_keys = None, analysis_ts_db_keys = None
DEBUG:root:finished querying values for None, count = 601169
DEBUG:root:finished querying values for None, count = 103488
DEBUG:root:orig_ts_db_matches = 601169, analysis_ts_db_matches = 103488
DEBUG:root:after reading 353488 entries, last timeseries entry is 1672435958.12(2022-12-30T21:32:38.120000+00:00)
DEBUG:root:Found 1030 messages in response to query {'user_id': UUID('802667b6-371f-45b2-9c7a-bb051244836a'), '$or': [{'metadata.type': 'message'}, {'metadata.type': 'sensor-data'}, {'metadata.type': 'rw-document'}], 'data.ts': {'$lte': 1703980800, '$gte': 1577836800}}
DEBUG:root:after reading 1030 entries, last uc entry is 1672195088.341(2022-12-28T02:38:08.341000+00:00)

Or maybe there's just something very weird going on locally - we should be limiting to 250k, but the orig_ts_db_matches is 601k.

{
    "timeseries": {
        "url": "localhost",
        "result_limit": 250000
    }
}
shankari commented 1 year ago

Actually, even retrying at the cache_series level is not enough because we callfind_entries` in the timeseries to read the entries from the timeseries and even that is a concatenation of the timeseries and analysis timeseries results. We have to read all of them separately to retry.

DEBUG:root:curr_query = {'user_id': UUID('802667b6-371f-45b2-9c7a-bb051244836a'), 'data.ts': {'$lte': 1703980800, '$gte': 1672435958.12}}, sort_key = data.ts
DEBUG:root:orig_ts_db_keys = None, analysis_ts_db_keys = None
DEBUG:root:finished querying values for None, count = 363
DEBUG:root:finished querying values for None, count = 1
DEBUG:root:orig_ts_db_matches = 363, analysis_ts_db_matches = 1
DEBUG:root:Retrieved 364 entries for TimeQuery data.ts with range [1672435958.12, 1703980800)
DEBUG:root:curr_batch has length 364, not finished reading for TimeQuery data.ts with range [1672435958.12, 1703980800)
DEBUG:root:curr_query = {'user_id': UUID('802667b6-371f-45b2-9c7a-bb051244836a'), 'data.ts': {'$lte': 1703980800, '$gte': 1672435958.12}}, sort_key = data.ts
DEBUG:root:orig_ts_db_keys = None, analysis_ts_db_keys = None
DEBUG:root:finished querying values for None, count = 363
DEBUG:root:finished querying values for None, count = 1
DEBUG:root:orig_ts_db_matches = 363, analysis_ts_db_matches = 1
DEBUG:root:Retrieved 364 entries for TimeQuery data.ts with range [1672435958.12, 1703980800)
DEBUG:root:curr_batch has length 364, not finished reading for TimeQuery data.ts with range [1672435958.12, 1703980800)
shankari commented 1 year ago

wrt https://github.com/e-mission/e-mission-docs/issues/845#issuecomment-1384431347 this is the same reason. Note that there are 601,169 entries in the timeseries, and 103,488 entries in the analysis timeseries, but our final batch size is only 353,488 entries. The last entry is in december, but it is from the analysis timeseries (recreated_location) that was tagged on to the end.

DEBUG:root:curr_query = {'user_id': UUID('802667b6-371f-45b2-9c7a-bb051244836a'), 'data.ts': {'$lte': 1703980800, '$gte': 1577836800}}, sort_key = data.ts
DEBUG:root:orig_ts_db_keys = None, analysis_ts_db_keys = None
DEBUG:root:finished querying values for None, count = 601169
DEBUG:root:finished querying values for None, count = 103488
DEBUG:root:orig_ts_db_matches = 601169, analysis_ts_db_matches = 103488
DEBUG:root:after reading 353488 entries, last timeseries entry is 1672435958.12(2022-12-30T21:32:38.120000+00:00)
DEBUG:root:Found 1030 messages in response to query {'user_id': UUID('802667b6-371f-45b2-9c7a-bb051244836a'), '$or': [{'metadata.type': 'message'}, {'metadata.type': 'sensor-data'}, {'metadata.type': 'rw-document'}], 'data.ts': {'$lte': 1703980800, '$gte': 1577836800}}
DEBUG:root:after reading 1030 entries, last uc entry is 1672195088.341(2022-12-28T02:38:08.341000+00:00)
DEBUG:root:Found 68 messages in response to query {'user_id': UUID('802667b6-371f-45b2-9c7a-bb051244836a'), '$or': [{'metadata.key': 'background/motion_activity'}], 'metadata.write_ts': {'$lte': 1703980800, '$gte': 1577836800}}
shankari commented 1 year ago

Note also that the dump script has a validate_truncation function but it compares the list to the exact max limit (250k) Since we combine the timeseries and analysis results, we actually end up with 353k > 250k, but the checks are for equality, so they don't work any more.

def validate_truncation(loc_entry_list, trip_entry_list, place_entry_list):
    MAX_LIMIT = 25 * 10000
    if len(loc_entry_list) == MAX_LIMIT:
        logging.warning("loc_entry_list length = %d, probably truncated" % len(loc_entry_list))
    if len(trip_entry_list) == MAX_LIMIT:
        logging.warning("trip_entry_list length = %d, probably truncated" % len(trip_entry_list))
    if len(place_entry_list) == MAX_LIMIT:
        logging.warning("place_entry_list length = %d, probably truncated" % len(place_entry_list))

The only principled solution is to actually read the three types of entries directly from the database and to retry each of them separately.

shankari commented 1 year ago

Porting the changes from master to the gis branch, that we actually use everywhere was a bit challenging since all the export code in the GIS branch is pulled out into a separate file (emission/export/export.py). I went through the changes carefully, and copied the new get_with_retry over exactly.

To test, I ran the export pipeline on both master and the gis branch. The master export had one additional location entry.

<             "$oid": "6377b6074202ae5631d96451"
<         "metadata": {
<             "key": "background/filtered_location",
<         "data": {
<             "fmt_time": "2022-11-18T09:09:11.620000-07:00",

Re-generating to see how the logs differ

shankari commented 1 year ago

On master:

DEBUG:root:finished querying values for None, count = 79243
DEBUG:root:Retrieved batch of size 79243, cumulative 79243 entries of total 79243 documents for TimeQuery data.ts with range [1664582400, 1703980800)
DEBUG:root:finished querying values for None, count = 12768
DEBUG:root:Retrieved batch of size 12768, cumulative 12768 entries of total 12768 documents for TimeQuery data.ts with range [1664582400, 1703980800)
DEBUG:root:For 802667b6-371f-45b2-9c7a-bb051244836a, found 664 messages in usercache
DEBUG:root:Found 664 messages in response to query {'user_id': UUID('802667b6-371f-45b2-9c7a-bb051244836a'), '$or': [{'metadata.type': 'message'}, {'metadata.type': 'sensor-data'}, {'metadata.type': 'rw-document'}], 'data.ts': {'$lte': 1703980800, '$gte': 1664582400}}
DEBUG:root:Retrieved batch of size 664, cumulative 664 entries of total 664 documents for TimeQuery data.ts with range [1664582400, 1703980800)

On GIS

DEBUG:root:finished querying values for None, count = 79242
DEBUG:root:Retrieved batch of size 79242, cumulative 79242 entries of total 79242 documents for TimeQuery data.ts with range [1664582400, 1703980800)
DEBUG:root:finished querying values for None, count = 12768
DEBUG:root:Retrieved batch of size 12768, cumulative 12768 entries of total 12768 documents for TimeQuery data.ts with range [1664582400, 1703980800)
DEBUG:root:For 802667b6-371f-45b2-9c7a-bb051244836a, found 664 messages in usercache
DEBUG:root:Found 664 messages in response to query {'user_id': UUID('802667b6-371f-45b2-9c7a-bb051244836a'), '$or': [{'metadata.type': 'message'}, {'metadata.type': 'sensor-data'}, {'metadata.type': 'rw-document'}], 'data.ts': {'$lte': 1703980800, '$gte': 1664582400}}
DEBUG:root:Retrieved batch of size 664, cumulative 664 entries of total 664 documents for TimeQuery data.ts with range [1664582400, 1703980800)

So get_entries_for_timeseries actually returns different results on master and gis? This seems whack

shankari commented 1 year ago

Running the query directly against the database while on the GIS branch does give the right number

>>> edb.get_timeseries_db().count_documents({'user_id': UUID('802667b6-371f-45b2-9c7a-bb051244836a'),'data.ts': {'$lte': 1703980800, '$gte': 1664582400}})
79243
shankari commented 1 year ago

ok, so the difference is due to the invalid check

>>> edb.get_timeseries_db().count_documents({'invalid': {'$exists': False}, 'user_id': UUID('802667b6-371f-45b2-9c7a-bb051244836a'), 'data.ts': {'$lte': 1703980800, '$gte': 1664582400}})
79242
>>> edb.get_timeseries_db().count_documents({'user_id': UUID('802667b6-371f-45b2-9c7a-bb051244836a'), 'data.ts': {'$lte': 1703980800, '$gte': 1664582400}})
79243

Double-checking, the new entry is in fact invalid

>>> edb.get_timeseries_db().find_one({"_id": boi.ObjectId("6377b6074202ae5631d96451")})
{'_id': ObjectId('6377b6074202ae5631d96451'), 'user_id': UUID('...'), 'metadata': {'key': 'background/filtered_location', 'write_fmt_time': '2022-11-18T09:09:15.153000-07:00'}, 'data': {'fmt_time': '2022-11-18T09:09:11.620000-07:00'}, 'invalid': True}

Also true in the diff

<             "$oid": "6377b6074202ae5631d96451"
<         },
...
<         },
<         "invalid": true
<     },

And the GIS branch does in fact include the invalid check in the query

    def _get_query(self, key_list = None, time_query = None, geo_query = None,
                   extra_query_list = []):
...
        ret_query = {"invalid": {"$exists": False}}
        ret_query.update(self.user_query)

while the master branch does not

    def _get_query(self, key_list = None, time_query = None, geo_query = None,
                   extra_query_list = []):
...
        ret_query = {}

Phew! I thought I was going crazy for a minute there

shankari commented 1 year ago

As a side note, it looks like we don't have the invalid key as an index in the database. Since we include it in every single search, we should probably add it, it will improve the performance.

shankari commented 1 year ago

restored for the test user

INFO:root:Loading file or prefix /tmp/test_smaller_retrieve_gis_2
INFO:root:Found 2 matching files for prefix /tmp/test_smaller_retrieve_gis_2
INFO:root:files are ['/tmp/test_smaller_retrieve_gis_2_....gz', '/tmp/test_smaller_retrieve_gis_2_pipelinestate_....gz'] ... ['/tmp/test_smaller_retrieve_gis_2_....gz']
INFO:root:==================================================
INFO:root:Loading data from file /tmp/test_smaller_retrieve_gis_2_....gz
INFO:root:Analyzing timeline...
INFO:root:timeline has 97206 entries
INFO:root:timeline has data from 1 users
INFO:root:timeline has the following unique keys {'statemachine/transition', 'analysis/inferred_section', 'stats/client_time', 'background/location', 'analysis/inferred_trip', 'segmentation/raw_untracked', 'inference/prediction', 'background/filtered_location', 'stats/pipeline_error', 'manual/mode_confirm', 'segmentation/raw_section', 'analysis/cleaned_stop', 'analysis/cleaned_untracked', 'stats/client_nav_event', 'analysis/inferred_labels', 'background/motion_activity', 'background/battery', 'analysis/cleaned_section', 'stats/client_error', 'manual/replaced_mode', 'analysis/expected_trip', 'segmentation/raw_place', 'analysis/cleaned_place', 'manual/purpose_confirm', 'stats/pipeline_time', 'analysis/confirmed_trip', 'analysis/cleaned_trip', 'segmentation/raw_trip', 'segmentation/raw_stop', 'inference/labels', 'analysis/recreated_location', 'stats/server_api_time'}
INFO:root:timeline for user ... contains analysis results
For uuid ..., finished loading 7 entries into the usercache and 0 entries into the timeseries
Loading pipeline state for ... from /tmp/test_smaller_retrieve_gis_2_pipelinestate_....gz
INFO:root:ignoring duplicate key error while restoring pipeline state
INFO:root:Creating user entries for 1 users
INFO:root:pattern = user-%01d
INFO:root:For 1 users, loaded 345442 raw entries, 66001 processed entries and 13 pipeline states

we now have those location entries

pd.json_normalize(list(edb.get_timeseries_db().find({'user_id': UUID('...'), 'metadata.key': 'background/filtered_location', 'data.ts': {"$lte": 1672586043.121 + 10 * 60}}).sort("data.ts", -1).limit(20)))[["data.fmt_time"]]
                       data.fmt_time
0   2023-01-01T08:23:48.014000-07:00
1   2023-01-01T08:23:17.079000-07:00
2   2023-01-01T08:22:47.037000-07:00
3   2023-01-01T08:21:14.033000-07:00
4   2023-01-01T08:20:12.074000-07:00
5   2023-01-01T08:19:41.096000-07:00
6   2023-01-01T08:19:11.055000-07:00
7   2023-01-01T08:18:40.019000-07:00
8   2023-01-01T08:18:09.021000-07:00
9   2023-01-01T08:17:38.002000-07:00
10  2023-01-01T08:16:36.016000-07:00
11  2023-01-01T08:16:05.105000-07:00
12  2023-01-01T08:15:35.090000-07:00
13  2023-01-01T08:15:05.083000-07:00
14  2023-01-01T08:14:35.025000-07:00
15  2023-01-01T08:14:03.121000-07:00
16  2022-12-30T14:35:41.025000-07:00
17  2022-12-30T14:35:10.056000-07:00
18  2022-12-30T14:34:40.047000-07:00
19  2022-12-30T14:34:09.100000-07:00
shankari commented 1 year ago

And the pipeline has since run successfully

For ...: on samsung, trip count = 984, location count = 64374, first trip = 2022-01-02T08:57:02.028000-07:00, last trip = 2023-01-17T13:38:56.016000-07:00, confirmed_pct  = approximately 0.98, last install: 2023-01-17T05:25:49.791000-07:00
shankari commented 1 year ago

Will close this once we have copied over data for all the other participants as well

shankari commented 1 year ago

Looking at the other participants

id last copied over location entry last trip last location entry Issue OS
12 2022-12-31T22:50:21.373000-07:00 2023-01-02T11:36:53.495000-07:00 2023-01-02T11:39:11.811000-07:00 No more location points; app uninstall? TCL
13 2022-12-05T23:25:32.409000-07:00 2022-12-30T13:30:14.693000-07:00 2023-01-14T18:54:40.730000-07:00 Copy over missing data motorola
14 2022-12-04T10:43:39.999537-07:00 2022-10-27T23:41:00.000556+10:00 2023-01-18T15:44:07.005393-07:00 Copy over missing data Apple
15 2022-12-31T03:03:39.999470-07:00 2023-01-04T14:30:00.000063-07:00 2023-01-04T14:30:00.000063-07:00 No more location points, app uninstall? Apple
16 2022-12-15T14:38:05.979000-07:00 2022-12-30T06:16:34.483000-07:00 2023-01-17T14:46:00.200000-07:00 Copy over missing data Samsung
17 2022-09-29T19:22:23.999000-06:00 2023-01-06T16:42:31.998000-07:00 2023-01-06T19:07:04.114000-07:00 No more location points, app uninstall? TCL

[1] but we have data until 12-31, missing point is from 12-30? data from 12-31 is from new app, previous data copy is only until 12-05 [2] we have data from Dec, but the missing location point is from 2022-10-27 and the location entries before that are from 2022-02-04?! Through searching, it looks like after Feb, we only have entries from 2022-11-05. So we will likely need to copy in multiple batches.

shankari commented 1 year ago

Copied over:

shankari commented 1 year ago

So 16 is weird. The missing item is not the location entry, but the place for the trip

2023-01-18 13:59:13,231:ERROR:140693321615168:Sectioning failed for user ...
2023-01-18 13:59:13,226:INFO:140693321615168:++++++++++++++++++++Processing trip 61d1ddc9200e4476992912a8 for user ....++++++++++++++++++++
Traceback (most recent call last):
File "/usr/src/app/emission/analysis/intake/segmentation/section_segmentation.py", line 52, in segment_current_sections
segment_trip_into_sections(user_id, trip_entry, trip_entry.data.source)
File "/usr/src/app/emission/analysis/intake/segmentation/section_segmentation.py", line 66, in segment_trip_into_sections
distance_from_place = _get_distance_from_start_place_to_end(trip_entry)
File "/usr/src/app/emission/analysis/intake/segmentation/section_segmentation.py", line 200, in _get_distance_from_start_place_to_end
start_place = esda.get_object(esda.RAW_PLACE_KEY, start_place_id)
File "/usr/src/app/emission/storage/decorations/analysis_timeseries_queries.py", line 45, in get_object
return get_entry(key, object_id).data
AttributeError: 'NoneType' object has no attribute 'data'

The missing place is in from 2021-12-31, the missing trip is from 2022-01-02 And the reason we were processing these old trips was because the section segmentation was reset completely

2023-01-18 13:59:13,019:INFO:140693321615168:For stage PipelineStages.SECTION_SEGMENTATION, start_ts is None

Although it was not reset in the original mongodump.

>>> list(edb.get_pipeline_state_db().find({"user_id": UUID("..."), "pipeline_stage": 2}))
[{'_id': ObjectId('60d532c55d607068e6913d4c'), 'user_id': UUID('...'), 'pipeline_stage': 2, 'curr_run_ts': None, 'last_processed_ts': 1672406199.483, 'last_ts_run': 1672416825.4551017}]

>>> list(edb.get_pipeline_state_db().find({"user_id": UUID(".."), "pipeline_stage": 2}))
[{'_id': ObjectId('63b19ee41ed87bba98d73510'), 'user_id': UUID('...'), 'pipeline_stage': 2, 'curr_run_ts': None, 'last_processed_ts': None, 'last_ts_run': None}]

And we are missing analysis data for the user before Jan 2022

{'user_id': UUID('...'), 'metadata': {'key': 'analysis/confirmed_trip', 'end_fmt_time': '2021-06-25T06:42:50-06:00', 'start_fmt_time': '2021-06-25T06:36:50-06:00'}

{'user_id': UUID('fbff5e08-b7f2-4a94-ab4b-2d7371999ef7'), 'metadata': {'key': 'analysis/confirmed_trip', 'end_fmt_time': '2022-01-02T09:31:06-07:00', 'start_fmt_time': '2022-01-02T07:55:07.322381-07:00'}

Ok so it looks like I copied over only data from 2022-01-01 onwards I then reset the pipeline (at least for the section stage) This meant that the start place of the first trip was not present, causing us to fail

(1) One fix is to reset the pipeline completely (2) A second fix is to get data from 2021 and copy it over

Let's go with (2) so that we retain the object ids in case we want to handle any more fixes.

shankari commented 1 year ago

Found another gap for user 16 from 2022-08-09 to 2022-09-12

0          2022-09-12
1   2022-09-12
2          2022-09-12
3   2022-09-12
4   2022-08-09
5          2022-08-09
6   2022-08-09
7          2022-08-09
shankari commented 1 year ago

After loading that gap, am currently at 2022-06-11T16:14:11-06:00