e-mission / e-mission-docs

Repository for docs and issues. If you need help, please file an issue here. Public conversations are better for open source projects than private email.
https://e-mission.readthedocs.io/en/latest
BSD 3-Clause "New" or "Revised" License
15 stars 34 forks source link

Create NREL-hosted instance of OpenPATH for ease of use by public agencies #721

Closed shankari closed 2 years ago

shankari commented 2 years ago

@aguttman As we discussed, the plan is to have multiple enclaves, one for each program or study.

shankari commented 2 years ago

@aguttman

The first step for this will be to create one enclave for internal NREL use. Creating this enclave will require permission from multiple NREL departments:

Since this is a complicated endeavor we will use an internal NREL project as an example to get started. @shankari will continue coordinating this since she has the context.

In parallel, we should start preparing for the more complex solution above. The immediate tasks that I can list to prepare include:

shankari commented 2 years ago

This should fix https://github.com/e-mission/e-mission-docs/issues/326 (once we expand from the NREL internal project to an open enrollment option) and provide a template for fixing https://github.com/e-mission/e-mission-docs/issues/452

aGuttman commented 2 years ago

Of these, I am first focusing on compatibility issues between mongodb and documentDB. I will familiarize myself with both to the extent needed to understand the problems present and what a proper solution should be like.

Secondarily, I will be attentive of @shankari 's communication and progress on creating the internal NREL enclave so that I can understand the process and be able to take point on future enclave creation.

shankari commented 2 years ago

Another, more serious compatibility wrt DocumentDB vs. mongodb: https://github.nrel.gov/nrel-cloud-computing/emissionlhd/issues/10#issuecomment-38430

Extracting the issue out here - the main issue is that operations on DocumentDB appear to be asynchronous, while mongodb is synchronous. In particular, a write followed by read does not return the newly written value.

Sample test

>>> import emission.core.get_database as edb
>>> from uuid import UUID

>>> def test_save_and_list():
...     edb.save(edb.get_pipeline_state_db(), {'user_id': UUID('39bf2b1c-e254-48da-aef1-e3cc7e6cf27f'), 'pipeline_stage': 6, 'curr_run_ts': 1622065279.3911324, 'last_processed_ts': None, 'last_ts_run': None})
...     print(list(edb.get_pipeline_state_db().find()))
...
>>> edb.get_pipeline_state_db().delete_many({})
<pymongo.results.DeleteResult object at 0x7f5c7a9a5280>
>>> test_save_and_list()
[]

Adding a sleep fixes it

>>> def test_save_and_list():
...     edb.save(edb.get_pipeline_state_db(), {'user_id': UUID('39bf2b1c-e254-48da-aef1-e3cc7e6cf27f'), 'pipeline_stage': 6, 'curr_run_ts': 1622065279.3911324, 'last_processed_ts': None, 'last_ts_run': None})
...     time.sleep(15)
...     print(list(edb.get_pipeline_state_db().find()))
...
>>> edb.get_pipeline_state_db().delete_many({})
<pymongo.results.DeleteResult object at 0x7f5c7a9a2b90>
>>> test_save_and_list()
[{'_id': ObjectId('60aec4b6be757e10c9069a27'), 'user_id': UUID('39bf2b1c-e254-48da-aef1-e3cc7e6cf27f'), 'pipeline_stage': 6, 'curr_run_ts': 1622065279.3911324, 'last_processed_ts': None, 'last_ts_run': None}]
shankari commented 2 years ago

I have currently worked around this on the NREL setup by adding a sleep to every DB call, but that is so bad and hacky that I will never merge it to master (https://github.nrel.gov/nrel-cloud-computing/emissionlhd/pull/16)

shankari commented 2 years ago

As pointed out by @jgu2 here are the pages on DocumentDB related to the issue:

https://docs.aws.amazon.com/documentdb/latest/developerguide/how-it-works.html#how-it-works.replication https://docs.aws.amazon.com/documentdb/latest/developerguide/transactions.html#transactions-isolation-level https://docs.aws.amazon.com/documentdb/latest/developerguide/how-it-works.html#durability-consistency-isolation.read-consistency

Ddocdb cluster is distributed, it has primary node and replicas, primary node supports read/write, and replica supports read only. The reads from a read replica are eventually consistent, often less than 100ms replica lag. To keep read-after-write consistency, it can setup the read preference,

db.example.find().readPref('primary')
or 
db.example.find().readPref('primaryPreferred')

Since we should be using the timeseries interface (emission.storage.timeseries.abstract_timeseries) for most calls, there should not be a lot of direct calls to find(). So maybe we should start with this fix and see if it works?

shankari commented 2 years ago

Although the real fix should be to look at the control flow and see where we have read-after-write dependencies. Here are all the write calls:

$ egrep -R -l "insert_one|replace_one|update_one" emission | grep -v tests | grep -v .pyc

which currently returns

emission/net/usercache/builtin_usercache.py
emission/net/ext_service/push/notify_interface_impl/firebase.py
emission/net/api/usercache.py
emission/core/wrapper/user.py
emission/core/wrapper/client.py
emission/core/get_database.py
emission/storage/timeseries/builtin_timeseries.py
shankari commented 2 years ago

Tasks for the documentDB support:

shankari commented 2 years ago

Tasks for May 3rd @aGuttman

shankari commented 2 years ago

We also need to have a demographic survey for each user. We have typically used external surveys (Qualtrics/Google Forms) before, but those have the following limitations:

A solution that would address all those issues is to store the survey information in mongodb, just like everything else. But we don't want to create a survey builder. Instead, we will use kobotoolbox to create a survey which we will display using the enketo library.

The UNSW group has already integrated with enketo core in the https://github.com/e-mission/e-mission-phone/tree/rciti branch, so let's start by exploring that approach.

This feature is tracked in https://github.com/e-mission/e-mission-docs/issues/727

shankari commented 2 years ago

From @jgu2:

It looks like you are using a hard-coded db name in code of emission called “Stage_database”. For best practice, it’s recommended to use different db for different deploy environment, as we are moving towards production deploy, a production db will be created on the same docdb cluster.

What’s more, as OpenPath needs multi-containers for different centers/organizations, ideally each center/organization would have its own db setup.

We can setup DB name in an environment variable, for example DB_NAME, where you can get access to according db via the variable. If this way, you will need to change your code to adapt it. Otherwise, all your data of stage/production from different centers/orgs in the future would be written into same db. What’s your thoughts?

From @shankari:

Makes sense.

For the record, the reason that the DB/collection name is hardcoded is because I typically run multiple different MongoDB containers for the different instances. Too bad that isn’t an option on the AWS cluster, but we can adapt it pretty easily, I think.

@aGuttman, can you take care of this after the index changes and the readPrefs?

shankari commented 2 years ago

@aGuttman will have the primary DB changes done by tomorrow:

Then he will work on #724 And then he will work on at least part of #709, the part about storing the generated models into the database instead of a file.

aGuttman commented 2 years ago

@aGuttman will have the primary DB changes done by tomorrow:

  • [x] Fix indices
  • [x] Deal with read replicas and readPreferred
  • [ ] Configure DB name

Then he will work on #724 And then he will work on at least part of #709, the part about storing the generated models into the database instead of a file.

shankari commented 2 years ago

@aGuttman

URL when connecting to remote documentDB instance can include a read preference setting. Setting this to primaryPreferred should avoid read-after-write problem without changing every find() call in code to find().readPref('primaryPreferred').

can you confirm that you tested this? the simple test-after-set function above should be sufficient

shankari commented 2 years ago

Not sure if this is the best way to do it, but thought it was attractive after seeing the URL solution to read preference to keep all these considerations in a single part of the already used db.config file.

@aGuttman Agree that this is attractive. What are some additional design options, and can you list the pros and cons before choosing this one?

shankari commented 2 years ago

@aGuttman wrt this list, here's my initial pass-through. ** are the files that you need to investigate and potentially adapt. The other can either be ignored or removed, although double check their history once as well 😄

    ./emission/core/__pycache__/get_database.cpython-37.pyc : compile artifact, ignore
    ./emission/core/get_database.py  : already identified
    ./emission/analysis/classification/inference/Classifier.ipynb : fairly obsolete, ignore
    **./emission/integrationTests/storageTests/TestMongodbAuth.py** : this is a test of the authenticated connections to mongodb. Use it to test the solution we come up with
    ** ./emission/storage/timeseries/abstract_timeseries.py**: Look at the history/blame for the code which includes it, and see if it can be rewritten to use get_database.
    ./emission/storage/timeseries/__pycache__/abstract_timeseries.cpython-37.pyc: compile artifact, ignore
    ./bin/historical/migrations/rename_client_ts_to_ts.py: historical script, ignore. Maybe add a readme to the directory indicating that these are for reference and are not guaranteed to work now.
    ./bin/plot_trip.py: Pretty sure this is obsolete. Check history and remove.
    **./setup/db_auth.py**: This is a script to setup the mongodb auth properly based on username and password. adapt to handle the DB name as well.
shankari commented 2 years ago

@aGuttman we don't store anything outside that database, so do you even need to parse it out? Maybe if you pass in a URL with the database embedded, you don't need to specify the database any more. From the SO post that you linked, it looks like maybe something like the following would just work

MongoClient("mongodb://<user>:<password>@documentdb-moderate-1.<rest_of_host>:27017/<dbName>?ssl=true&ssl_ca_certs=<path_to_certs>/rds-combined-ca-bundle.pem&replicaSet=rs0&readPreference=primaryPreferred&retryWrites=false").Stage_uuids.count_documents()

you could then replace

print("Connecting to database URL "+url)
_current_db = MongoClient(url).Stage_database

with

print("Connecting to database URL "+url)
_current_db = MongoClient(url)
aGuttman commented 2 years ago

@aGuttman

URL when connecting to remote documentDB instance can include a read preference setting. Setting this to primaryPreferred should avoid read-after-write problem without changing every find() call in code to find().readPref('primaryPreferred').

can you confirm that you tested this? the simple test-after-set function above should be sufficient

I did test this with the test_save_and_list() example typed out above and it did work, though I'm confused about how to interprate the behavior. primary, primaryPreferred and secondaryPreferred all produced the correct output. secondary times out. Invalid settings cause a warning, but the correct output is still produced. In my testing I haven't been able to get an empty list back like the example. I'm not including any sleeps.

Using primary read preference in the URL to avoid read-after-write problems is definitely an intended feature, as seen in the first example under Multiple Connection Pools here: https://docs.aws.amazon.com/documentdb/latest/developerguide/connect-to-replica-set.html

Also, if we don't take advantage of replica sets, it seems like we could just leave out the rs0 part of the URL.

If you omit /?replicaSet=rs0, the client routes all requests to the cluster endpoint, that is, your primary instance

shankari commented 2 years ago

In my testing I haven't been able to get an empty list back like the example. I'm not including any sleeps.

That might be because your connection to the database is slow so it acts as an implicit sleep. That is also probably why the tests take a long time to run. I think we can rely on the documentation for now (maybe leave out the rs0) and revisit if we still have issues.

aGuttman commented 2 years ago

@aGuttman we don't store anything outside that database, so do you even need to parse it out? Maybe if you pass in a URL with the database embedded, you don't need to specify the database any more. From the SO post that you linked, it looks like maybe something like the following would just work

MongoClient("mongodb://<user>:<password>@documentdb-moderate-1.<rest_of_host>:27017/<dbName>?ssl=true&ssl_ca_certs=<path_to_certs>/rds-combined-ca-bundle.pem&replicaSet=rs0&readPreference=primaryPreferred&retryWrites=false").Stage_uuids.count_documents()

you could then replace

print("Connecting to database URL "+url)
_current_db = MongoClient(url).Stage_database

with

print("Connecting to database URL "+url)
_current_db = MongoClient(url)

I don't think this works. I'm trying to do something like it now and it's returning errors like TypeError: 'Collection' object is not callable. If you meant to call the 'delete_many' method on a 'Database' object it is failing because no such method exists. I might be misunderstanding something, though I am able to get it to work parsing out the name and using dictionary style access.

url = config_data["timeseries"]["url"]
parsed=pymongo.uri_parser.parse_uri(url)
...
print("Connecting to database URL "+url)
_current_db = MongoClient(url)[parsed['database']]

It would be nice to avoid more parsing than necessary, but I think the name does need to be specified on the client object (?) in order to use it. Given that, it needs to be passed somehow from an external source. We can make it its own field in a json, but that just changes where the parse happens. Maybe the pymongo parser is slower, but we only need it once at the start and it just feels cleaner to me to specify it all in the url instead of adding another field to the config (or making an additional config file). Those are the only options I'm able to think of right now, and of those, I just like the url option better, even at the cost of using a potentially slightly less effective parser.

aGuttman commented 2 years ago

Also let's hold off on my pull request for a minute. tests/netTests/TestBuiltinUserCacheHandlerInput.py is failing. I can go back on all my changes expect the ones that need to be there (swap ASCENDING for HASHED, remove GEOSPHERE entirely or just remove sparse=True) but testMoveDuplicateKey and testTwoLongTermCalls fail. I'm going to look at what those two do.

shankari commented 2 years ago

If you see https://github.nrel.gov/nrel-cloud-computing/emissionlhd/issues/10#issuecomment-38429, laptop to DocumentDB always worked with the read preference.

on the laptop, the communication overhead for the call is high, which effectively functions as a sleep

You need to get @jgu2 to create an AWS EC2 instance (maybe to ssh into the staging instance?) and then run the script from there.

shankari commented 2 years ago

Given that, it needs to be passed somehow from an external source. We can make it its own field in a json, but that just changes where the parse happens. Maybe the pymongo parser is slower, but we only need it once at the start and it just feels cleaner to me to specify it all in the url instead of adding another field to the config (or making an additional config file). Those are the only options I'm able to think of right now, and of those, I just like the url option better, even at the cost of using a potentially slightly less effective parser.

This makes sense to me. Please submit a PR for this as well. You probably want to communicate the new URL format (after it is finalized) with @jgu2 so he can set the environment variables correctly.

shankari commented 2 years ago
shankari commented 2 years ago

@aGuttman every line is different because the timestamps are different. If you grep -v all the lines related to Updated result for and cut -d ":" -f ... to remove all the timestamp stuff, you should be able to see exactly what is different. Or at least, narrow it down significantly.

aGuttman commented 2 years ago

Ran 237 tests in 39327.855s

FAILED (failures=2, errors=15)

Just about 11 hours to run the tests remotely this time. No sleeping this time. I guess the timer from the last run didn't count that time either.

aGuttman commented 2 years ago

For the TestBuiltinUserCacheHandlerInput.py test failures:

Now, we have two sets of entries, so we will have 60 entries in longterm

self.assertEqual(len(self.uc1.getMessage()), 0)

Again, adding sleeps around this call seems to help sometimes, but not always.

Looking at the definition of `moveToLongTerm()` and seeing that the problem is that the cache is failing to be emptied, it seems like the problem would be in its call to `clearProcessedMessages()` in `emission/net/usercache/builtin_usercache.py` but that just boils down to

def clearProcessedMessages(self, timeQuery, key_list=None): del_query = self._get_msg_query(key_list, timeQuery) logging.debug("About to delete messages matching query %s" % del_query) del_result = self.db.delete_many(del_query) logging.debug("Delete result = %s" % del_result)


There doesn't seem to very much online discussion of documentDB issues online in general and I'm not finding much about synchronization issues with `delete_many()`, but I don't know. I'm not sure what to do from here.
shankari commented 2 years ago

@aGuttman I would:

  1. document what you did with the sleeps and the amount of the sleep and document the failures that you encountered
  2. try my prior sleep hack and see if that fixes it

Note that if we have moveToLongTerm() followed by an assertion to 0 but the actual value is 30, this could also just be a read of stale information.

shankari commented 2 years ago

The next thing I would try is to come up with an MRE that doesn't use any of the openpath code, but just pymongo directly. For example, something like:

test_db = MongoClient(url).Test_inconsistency_database
test_coll = test_db.Test_collection
lengths = [10,100,1000,10000]
for l in lengths:
     # Insert l entries for l = 10, 100...
     for i in range(l):
         test_coll.insert({"i": i})
     # busy loop until we get l documents
     for test_coll.count_documents() < l:
         # potentially add sleep here
         print(test_coll.count_documents())
     # Delete all the entries
     test_coll.remove_many({})
     # busy loop until we get 0 documents
     for test_coll.count_documents() > 0:
         # potentially add sleep here
         print(test_coll.count_documents())    
aGuttman commented 2 years ago

I believe I've pinpointed where the error is happening, but I don't understand it and am not able to reproduce it outside of the e-mission code.

By adding logs, I can see that moveToLongTerm() (in emission/net/usercache/builtin_usercache_handler.py) consistently sees the correct number of entries. After processing them, it passes the same query that it uses to correctly find these entries into clearProcessedMessages() (in emission/net/usercache/builtin_usercache.py). clearProcessedMessages() uses these arguments to call _get_msg_query(), but when this query is used, it sometimes fails to find any entries.

The odd part is that moveToLongTerm() uses getMessage() (in emission/net/usercache/builtin_usercache.py) with arguments None and time_query to create curr_entry_it which it iterates through to find each entry. getMessage() simply calls _get_msg_query() (in emission/net/usercache/builtin_usercache.py) with None and time_query and returns the result as a list. clearProcessedMessages()calls _get_msg_query() with None and time_query, the exact same call and arguments that perviously worked, but uses the result directly as a filter to delete_many(). When used this way, delete_many() will often fail to find any entries and delete nothing.

I'm trying to get the entries that clearProcessedMessages() should have as a list and delete them iteratively since that seems to work for moveToLongTerm(), for some reason. I can get the right number of entires now but at the moment I'm a bit fried and can't properly think through deleting them right for some reason, though it shouldn't be too hard. That said I have no idea why it would work this way as opposed to the other.

aGuttman commented 2 years ago
emission/net/usercache/builtin_usercache_handler.py
99
-logging.debug("moveToLongTerm: Deleting all entries for query %s" % time_query)
+logging.debug("moveToLongTerm: Deleting all %d entries for query %s" % (len(unified_entry_list), time_query))
emission/net/usercache/builtin_usercache.py
153
+del_list = self.getMessage(key_list, timeQuery)
+logging.debug("clearProcessedMessages: itr: deleting %d entries" % len(del_list))
+logging.debug("clearProcessedMessages: deleting %d entries" % self.db.count_documents(del_query))
log
2022-05-12 16:20:04,089:DEBUG:4542789120:moveToLongTerm: Deleting all 30 entries for query <emission.storage.timeseries.timequery.TimeQuery object at 0x7fba41101910>
...
2022-05-12 16:20:04,140:DEBUG:4542789120:clearProcessedMessages: itr: deleting 0 entries
2022-05-12 16:20:04,191:DEBUG:4542789120:clearProcessedMessages: deleting 0 entries
aGuttman commented 2 years ago

emission/tests/netTests/TestBuiltinUserCacheHandlerInput.py

aGuttman commented 2 years ago

Note that wrapping the calls to moveToLongTerm() in a loop such as below does work:

        # Then, we move entries for the ios user into longterm
        count = 0
        while True:
            enuah.UserCacheHandler.getUserCacheHandler(self.testUserUUIDios).moveToLongTerm()
            if len(self.ucios.getMessage()) == 0:
                break
            count += 1
            logging.debug("move fail count: %d" % count)

        self.assertEqual(len(self.ucios.getMessage()), 0)

Doing this we can consistently pass the tests, but I've seen the number of retries per failure range from 3 to >300. Median is about 60 from my experience.

aGuttman commented 2 years ago

Setting read preference at database creation such as below had no effect:

from pymongo import MongoClient, ReadPreference

_current_db = MongoClient(url, read_preference=ReadPreference.PRIMARY).Stage_database

The per call setting suggested by amazon's documentation does not seem to be supported by pymongo:

db.example.find().readPref('primary')

This causes errors, even when trying various imports and I could not find usage online.

aGuttman commented 2 years ago

I'm seeing a lot more failures like this today, as opposed to the != 0 fails that I was focused on yesterday. I wonder if the timing conditions of working at home vs in office have an effect.

======================================================================
FAIL: testMoveToLongTerm (__main__.TestBuiltinUserCacheHandlerInput)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "emission/tests/netTests/TestBuiltinUserCacheHandlerInput.py", line 123, in testMoveToLongTerm
    self.assertEqual(edb.get_timeseries_db().estimated_document_count(), 60)
AssertionError: 62 != 60

======================================================================
FAIL: testTwoLongTermCalls (__main__.TestBuiltinUserCacheHandlerInput)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "emission/tests/netTests/TestBuiltinUserCacheHandlerInput.py", line 329, in testTwoLongTermCalls
    self.assertEqual(edb.get_timeseries_db().estimated_document_count(), 120)
AssertionError: 122 != 120

----------------------------------------------------------------------
Ran 4 tests in 222.911s

FAILED (failures=2)
shankari commented 2 years ago

On the other hand, I am in CA, I just re-ran the tests and got

======================================================================
FAIL: testMoveDuplicateKey (__main__.TestBuiltinUserCacheHandlerInput)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "emission/tests/netTests/TestBuiltinUserCacheHandlerInput.py", line 161, in testMoveDuplicateKey
    self.assertEqual(len(self.uc1.getMessage()), 30)
AssertionError: 0 != 30

======================================================================
FAIL: testMoveToLongTerm (__main__.TestBuiltinUserCacheHandlerInput)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "emission/tests/netTests/TestBuiltinUserCacheHandlerInput.py", line 87, in testMoveToLongTerm
    self.assertEqual(len(self.uc1.getMessage()), 30)
AssertionError: 0 != 30

======================================================================
FAIL: testMoveWhenEmpty (__main__.TestBuiltinUserCacheHandlerInput)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "emission/tests/netTests/TestBuiltinUserCacheHandlerInput.py", line 130, in testMoveWhenEmpty
    self.assertEqual(len(self.uc1.getMessage()), 0)
AssertionError: 30 != 0

======================================================================
FAIL: testTwoLongTermCalls (__main__.TestBuiltinUserCacheHandlerInput)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "emission/tests/netTests/TestBuiltinUserCacheHandlerInput.py", line 259, in testTwoLongTermCalls
    self.assertEqual(len(self.uc1.getMessage()), 0)
AssertionError: 30 != 0

----------------------------------------------------------------------
Ran 4 tests in 209.991s

FAILED (failures=4)

Although I still have secondaryPreferred

shankari commented 2 years ago

@aGuttman If we are both going to test at the same time, we probably want to work on separate databases. If you have the dbname fix, could you submit it as well? Rebase the PR to something other than master, I will pull from it, and we can pick different database names.

aGuttman commented 2 years ago

Parsing the db name out of the config url works but I was light on my testing when I put it together the other day. Using the name "Stage_databse" in the url, everything gets parsed fine and I can connect and run, but using any other name gives an auth error.

pymongo.errors.OperationFailure: Authorization failure, full error: {'ok': 0.0, 'operationTime': Timestamp(1652462420, 1), 'code': 13, 'errmsg': 'Authorization failure'}

I believe this is a problem with the server or credentialing, not the e-missions code/ setup, but I don't understand enough to know. Even manually changing every instance of 'Stage_database' to a new name and rerunning the setup doesn't allow you to connect to a database with a different name.

ie: find/replace "Stage_database" with "NewName" across the whole project rerun source setup/setup.sh run source setup/activate.sh create unrelated file tested.py with test_db = MongoClient(url).NewName test_db.test_coll.insert({"a": 1}) yields pymongo.errors.OperationFailure: Authorization failure, full error: {'ok': 0, 'operationTime': Timestamp(1652464086, 1), 'code': 13, 'errmsg': 'Authorization failure'}

Doing the same with test_db = MongoClient(url).Stage_database works

I will email Jianli about it, but please let me know if it looks like I'm misunderstanding something.

shankari commented 2 years ago

@aGuttman i think that is because we have access only to the Stage_database; accessing a different database will likely require a different username and password. can you get @jgu2 to create two dev databases (one for each of us) and give us the credentials to connect to them

aGuttman commented 2 years ago

I'm noticing this from logging time_query, but I don't know what to make of it: Here, we fail to find entries to delete:

2022-05-13 10:57:41,066:DEBUG:4712445440:moveToLongTerm: Deleting all 30 entries for query <emission.storage.timeseries.timequery.TimeQuery object at 0x7fc348145610>
2022-05-13 10:57:41,066:DEBUG:4712445440:time_query: call clear with: type: metadata.write_ts start: 1652465.999 end: 1652460.999
2022-05-13 10:57:41,066:DEBUG:4712445440:time_query: clearProcessedMessages: type: metadata.write_ts start: 1652465.999 end: 1652460.999
2022-05-13 10:57:41,440:DEBUG:4712445440:Found 0 messages in response to query {'user_id': UUID('4c689acb-16ea-4a84-bda0-5d5ab13cb87e'), '$or': [{'metadata.type': 'message'}, {'metadata.type': 'sensor-data'}, {'metadata.type': 'rw-document'}], 'metadata.write_ts': {'$lte': 1652460.999, '$gte': 1652465.999}}
2022-05-13 10:57:41,440:DEBUG:4712445440:About to delete messages matching query {'user_id': UUID('4c689acb-16ea-4a84-bda0-5d5ab13cb87e'), '$or': [{'metadata.type': 'message'}, {'metadata.type': 'sensor-data'}, {'metadata.type': 'rw-document'}], 'metadata.write_ts': {'$lte': 1652460.999, '$gte': 1652465.999}}
2022-05-13 10:57:41,440:DEBUG:4712445440:clearProcessedMessages: list: deleting 0 entries
2022-05-13 10:57:41,504:DEBUG:4712445440:clearProcessedMessages: count_docs: deleting 0 entries

I am looping on these failures, so we try again and succeed:

2022-05-13 10:57:42,688:DEBUG:4712445440:moveToLongTerm: Deleting all 30 entries for query <emission.storage.timeseries.timequery.TimeQuery object at 0x7fc380575350>
2022-05-13 10:57:42,688:DEBUG:4712445440:time_query: call clear with: type: metadata.write_ts start: 1652465.999 end: 1652460999
2022-05-13 10:57:42,688:DEBUG:4712445440:time_query: clearProcessedMessages: type: metadata.write_ts start: 1652465.999 end: 1652460999
2022-05-13 10:57:42,778:DEBUG:4712445440:Found 30 messages in response to query {'user_id': UUID('4c689acb-16ea-4a84-bda0-5d5ab13cb87e'), '$or': [{'metadata.type': 'message'}, {'metadata.type': 'sensor-data'}, {'metadata.type': 'rw-document'}], 'metadata.write_ts': {'$lte': 1652460999, '$gte': 1652465.999}}
2022-05-13 10:57:42,778:DEBUG:4712445440:About to delete messages matching query {'user_id': UUID('4c689acb-16ea-4a84-bda0-5d5ab13cb87e'), '$or': [{'metadata.type': 'message'}, {'metadata.type': 'sensor-data'}, {'metadata.type': 'rw-document'}], 'metadata.write_ts': {'$lte': 1652460999, '$gte': 1652465.999}}
2022-05-13 10:57:42,778:DEBUG:4712445440:clearProcessedMessages: list: deleting 30 entries
2022-05-13 10:57:42,848:DEBUG:4712445440:clearProcessedMessages: count_docs: deleting 30 entries

Notice end/lte changed from 1652460.999 to 1652460999 from one attempt to the next. The decimal disappeared. Also notice that for the first try, start: 1652465.999 > end: 1652460.999.

shankari commented 2 years ago

Aha! the initial write_ts format seems wrong. It is supposed to be in seconds since the epoch, but 1652460.999 is way too small. Has the decimal point in the wrong place. concretely,

>>> arrow.get(1652460.999)
<Arrow [1970-01-20T03:01:00.999000+00:00]>
>>> arrow.get(1652460999)
<Arrow [2022-05-13T16:56:39+00:00]>
aGuttman commented 2 years ago

Yes! It seems that all successful deletes have lte as a 10 digit number and all failing have let as 7 digits with 3 decimals.

2022-05-13 10:59:00,731:DEBUG:4712445440:moveToLongTerm: Deleting all 30 entries for query <emission.storage.timeseries.timequery.TimeQuery object at 0x7fc33002d410>
2022-05-13 10:59:00,731:DEBUG:4712445440:time_query: call clear with: type: metadata.write_ts start: 1652466.117 end: 1652461.117
2022-05-13 10:59:00,731:DEBUG:4712445440:time_query: clearProcessedMessages: type: metadata.write_ts start: 1652466.117 end: 1652461.117
2022-05-13 10:59:00,799:DEBUG:4712445440:Found 0 messages in response to query {'user_id': UUID('1effebb2-472c-4004-aee2-898e52c64776'), '$or': [{'metadata.type': 'message'}, {'metadata.type': 'sensor-data'}, {'metadata.type': 'rw-document'}], 'metadata.write_ts': {'$lte': 1652461.117, '$gte': 1652466.117}}
2022-05-13 10:59:00,799:DEBUG:4712445440:About to delete messages matching query {'user_id': UUID('1effebb2-472c-4004-aee2-898e52c64776'), '$or': [{'metadata.type': 'message'}, {'metadata.type': 'sensor-data'}, {'metadata.type': 'rw-document'}], 'metadata.write_ts': {'$lte': 1652461.117, '$gte': 1652466.117}}
2022-05-13 10:59:00,799:DEBUG:4712445440:clearProcessedMessages: list: deleting 0 entries
2022-05-13 10:59:00,874:DEBUG:4712445440:clearProcessedMessages: count_docs: deleting 0 entries

The next attempt

2022-05-13 10:59:02,223:DEBUG:4712445440:moveToLongTerm: Deleting all 30 entries for query <emission.storage.timeseries.timequery.TimeQuery object at 0x7fc350180a90>
2022-05-13 10:59:02,223:DEBUG:4712445440:time_query: call clear with: type: metadata.write_ts start: 1652466.117 end: 1652461117.0
2022-05-13 10:59:02,223:DEBUG:4712445440:time_query: clearProcessedMessages: type: metadata.write_ts start: 1652466.117 end: 1652461117.0
2022-05-13 10:59:02,288:DEBUG:4712445440:Found 30 messages in response to query {'user_id': UUID('1effebb2-472c-4004-aee2-898e52c64776'), '$or': [{'metadata.type': 'message'}, {'metadata.type': 'sensor-data'}, {'metadata.type': 'rw-document'}], 'metadata.write_ts': {'$lte': 1652461117.0, '$gte': 1652466.117}}
2022-05-13 10:59:02,288:DEBUG:4712445440:About to delete messages matching query {'user_id': UUID('1effebb2-472c-4004-aee2-898e52c64776'), '$or': [{'metadata.type': 'message'}, {'metadata.type': 'sensor-data'}, {'metadata.type': 'rw-document'}], 'metadata.write_ts': {'$lte': 1652461117.0, '$gte': 1652466.117}}
2022-05-13 10:59:02,288:DEBUG:4712445440:clearProcessedMessages: list: deleting 30 entries
2022-05-13 10:59:02,373:DEBUG:4712445440:clearProcessedMessages: count_docs: deleting 30 entries

I don't understand the workings of this whole thing. How is this changing run to run?

shankari commented 2 years ago

@aGuttman it almost seems like something assumes that this data is in milliseconds instead of seconds and is converting it to seconds by dividing by 1000. not sure how that can be documentDB though; it should theoretically not even know that this is a timestamp

aGuttman commented 2 years ago

Running local on mongo I can confirm that the numbers are always appropriately large. No dividing by 1000.

Looking at the time_query where moveToLongTerm first sees them format seems correct. Problem is coming from here then? unified_entry = enuf.convert_to_common_format(entry)

Any of this look suspicious?

def convert_to_common_format(entry):
    format_fn = get_formatter(entry)
    return format_fn(entry)

def get_formatter(entry):
    module_name = get_module_name(entry.metadata) 
    logging.debug("module_name = %s" % module_name)
    module = importlib.import_module(module_name) 
    return getattr(module, "format")

def get_module_name(metadata):
    return "emission.net.usercache.formatters.{platform}.{key}".format(
        platform=metadata.platform,
        key=metadata.key.split("/")[1])

Did entry.metadata on the server get messed up somehow?

shankari commented 2 years ago

There are currently no entries in the usercache on DocumentDB, so the entries must be inserted as part of the test.

>>> import emission.core.get_database as edb
Connecting to database URL mongodb://...
>>> edb.get_usercache_db().count_documents({})
0

I do recall having some older entries in which the data was in milliseconds and having a special formatter for them. But then we should be using the same formatter on both mongodb and documentDB

shankari commented 2 years ago

@aGuttman Looking more closely at moveToLongTerm, the original time query is not based on the entries in the usercache. Only on the entries in the pipeline state.

        time_query = esp.get_time_range_for_usercache(self.user_id)
def get_time_range_for_stage(user_id, stage):
...
        start_ts = curr_state.last_processed_ts
       end_ts = time.time() - END_FUZZ_AVOID_LTE

Let's check the start timestamps in the database, although it seems like end_ts, which uses time.time() should always be correct.

shankari commented 2 years ago

Start times in the pipeline state DB

[{'_id': .. 'pipeline_stage': 0, 'curr_run_ts': None, 'last_processed_ts': 1652461122.0, 'last_ts_run': 1652461136.7204711}, 
{'_id': .. 'pipeline_stage': 0, 'curr_run_ts': None, 'last_processed_ts': 1652461122, 'last_ts_run': 1652461138.545055}]

also seem to be correct

shankari commented 2 years ago

And we don't override the start_ts on the timequery anywhere in moveToLongTerm. So where do we get metadata.write_ts start: 1652465.999 from?

It can't be from any existing entries in the DB because there's no entry with that number either in the pipeline state or in the usercache. And if it is created as part of the test, how does DocumentDB know that it is supposed to be a timestamp?

shankari commented 2 years ago

since it does work eventually, I wonder if the issue is that an earlier save (as part of the test) is messed up?

shankari commented 2 years ago

Here's a way to setup the database with all our entries. We can then run the test line by line and check the state of the database manually at each step.

$ ./e-mission-py.bash
Python 3.7.10 | packaged by conda-forge | (default, Feb 19 2021, 15:59:12)
[Clang 11.0.1 ] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import emission.core.get_database as edb
Connecting to database URL mongodb://
>>> list(edb.get_pipeline_state_db().find())
[{'_id': , 'user_id': , 'pipeline_stage': 0, 'curr_run_ts': None, 'last_processed_ts': 1652477888.0, 'last_ts_run': 1652477905.4214709}, {'_id': , 'user_id': , 'pipeline_stage': 0, 'curr_run_ts': None, 'last_processed_ts': 1652477888, 'last_ts_run': 1652477907.719307}]
>>> import emission.tests.netTests.TestBuiltinUserCacheHandlerInput as test
>>> test.TestBuiltinUserCacheHandlerInput().setUp()
collections = ['Stage_usercache', 'Stage_pipeline_state', 'Stage_timeseries']
Dropping collection Stage_usercache
Dropping collection Stage_pipeline_state
Dropping collection Stage_timeseries
>>> edb.get_usercache_db().count_documents({})
90