opentargets / issues

Issue tracker for Open Targets Platform and Open Targets Genetics Portal
https://platform.opentargets.org https://genetics.opentargets.org
Apache License 2.0
12 stars 2 forks source link

`safe_array_union` handles incorrectly <array<struct>> when struct fields are not sorted #3551

Closed project-defiant closed 1 month ago

project-defiant commented 1 month ago

Describe the bug safe_array_union function from gentropy can not merge arrays of structs with differently ordered fields. In example array<struct<a,b>>witharray<struct<b,a>>

Observed behaviour Merging arrays should not raise AnalysisException issue

Expected behaviour AnalysisException issue is raised

To Reproduce

from gentropy.common.session import Session
from gentropy.common.spark_helpers import safe_array_union
from pyspark.sql import functions as f
from pyspark.sql import types as t
session = Session()
schema = t.StructType(
    [
        t.StructField(
            "arr",
            t.ArrayType(
                t.StructType(
                    [
                        t.StructField("a", t.StringType()),
                        t.StructField("b", t.IntegerType())
                    ]
                )
            )
        ),
        t.StructField("id", t.IntegerType())
    ]
)
schema2 = t.StructType(
    [
        t.StructField(
            "arr2",
            t.ArrayType(
                t.StructType(
                    [
                        t.StructField("b", t.IntegerType()),
                        t.StructField("a", t.StringType())
                    ]
                )
            )
        ),
        t.StructField("id", t.IntegerType())
    ]
)
df1 = session.spark.createDataFrame([([("a", 1,)], 1),], schema=schema).cache()
df2 = session.spark.createDataFrame([([(1,"a",), (2, "c")], 1),], schema=schema2).cache()
j = df1.join(df2, on=["id"], how="left")
j.select(safe_array_union(f.col("arr"), f.col("arr2"))).show()

raises

---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
Cell In[182], [line 37](vscode-notebook-cell:?execution_count=182&line=37)
     [35](vscode-notebook-cell:?execution_count=182&line=35) df2 = session.spark.createDataFrame([([(1,"a",), (2, "c")], 1),], schema=schema2).cache()
     [36](vscode-notebook-cell:?execution_count=182&line=36) j = df1.join(df2, on=["id"], how="left")
---> [37](vscode-notebook-cell:?execution_count=182&line=37) j.select(safe_array_union(f.col("arr"), f.col("arr2"))).show()

