aehrc / pathling

Tools that make it easier to use FHIR® and clinical terminology within data analytics, built on Apache Spark.
https://pathling.csiro.au
Apache License 2.0
86 stars 13 forks source link

feature request: read FHIR resources from Kafka stream #452

Closed kapsner closed 2 years ago

kapsner commented 2 years ago

We are currently searching for a solution to read FHIR resources from a Kafka Stream into Spark and found this repo as a potential solution for that.

Using pyspark 3.2.1 and the spark streaming API, we were indeed able to read the FHIR resources into a spark.SQL.dataframe, however, the last step of enfolding them into spark using bunsen failed due to issues mostly related to outdated dependencies of Bunsen.

Would there be any chance that pathling could somehow fill this gap?

Is pathling able (or will be in the future) to use a Kafka stream a source?

And: is there a Python API to pathling encoder?

johngrimes commented 2 years ago

@piotrszul Do we need anything extra to be able to access the Pathling encoders from within Python (assuming that the Pathling encoders JAR is already on Spark's classpath)?

johngrimes commented 2 years ago

We would be very open to implementation of a Kafka consumer within Pathling - would you be interested in submitting a pull request in relation to this?

kapsner commented 2 years ago

Well, our python script is fairly simple right now and I would currently not know, where to integrate it into pathling exactly in order to submit a PR (however, we would be indeed very interested in helping to get this working).

Here is our current script:

#!/bin/python

appName = "Kafka, Spark and FHIR Data"
master = "spark://spark:7077"
kafka_topic = "fhir.post-gateway-kdb"

# https://engineering.cerner.com/bunsen/0.5.10-SNAPSHOT/
from bunsen.r4.bundles import from_json, extract_entry
from pyspark.sql import SparkSession

if __name__ == "__main__":

    spark = SparkSession \
        .builder \
        .appName(appName) \
        .master(master) \
        .getOrCreate()

    df = spark \
      .readStream  \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "kafka1:19092") \
      .option("subscribe", kafka_topic) \
      .option("startingOffsets", "earliest") \
      .load()

    df.printSchema()

    query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
            .writeStream \
            .queryName("gettable")\
            .format("memory")\
            .start()

    # close connection after 15 seconds
    query.awaitTermination(15)

    mydf = spark.sql("select * from gettable")
    mydf.show()
    type(mydf)

    df = mydf.toPandas()
    df

    mydf_rdd = mydf.rdd
    type(mydf_rdd) # convert to javaRDD for bunsen

    # the bunsen part is failing due to using old pyspark=2.4.4
    #bundles = from_json(mydf_rdd, 'value')

df.printSchema() root |-- key: binary (nullable = true) |-- value: binary (nullable = true) |-- topic: string (nullable = true) |-- partition: integer (nullable = true) |-- offset: long (nullable = true) |-- timestamp: timestamp (nullable = true) |-- timestampType: integer (nullable = true)

df +-------------------+--------------------+ | key| value| +-------------------+--------------------+ | Patient/patient|{"resourceType": ...| |Condition/condition|{"resourceType": ...| |Condition/condition|{"resourceType": ...| |Procedure/procedure|{"resourceType": ...| |Encounter/encounter|{"resourceType": ...| +-------------------+--------------------+

If that would help, my colleague @jasminziegler could perhaps provide our dockerized development setup for a reproducible example of this code.

Edit: Our code was mostly adapted from https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html if that helps. The code snippets are also available in Scala and Java

piotrszul commented 2 years ago

@piotrszul Do we need anything extra to be able to access the Pathling encoders from within Python (assuming that the Pathling encoders JAR is already on Spark's classpath)?

Well, I would think that we would have to provide a python API similar to that of bunsen for converting a stream/dataframe of of JSON values to the stream/dataframe structured fhir entries. This would probably involve extracting parts of 'import' functionality to a function callable from python and then providing a python wrapper to call it.

kapsner commented 2 years ago

That would be really great, if adding a python API similar to bunsen would be possible in order to convert FHIR-resources from a stream to the structured dataframes.

Should this go into another feature-request, or would this already be covered with #194 ?

jasminziegler commented 2 years ago

Dear @johngrimes and @piotrszul, would it be of any help for your further development if we provided a dockerized demo setup of our current state? I am happy to provide you with anything needed to ease the upcoming steps :-)

Looking forward to hearing form you again, Jasmin

johngrimes commented 2 years ago

We're working on a pathling pip package that would have a very similar API to Bunsen.

It will have a way of encoding a dataframe of FHIR JSON strings, so you should be able to select the appropriate column from the dataframe you are getting from the Kafka stream and feed it in.

We're thinking that maybe we could give you access to an "alpha" release of this, so that you could test it and let us know if it works for your use case?

jasminziegler commented 2 years ago

This sounds amazing, thank you! We would be happy to test an alpha release of the pathling pip package. Kindly let us know when it's ready!

I summarized the most important steps from our current pipeline for you in a demo here: https://github.com/miracum/demo-kafka-spark-pipeline Here you can see the kafka setup and a mock data loader loading R4 FHIR resources into a kafka topic together with a Spark setup reading the data from this kafka topic into spark. It also contains the part where Bunsen is failing. You can find all Setup instructions in the Readme.

I hope this is helpful! Let me know in case you have any doubts or questions and we are happy to support you in any other way.