ehrbase / openEHR_SDK

A SDK to facilitate the development of openEHR applications
Other
48 stars 28 forks source link

Performance - Serialization and Validation #418

Open sidharthramesh opened 1 year ago

sidharthramesh commented 1 year ago

Configuration information

I'm trying to directly load compositions into a Postgres database using the SDK. The data is in the Simplified Flat format, and this needs to be validated and converted into the Database Native format.

The input data is a JSON array of multiple compositions (batches of 1000) that look like this:

[{
    "composition": {
      "clinikk.prescription_pad.v2/context/_health_care_facility|id": "19700ddf-6b3a-54ad-9e3b-c73f49149130",
      "clinikk.prescription_pad.v2/context/_health_care_facility|id_namespace": "Clinikk",
      "clinikk.prescription_pad.v2/context/_health_care_facility|id_scheme": "Clinikk",
      "clinikk.prescription_pad.v2/context/_health_care_facility|name": "Clinikk",
      "ctx/time": "2021-08-02T00:00:00",
      "ctx/composer_name": "Migration Agent",
      "ctx/language": "en",
      "ctx/territory": "IN",
      "clinikk.prescription_pad.v2/pulse_oximetry/spo|denominator": 100.0,
      "clinikk.prescription_pad.v2/pulse_oximetry/spo|type": 3,
      "clinikk.prescription_pad.v2/pulse_oximetry/spo": 0.97,
      "clinikk.prescription_pad.v2/pulse_oximetry/spo|numerator": 97,
      "clinikk.prescription_pad.v2/blood_pressure/systolic|magnitude": 140,
      "clinikk.prescription_pad.v2/blood_pressure/systolic|unit": "mm[Hg]",
      "clinikk.prescription_pad.v2/blood_pressure/diastolic|magnitude": 88,
      "clinikk.prescription_pad.v2/blood_pressure/diastolic|unit": "mm[Hg]",
      "clinikk.prescription_pad.v2/respiration/rate|magnitude": 30,
      "clinikk.prescription_pad.v2/respiration/rate|unit": "/min",
      "clinikk.prescription_pad.v2/height_length/height_length|magnitude": 164,
      "clinikk.prescription_pad.v2/height_length/height_length|unit": "cm",
      "clinikk.prescription_pad.v2/body_weight/any_event:0/weight|magnitude": 93,
      "clinikk.prescription_pad.v2/body_weight/any_event:0/weight|unit": "kg",
      "clinikk.prescription_pad.v2/body_mass_index/body_mass_index|magnitude": 34.58,
      "clinikk.prescription_pad.v2/body_mass_index/body_mass_index|unit": "kg/m2",
      "clinikk.prescription_pad.v2/body_temperature/temperature|magnitude": 97,
      "clinikk.prescription_pad.v2/body_temperature/temperature|unit": "[degF]",
      "clinikk.prescription_pad.v2/reason_for_encounter/presenting_problem:0": "k/co asthma \nwith acute exacerbation\n",
      "clinikk.prescription_pad.v2/story_history/any_event:0/story:0": "patient came to opd with c/o wheeze and breathing difficulty since yesterday\nh\nl/c/o t2 dm with asthma\nno h/o cough/fever",
      "clinikk.prescription_pad.v2/physical_examination_findings/description": "rs rhonchi b/l \nrest examination nad",
      "clinikk.prescription_pad.v2/problem_diagnosis:0/problem_diagnosis_name": "asthma\n",
      "clinikk.prescription_pad.v2/follow_up/current_activity:0/description": "avoid triggers",
      "clinikk.prescription_pad.v2/follow_up/current_activity:0/service_due": "2021-08-05T18:30:00Z",
      "clinikk.prescription_pad.v2/follow_up/current_activity:0/service_name": "Follow up",
      "clinikk.prescription_pad.v2/ha_follow_up/current_activity:0/service_name": "HA Follow up",
      "clinikk.prescription_pad.v2/ha_follow_up/current_activity:0/description": "wheeze"
    },
    "id": "19700ddf-6b3a-54ad-9e3b-c73f49149130",
    "template": "clinikk.prescription_pad.v2",
    "ehrId": "b7701e59-e345-5060-818a-0394bb0b8a90"
  }]