File ~/Public/projects/ot-analysis/.venv/ot-analysis/lib/python3.10/site-packages/pyspark/sql/dataframe.py:2023, in DataFrame.select(self, *cols)
   [2002](https://file+.vscode-resource.vscode-cdn.net/Users/ss60/Public/projects/ot-analysis/notebooks/24-09-finemapping-batch-job-inputs/~/Public/projects/ot-analysis/.venv/ot-analysis/lib/python3.10/site-packages/pyspark/sql/dataframe.py:2002) def select(self, *cols: "ColumnOrName") -> "DataFrame":  # type: ignore[misc]
   [2003](https://file+.vscode-resource.vscode-cdn.net/Users/ss60/Public/projects/ot-analysis/notebooks/24-09-finemapping-batch-job-inputs/~/Public/projects/ot-analysis/.venv/ot-analysis/lib/python3.10/site-packages/pyspark/sql/dataframe.py:2003)     """Projects a set of expressions and returns a new :class:`DataFrame`.
   [2004](https://file+.vscode-resource.vscode-cdn.net/Users/ss60/Public/projects/ot-analysis/notebooks/24-09-finemapping-batch-job-inputs/~/Public/projects/ot-analysis/.venv/ot-analysis/lib/python3.10/site-packages/pyspark/sql/dataframe.py:2004) 
   [2005](https://file+.vscode-resource.vscode-cdn.net/Users/ss60/Public/projects/ot-analysis/notebooks/24-09-finemapping-batch-job-inputs/~/Public/projects/ot-analysis/.venv/ot-analysis/lib/python3.10/site-packages/pyspark/sql/dataframe.py:2005)     .. versionadded:: 1.3.0
   (...)
   [2021](https://file+.vscode-resource.vscode-cdn.net/Users/ss60/Public/projects/ot-analysis/notebooks/24-09-finemapping-batch-job-inputs/~/Public/projects/ot-analysis/.venv/ot-analysis/lib/python3.10/site-packages/pyspark/sql/dataframe.py:2021)     [Row(name='Alice', age=12), Row(name='Bob', age=15)]
   [2022](https://file+.vscode-resource.vscode-cdn.net/Users/ss60/Public/projects/ot-analysis/notebooks/24-09-finemapping-batch-job-inputs/~/Public/projects/ot-analysis/.venv/ot-analysis/lib/python3.10/site-packages/pyspark/sql/dataframe.py:2022)     """
-> [2023](https://file+.vscode-resource.vscode-cdn.net/Users/ss60/Public/projects/ot-analysis/notebooks/24-09-finemapping-batch-job-inputs/~/Public/projects/ot-analysis/.venv/ot-analysis/lib/python3.10/site-packages/pyspark/sql/dataframe.py:2023)     jdf = self._jdf.select(self._jcols(*cols))
   [2024](https://file+.vscode-resource.vscode-cdn.net/Users/ss60/Public/projects/ot-analysis/notebooks/24-09-finemapping-batch-job-inputs/~/Public/projects/ot-analysis/.venv/ot-analysis/lib/python3.10/site-packages/pyspark/sql/dataframe.py:2024)     return DataFrame(jdf, self.sparkSession)

File ~/Public/projects/ot-analysis/.venv/ot-analysis/lib/python3.10/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   [1315](https://file+.vscode-resource.vscode-cdn.net/Users/ss60/Public/projects/ot-analysis/notebooks/24-09-finemapping-batch-job-inputs/~/Public/projects/ot-analysis/.venv/ot-analysis/lib/python3.10/site-packages/py4j/java_gateway.py:1315) command = proto.CALL_COMMAND_NAME +\
   [1316](https://file+.vscode-resource.vscode-cdn.net/Users/ss60/Public/projects/ot-analysis/notebooks/24-09-finemapping-batch-job-inputs/~/Public/projects/ot-analysis/.venv/ot-analysis/lib/python3.10/site-packages/py4j/java_gateway.py:1316)     self.command_header +\
   [1317](https://file+.vscode-resource.vscode-cdn.net/Users/ss60/Public/projects/ot-analysis/notebooks/24-09-finemapping-batch-job-inputs/~/Public/projects/ot-analysis/.venv/ot-analysis/lib/python3.10/site-packages/py4j/java_gateway.py:1317)     args_command +\
   [1318](https://file+.vscode-resource.vscode-cdn.net/Users/ss60/Public/projects/ot-analysis/notebooks/24-09-finemapping-batch-job-inputs/~/Public/projects/ot-analysis/.venv/ot-analysis/lib/python3.10/site-packages/py4j/java_gateway.py:1318)     proto.END_COMMAND_PART
   [1320](https://file+.vscode-resource.vscode-cdn.net/Users/ss60/Public/projects/ot-analysis/notebooks/24-09-finemapping-batch-job-inputs/~/Public/projects/ot-analysis/.venv/ot-analysis/lib/python3.10/site-packages/py4j/java_gateway.py:1320) answer = self.gateway_client.send_command(command)
-> [1321](https://file+.vscode-resource.vscode-cdn.net/Users/ss60/Public/projects/ot-analysis/notebooks/24-09-finemapping-batch-job-inputs/~/Public/projects/ot-analysis/.venv/ot-analysis/lib/python3.10/site-packages/py4j/java_gateway.py:1321) return_value = get_return_value(
   [1322](https://file+.vscode-resource.vscode-cdn.net/Users/ss60/Public/projects/ot-analysis/notebooks/24-09-finemapping-batch-job-inputs/~/Public/projects/ot-analysis/.venv/ot-analysis/lib/python3.10/site-packages/py4j/java_gateway.py:1322)     answer, self.gateway_client, self.target_id, self.name)
   [1324](https://file+.vscode-resource.vscode-cdn.net/Users/ss60/Public/projects/ot-analysis/notebooks/24-09-finemapping-batch-job-inputs/~/Public/projects/ot-analysis/.venv/ot-analysis/lib/python3.10/site-packages/py4j/java_gateway.py:1324) for temp_arg in temp_args:
   [1325](https://file+.vscode-resource.vscode-cdn.net/Users/ss60/Public/projects/ot-analysis/notebooks/24-09-finemapping-batch-job-inputs/~/Public/projects/ot-analysis/.venv/ot-analysis/lib/python3.10/site-packages/py4j/java_gateway.py:1325)     temp_arg._detach()

File ~/Public/projects/ot-analysis/.venv/ot-analysis/lib/python3.10/site-packages/pyspark/sql/utils.py:196, in capture_sql_exception.<locals>.deco(*a, **kw)
    [192](https://file+.vscode-resource.vscode-cdn.net/Users/ss60/Public/projects/ot-analysis/notebooks/24-09-finemapping-batch-job-inputs/~/Public/projects/ot-analysis/.venv/ot-analysis/lib/python3.10/site-packages/pyspark/sql/utils.py:192) converted = convert_exception(e.java_exception)
    [193](https://file+.vscode-resource.vscode-cdn.net/Users/ss60/Public/projects/ot-analysis/notebooks/24-09-finemapping-batch-job-inputs/~/Public/projects/ot-analysis/.venv/ot-analysis/lib/python3.10/site-packages/pyspark/sql/utils.py:193) if not isinstance(converted, UnknownException):
    [194](https://file+.vscode-resource.vscode-cdn.net/Users/ss60/Public/projects/ot-analysis/notebooks/24-09-finemapping-batch-job-inputs/~/Public/projects/ot-analysis/.venv/ot-analysis/lib/python3.10/site-packages/pyspark/sql/utils.py:194)     # Hide where the exception came from that shows a non-Pythonic
    [195](https://file+.vscode-resource.vscode-cdn.net/Users/ss60/Public/projects/ot-analysis/notebooks/24-09-finemapping-batch-job-inputs/~/Public/projects/ot-analysis/.venv/ot-analysis/lib/python3.10/site-packages/pyspark/sql/utils.py:195)     # JVM exception message.
--> [196](https://file+.vscode-resource.vscode-cdn.net/Users/ss60/Public/projects/ot-analysis/notebooks/24-09-finemapping-batch-job-inputs/~/Public/projects/ot-analysis/.venv/ot-analysis/lib/python3.10/site-packages/pyspark/sql/utils.py:196)     raise converted from None
    [197](https://file+.vscode-resource.vscode-cdn.net/Users/ss60/Public/projects/ot-analysis/notebooks/24-09-finemapping-batch-job-inputs/~/Public/projects/ot-analysis/.venv/ot-analysis/lib/python3.10/site-packages/pyspark/sql/utils.py:197) else:
    [198](https://file+.vscode-resource.vscode-cdn.net/Users/ss60/Public/projects/ot-analysis/notebooks/24-09-finemapping-batch-job-inputs/~/Public/projects/ot-analysis/.venv/ot-analysis/lib/python3.10/site-packages/pyspark/sql/utils.py:198)     raise

AnalysisException: cannot resolve 'array_union(arr, arr2)' due to data type mismatch: input to function array_union should have been two arrays with same element type, but it's [array<struct<a:string,b:int>>, array<struct<b:int,a:string>>];
'Project [unresolvedalias(CASE WHEN (isnotnull(arr#6451) AND isnotnull(arr2#6465)) THEN array_union(arr#6451, arr2#6465) ELSE coalesce(arr#6451, arr2#6465) END, Some(org.apache.spark.sql.Column$$Lambda$3685/0x000000080174a840@237b24c4))]
+- Project [id#6452, arr#6451, arr2#6465]
   +- Join LeftOuter, (id#6452 = id#6466)
      :- LogicalRDD [arr#6451, id#6452], false
      +- LogicalRDD [arr2#6465, id#6466], false

Additional context This issue was discovered when trying to merge transcriptConsequences fields between vep based VariantIndex and gnomad based VariantIndex - see dataproc job and conversation about the schema issues in https://github.com/opentargets/gentropy/pull/790#issuecomment-2378784157

ireneisdoomed commented 1 month ago

For context, the original scope of this function was unnested arrays. From the docs:The function assumes the array columns have the same schema. Otherwise, the function will fail.. The issue is present in the VEP parsing step because we are unioning arrays of structs.

project-defiant commented 1 month ago

For context, the original scope of this function was unnested arrays. From the docs:The function assumes the array columns have the same schema. Otherwise, the function will fail.. The issue is present in the VEP parsing step because we are unioning arrays of structs.

To me it says that the schema has to be the same, there is no mention about the non-nested structs. Furthermore the fix will only work on +1 nesting level still.