Open lossyrob opened 6 years ago
The Await
is required because I was running in a standalone process. In an akka-http
route, you could simply return the Future
, in which case the top portion of code should read:
val layerReader = S3CollectionLayerReader("datahub-catalogs-us-east-1", "")
val queryPoly: MultiPolygon = ???
val nlcdLayerId = LayerId("nlcd-2011-30m-epsg5070-int8", 0)
val soilLayerId = LayerId("ssurgo-hydro-groups-30m-epsg5070-int8", 0)
val nlcdFetch =
Future {
baseLayerReader
.query[SpatialKey, Tile, TileLayerMetadata[SpatialKey]](nlcdLayerId)
.where(Intersects(poly)).result
}
val soilFetch =
Future {
baseLayerReader
.query[SpatialKey, Tile, TileLayerMetadata[SpatialKey]](soilLayerId)
.where(Intersects(poly)).result
}
for(
nlcdLayer <- nlcdFetch;
soilLayer <- soilFetch
) yield {
histograms(
nlcdLayer,
soilLayer,
queryPoly
)
}
(I apologize in advance for the length of this comment.)
Hi @lossyrob,
In our production code, we have to support 1-3 layers. Thus, we cannot work with specific ones as shown in the code above. To support a dynamic number of layers, we usually do something like:
val layerReader = S3CollectionLayerReader("datahub-catalogs-us-east-1", "")
def layerFetch (
layerId: LayerId,
multiPolygon: MultiPolygon
): TileLayerCollection[SpatialKey] =
layerReader
.query[SpatialKey, Tile, TileLayerMetadata[SpatialKey]](layerId)
.where(Intersects(multiPolygon))
.result
def doTheThing (
input: InputJson
) = {
val layerIds = input.layerIds.map(lid => LayerId(lid, input.zoom))
val multiPolygon = input.polygons.unionGeometries.asMultiPolygon.get
val layers = layerIds.map(layerFetch(_, multiPolygon))
rasterGroupedCount(layers, multiPolygon)
}
Where doTheThing
will be called by the proper AKKA-Http route.
We adapted your histogram
from above into rasterGroupedCount
to work with a sequence of layers instead of two explicit ones:
def rasterGroupedCount (
layers: Seq[TileLayerCollection[SpatialKey]],
multiPolygon: MultiPolygon
): Map[Seq[Int], Long] = {
val mapTransform = layers.head.metadata.mapTransform
joinCollectionLayers(layers)
.par
.map { case (key, tiles) =>
val tileResult = mutable.Map[Seq[Int], Long]()
val re = RasterExtent(mapTransform(key), tiles.head.cols, tiles.head.rows)
if (multiPolygon.contains(re.extent)) {
cfor(0)(_ < re.cols, _ + 1) { col =>
cfor(0)(_ < re.rows, _ + 1) { row =>
val vs = tiles.map(_.get(col, row))
if (!tileResult.contains(vs)) {
tileResult(vs) = 0
}
tileResult(vs) += 1
}
}
} else {
multiPolygon.foreach(re, Options(includePartial=true, sampleType=PixelIsArea)) { (col, row) =>
val vs = tiles.map(_.get(col, row))
if (!tileResult.contains(vs)) {
tileResult(vs) = 0
}
tileResult(vs) += 1
}
}
tileResult.toMap
}
.reduce { (m1, m2) =>
(m1.toSeq ++ m2.toSeq)
.groupBy(_._1)
.map { case (key, values) => key -> values.map(_._2).sum }
}
}
Here is an alternative implementation of rasterGroupedCount
that @echeipesh came up with a few months ago:
def rasterGroupedCount (
layers: Seq[TileLayerCollection[SpatialKey]],
multiPolygon: MultiPolygon
): Map[Seq[Int], Long] = {
val init = () => new LongAdder
val update = (_: LongAdder).increment()
val metadata = layers.head.metadata
val pixelGroups: TrieMap[Seq[Int], LongAdder] = TrieMap.empty
joinCollectionLayers(layers)
.par
.foreach { case (key, tiles) =>
val re = RasterExtent(metadata.mapTransform(key), metadata.layout.tileCols, metadata.layout.tileRows)
Rasterizer.foreachCellByMultiPolygon(multiPolygon, re) { case (col, row) =>
val pixelGroup: Seq[Int] = tiles.map(_.get(col, row))
val acc = pixelGroups.getOrElseUpdate(pixelGroup, init())
update(acc)
}
}
pixelGroups.mapValues(_.sum())
}
where joinCollectionLayers
used in both implementations is:
def joinCollectionLayers(layers: Seq[TileLayerCollection[SpatialKey]]): Map[SpatialKey, Seq[Tile]] = {
val maps: Seq[Map[SpatialKey, Tile]] = layers.map((_: Seq[(SpatialKey, Tile)]).toMap)
val keySet: Array[SpatialKey] = maps.map(_.keySet).reduce(_ union _).toArray
for (key: SpatialKey <- keySet) yield {
val tiles: Seq[Tile] = maps.map(_.apply(key))
key -> tiles
}
}.toMap
In our benchmarks, we found the fastest implementations, quickest first, to be:
histograms
with two explicit layers (avg 2.6s for ~3800 sq.km.)rasterGroupedCount
with a list of layers (avg 3.25s for ~3800 sq.km.)rasterGroupedCount
above with cfor
with a list of layers (avg 3.8s for ~3800 sq.km.)Given that we do want to operate on a list of layers, my questions are:
joinCollectionLayers
be made faster, since that seems to be holding the cfor
implementation back?cfor
over the more concise TrieMap
/ LongAdder
variant?Future
since we wanted to focus on the calculations, hoping to add asynchrony later. Should we be using that here somewhere, and would it increase performance?I tried some optimizations, but it looks like nothing I did significantly (or insignificantly) improved on the TrieMap
implementation.
My suggestion for further optimization is to use Futures for fetching layers in parallel. This would mean changing this method to return a Future[Seq[TileLayerCollection[SpatialKey]]]
, and spidering the Futures through the codebase. akka-http
lets you return a Future
from the route, so you could carry the future all the way through to the route.
So the start of the refactor starts with moving the method to something like
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
def cropRastersToAOI(
rasterIds: List[String],
zoom: Int,
aoi: MultiPolygon
): Future[Seq[TileLayerCollection[SpatialKey]]] =
Future.sequence {
val futures: Seq[Future[TileLayerCollection[SpatialKey]]] =
rasterIds
.map { str =>
Future {
cropSingleRasterToAOI(str, zoom, aoi)
}
}
futures
}
and fixing compiler errors from there, by changing everything to work with futures by mapping into the future. E.g.:
def getRasterGroupedCount(input: InputData): Future[ResultInt] = {
val aoi = createAOIFromInput(input)
val rasterLayers = cropRastersToAOI(input.rasters, input.zoom, aoi)
rasterLayers.map { layers => ResultInt(rasterGroupedCount(layers, aoi)) }
}
This code provides a starting point for the Collections API optimizations, and is basically an optimized version of
def histograms(nlcd: TileLayerRDD[SpatialKey], soil: TileLayerRDD[SpatialKey], multiPolygons: Seq[MultiPolygon]): Seq[Map[(Int, Int), Int]]
that uses the Collections API:where
histograms
is: