locationtech / geotrellis

GeoTrellis is a geographic data processing engine for high performance applications.
http://geotrellis.io
Other
1.33k stars 362 forks source link

After merge rdd, regrid get wrong row / col info #3468

Closed imperio-wxm closed 2 years ago

imperio-wxm commented 2 years ago

Describe the bug

Regrid operation after merging two MultibandTileLayerRDD[K], get Row/Col error message

To Reproduce

Full unit test


// There is no business logic of my own in this code, it is completely geotrells own api implementation.

it("merge rdd") { def genSourceRddWrapper(sc: SparkContext, files: Seq[String], targetCRS: CRS, layoutScheme: LayoutScheme) = { val sourceRDD: RDD[RasterSource] = sc.parallelize(files) .map(uri => { if (Objects.nonNull(targetCRS)) { GeoTiffRasterSource(uri).reproject(targetCRS): RasterSource } else { GeoTiffRasterSource(uri): RasterSource } }) val summary = RasterSummary.fromRDD(sourceRDD) val LayoutLevel(zoom, layout2) = summary.levelFor(layoutScheme) RasterSourceRDD.tiledLayerRDD(sourceRDD, layout2, KeyExtractor.spatialKeyExtractor, rasterSummary = Some(summary)) }

val image1 = "/Downloads/LC08_L2SP_126032_20220408_20220412_02_T1_SR_B2.TIF"
val image2 = "/Downloads/LC08_L2SP_126033_20220408_20220412_02_T1_SR_B2.TIF"

val rdd1 = genSourceRddWrapper(sparkContext, Seq(image1), WebMercator, FloatingLayoutScheme(512))
val rdd2 = genSourceRddWrapper(sparkContext, Seq(image2), WebMercator, FloatingLayoutScheme(512))

val resultRdd = rdd1.merge(rdd2)

resultRdd.regrid(1024).toGeoTiffs().foreach {
    case (key, mTile) =>
        val localPath = s"/Downloads/merge_$key.tif"
        GeoTiffWriter.write(MultibandGeoTiff(mTile.tile, mTile.extent, mTile.crs), localPath, optimizedOrder = true)
}

}


> Get error

