opentargets / issues

Issue tracker for Open Targets Platform and Open Targets Genetics Portal
https://platform.opentargets.org https://genetics.opentargets.org
Apache License 2.0
12 stars 2 forks source link

Schema validation misses nested arrays #3545

Open DSuveges opened 3 days ago

DSuveges commented 3 days ago

@project-defiant identified a bug in the vep to variant index parser, which was traced back to an incorrect type. When the parser run the inSilicoPredictors column instead of an array of struct, was array of array of struct. What was striking is the fact that the all tests were passing, so the generated dataset had to pass schema validation. This lead to the identification of an other bug: schema validation doesn't work on highly nested columns.

Demonstaring the behaviour:

@dataclass
class Test(Dataset):
    def __post_init__(self) -> None:
        self.validate_schema()

    @classmethod
    def get_schema(cls) -> StructType:
        return StructType(
            [
                StructField(
                    "a",
                    ArrayType(
                        StructType(
                            [
                                StructField("b", StringType(), True),
                                StructField("c", LongType(), True),
                            ]
                        ),
                        True,
                    ),
                    True,
                )
            ]
        )
# test data matching the expected schema:
df = session.spark.createDataFrame([("c", 2)], ["b", "c"]).select(
    f.array(f.struct(f.col("b"), f.col("c"))).alias("a")
)

t = Test(_df=df, _schema=Test.get_schema())
t.df.printSchema()
root
 |-- a: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- b: string (nullable = true)
 |    |    |-- c: long (nullable = true)

All good. This is what we are expecting. If something is wrong inside the array, it raises and error as expected:

df = session.spark.createDataFrame([("c", 2)], ["b", "c"]).select(
    f.array(f.struct(f.col("b"), f.col("c").alias('d'))).alias("a")
)

t = Test(_df=df, _schema=Test.get_schema())

Failing:

ValueError: The ['a.d'] fields are not included in DataFrame schema: [Field(name='a', dataType=ArrayType(StructType([]), True)), Field(name='a.b', dataType=StringType()), Field(name='a.c', dataType=LongType())]

What is interesting, if the data has an array of array, the validation passes:

t = Test(_df=df.withColumn("a", f.array(f.col("a"))), _schema=Test.get_schema())
t.df.printSchema()

This is the schema:

root
 |-- a: array (nullable = false)
 |    |-- element: array (containsNull = false)
 |    |    |-- element: struct (containsNull = false)
 |    |    |    |-- b: string (nullable = true)
 |    |    |    |-- d: long (nullable = true)

Things are even worse, because anything validates inside an array of array:

df = session.spark.createDataFrame([("c", 2)], ["b", "c"]).select(
    f.array(f.array(f.struct(f.col("b"), f.col("c").alias("d")))).alias("a")
)

t = Test(_df=df, _schema=Test.get_schema())
t.df.printSchema()

Gives:

root
 |-- a: array (nullable = false)
 |    |-- element: array (containsNull = false)
 |    |    |-- element: struct (containsNull = false)
 |    |    |    |-- b: string (nullable = true)
 |    |    |    |-- d: long (nullable = true)

Essentially, we need to recursively loop through the observed and expected schema and identify any discrepancies.

DSuveges commented 2 days ago

A proposed solution involves a recursive approach that checks nested struct and array fields. The schema comparison walks through the observed and expected schemas and collects all the discrepancies.

observed = StructType(
    [
        StructField(
            'a',
            ArrayType(
                ArrayType(
                    StructType(
                        [
                            StructField('a', IntegerType(), False),
                            StructField('c', StringType(), True),
                            StructField('c', StringType(), True)
                        ]
                    ),
                    False
                ),
                False
            ),
            False
        ),
    ]
)

expected = StructType(
    [
        StructField(
            'a',
            ArrayType(
                ArrayType(
                    StructType(
                        [
                            StructField('b', IntegerType(), False),
                            StructField('c', StringType(), True),
                            StructField('d', StringType(), True)
                        ]
                    ),
                    False
                ),
                False
            ),
            False
        ),
    ]
)

discrepancies = compare_struct_schemas(observed, expected)
if d:
    raise SchemaValidationError('Schema validation failed', discrepancies)

In the observed and expected schema there are a handful of differences captured by the validation error:

---------------------------------------------------------------------------
SchemaValidationError                     Traceback (most recent call last)
Cell In[303], [line 47](vscode-notebook-cell:?execution_count=303&line=47)
     [45](vscode-notebook-cell:?execution_count=303&line=45) d = compare_struct_schemas(o, e)
     [46](vscode-notebook-cell:?execution_count=303&line=46) if d:
---> [47](vscode-notebook-cell:?execution_count=303&line=47)     raise SchemaValidationError('Schema validation failed', d)

SchemaValidationError: Schema validation failed
Errors:
  duplicated_columns: a[][].c
  missing_mandatory_columns: a[][].b
  unexpected_columns: a[][].a
DSuveges commented 2 days ago

Upon integrating the schema validation with the dataset class the behaviour is:

@dataclass
class Test(Dataset):

    def __post_init__(self) -> None:
        self.validate_schema()

    @classmethod
    def get_schema(cls) -> StructType:
        return StructType(
            [
                StructField(
                    'a',
                    ArrayType(
                        StructType(
                            [
                                StructField("b", StringType(), True),
                                StructField("c", LongType(), True)
                            ]
                        ),
                        True
                    ), True
                )
            ]
        )

t= Test(
    _df= df,
    _schema = Test.get_schema()
)
t.df.printSchema()

All good:

root
 |-- a: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- b: string (nullable = true)
 |    |    |-- c: long (nullable = true)

However this raises an error:

t= Test(
    _df= df.withColumn('a', f.array(f.col('a'))),
    _schema = Test.get_schema()
)

The error looks like this:

SchemaValidationError: Schema validation failed for Test
Errors:
  columns_with_non_matching_type: For column "a[][]" found array instead of struct
DSuveges commented 2 days ago

When running tests via the updated schema validation the variant index tests are failing:

E           gentropy.common.schemas.SchemaValidationError: Schema validation failed for VariantIndex
E           Errors:
E             columns_with_non_matching_type: For column "inSilicoPredictors[][]" found array instead of struct

src/gentropy/dataset/dataset.py:152: SchemaValidationError