locationtech-labs / geopyspark

GeoTrellis for PySpark
Other
179 stars 59 forks source link

Update pyramid tile image #685

Open mingnet opened 5 years ago

mingnet commented 5 years ago

I am trying to ingest a batch of large tiff images. And my spark cluster doesn't have a lot of memory and resources. So I tried to ingest images in multiple batches

I plan to generate a pyramid of the first tiff image and then write it to disk. Then generate a pyramid of the second tiff image and update to the same directory

I am trying to add an update code

./geopyspark-backend/geotrellis/src/main/scala/geopyspark/geotrellis/io/LayerWriterWrapper.scala

  def update(
    layerName: String,
    spatialRDD: TiledRasterLayer[SpatialKey]
  ): Unit = {
    val id =
      spatialRDD.zoomLevel match {
        case Some(zoom) => LayerId(layerName, zoom)
        case None => LayerId(layerName, 0)
      }
    layerWriter.update(id, spatialRDD.rdd)
  }

  def overwrite(
    layerName: String,
    spatialRDD: TiledRasterLayer[SpatialKey]
  ): Unit = {
    val id =
      spatialRDD.zoomLevel match {
        case Some(zoom) => LayerId(layerName, zoom)
        case None => LayerId(layerName, 0)
      }
    layerWriter.overwrite(id, spatialRDD.rdd)
  }

./geopyspark/geotrellis/catalog.py

def update(uri,
          layer_name,
          tiled_raster_layer,
          store=None):
    if tiled_raster_layer.zoom_level is None:
        Log.warn(tiled_raster_layer.pysc, "The given layer doesn't not have a zoom_level. Writing to zoom 0.")

    if store:
        store = AttributeStore.build(store)
    else:
        store = AttributeStore.cached(uri)

    pysc = tiled_raster_layer.pysc

    writer = pysc._gateway.jvm.geopyspark.geotrellis.io.LayerWriterWrapper(
        store.wrapper.attributeStore(), uri)

    writer.update(layer_name, tiled_raster_layer.srdd)

def overwrite(uri,
          layer_name,
          tiled_raster_layer,
          store=None):
    if tiled_raster_layer.zoom_level is None:
        Log.warn(tiled_raster_layer.pysc, "The given layer doesn't not have a zoom_level. Writing to zoom 0.")

    if store:
        store = AttributeStore.build(store)
    else:
        store = AttributeStore.cached(uri)

    pysc = tiled_raster_layer.pysc

    writer = pysc._gateway.jvm.geopyspark.geotrellis.io.LayerWriterWrapper(
        store.wrapper.attributeStore(), uri)

    writer.overwrite(layer_name, tiled_raster_layer.srdd)

Then I ingest the data like this.

def writefile(srcpath, key, destpath):
    raster_layer = gps.geotiff.get(layer_type=gps.LayerType.SPATIAL, uri=srcpath,num_partitions=100)
    raster_layer.persist(pyspark.StorageLevel.MEMORY_AND_DISK_SER)
    tiled_layer = raster_layer.tile_to_layout(layout=gps.GlobalLayout(), target_crs=3857)
    pyramid = tiled_layer.pyramid()
    for layer in pyramid.levels.values():
        gps.write(destpath, key, layer, time_unit=gps.TimeUnit.DAYS)
    raster_layer.persist(pyspark.StorageLevel.MEMORY_AND_DISK_SER)

def updatefile(srcpath, key, destpath):
    raster_layer = gps.geotiff.get(layer_type=gps.LayerType.SPATIAL, uri=srcpath,num_partitions=100)
    raster_layer.persist(pyspark.StorageLevel.MEMORY_AND_DISK_SER)
    tiled_layer = raster_layer.tile_to_layout(layout=gps.GlobalLayout(), target_crs=3857)
    pyramid = tiled_layer.pyramid()
    for layer in pyramid.levels.values():
        gps.update(destpath, key, layer)
    raster_layer.persist(pyspark.StorageLevel.MEMORY_AND_DISK_SER)

tiflist = [os.path.join(basepath, b) for b in filelist]
writefile(tiflist[0], key, destpath)
for t in tiflist[1:]:
    updatefile(t, key, destpath)

Then run an error and prompt

py4j.protocol.Py4JJavaError: An error occurred while calling o131.update. : geotrellis.spark.io.package$LayerOutOfKeyBoundsError: Updating rdd is out of the key index space for Layer(name = "201706", zoom = 14): KeyBounds(SpatialKey(13341,6446),SpatialKey(13460,6569)). You must reindex this layer with large enough key bounds for this update.

What should I do, what good advice?

jbouffard commented 5 years ago

Hey, @mingnet! As I mentioned in your other issue, I'm really sorry about responding to your issue so late :slightly_frowning_face:

The reason you're getting that error is because LayerWriter.update fails when trying to update a saved catalog with a layer whose KeyBounds are outside of the layer's (see here). What this means is that unfortunately your implementation won't work as each layer will have different KeyBounds.

The most straightforward way around this would be to read all of your files a once and then save them together as one layer. I know you said that you're working with a small cluster, but if you can show me the script you're using as well as give me some info about your cluster, I may be able to point out places where you could improve the performance. I think we should try this first before going to the next alternative (which is more involved/complicated).

mingnet commented 5 years ago

I have tried another solution, but I still have some problems. Maybe you are interested to know. I tried to generate a global KeyBounds at the beginning. I am writing another function in the file(./geopyspark-backend/geotrellis/src/main/scala/geopyspark/geotrellis/io/LayerWriterWrapper.scala)

  def writeSpatialGlobal(
    layerName: String,
    spatialRDD: TiledRasterLayer[SpatialKey],
    indexStrategy: String
  ): Unit = {
    val id =
      spatialRDD.zoomLevel match {
        case Some(zoom) => LayerId(layerName, zoom)
        case None => LayerId(layerName, 0)
      }
    val indexKeyBounds = KeyBounds[SpatialKey](SpatialKey(0, 0), SpatialKey(spatialRDD.rdd.metadata.layout.layoutCols, spatialRDD.rdd.metadata.layout.layoutRows))
    val indexMethod = getSpatialIndexMethod(indexStrategy)
    val keyIndex = indexMethod.createIndex(indexKeyBounds)
    layerWriter.write(id, spatialRDD.rdd, keyIndex)
  }

I plan to call this function when processing the first batch. This has a global KeyBounds. Then update the data of other batches. But this function is very very slow to execute. As a result, I was very difficult to finish the first batch. Because I don't know enough about geotrellis. So I don't understand why. Just generate a different index. I think it should be as fast as the writeSpatial function.

jbouffard commented 5 years ago

@mingnet I see. Based on the work you showed, it looks like everything should work okay. What backend are you trying to write to? There can be a lot of I/O involved for some of them, which could greatly increase the running time. Other than what I just mentioned, there could be other causes for slowdown, but I won't be able to say for sure without seeing your Python code.