google / fhir-data-pipes

A collection of tools for extracting FHIR resources and analytics services on top of that data.
https://google.github.io/fhir-data-pipes/
Apache License 2.0
153 stars 86 forks source link

Problem while using Encounter Constraints with SparkQuery #251

Closed gdevanla closed 8 months ago

gdevanla commented 2 years ago

Using the provided notebook, I tried adding EncounterConstraints with typeCode.

patient_query.encounter_constraints(typeCode=['5021b1a1-e7f6-44b4-ba02-da2f2bcf8718'])
flat_enc_df = patient_query.get_patient_encounter_view(BASE_URL)
flat_enc_df.head()

But, looks like the flattenning is not sufficient, since type.code.coding is nested at an extra level. I get the following error.

AnalysisException                         Traceback (most recent call last)
Input In [6], in <cell line: 8>()
      1 # Add encounter location constraint
      2 
      3 #x = patient_query._flatten_encounter(BASE_URL)
      4 #x = patient_query._enc_df.select('type').where()
      7 patient_query.encounter_constraints(typeCode=['5021b1a1-e7f6-44b4-ba02-da2f2bcf8718'])
----> 8 flat_enc_df = patient_query.get_patient_encounter_view(BASE_URL)
      9 flat_enc_df.head()

File ~/fsf/openmrs-fhir-analytics/dwh/query_lib.py:402, in _SparkPatientQuery.get_patient_encounter_view(self, base_url, force_location_type_columns)
    400 self._make_sure_patient()
    401 self._make_sure_encounter()
--> 402 flat_enc = self._flatten_encounter(base_url + 'Encounter/',
    403                                    force_location_type_columns)
    404 column_list = ['encPatientId']
    405 if self._enc_constraint.has_location() or force_location_type_columns:

File ~/fsf/openmrs-fhir-analytics/dwh/query_lib.py:441, in _SparkPatientQuery._flatten_encounter(self, base_encounter_url, force_location_type_columns)
    437   column_list += [
    438       F.col('typeFlat.coding.system').alias('encTypeSystem'),
    439       F.col('typeFlat.coding.code').alias('encTypeCode')]
    440 print(flat_df.select(column_list))
--> 441 return flat_df.select(column_list).where(self._enc_constraint.sql())

File ~/fsf/openmrs-fhir-analytics/.env38/lib/python3.8/site-packages/pyspark/sql/dataframe.py:1731, in DataFrame.filter(self, condition)
   1706 """Filters rows using the given condition.
   1707 
   1708 :func:`where` is an alias for :func:`filter`.
   (...)
   1728 [Row(age=2, name='Alice')]
   1729 """
   1730 if isinstance(condition, str):
-> 1731     jdf = self._jdf.filter(condition)
   1732 elif isinstance(condition, Column):
   1733     jdf = self._jdf.filter(condition._jc)

File ~/fsf/openmrs-fhir-analytics/.env38/lib/python3.8/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1315 command = proto.CALL_COMMAND_NAME +\
   1316     self.command_header +\
   1317     args_command +\
   1318     proto.END_COMMAND_PART
   1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
   1322     answer, self.gateway_client, self.target_id, self.name)
   1324 for temp_arg in temp_args:
   1325     temp_arg._detach()

File ~/fsf/openmrs-fhir-analytics/.env38/lib/python3.8/site-packages/pyspark/sql/utils.py:117, in capture_sql_exception.<locals>.deco(*a, **kw)
    113 converted = convert_exception(e.java_exception)
    114 if not isinstance(converted, UnknownException):
    115     # Hide where the exception came from that shows a non-Pythonic
    116     # JVM exception message.
--> 117     raise converted from None
    118 else:
    119     raise

AnalysisException: cannot resolve 'encTypeCode['code']' due to data type mismatch: argument 2 requires integral type, however, ''code'' is of string type.; line 1 pos 0;
'Filter ((true AND encTypeCode#492[code] IN (5021b1a1-e7f6-44b4-ba02-da2f2bcf8718)) AND true)
+- Project [encounterId#460, subject#87.patientId AS encPatientId#467, period#92.start AS first#468, period#92.end AS last#469, locationFlat#471.location.LocationId AS locationId#479, locationFlat#471.location.display AS locationDisplay#480, typeFlat#482.coding.system AS encTypeSystem#491, typeFlat#482.coding.code AS encTypeCode#492]
   +- Project [subject#87, id#74, location#98, type#85, period#92, encounterId#460, locationFlat#471, typeFlat#482]
      +- Generate explode(type#85), true, [typeFlat#482]
         +- Project [subject#87, id#74, location#98, type#85, period#92, encounterId#460, locationFlat#471]
            +- Generate explode(location#98), true, [locationFlat#471]
               +- Project [subject#87, id#74, location#98, type#85, period#92, regexp_replace(id#74, Encounter/, , 1) AS encounterId#460]
                  +- Project [subject#87, id#74, location#98, type#85, period#92]
                     +- Relation [id#74,meta#75,implicitRules#76,language#77,text#78,contained#79,identifier#80,status#81,statusHistory#82,class#83,classHistory#84,type#85,priority#86,subject#87,episodeOfCare#88,incomingReferral#89,participant#90,appointment#91,period#92,length#93,reason#94,diagnosis#95,account#96,hospitalization#97,... 3 more fields] parquet

I think the problem is the type of type as show here. Notice, that type.code which gets flattened in _flatten_encounter does not flatten.

 |-- type: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- coding: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- id: string (nullable = true)
 |    |    |    |    |-- system: string (nullable = true)
 |    |    |    |    |-- version: string (nullable = true)
 |    |    |    |    |-- code: string (nullable = true)
 |    |    |    |    |-- display: string (nullable = true)
 |    |    |    |    |-- userSelected: boolean (nullable = true)
 |    |    |-- text: string (nullable = true)

You can try this and notice that we need to use array_contains to make this query work:

x = patient_query._flatten_encounter(BASE_URL)
x.select('encTypeCode').where('encTypeCode is not null').where('array_contains(encTypeCode, "5021b1a1-e7f6-44b4-ba02-da2f2bcf8718")').show()
gdevanla commented 2 years ago

I think the solution is to change

https://github.com/GoogleCloudPlatform/openmrs-fhir-analytics/blob/master/dwh/query_lib.py#L150

to

type_code_str = 'array_contains(encTypeCode, {})'.format(temp_str)

gdevanla commented 2 years ago

FIxing this as part of https://github.com/GoogleCloudPlatform/openmrs-fhir-analytics/pull/256

bashir2 commented 2 years ago

Thanks for finding/flagging this bug @gdevanla. I finally managed to take a look and I think the proper fix for this is to do another explode_outer on typeFlat.coding before here. The idea for these flat views was that each output column would be a simple data type (i.e., not nested and not repeated). So for example for location, we remove the repeated element here and then create an alias for sub-elements of locationFlat. We want to do the same for typeFlat but as you pointed out typeFlat.coding is still a repeated field which needs to be flattened as well.

bashir2 commented 8 months ago

Closing this issue as we are deprecating the old query library.