drubbo / SparkGIS

GIS extension for SparkSQL
Apache License 2.0
37 stars 14 forks source link

java.lang.NullPointerException on ST_Buffer #9

Open netanel246 opened 8 years ago

netanel246 commented 8 years ago

I got this Exception:

16/04/06 03:49:09 INFO GenerateUnsafeProjection: Code generated in 190.332219 ms 16/04/06 03:49:10 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1) java.lang.NullPointerException at org.betterers.spark.gis.GisGeometry$ImplicitConversions$.fromGeometry(GisGeometry.scala:389) at org.betterers.spark.gis.Geometry$ImplicitConversions$.toImpl(Geometry.scala:94) at org.betterers.spark.gis.udf.Functions$.ST_Buffer(Functions.scala:232) at SparkGis$$anonfun$1.apply(SparkGis.scala:42) at SparkGis$$anonfun$1.apply(SparkGis.scala:42) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$32.apply(RDD.scala:912) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)

When i run this piece of code:

import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.betterers.spark.gis._
import org.betterers.spark.gis.udf.Functions

object SparkGis {
  val conf = new SparkConf()
  .setAppName("Spark Gis Test")
  .setMaster("local[*]")

  val sc = new SparkContext(conf)
  val sqlContext = new SQLContext(sc)

  def main (args: Array[String]) {
    val layerSchema = StructType(Seq(StructField("id", LongType),
                                      StructField("geometry", GeometryType.Instance)))
    val polygons =
    sqlContext.read.format("json")
    .schema(layerSchema)
    .load("data/countries.json")

    val points =
      sqlContext.read.format("json")
        .schema(layerSchema)
        .load("data/points.json")

    bufferDistanceExample(polygons, points)
  }

  def bufferDistanceExample(poly:DataFrame, points:DataFrame): Unit={
    poly.registerTempTable("polygons")
    val pointsBuffer = poly
      .map(c => Functions.ST_Buffer(c.getAs[Geometry](1), 10))

    pointsBuffer.foreach(println)

    val pointsBufferRow = pointsBuffer
    .map(x => org.apache.spark.sql.Row(x))

    pointsBufferRow.foreach(println)

    val pointsBufferDf =sqlContext
    .createDataFrame(pointsBufferRow, StructType(Seq(StructField("geometry", GeometryType.Instance))))

    pointsBufferDf.show()

    pointsBufferDf.registerTempTable("pointsBuffer")
    val bufferJoin = pointsBufferDf.sqlContext.sql("select p.geometry, pol.geometry from pointsBuffer p join polygons pol")
    bufferJoin.show()
  }
}

with this data: data.zip

I can see the geometries, but when i tried to do join between them and print the result i got this exception. When i ran it without the buffer section it works. The data is valid, because before the join i can print the features of the layers well. In Addition, i compiled the package(SparkGIS) to scala 2.10.5 instead of using scala 2.11 (All the tests pass)

drubbo commented 8 years ago

Can you provide the dataset please ?

netanel246 commented 8 years ago

I added the data in a zip file. Thanks.

drubbo commented 8 years ago

I'll give it a look asap, but for a start I notice that your id should have a StringType rather than a LongType.

netanel246 commented 8 years ago

i tried with StringType and I got different exception:

16/04/20 04:39:27 ERROR Executor: Exception in task 1.0 in stage 2.0 (TID 5)
java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.unsafe.types.UTF8String
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getUTF8String(rows.scala:46)
    at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:221)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoin$$anonfun$2.apply(BroadcastNestedLoopJoin.scala:105)
    at org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoin$$anonfun$2.apply(BroadcastNestedLoopJoin.scala:96)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Update I tried to run it with scala 2.11.8 and it still throw this exception.

drubbo commented 8 years ago

The issue is not ST_Buffer itself; my fear is that the problem is related to this: https://issues.apache.org/jira/browse/SPARK-10352

That's internal Spark black magic, and possibly they are still lacking some conversion / UDT handling. Maybe I'll give a look at current Spark code and see if I can find the culprit and provide a quick fix, but it will take time.

About your code instead, there are few things I don't understand:

Why don't you do something along these lines instead:

val polys = sql.read.schema(schema).json(sc.parallelize(polygonsJSON))

val buf = polys.map(p => Row(p(0), p(1), Functions.ST_Buffer(p.getAs[Geometry](1), 10)))
val bufSchema = StructType(Seq(
  StructField("id", LongType),
  StructField("geometry", GeometryType.Instance),
  StructField("buffer", GeometryType.Instance)))
val bufDF = sql.createDataFrame(buf, bufSchema)
bufDF.show()

+---+--------------------+--------------------+
| id|            geometry|              buffer|
+---+--------------------+--------------------+
|  0|POLYGON ((61.2108...|POLYGON ((50.5495...|
|  1|POLYGON ((20.5902...|POLYGON ((9.3699 ...|
|  2|POLYGON ((51.5795...|POLYGON ((41.7131...|
+---+--------------------+--------------------+
netanel246 commented 8 years ago

Hi, I didn't have time to do it right, I've just wanted to run this POC as quickly as i can. I found that the problem is in the join section.Also, I've tried the same operation in Spark GIS 0.2.0 (scala 2.10, spark 1.5.1) and it works great. We also found that with RDD it works.

There is something that i can check to find the bug quickly? Or some workaround that i can use?

Update I did some tests, and i found that this bug appears only from spark version 1.6. So now we did a version downgrade to 1.5.2 until we solved it. Thanks.

drubbo commented 8 years ago

I think we really need to talk with the Spark guys ( @rxin ? ) about this. Sounds like a regression, and my guess is that no unit test covering this scenario is present in the Spark codebase.