infoderm / patients

:face_with_thermometer: Patients meteor app
GNU Affero General Public License v3.0
5 stars 2 forks source link

Try to exploit Change Streams instead of `observe`/`observeChanges` #999

Open make-github-pseudonymous-again opened 1 month ago

make-github-pseudonymous-again commented 1 month ago

observeChangesAsync can have extreme lag when combined with raw collection operations (e.g. transactions). We should try to exploit Change Streams instead. Might also help with migrating away from Meteor.

See:

make-github-pseudonymous-again commented 1 month ago
  1. Run find in a transaction with an explicit ClientSession
  2. Retrieve ClientSession#operationTime
  3. Watch collection starting at that operationTime
make-github-pseudonymous-again commented 1 month ago

One problem with point 3. is that watch does not seem to handle all possible queries that find handles. A possible solution is to resort to polling the query using find whenever watch triggers an event for a reasonable superset of the input query.

For ordered callbacks (addedBefore, removedBefore), the different query responses can be compared using a longest increasing subsequence algorithm in order to trigger the corresponding callbacks:

  1. map old response items to their position in the new response (remove any item that is not in the new response)
  2. compute the LIS of these positions, this is the sequence of items that stayed in place and for which we at most need to call changed callbacks
  3. items not part of the LIS need to be removedBefore from their previous position and addedBefore to their new position
  4. items part of the old response but not part of the new response need to be removedBefore
  5. items part of the new response but not part of the old response need to be addedBefore

This is implemented by Meteor's diff-sequence package:

https://github.com/meteor/meteor/blob/7e2abe9464c215aace47d20f737e59243a819871/packages/diff-sequence/diff.js#L64

For unordered callbacks (added, removed), the solution is simpler: at most changed for what is in both the old and the new response, removed for what is not in the new response, and added for what is in the old response.

This is implemented by Meteor's diff-sequence package:

https://github.com/meteor/meteor/blob/7e2abe9464c215aace47d20f737e59243a819871/packages/diff-sequence/diff.js#L28

make-github-pseudonymous-again commented 1 month ago

Note that Meteor's current implementation also fails to implement all possible queries supported by find: it resorts to polling if minimongo cannot create a matcher for it

https://github.com/meteor/meteor/blob/628dcd24d7685e8c9a5e4182aaa564736b3cc8fc/packages/mongo/mongo_driver.js#L1536-L1546

make-github-pseudonymous-again commented 1 month ago

Note also that Meteor's oplog observe driver becomes quite complicated once limit is involved:

https://github.com/meteor/meteor/blob/628dcd24d7685e8c9a5e4182aaa564736b3cc8fc/packages/mongo/oplog_observe_driver.js#L51-L68

Unless watch does not suffer from the same inherent complexity, it would be wiser to start implementing a simple polling solution first.

make-github-pseudonymous-again commented 2 days ago

If resorting to pure change stream updates instead of polling, the following ChangeStream change event handler may make sense:

const handleChangeStreamEvents = <TSchema extends { _id: string, [key: string]: any }>(
  self: Subscription,
  collectionName: string,
  doc: ChangeStreamDocument<TSchema>
) => {
  switch (doc.operationType) {
    case "replace":
      if (doc.fullDocument) {
        self.changed(collectionName, doc.documentKey._id, doc.fullDocument);
        }
      break;
    case "insert":
      if (doc.fullDocument) {
        self.added(collectionName, doc.documentKey._id, doc.fullDocument);
    }
      break;
    case "delete":
      self.removed(collectionName, doc.documentKey._id);
      break;
    case "update":
      const {removedFields, updatedFields} = doc.updateDescription;
      const fields = Object.fromEntries([
    ...(removedFields?.map((key: keyof TSchema) => [key, undefined] as const) ?? []),
    ...Object.entries(updatedFields ?? {}),
      ]) as Partial<TSchema>;
      self.changed(collectionName, doc.documentKey._id, fields);
      break;
    case "drop":
    case "dropDatabase":
    case "rename":
    case "invalidate":
      self.stop();
      break;
    default:
      break;
  }
};

See also: https://forums.meteor.com/t/pub-sub-with-mongodb-change-stream/57495.