A snippet from the script that does the conversion looks like:

byte[] bytes = IOUtils.toByteArray(inputStream);
JSONArray inputArray = new JSONArray(new String(bytes));
for (int i = 0; i < inputArray.length(); i++) {
  JSONObject input = inputArray.getJSONObject(i);
  JSONObject composition = input.getJSONObject("composition");
  String id = input.getString("id");
  String ehrId = input.getString("ehrId");
  Composition flatComposition = unflattner.unmarshal(composition.toString(),
      this.webTemplate);

  List<ConstraintViolation> result = validator.validate(flatComposition,
  template);
  if (result.size() > 0) {
  throw new ProcessException(id + "- Validation failed: " + result);
  }

  String rawJson = new RawJson().marshal(flatComposition);
  System.out.println("marshalling " + id);

  String sqlString = "call put_composition('" + ehrId + "', '" + id +
      "', '" + template.getTemplateId().getValue() + "', '" + rawJson +
      "','" + systemId + "', '" + id + "');\n";
  IOUtils.write(sqlString.getBytes(), outputStream);
}

The put_composition is a stored procedure on Postgres that will do what's necessary to create a composition, contribution, party and entries in the database.

This takes about < 30ms / composition

Actual result

Validation and transforming 3013 compositions in total took a total of - 661.1 seconds running on an M1 Macbook Air (running Java without Rosetta emulation).

The batches of x compositions each took:

1274 compositions - 297.6s
1108 compositions - 221.8s
631 compositions - 141.7s

Averaging at 219ms per composition.

Expected result (Acceptance Criteria)

Running validation and transformation operations should at least be as fast as the database insert operation ~ 30ms to not make the validation and transformation process the choking point on ETL pipelines.

Any other suggestions or workarounds to speed up the process would also be much appreciated!

Definition of Done

stefanspiska commented 1 year ago

@sidharthramesh

Replace validator.validate(flatComposition, template);

with validator.validate(flatComposition, this.webTemplate);

stefanspiska commented 1 year ago

@sidharthramesh Also you might want to take a look at https://github.com/ehrbase/performance-test/blob/main/src/main/java/org/ehrbase/webtester/service/LoaderServiceImp.java

stefanspiska commented 1 year ago

cc: @vidi42

sidharthramesh commented 1 year ago

@stefanspiska thank you for the quick reply. With just that 1 change, it's already much faster:

1274 compositions - 87.4s
1108 compositions - 56.7s
631 compositions - 37.1s

= 181.2s ( 3.6 x faster)

Averaging around 60ms per composition - so much better. But still technically the bottleneck xD. I'll look at the Performance Test Loader and see if we can get it lower to around 30ms.

For context, We're building a Nifi Processor that can ingest compositions in bulk after multiple other ETL pipelines.

stefanspiska commented 1 year ago

@sidharthramesh

I do not thing what you are trying to do is a good idea. Its one thing to directly insert data to generate a test set or a initial import. But ehrbase is build on the assumption that it has exclusive written access to the data. The DB-Structure can and will change. Also you might hit concurrency and integrity issues. And in the end in the best case you just replicate our service layer.

If you need a batch which run in one transaction you can do that via the contribution Endpoint. (Now supported in the sdk). If you want throughput your pipeline needs to sent request in parallel. (Ideally the contribution Service would use parallel processing, but this is not yet implemented )

And finally if you do not want to have the rest overhead some other protocols could be added via a plugin, but plugins is a beta feature right now.

cc @birgerhaarbrandt , @vidi42 , @HolgerReiseVSys

sidharthramesh commented 1 year ago

Hey @stefanspiska,

I understand my solution is hacky, and yes, I totally expect the database schema to change over time. The points you made about concurrency and integrity are also bothering me now, and it's probably best to seek a proper solution to this - will come in handy for many clients.

2 key requirements to be able to do ETL well - idempotency and batching.

We tried using the EHRbase REST API first, but didn't meet these requirements: