apache / sedona

A cluster computing framework for processing large-scale geospatial data
https://sedona.apache.org/
Apache License 2.0
1.96k stars 695 forks source link

Errors reading data written with sedona 1.3.1 #1031

Closed sebbegg closed 10 months ago

sebbegg commented 1 year ago

Expected behavior

Reading previously written with previous sedona versions should succeed

Actual behavior

We're trying to migrate to sedona 1.4.1 on datasets that have previously been created/written using sedona 1.3.1. Reading existing parquet files (from a delta table) fails with actual cause probably being this:

Caused by: org.apache.spark.sql.AnalysisException: Invalid Spark read type: expected optional group gps_dr_position (LIST) {
  repeated group list {
    required int32 element (INTEGER(8,true));
  }
} to be list type but found Some(BinaryType)

(full stacktrace attached)

I'm guess that this is related to the change in the GeometryUDT.sqlType which is was a list of bytes in 1.3.1 and now is a BinaryType:

https://github.com/apache/sedona/blob/cc213b1e08e461bc46fd576b3686f610a16cc3b2/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/UDT/GeometryUDT.scala#L29-L30 vs (1.3.1): https://github.com/apache/sedona/blob/95502ef0cb7d4ee798d3f67a7307b835c904214a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/UDT/GeometryUDT.scala#L29-L30

I can think of several ways to re-write all data as "plain" WKB without the UDT, loading them with sedona 1.4.1 and re-writing them with the new format. However all this causes lots of interruption, e.g. because we're consuming some of the datasets as streaming sources.

So, is there any way that we might get around re-writing all the data with the new sedona version?

sedona-stacktrace.txt

Steps to reproduce the problem

Settings

Sedona version = 1.4.1

Apache Spark version = 3.3.2 (Databricks Runtime 12.2)

Apache Flink version = -

API type = Scala, Java, Python?

Scala version = 2.12

JRE version = 1.8

Python version = ?

Environment = Databricks

Sebastian Eckweiler sebastian.eckweiler@mercedes-benz.com on behalf of behalf of MBition GmbH. Provider Information

jiayuasu commented 1 year ago

@sebbegg I wonder how you read the Parquet file in Sedona 1.4.1. Spark parquet reader and writer do not understand GeometryUDT so in 1.3.1, the geometry column in a parquet file is just an array of bytes which represents the WKB format of the geometry.

Maybe you can try to use ST_GeomFromWKB in 1.4.1 to read the 1.3.1 geometry column directly

df = spark.read("parquet").XXX
df = df.select("ST_GeomFromWKB(geom)")
df.show()

If this does not work, you might have to write a UDF to convert ArrayType[binary] to BinaryType. This might help: https://stackoverflow.com/questions/57847527/how-do-i-convert-arrayfloattype-to-binarytype-in-spark-dataframes-using-scala

sebbegg commented 1 year ago

Hi @jiayuasu ,

thanks for the feedback, we'll give it a try. I get the point that ArrayType[byte] is basically the same as BinaryType, but it seems that the parquet reader doesn't "understand" this. We read/write data with delta (https://delta.io/), which is basically parquet files plus a transaction log. So in the end this should be re-using the plain spark parquet data sources, as I think is only visible in the traceback:

Caused by: org.apache.spark.sql.AnalysisException: Invalid Spark read type: expected optional group gps_dr_position (LIST) {
  repeated group list {
    required int32 element (INTEGER(8,true));
  }
} to be list type but found Some(BinaryType)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.checkConversionRequirement(ParquetSchemaConverter.scala:728)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.$anonfun$convertGroupField$3(ParquetSchemaConverter.scala:325)
    at scala.Option.fold(Option.scala:251)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convertGroupField(ParquetSchemaConverter.scala:306)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convertField(ParquetSchemaConverter.scala:174)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.$anonfun$convertInternal$3(ParquetSchemaConverter.scala:133)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.$anonfun$convertInternal$3$adapted(ParquetSchemaConverter.scala:103)
sebbegg commented 1 year ago

So we gave it a try - without success.

Also when using spark.read.format('parquet') spark reads (tries to read) the geometry column as the UDT, since that information is encoded in the parquet file's metadata. So also with plain parquet, when reading the file spark tries to convert the parquet schema/types to the spark schema, leading the errors in the stacktrace.

So from my perspective, I think we would need to force spark to "ignore" the udt when reading the data in order to be able to cast/change it. Not sure if this is possible...

One not really relevant observation: the GeometryUDT does not overwrite the sql and catalogString properties, leading to hard-to-understand error messages like this:

AnalysisException: [DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE] Cannot resolve "st_geomfromwkb(gps_dr_position)" due to data type mismatch: parameter 1 requires ("STRING" or "BINARY") type, however, "gps_dr_position" is of "BINARY" type.; line 1 pos 0

The above happens, since gps_dr_position is already of the geometry type.

jiayuasu commented 1 year ago

@sebbegg

Two possible solutions:

  1. Disable the vectorized reader in Spark parquet since it only reads the primitive type / atomic type, not the array type: (https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-vectorized-parquet-reader.html) and (https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-DataType.html). Set spark.sql.parquet.enableVectorizedReader to false and set spark.sql.parquet.enableNestedColumnVectorizedReader to false
  2. If Step 1 does not work, try to force a schema: https://stackoverflow.com/questions/52387021/spark-read-parquet-with-custom-schema
jiayuasu commented 1 year ago

@sebbegg We think the solution is gonna be:

Step 1: force the schema for the native parquet reader. Force the geom field to be ArrayType Byte:

ProductCustomSchema = StructType([
        StructField("id_sku", IntegerType(), True),
        StructField("flag_piece", StringType(), True),
        StructField("flag_weight", StringType(), True),
        StructField("ds_sku", StringType(), True),
        StructField("qty_pack", FloatType(), True)]),
        StructField("geom", ArrayType(Byte), True)])

spark.read.format("parquet").schema(schema).path("")

Step 2: if the loader can load the data, you might then need to convert the ArrayType geom to Binary type using a UDF like this

val binUdf = udf((arr:WrappedArray[Byte]) => {arr.to.map(_.toByte)})
df.withColumn("Values",binUdf($"Values")).printSchema

Step 3: then use ST_GeomFromWKB to create the geom column in Sedona 1.4.1

jiayuasu commented 10 months ago

In a few days, we will add a legacy mode to allow Sedona 1.4+ users to read old parquet files written by Sedona 1.3.1 and earlier version. This will be shipped in Sedona 1.5.1

jiayuasu commented 9 months ago

@sebbegg Sedona 1.5.1 has fixed this by providing a legacy mode in GeoParquet reader. If you upgrade your Sedona to 1.5.1, you will be able to read parquet files written by Sedona 1.3.1 and earlier: https://sedona.apache.org/1.5.1/api/sql/Reading-legacy-parquet/

df = sedona.read.format("geoparquet").option("legacyMode", "true").load("path/to/legacy-parquet-files")