harsha2010 / magellan

Geo Spatial Data Analytics on Spark
Apache License 2.0
534 stars 149 forks source link

Magellan "index" column in parquet not reused for joins #203

Closed zebehringer closed 6 years ago

zebehringer commented 6 years ago

I wanted to pre-generate the index for a very large set of polygons (loaded from Shapefile) and store as parquet so that I can reuse it in frequent production processes, but it seems that the ZOrderCurve type column named "index" is ignored when joining the parquet data with a list of points.

import org.apache.spark.sql.types._
import magellan.{Point, Polygon}
import org.apache.spark.sql.magellan.dsl.expressions._

val schema = new StructType(Array(
    StructField("latitude",           DoubleType,       false),
    StructField("longitude",          DoubleType,       false)
))

val sample = spark.read.schema(schema).option("header",true).csv("./sample.csv.gz")

magellan.Utils.injectRules(spark)

//spark.read.format("magellan").load("s3://myBucket/my_shapefile_folder")
//    .withColumn("index", $"polygon" index 15)
//    .selectExpr("polygon", "index", "metadata.ID AS id")
//    .write.saveAsTable("shapes")

sample.join(spark.table("shapes"), point($"longitude",$"latitude") within $"polygon").explain()

Here's the plan:

== Physical Plan ==
*Project [id#7, longitude#1, latitude#0, polygon#5, index#6]
+- *BroadcastHashJoin [curve#245], [curve#247], Inner, BuildLeft, ((relation#248 = Within) || Within(pointconverter(longitude#1, latitude#0), polygon#5))
   :- BroadcastExchange HashedRelationBroadcastMode(List(input[2, struct<xmin:double,ymin:double,xmax:double,ymax:double,precision:int,bits:bigint>, true]))
   :  +- Generate inline(indexer(pointconverter(longitude#1, latitude#0), 30)), true, false, [curve#245, relation#246]
   :     +- *FileScan csv [latitude#0,longitude#1] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/ec2-user/sample.csv.gz], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<latitude:double,longitude:double>
   +- Generate inline(indexer(polygon#5, 30)), true, false, [curve#247, relation#248]
      +- *FileScan parquet default.df3[polygon#5,index#6,id#7] Batched: false, Format: Parquet, Location: InMemoryFileIndex[file:/home/ec2-user/spark-warehouse/df3], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<polygon:struct<type:int,xmin:double,ymin:double,xmax:double,ymax:double,indices:array<int>...
harsha2010 commented 6 years ago

@zebehringer can you give this PR a try? The issue I think is that the nullability column is reset(a bug in Spark SQL) when Spark SQL writes to Parquet.. and when we read back this causes a schema mismatch