apache / sedona

A cluster computing framework for processing large-scale geospatial data
https://sedona.apache.org/
Apache License 2.0
1.95k stars 693 forks source link

JoinQuery and partitioning #120

Closed fmarchand closed 6 years ago

fmarchand commented 7 years ago

I'm trying to do a JoinQuery between 1 458 000 points and 104 polygons. It works but it's very long I think. Here is what I do :

First I load points in a dataframe,

val train = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("hdfs://namenode-0-node.hdfsnoha.autoip.dcos.thisdcos.directory:9001/train.csv")

train.cache
train.count

Then I load the shape file with all polygons

val  shapefileRDD = new ShapefileRDD(sc,"hdfs://namenode-0-node.hdfsnoha.autoip.dcos.thisdcos.directory:9001/nycbb");

val spatialRDD = new PolygonRDD(shapefileRDD.getPolygonRDD, StorageLevel.MEMORY_ONLY)

I create the PointRDD from my raw data (I repartition on purpose with the default processing unit of my spark context to be sure 😄 ),

val geometryFactory=new GeometryFactory()

val rawPointRDD = train.rdd.repartition(16).map( row => {

        val point = geometryFactory.createPoint(new Coordinate(row.getDouble(row.fieldIndex("pickup_longitude")), row.getDouble(row.fieldIndex("pickup_latitude"))))        
        point
    })

val nycPointRDD = new PointRDD(rawPointRDD, StorageLevel.MEMORY_ONLY)

I do a spatial partitioning of the points and apply that partitioning to the polygons (It's what I understood in the showcase or examples),


nycPointRDD.spatialPartitioning(GridType.QUADTREE)
spatialRDD.spatialPartitioning(nycPointRDD.partitionTree)

// nycPointRDD.buildIndex(IndexType.RTREE,true)

nycPointRDD.spatialPartitionedRDD.persist(StorageLevel.MEMORY_ONLY)
spatialRDD.spatialPartitionedRDD.persist(StorageLevel.MEMORY_ONLY)`

And finally I run the join,

val joinResult = JoinQuery.SpatialJoinQuery(nycPointRDD, spatialRDD, false,true)
joinResult.count

NB : I don't index yet all the points on purpose.

My question is : when I do that, I notice that only one worker was currently active. So, from what I understood, because I asked the same spatial partitioning for my points and my polygons, I should have all the workers processing the join at the same time.

All my polygons cached take 3.2Mo in memory and all my points take 92Mo in memory.

I do something wrong I think (I'm sure !) but I can't figure out what ...

jiayuasu commented 7 years ago

@fmarchand My two cents:

  1. Are you sure that nycPointRDD has more than one partition?
  2. GeoSpark doesn't impact Spark cluster setting. Please check your cluster setting. Did you successfully connect to Spark master, e.g., explicitly specify Spark master IP address?