```scala
22/06/27 11:38:52 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 7)
java.lang.AssertionError: assertion failed: Row/col intervals must begin before they end
    at scala.Predef$.assert(Predef.scala:219)
    at geotrellis.spark.regrid.Regrid$Interval.<init>(Regrid.scala:34)
    at geotrellis.spark.regrid.Regrid$Interval.intersect(Regrid.scala:38)
    at geotrellis.spark.regrid.Regrid$.$anonfun$apply$6(Regrid.scala:119)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234)
    at scala.collection.Iterator.foreach(Iterator.scala:944)
    at scala.collection.Iterator.foreach$(Iterator.scala:944)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1432)
    at scala.collection.IterableLike.foreach(IterableLike.scala:71)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike.map(TraversableLike.scala:234)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:227)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at geotrellis.spark.regrid.Regrid$.$anonfun$apply$4(Regrid.scala:111)
    at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:241)
    at scala.collection.Iterator.foreach(Iterator.scala:944)
    at scala.collection.Iterator.foreach$(Iterator.scala:944)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1432)
    at scala.collection.IterableLike.foreach(IterableLike.scala:71)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike.flatMap(TraversableLike.scala:241)
    at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:238)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
    at geotrellis.spark.regrid.Regrid$.$anonfun$apply$2(Regrid.scala:106)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:482)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:488)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Screenshots

Before merge

LC08_L2SP_126032_20220408_20220412_02_T1_SR_B2.TIF Metadata image

LC08_L2SP_126033_20220408_20220412_02_T1_SR_B2.TIF Metadata image

After merge

image

After the merge, the values ​​of cols and rows of the layout are still the previous values, and have not changed, which is very strange.

Environment

pomadchin commented 2 years ago

Hey @imperio-wxm, TLDR; two tiles on two different layouts, tiled differently are tried to be merged.

I didn't pay attention to that, but had these thoughts:

So they are located close to each other, and I guess the thing you're trying is to merge two 'layers' (RDDs that represent each scene) into a single one.

image

There are two problems with it in the code:

  1. FloatingLayoutScheme is used, which performs relative to the tile grid tiling:
    • rdd1.md.bounds: KeyBounds(SpatialKey(0,0),SpatialKey(15,15))
    • rdd2.md.bounds: KeyBounds(SpatialKey(0,0),SpatialKey(15,15)) It means that even though two scenes are positioned differently, the keys are the same.
    • Metadatas are slightly different (metadata used to compute tiles relative position on a grid, and used to compute the target keys after the regrid):
      • Extent(1.2319029048418062E7, 4742829.94013792, 1.264605352776789E7, 5069854.419487747), CellSize(39.91998038938323,39.91998038938323)
      • Extent(1.2271730055460626E7, 4540358.837452589, 1.2591363907295156E7, 4859992.689287118), CellSize(39.01780417901978,39.01780417901966)
  2. The merge is done by keys, and assumes, that rdds are tiled according to the same layout, and keys represent the same tile on a layout.
    • Even if merge could happen, it would behave incorrectly. Merging of incorrect keys is going to happen.
    • Some keys after resampling go into negative values due to going out of bounds of the layout (java.lang.AssertionError: assertion failed: Row/col intervals must begin before they end error is thrown) due to differences in it.

To solve it we need to be sure that inputs are in the same projection and on the same layout:

it("merge rdd") {
  val images = List(
    "/Users/.../Downloads/LC08_L2SP_126032_20220408_20220412_02_T1_SR_B2.TIF",
    "/Users/.../Downloads/LC08_L2SP_126033_20220408_20220412_02_T1_SR_B2.TIF"
  )

  val layoutScheme = FloatingLayoutScheme(512)

  val source: RDD[RasterSource] = sc.parallelize(images).map { RasterSource(_).reproject(WebMercator): RasterSource }
  // summary of all rasters
  val summary = RasterSummary.fromRDD(source)
  // layout includes all rasters
  val LayoutLevel(_, layout) = summary.levelFor(layoutScheme)

  // tiling 
  val rdd = RasterSourceRDD.tiledLayerRDD(source, layout, KeyExtractor.spatialKeyExtractor, rasterSummary = Some(summary))

  // regrid
  rdd.regrid(1024).toGeoTiffs().foreach {
    case (key, mTile) =>
      val localPath = s"/Users/.../Downloads/merge_$key.tif"
      MultibandGeoTiff(mTile.tile, mTile.extent, mTile.crs).write(localPath, optimizedOrder = true)
  }
}

If you want to keep the style of the code in the issue description (which is a bit inefficient):

it("merge rdd (behaves in fact like above, but manually and slower)") {
  val image1 = "/Users/.../Downloads/LC08_L2SP_126032_20220408_20220412_02_T1_SR_B2.TIF"
  val image2 = "/Users/.../Downloads/LC08_L2SP_126033_20220408_20220412_02_T1_SR_B2.TIF"

  val layoutScheme = FloatingLayoutScheme(512)

  // read raster1
  val source1: RDD[RasterSource] = sparkContext.parallelize(image1 :: Nil).map { RasterSource(_).reproject(WebMercator): RasterSource }
  // read raster2
  val source2: RDD[RasterSource] = sparkContext.parallelize(image2 :: Nil).map { RasterSource(_).reproject(WebMercator): RasterSource }

  // collect summaries 
  val summary1 = RasterSummary.fromRDD(source1)
  val summary2 = RasterSummary.fromRDD(source2)

  // combine summaries
  val summary = summary1.combine(summary2)
  // get the unified layout
  val LayoutLevel(_, layout) = summary.levelFor(layoutScheme)

  // tile rdds to the same layout
  val rdd1 = RasterSourceRDD.tiledLayerRDD(source1, layout, KeyExtractor.spatialKeyExtractor, rasterSummary = Some(summary))
  val rdd2 = RasterSourceRDD.tiledLayerRDD(source2, layout, KeyExtractor.spatialKeyExtractor, rasterSummary = Some(summary))

  // merge
  val rdd = rdd1.merge(rdd2)

  // regrid
  rdd.regrid(1024).toGeoTiffs().foreach {
    case (key, mTile) =>
      val localPath = s"/Users/.../Downloads/merge_$key.tif"
      MultibandGeoTiff(mTile.tile, mTile.extent, mTile.crs).write(localPath, optimizedOrder = true)
  }
}
imperio-wxm commented 2 years ago

@pomadchin hi, summary must be generated by RDD[RasterSource], If the rdd of the merge to be the intermediate calculation result instead of the original RasterSourceRdd, the following code: read the two images and do the division and addition respectively and then merge them.

How to generate summary through rdd1New、rdd2new and put them into the same layout in this case? What I want to do is actually, the merge of rdds generated by complex operations is no longer the original RasterDataSouce at this time.

it("merge rdd 2") {
    def genSourceRddWrapper(sc: SparkContext, files: Seq[String], targetCRS: CRS, layoutScheme: LayoutScheme) = {
    val sourceRDD: RDD[RasterSource] = sc.parallelize(files)
        .map(uri => {
        if (Objects.nonNull(targetCRS)) {
            GeoTiffRasterSource(uri).reproject(targetCRS): RasterSource
        } else {
            GeoTiffRasterSource(uri): RasterSource
        }
        })

    val summary = RasterSummary.fromRDD(sourceRDD)
    val LayoutLevel(zoom, layout2) = summary.levelFor(layoutScheme)
    RasterSourceRDD.tiledLayerRDD(sourceRDD, layout2, KeyExtractor.spatialKeyExtractor, rasterSummary = Some(summary))
    }

    val image1 = "/Users/weiximing/Downloads/LC08_L2SP_126032_20220408_20220412_02_T1_SR_B2.TIF"
    val image2 = "/Users/weiximing/Downloads/LC08_L2SP_126033_20220408_20220412_02_T1_SR_B2.TIF"

    val rdd1 = genSourceRddWrapper(sparkContext, Seq(image1), WebMercator, FloatingLayoutScheme(512))
    val rdd2 = genSourceRddWrapper(sparkContext, Seq(image2), WebMercator, FloatingLayoutScheme(512))

    ///// This may be a very complex calculation, or reprojection, resampling, crop extent may be performed /////
    val rdd1New = rdd2.withContext {
        rdd =>
           val result = rdd.map { case (key, tile) => (key, MultibandTile(tile.bands.map(_.localDivide(100)))) }
           result
    }

    val rdd2New = rdd2.withContext {
       rdd =>
          val result = rdd.map { case (key, tile) => (key, MultibandTile(tile.bands.map(_.localAdd(10)))) }
          result
    }
    ///////////////////////////

    val resultRdd = rdd1New.merge(rdd2New)

    resultRdd.regrid(1024).toGeoTiffs().foreach {
    case (key, mTile) =>
        val localPath = s"/Users/weiximing/code/temp_code/gcs-spark/src/test/scala/com/alibaba/aie/gcs/mean_$key.tif"
        GeoTiffWriter.write(MultibandGeoTiff(mTile.tile, mTile.extent, mTile.crs), localPath, optimizedOrder = true)
    }
}
pomadchin commented 2 years ago

hey @imperio-wxm read the example #2:

it("merge rdd (behaves in fact like above, but manually and slower)") {
  val image1 = "/Users/.../Downloads/LC08_L2SP_126032_20220408_20220412_02_T1_SR_B2.TIF"
  val image2 = "/Users/.../Downloads/LC08_L2SP_126033_20220408_20220412_02_T1_SR_B2.TIF"

  val layoutScheme = FloatingLayoutScheme(512)

  // read raster1
  val source1: RDD[RasterSource] = sparkContext.parallelize(image1 :: Nil).map { RasterSource(_).reproject(WebMercator): RasterSource }
  // read raster2
  val source2: RDD[RasterSource] = sparkContext.parallelize(image2 :: Nil).map { RasterSource(_).reproject(WebMercator): RasterSource }

  // collect summaries 
  val summary1 = RasterSummary.fromRDD(source1)
  val summary2 = RasterSummary.fromRDD(source2)

  // combine summaries
  val summary = summary1.combine(summary2)
  // get the unified layout
  val LayoutLevel(_, layout) = summary.levelFor(layoutScheme)

  // tile rdds to the same layout
  val rdd1 = RasterSourceRDD.tiledLayerRDD(source1, layout, KeyExtractor.spatialKeyExtractor, rasterSummary = Some(summary))
  val rdd2 = RasterSourceRDD.tiledLayerRDD(source2, layout, KeyExtractor.spatialKeyExtractor, rasterSummary = Some(summary))

  // merge
  val rdd = rdd1.merge(rdd2)

  // regrid
  rdd.regrid(1024).toGeoTiffs().foreach {
    case (key, mTile) =>
      val localPath = s"/Users/.../Downloads/merge_$key.tif"
      MultibandGeoTiff(mTile.tile, mTile.extent, mTile.crs).write(localPath, optimizedOrder = true)
  }
}
imperio-wxm commented 2 years ago

hey @imperio-wxm read the example #2:

it("merge rdd (behaves in fact like above, but manually and slower)") {
  val image1 = "/Users/.../Downloads/LC08_L2SP_126032_20220408_20220412_02_T1_SR_B2.TIF"
  val image2 = "/Users/.../Downloads/LC08_L2SP_126033_20220408_20220412_02_T1_SR_B2.TIF"

  val layoutScheme = FloatingLayoutScheme(512)

  // read raster1
  val source1: RDD[RasterSource] = sparkContext.parallelize(image1 :: Nil).map { RasterSource(_).reproject(WebMercator): RasterSource }
  // read raster2
  val source2: RDD[RasterSource] = sparkContext.parallelize(image2 :: Nil).map { RasterSource(_).reproject(WebMercator): RasterSource }

  // collect summaries 
  val summary1 = RasterSummary.fromRDD(source1)
  val summary2 = RasterSummary.fromRDD(source2)

  // combine summaries
  val summary = summary1.combine(summary2)
  // get the unified layout
  val LayoutLevel(_, layout) = summary.levelFor(layoutScheme)

  // tile rdds to the same layout
  val rdd1 = RasterSourceRDD.tiledLayerRDD(source1, layout, KeyExtractor.spatialKeyExtractor, rasterSummary = Some(summary))
  val rdd2 = RasterSourceRDD.tiledLayerRDD(source2, layout, KeyExtractor.spatialKeyExtractor, rasterSummary = Some(summary))

  // merge
  val rdd = rdd1.merge(rdd2)

  // regrid
  rdd.regrid(1024).toGeoTiffs().foreach {
    case (key, mTile) =>
      val localPath = s"/Users/.../Downloads/merge_$key.tif"
      MultibandGeoTiff(mTile.tile, mTile.extent, mTile.crs).write(localPath, optimizedOrder = true)
  }
}

I know, this case is very simple, what I want to ask is how to do summary combine through rdd which is not RasterSource. Because RasterSummary.fromRDD must be RasterSource, for example an RDD like this: RDD[(K, MultibandTile)] with Metadata[TileLayerMetadata[K]]

pomadchin commented 2 years ago

@imperio-wxm layouts should match upfront, if they are not global and / or do not match somehow else than there is nothing imbuilt you can do - keys are totally different, and you should recompute them somehow to make layers match.

If you have any cool ideas about how to handle / suggest some API around it - that is very welcome btw; but it doesn't sound like a very trivial or a cheap task.

The alternative which may help is to use the global ZommedLayoutScheme for all layers, and in this case you worry only about the resolution, layers zoom levels should match.

imperio-wxm commented 2 years ago

@pomadchin Hi, if I want to recalculate the keys and re-layout, is there any API or code I can refer to?

pomadchin commented 2 years ago

hey @imperio-wxm, I am afraid no. I think you may want smth like tileToLayout but for any TileLayerRDD which is not implemented.

pomadchin commented 2 years ago

I'm closing this for now, feel free to reopen it / create a new issue!