opentargets / issues

Issue tracker for Open Targets Platform and Open Targets Genetics Portal
https://platform.opentargets.org https://genetics.opentargets.org
Apache License 2.0
12 stars 2 forks source link

Prototype a data quality suite in the data generation step #2691

Closed ireneisdoomed closed 1 year ago

ireneisdoomed commented 2 years ago

Context

Data quality checks are a common task of the Data and Genetics team and they take a significant amount of time. The basic current data flow in the Platform and in Genetics is as follows:

graph TD
    A[Platform raw data] --> C[OT Harmonisation]
    B[Genetics raw data] --> C
    C --> D[Data generation]
    D -->|Integration| E[ETL]
    E --> F[Exposure:<br> FE, API, BigQuery, Dumps]

We want to build a solid system that accounts for data quality checks so that we increase confidence in the outputs of the data generation step. This way we would reduce manual checks in the ETL.

Data quality is a very broad topic and there are at least 2 ways we could tackle it:

So far this functionality is partially covered by the OT validator, which runs checks against the structure and the content in a high-level manner. However JSON Schemas:

Proposal

The approach with the higher effort/return rate is to enforce the schemas first. Most of our pipelines are written in Spark, which makes it very easy to have the schema inspected with df.schema.json().

At the same time, I would also like to come up with a solution that makes the second scenario of checking the content easier to iteratively add rules to the data as we become aware of them (e.g., no variant in the credible set of type 'gwas' should have a gene_id assigned).

This way, the process could look like this:

graph TD
    A[Platform raw data] -->C[OT Harmonisation] 
    B[Genetics raw data] -->C
    C --> D[Data Generation]
    subgraph Quality
        DA[Schema enforcement] --> D
        DB[Data constaints] --> D
    end
    D -->|Integration| E[ETL]
    E --> F[Exposure:<br> FE, API, BigQuery, Dumps]

We are currently in an interesting phase where we are working on the Genetics pipelines to avoid many of the common issues that happen in the release process. We could take one of the new repos as an example to test what is the best way to do these tests on the data. For example the new version of coloc or the dev branch of v2g with @DSuveges's latest work in the Intervals datasets.

A proposed project structure:

|--root
|    |-- src
|    |    |-- parser.py
|    |-- tests
|    |    |-- test_coloc.py
|    |    |-- schemas
|    |    |    |--  coloc.json

Potential tools

Some tools that could fit this purpose.

1. Chispa
[Github repo](https://github.com/MrPowers/chispa) The library provides useful functions to test the content of two dataframes. We could define some expected data and test that the produced output is in line with the controlled data.
2. Great Expectations
[Github repo](https://github.com/great-expectations/great_expectations) Huge suite with a lot of functionalities to test and monitor the data. It even allows to set up Slack notifications based on the result of the assessments. It is the library that I see referenced the most and Spark Dataframes are integrated. However it seems that the initial set up requires time to understand if we want to set up the "data context" - which I don't know if it is a mandatory thing. You can see a basic example [here](https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/968100988546031/3944681434720936/8836542754149149/latest.html)
3. Pandera
[Github repo](https://github.com/unionai-oss/pandera) Similar scope to Great Expectations but with an easier learning curve (probably at the expense of loss in functionalities). It looks however interesting as you can define data and schema constraints (through Pydantic schema models). It is natively built to work with Pandas dfs but they have documentation to handle Spark dfs. You can see a basic example [here](https://pandera.readthedocs.io/en/latest/fugue.html).
4. Apache Deequ
[Github repo](https://github.com/awslabs/deequ) Very interesting library developed by AWS intended explicitly to develop data unit tests in a Spark environment. The native language is Scala, although there is also a [Python API](https://github.com/awslabs/python-deequ) You can see how it works with detail [here](https://www.velotio.com/engineering-blog/test-data-quality-at-scale-using-deequ-and-apache-spark) Main characteristics are: - Spark native support - anomaly detection - to compare your dataset from now with the previously analyzed dataset, for instance, to detect data regressions in terms of dataset size. Deequ comes with a native implementation of this feature. - global and unitary checks - you can verify the whole dataset attributes like the number of rows as well as every column with a fine-grained rule. - metrics storage - Deequ has a concept of a repository where you can output generated metrics. Thanks to the repository you can query the metrics and visualize them on a dashboard. - evolving datasets - when you need to measure metrics over an evolving dataset, for example increasing in size every day, you can use a state store to persist the measures on an HDFS-compatible file system (object storage) or simply in memory. - constraints suggestion - Deequ can analyze your whole dataset or only its part and suggest you the validation constraints from there. - profiling - The profiling will give you information about one specific column, like its completeness (missing values), the number of distinct values or data type.
  1. Apache Griffin - More obscure to implement

Actions

DSuveges commented 2 years ago

I kind of have an impression it is still too costly to put together such an infrastructure. However I think something like Miguel's checkomatic could be considered. It was a rule based validation and QC system, which provides a dataset agnostic tool that can be used via a custom set of rules to check against any data. I think we can develop something similar.

  1. Having a rule agnostic tool that does the tests.
  2. The actual tests are defined by the creator of the dataset upon developing or updating the component so the burden is smaller. Eg. as I'm working on the intervals dataset, I am the most qualified to define the necessary rules in a yaml file that describes it eg. the schema, the content of the columns etc. You can imagine the rules would be pyspark expressions/filters that are thrown into the loaded dataset and the tool asserts the expected results.
  3. This yaml file could be stored in the repository of the code.
  4. A separate ETL step could be to call the test tool on the data given the set of rules that are expected to be passed.
  5. If we are rigorous enough we can build github actions around it.

But I'm not entirely sure this is the best way to spend our time to be honest. I think now we are having all sorts of problems, because none of us knows the Genetics Pipelines really well and of course the pipelines are very fragile and leaking everywhere. I believe if we invest into writing things properly there would be less of a need for such a tool. Building this would be a huge investment in time, and still would not guarantee anything.

DSuveges commented 2 years ago

If we are especially concerned by the correctness of the schema, what we can do is to ensure, that the last step of all processes is a .select statement listing all the columns with their type.

(
  df.
  # some processes.
  .select(
    F.col('column1').cast(T.StringType()),
    F.col('column2').cast(T.IntegerType()),
  )
  .write.parquet('...')
)

Any time output is generated by such script, there's no other option but outputting the correct schema. Also there's no burden to update a repository with the schemas. No need to develop an other process to check the correctness. Also upon updating the code, which involves changes in the schema, there's no chance to make those changes without updating the select statement.

ireneisdoomed commented 2 years ago

Just to give another idea, we can also cast the schema of a df to a preset one by passing the schema as an parameter in the createDataFrame function. We first have to convert the df to a rdd, so that the rdd is converted back. I read this somewhere and it is not an expensive function. Something like:

import json
schema_json = 'path_to_the_schema.json'
controlled_schema = StructType.fromJson(json.loads(schema_json))

final_df = spark.createDataFrame(df.rdd, schema=controlled_schema)
tskir commented 2 years ago

I've investigated how fastjsonschema could be used for validating data directly inside Spark dataframes, while still retaining the JSON schema as the source of truth, which can be relevant for this ticket: https://github.com/opentargets/issues/issues/2722

DSuveges commented 1 year ago

@ireneisdoomed, @tskir Currently I don't think there's any actionable item under this ticket that is waiting for completion. Although I agree that data QC should be further improved, I don't see it as an immediate priority. At the same time, I don't want to lose all the exploration done here, so moving the ticket to icebox.