This directory contains a reference Cloud Dataflow pipeline to convert HL7v2 messages to FHIR resources. Please note that additional configurations and hardening are required before processing PHI data with this pipeline.
NotificationConfig
s and a schematized ParserConfig
.Make sure you have enough permissions to run Cloud Dataflow jobs.
The Cloud Dataflow Controller Service Account needs the following permissions.
roles/pubsub.subscriber
.
roles/healthcare.hl7V2Consumer
.
roles/healthcare.fhirResourceEditor
.
roles/storage.objectAdmin
.
Build a fat JAR of the pipeline by running the following from the project directory.
# Generate wrapper classes.
gradle wrapper --gradle-version 7.6
./build_deps.sh && ./gradlew shadowJar
A JAR file should be generated in build/libs
folder.
Now run the pipeline with the following command:
# Please set the environment variables in the following command.
java -jar build/libs/converter-0.1.0-all.jar --pubSubSubscription="projects/${PROJECT?}/subscriptions/${SUBSCRIPTION?}" \
--readErrorPath="gs://${ERROR_BUCKET?}/read/" \
--writeErrorPath="gs://${ERROR_BUCKET?}/write/" \
--mappingErrorPath="gs://${ERROR_BUCKET?}/mapping/" \
--mappingPath="gs://${MAPPING_BUCKET?}/hl7v2_fhir.wstl" \
--fhirStore="projects/${PROJECT?}/locations/${LOCATION?}/datasets/${DATASET?}/fhirStores/${FHIRSTORE?}" \
--importRoot="${MAPPING_ROOT_FOLDER}" \
--runner=DataflowRunner \
--region=${REGION?} \
--project=${PROJECT?}
A few notes:
--enableStreamingEngine
(recommended) or a combination of --autoscalingAlgorithm=THROUGHPUT_BASED
and
--maxNumWorkers=N
to manually enable it. See this page for more details.--experiments=enable_stackdriver_agent_metrics
as an option (you will need to grant roles/monitoring.metricWriter
to Dataflow controller service account as well), see this page for more details. Additionally, we highly recommend limiting the number of threads on each worker, e.g. --numberOfWorkerHarnessThreads=10
. You can tune the limit based on your workload.--stagingLocation=gs://${STAGING_LOCATION} --templateLocation=gs://${TEMPLATE_LOCATION}
to the above command. See herePlease take a look at the PipelineRunner
class to see the concrete meaning of
each argument.
You should be able to verify that a Dataflow pipeline is running from the cloud console UI. Data should start flowing through the pipeline and arrive at the FHIR Store, use the SearchResources API to verify that FHIR Resources are written correctly.
This directory contains a reference Cloud Dataflow pipeline to convert a DICOM Study to a FHIR ImagingStudy resource.
Make sure you have enough permissions to run Cloud Dataflow jobs.
The Cloud Dataflow Controller Service Account needs the following permissions.
roles/pubsub.subscriber
.
roles/healthcare.dicomEditor
.
roles/healthcare.fhirResourceEditor
.
roles/storage.objectAdmin
.
Build a fat JAR of the pipeline by running the following from the project directory.
# Generate wrapper classes.
gradle wrapper
./build_deps.sh && ./gradlew shadowJar -PmainClass=com.google.cloud.healthcare.etl.runner.dicomtofhir.DicomToFhirStreamingRunner
A JAR file should be generated in build/libs
folder.
Now run the pipeline with the following command:
# Please set the environment variables in the following command.
java -jar build/libs/converter-0.1.0-all.jar --pubSubSubscription="projects/${PROJECT?}/subscriptions/${SUBSCRIPTION?}" \
--readErrorPath="gs://${ERROR_BUCKET?}/read/" \
--writeErrorPath="gs://${ERROR_BUCKET?}/write/" \
--mappingErrorPath="gs://${ERROR_BUCKET?}/mapping/" \
--mappingPath="gs://${MAPPING_BUCKET?}/main.textproto" \
--fhirStore="projects/${PROJECT?}/locations/${LOCATION}/datasets/${DATASET?}/fhirStores/${FHIRSTORE?}" \
--runner=DataflowRunner \
--region=${REGION?} \
--project=${PROJECT?}
A few notes:
--enableStreamingEngine
(recommended) or a combination of --autoscalingAlgorithm=THROUGHPUT_BASED
and
--maxNumWorkers=N
to manually enable it. See this page for more details.--experiments=enable_stackdriver_agent_metrics
as an option (you will need to grant roles/monitoring.metricWriter
to Dataflow controller service account as well), see this page for more details. Additionally, we highly recommend limiting the number of threads on each worker, e.g. --numberOfWorkerHarnessThreads=10
. You can tune the limit based on your workload.--stagingLocation=gs://${STAGING_LOCATION} --templateLocation=gs://${TEMPLATE_LOCATION}
to the above command. See heregcs_location:
instead of local_path:
.Please take a look at the PipelineRunner
class to see the concrete meaning of
each argument.
You should be able to verify that a Dataflow pipeline is running from the cloud console UI. Data should start flowing through the pipeline and arrive at the FHIR Store, use the SearchResources API to verify that FHIR Resources are written correctly.
This directory contains a reference Cloud Dataflow pipeline to convert custom/non standard messages to FHIR resources. Please note that additional configurations and hardening are required before processing PHI data with this pipeline.
Make sure you have enough permissions to run Cloud Dataflow jobs.
The Cloud Dataflow Controller Service Account needs the following permissions.
roles/pubsub.subscriber
.
roles/healthcare.fhirResourceEditor
.
roles/storage.objectAdmin
.
roles/pubsub.viewer
.
roles/dataflow.worker
.
Build a fat JAR of the pipeline by running the following from the project directory.
# Generate wrapper classes.
./build_deps.sh && gradle wrapper --gradle-version 7.6
./gradlew shadowJar
A JAR file should be generated in build/libs
folder.
Now run the pipeline with the following command:
shadowJar { mainClassName = project.findProperty('mainClass') ?: 'com.google.cloud.healthcare.etl.runner.customtofhir.CustomToFhirStreamingRunner' dependsOn('buildDeps') }
// sourceCompatibility = 11 sourceCompatibility = 1.8
# Please set the environment variables in the following command.
java -jar build/libs/converter-0.1.0-all.jar --pubSubSubscription="projects/${PROJECT?}/subscriptions/${SUBSCRIPTION?}" \
--readErrorPath="gs://${ERROR_BUCKET?}/read/" \
--writeErrorPath="gs://${ERROR_BUCKET?}/write/" \
--mappingErrorPath="gs://${ERROR_BUCKET?}/mapping/" \
--mappingPath="gs://${MAPPING_BUCKET?}/mapping.textproto" \
--fhirStore="projects/${PROJECT?}/locations/${LOCATION?}/datasets/${DATASET?}/fhirStores/${FHIRSTORE?}" \
--runner=DataflowRunner \
--region=${REGION?} \
--project=${PROJECT?} \
--serviceAccount=dataflow-0222@smede-276406.iam.gserviceaccount.com
A few notes:
--enableStreamingEngine
(recommended) or a combination of --autoscalingAlgorithm=THROUGHPUT_BASED
and
--maxNumWorkers=N
to manually enable it. See this page for more details.--experiments=enable_stackdriver_agent_metrics
as an option (you will need to grant roles/monitoring.metricWriter
to Dataflow controller service account as well), see this page for more details. Additionally, we highly recommend limiting the number of threads on each worker, e.g. --numberOfWorkerHarnessThreads=10
. You can tune the limit based on your workload.--stagingLocation=gs://${STAGING_LOCATION} --templateLocation=gs://${TEMPLATE_LOCATION}
to the above command. See herePlease take a look at the PipelineRunner
class to see the concrete meaning of
each argument.
You should be able to verify that a Dataflow pipeline is running from the cloud console UI. Data should start flowing through the pipeline and arrive at the FHIR Store, use the SearchResources API to verify that FHIR Resources are written correctly.
Please file GitHub issues if you encounter any problems.