google / fhir-data-pipes

A collection of tools for extracting FHIR resources and analytics services on top of that data.
https://google.github.io/fhir-data-pipes/
Apache License 2.0
151 stars 84 forks source link

Streaming pipeline stops syncing when number of patients = jdbcMaxPoolSize #220

Closed omarismail94 closed 2 years ago

omarismail94 commented 2 years ago

Problem

Streaming pipeline stops syncing patients when number of patients = jdbcMaxPoolSize

Pre-requisites to reproduce the issue

  1. Change the openmr_convert method for the Patient class in resources.py to code in [1]. This is to get rid of the code that does not allow the patient to be upload more than once. Having duplicates does not affect the reproduction

  2. Change the value of utils/dbz_event_to_fhir_config.json to localhost, as opposed to openmrs-fhir-mysql

  3. Check you can run jps and jstack. These should be part of your JDK installation

  4. Make sure bunsen and the pipelines are compiled. If not run:

    mvn -B -e -f bunsen install; mvn -B  -e -f pipelines install
  5. Make sure OpenMRS and HAPI Server are up and running. If they are not running, run:

    docker-compose -f ./docker/openmrs-compose.yaml -f ./docker/sink-compose.yml  up --force-recreate --remove-orphan -d

Repro Instructions

You need three terminal windows. One to run the pipeline, and the other to upload data.

First, run the pipeline using the command:

java -jar pipelines/streaming-binlog/target/fhir-binlog-streaming-etl-bundled-0.1.0-SNAPSHOT.jar  \
--fhirDebeziumConfigPath=utils/dbz_event_to_fhir_config.json \
--jdbcMaxPoolSize=6  \
--openmrsServerUrl=http://localhost:8099/openmrs --jdbcInitialPoolSize=4  \
--outputParquetPath=e2e-tests/STREAMING \
 --fhirSinkPath=http://localhost:8098/fhir   --sinkUserName=hapi   --sinkPassword=hapi \
 --secondsToFlushParquetFiles=15 &> log.log

In the second terminal, run:

python3 synthea-hiv/uploader/main.py         http://localhost:8099/openmrs/ws/fhir2/R4          --input_dir synthea-hiv/sample_data --convert_to_openmrs

This will upload 79 patients to OpenMRS.

Next, open the log file and search for the string:

Fetching FHIR resource at /Patient

You will see that this stops at jdbcMaxPoolSize, even though there are more patients that get uploaded

Cleanup

To stop the pipeline, first run the command jps to find the process number for the Java process running the pipeline. Once you have that, run kill -9 PID_NUMBER to stop and kill the process.

After each time you stop the pipeline, make sure to remove the data and streaming folders by running:

   sudo rm -rf ./data/; sudo rm -rf e2e-tests/STREAMING/

[1]

  def openmrs_convert(self):
    """Add fields to make Patient uploadable to OpenMRS."""   
    import random 
    openmrs_id = idgen.luhn_id_generator(random.randrange(0,100000))

    self.base.json['identifier'].append({
        'extension': [{
            'url': 'http://fhir.openmrs.org/ext/patient/identifier#location',
            'valueReference': {
                'reference': 'Location/8d6c993e-c2cc-11de-8d13-0010c6dffd0f',
                'type': 'Location',
                'display': 'Unknown Location'
            }
        }],
        'use': 'official',
        'type': {
            'text': 'OpenMRS ID'
        },
        'value': openmrs_id,
        'id': openmrs_id
    })

    self._inject_id()
omarismail94 commented 2 years ago

Issue is that the Connection is not closed. We do close the Statement and ResultSet, but we do not close the Connection. PR #219 fixes the issue

omarismail94 commented 2 years ago

PR #219 merged into master. Closing