Open-EO / openeo-geotrellis-extensions

Java/Scala extensions for Geotrellis, for use with OpenEO GeoPySpark backend.
Apache License 2.0
5 stars 3 forks source link

sample_by_feature creates empty tiles in some cases #275

Open JeroenVerstraelen opened 6 months ago

JeroenVerstraelen commented 6 months ago

The following code:

import openeo
c = openeo.connect("openeo.vito.be").authenticate_oidc()
# Load the collection
temp = c.load_collection(
    "AGERA5",
    temporal_extent=["2018-08-30", "2018-09-15"], 
    # temporal_extent=["2018-08-30", "2020-03-02"],
    bands=["temperature-mean"],
)
geoms = c.load_url("https://artifactory.vgt.vito.be/artifactory/auxdata-public/gfmap-temp/openeogfmap_dataframe_0.parquet", format='parquet')
result = temp.filter_spatial(geometries=geoms)
result.execute_batch("result.nc", format="NetCDF", sample_by_feature=True)

Results in this error: OpenEO batch job failed: Exception during Spark execution: geotrellis.raster.GeoAttrsError: invalid cols: 0

Stack trace:

Stage error: Job aborted due to stage failure: Task 0 in stage 7.0 failed 4 times, most recent failure: Lost task 0.3 in stage 7.0 (TID 970) (epod065.vgt.vito.be executor 41): geotrellis.raster.GeoAttrsError: invalid cols: 0
    at geotrellis.raster.GridExtent$mcI$sp.<init>(GridExtent.scala:45)
    at geotrellis.raster.RasterExtent.<init>(RasterExtent.scala:79)
    at geotrellis.raster.RasterExtent$.apply(RasterExtent.scala:149)
    at geotrellis.raster.Raster.rasterExtent(Raster.scala:69)
    at org.openeo.geotrellis.netcdf.NetCDFRDDWriter$.writeToDisk(NetCDFRDDWriter.scala:509)
    at org.openeo.geotrellis.netcdf.NetCDFRDDWriter$.$anonfun$groupByFeatureAndWriteToNetCDF$1(NetCDFRDDWriter.scala:386)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
    at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
    at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
    at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
    at scala.collection.AbstractIterator.to(Iterator.scala:1431)
    at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
    at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
    at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
    at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
    at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1019)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2303)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

Job id: j-240320965131422cb2699a7fbe31bcd1

JeroenVerstraelen commented 6 months ago

The issue occurs in NetCDFRDDWriter.scala:509

def writeToDisk(rasters: Seq[Raster[MultibandTile]], ...)
  val maxExtent: Extent = rasters.map(_._2).reduce((a, b) => if (a.area > b.area) a else b)
  val equalRasters = rasters.map(raster =>
    if (raster.extent != maxExtent) raster.crop(maxExtent, CropOptions(clamp = false, force = true)) else raster
  )
  equalRasters.head.rasterExtent

So either some of the initial rasters have size 0, or some issue occurred when cropping to maxExent.

I will add a try catch, and add some extra logging.