CDCgov / IDWA

Intelligent Data Workflow Automation
Apache License 2.0
1 stars 1 forks source link

RL Synthea Performance tests #59

Closed ericbuckley closed 3 months ago

ericbuckley commented 4 months ago

Pull Request

Description

These changes build upon the record linkage performance testing compose environment by introducing the scripts/test_synthea_data.sh script to execute tests using patient data generated by Synthea. The script breaks the process down into 4 steps

  1. truncates the MPI database (scripts/reset_db.sh)
  2. generates Synthea data (scripts/generate_synthetic_data.sh)
  3. extracts patient data and sends to the linkage API (scripts/send_linkage_requests.sh)
  4. hangs the script for environment analysis

Related Issues

Additional Notes

Recordings on Running the Performance Tests

Running the tests

docker compose run --build --rm runner scripts/test_synthea_data.sh

Environment variables

The environment variables are now split across an rlpt.env file and the environment attributes in compose.yml, see the Environment Variables section in the README.md for more information on why.

Notes on src/link.py

The link.py module was copied from the phdi.linkage.link module and overridden to test out a performance change (see README.md Performance Test Parameters for more details). It was decided that it be easier to edit and maintain a copied version rather than a diff file, but for the sake of this review a diff patch is provided below to more easily see the changes.

--- /Users/buckley/Downloads/link.py    2024-03-30 07:56:36
+++ src/link.py 2024-03-30 07:58:52
@@ -1,8 +1,10 @@
+import collections
 import copy
 import datetime
 import hashlib
 import json
 import logging
+import os
 import pathlib
 from itertools import combinations
 from math import log
@@ -32,6 +34,11 @@
     "sex": "Patient.gender",
     "mrn": "Patient.identifier.where(type.coding.code='MR').value",
 }
+
+
+REDUCE_COMPARES = (os.environ.get("REDUCE_COMPARES", "0").lower() in ["true", "t", "1",])
+if REDUCE_COMPARES:
+    logging.warning("REDUCE COMPARE MODE: Only comparing first record in each cluster.")

 def block_data(data: pd.DataFrame, blocks: List) -> dict:
@@ -639,12 +646,21 @@
             logging.info(
                 f"Done with _group_patient_block_by_person at:{datetime.datetime.now().strftime('%m-%d-%yT%H:%M:%S.%f')}"  # noqa
             )
+            
+            if REDUCE_COMPARES:
+                clusters = _consolidate_person_clusters(clusters)

             # Check if incoming record should belong to one of the person clusters
             kwargs = linkage_pass.get("kwargs", {})
+            logging.warning(f"Person Size: {len(clusters)}")
             for person in clusters:
                 num_matched_in_cluster = 0.0
-                for linked_patient in clusters[person]:
+                linked_patients = clusters[person]
+                if REDUCE_COMPARES:
+                    count = float(len(linked_patients))
+                    linked_patients = linked_patients[0:1]
+                logging.warning(f"Patient Size: {len(linked_patients)}")
+                for linked_patient in linked_patients:
                     logging.info(
                         f"Starting _compare_records at:{datetime.datetime.now().strftime('%m-%d-%yT%H:%M:%S.%f')}"  # noqa
                     )
@@ -661,7 +677,10 @@
                     )

                     if is_match:
-                        num_matched_in_cluster += 1.0
+                        if REDUCE_COMPARES:
+                            num_matched_in_cluster += count
+                        else:
+                            num_matched_in_cluster += 1.0

                 # Update membership score for this person cluster so that we can
                 # track best possible link across multiple passes
@@ -669,6 +688,8 @@
                     f"Starting to update membership score at:{datetime.datetime.now().strftime('%m-%d-%yT%H:%M:%S.%f')}"  # noqa
                 )
                 belongingness_ratio = num_matched_in_cluster / len(clusters[person])
+                if REDUCE_COMPARES:
+                    person = person[0]
                 if belongingness_ratio >= linkage_pass.get("cluster_ratio", 0):
                     logging.info(
                         f"belongingness_ratio >= linkage_pass.get('cluster_ratio', 0): {datetime.datetime.now().strftime('%m-%d-%yT%H:%M:%S.%f')}"  # noqa
@@ -1306,7 +1327,6 @@
     )
     if not list_of_address_objects:
         return None
-
     if field == "address":
         list_of_address_lists = [
             ao.get(LINKING_FIELDS_TO_FHIRPATHS[field].split(".")[-1], [])
@@ -1321,7 +1341,6 @@
             list_of_usable_address_elements.append(
                 address_object.get(LINKING_FIELDS_TO_FHIRPATHS[field].split(".")[-1])
             )
-
     return list_of_usable_address_elements

@@ -1392,6 +1411,19 @@
     return clusters

+def _consolidate_person_clusters(clusters: dict[str, List]) -> dict[(str, str), List]:
+    """
+    Helper method that consolidates the clusters into groups of patients based on
+    similar attributes for the patients.
+    """
+    consolidated_clusters = collections.defaultdict(list)
+    for person_id, patients in clusters.items():
+        for patient in patients:
+            hash_key = '\t'.join(str(x).lower().strip() for x in patient[2:])
+            consolidated_clusters[(person_id, hash_key)].append(patient)
+    return consolidated_clusters
+
+
 def _map_matches_to_record_ids(
     match_list: Union[List[tuple], List[set]], data_block, cluster_mode: bool = False
 ) -> List[tuple]:

Checklist

Please review and complete the following checklist before submitting your pull request:

Checklist for Reviewers

Please review and complete the following checklist during the review process:

ericbuckley commented 3 months ago

@zdeveloper @colekettler @arinkulshi @jonchang @knguyenrise8 I wanted to tag the 5 of you in case you're interested in learning about some of the performance testing work I'm doing with the phdi record-linkage API. If you pull down the branch, it's pretty easy to get up and running with just 1 docker compose command. There are also a couple of recordings linked in the PR description if you prefer.