Open-EO / openeo-geotrellis-extensions

Java/Scala extensions for Geotrellis, for use with OpenEO GeoPySpark backend.
Apache License 2.0
5 stars 3 forks source link

fix recent tsservice performance drop #184

Closed bossie closed 11 months ago

bossie commented 1 year ago

[Yesterday 1:17 PM] Stijn C.

The performance of the TimeSeries service is often slow, causing the monitoring checks to go in warning/critical state. Can you please have a look at what might have caused this performance drop and how to remediate it? I logged this ticket: EP-4234

More details in said ticket.

jdries commented 1 year ago

@JeroenVerstraelen do you still remember the fix that we wanted to do for this one? Was it something with the count/partitioner in FileLayerProvider?

JeroenVerstraelen commented 1 year ago

@jdries I believe you are referring to this issue: https://github.com/Open-EO/openeo-geotrellis-extensions/issues/186

jdries commented 1 year ago

First fixes have been implemented, we got rid of accumulating cached rdd's, but there's still some increase in memory, leading to a slowdown. Some of the slow tasks have huge deserialization times, indicating that something large is shipped in the context.

jdries commented 1 year ago

also took a heap dump. A lot of objects were retained by spark GC, so I now made it run more frequently to avoid this memory buildup. There were also a lot of SerializableConfiguration objects, so I tried to improve cleanup of the broadcast that was holding on to it.

bossie commented 1 year ago

I reverted the broadcast cleanup because this integration test started failing:

openeo-collection-tests.test_layers.test_layer[TERRASCOPE_S1_GAMMA0_V1]

openeo.rest.OpenEoApiError: [500] Internal: Server error: Exception during Spark execution: Failed to get broadcast_251_piece0 of broadcast_251 (ref: r-5050849ea1ba4489be3854783159cd09)

This exception originates in the executors:

org.apache.spark.SparkException: Failed to get broadcast_251_piece0 of broadcast_251
    at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBlocks$1(TorrentBroadcast.scala:219)
    at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at org.apache.spark.broadcast.TorrentBroadcast.readBlocks(TorrentBroadcast.scala:191)
    at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$4(TorrentBroadcast.scala:275)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$2(TorrentBroadcast.scala:253)
    at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64)
    at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$1(TorrentBroadcast.scala:248)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1495)
    at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:248)
    at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:109)
    at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
    at org.openeo.geotrellisaccumulo.GeotrellisAccumuloRDD.getConf(GeotrellisAccumuloRDD.scala:108)
    at org.openeo.geotrellisaccumulo.GeotrellisAccumuloRDD.org$openeo$geotrellisaccumulo$GeotrellisAccumuloRDD$$createJobContextWithTokens(GeotrellisAccumuloRDD.scala:169)
    at org.openeo.geotrellisaccumulo.GeotrellisAccumuloRDD$$anon$1.<init>(GeotrellisAccumuloRDD.scala:196)
    at org.openeo.geotrellisaccumulo.GeotrellisAccumuloRDD.compute(GeotrellisAccumuloRDD.scala:193)
    at org.openeo.geotrellisaccumulo.GeotrellisRasterRDD.compute(GeotrellisRasterRDD.scala:36)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at geotrellis.spark.ContextRDD.compute(ContextRDD.scala:36)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at geotrellis.spark.ContextRDD.compute(ContextRDD.scala:36)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at geotrellis.spark.ContextRDD.compute(ContextRDD.scala:36)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at geotrellis.spark.ContextRDD.compute(ContextRDD.scala:36)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at geotrellis.spark.ContextRDD.compute(ContextRDD.scala:36)
    at org.openeo.geotrellis.SpatialToSpacetimeJoinRdd.compute(SpatialToSpacetimeJoinRdd.scala:62)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:139)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

It looks like they are still trying to use the broadcast variable that has been destroyed in the meanwhile.

jdries commented 11 months ago

Memory use and performance is still stable after the revert, so probably fixed this in some other way, for instance by tuning garbage collection.