google / fhir-data-pipes

A collection of tools for extracting FHIR resources and analytics services on top of that data.
https://google.github.io/fhir-data-pipes/
Apache License 2.0
148 stars 84 forks source link

FhirEtl misses resources #1109

Closed jakubadamek closed 1 month ago

jakubadamek commented 2 months ago

When I start FhirEtl, it outputs: Number of resources for Patient search is 79370 Number of resources for Encounter search is 3994387 Number of resources for Observation search is 16928057

After I finish a run, I check the number of rows in the three Parquet files, e.g. from java -cp ./pipelines/batch/target/batch-bundled.jar com.google.fhir.analytics.FhirEtl --fhirServerUrl=http://10.128.0.7:8080/fhir --runner=FlinkRunner --resourceList=Patient,Encounter,Observation --batchSize=100 --parallelism=67 --fasterCopy=true Patient: 'Total RowCount: 79370 Encounter': 'Total RowCount: 3356287 Observation': 'Total RowCount: 4226100"

The closest I have gotten since I started checking this is a Dataflow run which created all Paitents and Encounters, but not Observations. java -cp ./pipelines/batch/target/batch-bundled.jar com.google.fhir.analytics.FhirEtl --fhirServerUrl=http://10.128.0.14:8080/fhir --runner=DataflowRunner --resourceList=Patient,Encounter,Observation --batchSize=1000 --region=us-central1 --numWorkers=67 --gcpTempLocation=gs://fhir-analytics-test/dataflow_temp Patient': 'Total RowCount: 79370 Encounter': 'Total RowCount: 3994387 Observation': 'Total RowCount: 16922000"

jakubadamek commented 2 months ago

So the 79 dataset and 768 dataset are fine with Flink (even with batchSize 1000). But 7885 is not, it fetches 395664 out of 396664 Encounters and 1667000 out of 1695261 Observations.

java -cp ./pipelines/batch/target/batch-bundled.jar com.google.fhir.analytics.FhirEtl --fhirServerUrl=http://10.128.0.14:8080/fhir --runner=FlinkRunner --resourceList=Patient,Encounter,Observation --batchSize=1000 --parallelism=50 --fasterCopy=true

Here are all the errors from logs: https://paste.googleplex.com/6260051377651712 There are two kinds: 1) 18:00:42.564 [CHAIN DataSource (at Create.Values3/Read(CreateSource) (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat)) -> FlatMap (FlatMap at FetchResources3/ParDo(Search)/ParMultiDo(Search)) -> FlatMap (FlatMap at FetchResources3/ParDo(Search)/ParMultiDo(Search).output) (43/50)#0] ERROR c.g.fhir.analytics.FhirSearchUtil com.google.fhir.analytics.FhirSearchUtil.searchByUrl:70 - Failed to search for url: http://10.128.0.14:8080/fhir?_getpages=4af31a3d-8778-4504-b935-c228f03c2041&_getpagesoffset=105000 ; Exception: ca.uhn.fhir.rest.client.exceptions.FhirClientConnectionException: HAPI-1361: Failed to parse response from server when performing GET to URL http://10.128.0.14:8080/fhir?_getpages=4af31a3d-8778-4504-b935-c228f03c2041&_getpagesoffset=105000&_count=1000&_summary=data - java.net.SocketTimeoutException: Read timed out

