WikiWatershed / mmw-geoprocessing

A Spark Job Server job for Model My Watershed geoprocessing.
Apache License 2.0
6 stars 6 forks source link

Raster Lines Join #31

Closed rajadain closed 8 years ago

rajadain commented 8 years ago

Overview

Creates an endpoint that given a polygon, a set of lines, and a raster layer ID, will return a histogram of number of cells of each type in the raster which intersect with the set of lines within the polygon. These values can be used for deriving values for MapShed.

Approach

Since we can have a very large set of lines, for example 25K+ https://goo.gl/YFvX3F (20 MB), we first create a range tree recording which lines are present in each tile of the raster clipped to the polygon. Then, for each tile, we calculate the intersection of the rasterized line to the base raster, and record the types of cells in the intersection. Finally, we collect all those individual counts across tiles into one collection which maps the raster cell value to its count.

Timing

If the tiles are cached, the processing time for the three requests are as follows:

MapshedJob_NHD.json
real    0m1.173s
user    0m0.003s
sys     0m0.006s

MapshedJob_DRB_Small.json
real    0m1.210s
user    0m0.000s
sys     0m0.012s

MapshedJob_DRB_Large.json
real    0m5.767s
user    0m0.002s
sys     0m0.070s

When tile downloading is taken into account, the times are roughly doubled.

Connects #28

jamesmcclain commented 8 years ago

It looks pretty good ... are there any substantive differences between the code in this branch and the code here: https://github.com/WikiWatershed/mmw-geoprocessing/tree/feature/tt/mapshed-raster-values ?

rajadain commented 8 years ago

Mainly, the differences include:

The diff for MapshedJob.scala is:

diff --git a/summary/src/main/scala/MapshedJob.scala b/summary/src/main/scala/MapshedJob.scala
index 49f42ac..ef398ec 100644
--- a/summary/src/main/scala/MapshedJob.scala
+++ b/summary/src/main/scala/MapshedJob.scala
@@ -19,9 +19,9 @@ import scala.collection.mutable

 sealed trait MapshedJobParams

-case class RasterVectorJobParams(
+case class RasterLinesJobParams(
   polygon: Seq[MultiPolygon],
-  vector: Seq[Line],
+  lines: Seq[Line],
   rasterLayerId: LayerId
 ) extends MapshedJobParams

@@ -34,11 +34,13 @@ object MapshedJob extends SparkJob with JobUtils {

   override def runJob(sc: SparkContext, config: Config): Any = {
      parseConfig(config) match {
-      case RasterVectorJobParams(polygon, vector, rasterLayerId) =>
+      case RasterLinesJobParams(polygon, lines, rasterLayerId) =>
         val extent = GeometryCollection(polygon).envelope
         val rasterLayer = queryAndCropLayer(catalog(sc), rasterLayerId, extent)
-        rasterVectorJoin(rasterLayer, vector)
-       case _ =>
+
+        rasterLinesJoin(rasterLayer, lines)
+
+      case _ =>
          throw new Exception("Unknown Job Type")
     }
   }
@@ -47,35 +49,33 @@ object MapshedJob extends SparkJob with JobUtils {
     val crs: String => geotrellis.proj4.CRS = getCRS(config, _)

     config.getString("input.operationType") match {
-      case "RasterVectorJoin" =>
+      case "RasterLinesJoin" =>
         val rasterCRS = crs("input.rasterCRS")
         val polygonCRS = crs("input.polygonCRS")
-        val vectorCRS = crs("input.vectorCRS")
+        val linesCRS = crs("input.vectorCRS")
         val polygon = config.getStringList("input.polygon").asScala.map({ str => parseGeometry(str, polygonCRS, rasterCRS) })
-        val vector = config.getStringList("input.vector").asScala.map({ str => str.parseJson.convertTo[Line].reproject(vectorCRS, rasterCRS) })
+        val lines = config.getStringList("input.vector").asScala.map({ str => str.parseJson.convertTo[Line].reproject(linesCRS, rasterCRS) })
         val zoom = config.getInt("input.zoom")
         val rasterLayerId = LayerId(config.getString("input.raster"), zoom)

-        RasterVectorJobParams(polygon, vector, rasterLayerId)
+        RasterLinesJobParams(polygon, lines, rasterLayerId)
+
       case _ =>
         throw new Exception("Unknown Job Type")
     }
   }

-
-  def rasterVectorJoin(rasterLayer: TileLayerRDD[SpatialKey], vector: Seq[Line]): Map[Int, Int] = {
+  def rasterLinesJoin(rasterLayer: TileLayerRDD[SpatialKey], lines: Seq[Line]): Map[Int, Int] = {
     val rtree = new STRtree

-    vector.foreach({lineString =>
+    lines.foreach({lineString =>
       val Extent(xmin, ymin, xmax, ymax) = lineString.envelope
       rtree.insert(new Envelope(xmin, xmax, ymin, ymax), lineString)
     })

     rasterLayer
       .map({ case (key, tile) => {
-        val metadata = rasterLayer.metadata
-        val mapTransform = metadata.mapTransform
-        val extent = mapTransform(key)
+        val extent = rasterLayer.metadata.mapTransform(key)
         val Extent(xmin, ymin, xmax, ymax) = extent
         val rasterExtent = RasterExtent(extent, tile.cols, tile.rows)
         val pixels = mutable.ListBuffer.empty[(Int, Int)]
@@ -98,38 +98,4 @@ object MapshedJob extends SparkJob with JobUtils {
       .reduce({ (left, right) => left ++ right})
       .groupBy(_._1).map({ case (k: Int, list: List[(Int, Int)]) => (k -> list.map(_._2).sum) })
   }
-
-  def rasterVectorJoinSimple(raster: TileLayerRDD[SpatialKey], vector: Seq[Line]): Map[Int, Int] = {
-    raster map { case (key, tile) =>
-      val extent = raster.metadata mapTransform key
-      val rasterExtent = RasterExtent(extent, tile.cols, tile.rows)
-      val counts = mutable.Map.empty[Int, Int]
-      val pixels = mutable.Set.empty[(Int, Int)]
-
-      vector foreach {lineString =>
-        lineString & extent match {
-          case LineResult(line) =>
-            Rasterizer.foreachCellByLineString(line, rasterExtent)(
-              new Callback {
-                def apply(col: Int, row: Int): Unit = {
-                  pixels += ((col, row))
-                }
-              }
-            )
-          case _ =>
-        }
-      }
-
-      pixels foreach {case (col, row) => {
-        val nlcd = tile.get(col, row)
-        counts += nlcd -> (counts.getOrElse(nlcd, 0) + 1)
-      }}
-
-      counts.toMap
-    } reduce { (left, right) =>
-      left ++ (right map { case (key, count) =>
-        key -> (count + left.getOrElse(key, 0))
-      })
-    }
-  }
 }
jamesmcclain commented 8 years ago

:+1: Looks good to me.

One more word about performance: if you find that further improvements are needed, the first thing that I would look to do is find a way to efficiently render the lines into a bitmap instead of calling the callback on line 90 of MapshedJob.scala so many times.