2) 18:01:02.476 [CHAIN DataSource (at Create.Values/Read(CreateSource) (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat)) -> FlatMap (FlatMap at FetchResources/ParDo(Search)/ParMultiDo(Search)) -> FlatMap (FlatMap at FetchResources/ParDo(Search)/ParMultiDo(Search).output) (23/50)#0] ERROR c.g.fhir.analytics.FhirSearchUtil com.google.fhir.analytics.FhirSearchUtil.searchByUrl:70 - Failed to search for url: http://10.128.0.14:8080/fhir?_getpages=d6761bd6-2b1c-405a-9c94-396f9939b18f&_getpagesoffset=1518000 ; Exception: ca.uhn.fhir.rest.server.exceptions.InternalErrorException: HTTP 500 : HAPI-1163: Request timed out after 60056ms 1

bashir2 commented 2 months ago

Thanks for sharing and summarizing the logs; I think they explain what is happening. Some of the requests to the FHIR server are failing and those errors are ignored because we catch the exception and do not re-throw it. Looking at the code it seems this bug has been around for a very long time but I have never experienced it myself. It is probably happening for you more because you are using large batchSize (I see that in one of your examples, you have 100 which is odd to cause time-outs; that is probably because of having a lot of worker threads overwhelming the FHIR server).

I have a small PR to fix this which I'll send soon.

jakubadamek commented 2 months ago

The PR helps in making the pipeline fail if there is any error. It doesn't make it re-try which would probably be better.

Turns out that 3 parallel Flink workers are fine (and finish with 100% data) but 5 are not, even though the CPU load never goes to 100%.

See here for FhirEtl exceptions: https://paste.googleplex.com/4803996126806016 Here for HAPI exceptions: https://paste.googleplex.com/6550778385006592

bashir2 commented 2 months ago

The PR helps in making the pipeline fail if there is any error. It doesn't make it re-try which would probably be better.

We can add more retry logic, e.g., in the HTTP client or Flink re-execution params of failed bundles (I thought it is enabled in Flink by default), but regardless there is a limit to these. We are basically overwhelming the FHIR server and even if we retry more, at some point we run out of HTTP/worker retries (unless if we make them infinite which is not a good idea) and the pipeline has to fail.

Turns out that 3 parallel Flink workers are fine (and finish with 100% data) but 5 are not, even though the CPU load never goes to 100%.

Assuming that the pipeline, FHIR server, and the DB are all on the same machine, if the load is far from 100% then maybe it is the number of threads for the FHIR server that can be increased? There should be some sort of resource starvation somewhere if the HTTP query is timing out (note in the client we set it to 200 seconds but from your earlier logs it seems HAPI JPA server has a max timeout of 60 seconds, I have not investigated much).

When you say the pipeline fails with 5 workers, is it with the default batchSize of 100? I frequently run the pipeline with 40+ workers (i.e., number of cores on my machine) and have never experienced this timeout issue with batchSize=100.

Correction: I realized my large runs over the last year or so have been using the JDBC mode. I just tried the FHIR Search API with a large dataset and can reproduce the timeout issue you are experiencing. It seems to be coming from this line in HAPI JPA server and I don't see a way to override the default 60 seconds; will investigate more later. I am also changing this bug to a P0.

jakubadamek commented 2 months ago

Thanks. Yes, the limit is hardcoded to 1 minute here

jakubadamek commented 2 months ago

The pipeline and FHIR server are on the same machine. Sometimes I use two different machine with similar results. The DB is separate and usually has a very low load probably due to caching as I repeat the same tests.

bashir2 commented 1 month ago

Some updates about this issue; the TL;DR; is that I am pretty sure that this is a new issue on the HAPI server side that has been introduced over the last two years. I am going to file an issue there and reference it here too.

Details: Two years ago, @rayyang29 did some extensive evaluations of the FHIR Search mode against large HAPI servers/DBs. His findings are documented in Issue #266. As it is clear from the referenced documents it was possible to use the FHIR Search mode for large datasets and also to flood the CPU by database processes. Here is a sample graph where the postgres processes use most of the available 48 cores:

image

But in my experiments, I was never able to do this! The time-out issue does not happen for small datasets. I started from a dataset with ~1.7M Observations and the following succeeded:

java -cp ./pipelines/batch/target/batch-bundled.jar com.google.fhir.analytics.FhirEtl --fhirServerUrl=http://localhost:8094/fhir --runner=FlinkRunner --resourceList=Observation --outputParquetPath=[PATH] --batchSize=10 --parallelism=20

But when I tried something similar for a dataset of ~3x size (i.e., ~5.2M Observations), it failed with time-outs:

org.apache.beam.sdk.util.UserCodeException: ca.uhn.fhir.rest.server.exceptions.InternalErrorException: HTTP 500 : HAPI-1163: Request timed out after 60088ms

Also, although I increased the HAPI server's connection pool size to 40 it seems that during pipeline run, only one postgres process was consuming significant amount of CPU (while the HAPI server had 40 connections to the database).

Doing more investigation with just one worker thread (i.e., --parallelism=1), I can see that always after ~2000 resources are fetched, one of the http://localhost:8094/fhir?_getpages=ID&_getpagesoffset=OFFSET page queries get stuck for tens of seconds (others are returned almost instantly). I can even reproduce this behavior outside our pipeline with a simple bash script:

for ((i=0; i<5000; i=i+10));
do
  echo $i ;
  curl -X GET -H "Content-Type: application/fhir+json;charset=utf-8" "http://localhost:8094/fhir?_getpages=ID&_getpagesoffset=${i}&_count=10&_summary=data" -o tmp/Observation_${i}.json
done

and as explained above, at about i=2000, all of a sudden one query takes tens of seconds. Note the pipeline does not fail with --parallelism=1.

jakubadamek commented 1 month ago

Great work, Bashir, that makes a lot of sense. And the fact that you reproduced with bash.

bashir2 commented 1 month ago

Filed this issue on the HAPI side.