locationtech / geotrellis

GeoTrellis is a geographic data processing engine for high performance applications.
http://geotrellis.io
Other
1.33k stars 360 forks source link

"Task not serializable" when writing to S3 layer #3077

Closed echeipesh closed 4 years ago

echeipesh commented 4 years ago

This is issue against master. When trying to write a layer to S3 with:

val pyramid = Pyramid.fromLayerRDD(layer, resampleMethod = Sum)
val store = AttributeStore(catalog)
val writer = LayerWriter(store, catalog)
pyramid.write(layerName, writer, ZCurveKeyIndexMethod)

The following exception will be thrown:

geotrellis.store.package$LayerWriteError: Failed to write Layer(name = "MWI", zoom = 6)
    at geotrellis.spark.store.s3.S3LayerWriter._write(S3LayerWriter.scala:125)
    at geotrellis.spark.store.s3.S3LayerWriter._write(S3LayerWriter.scala:49)
    at geotrellis.spark.store.LayerWriter$class.write(LayerWriter.scala:150)
    at geotrellis.spark.store.s3.S3LayerWriter.write(S3LayerWriter.scala:49)
    at geotrellis.spark.pyramid.Pyramid$$anonfun$write$1.apply$mcVI$sp(Pyramid.scala:68)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at geotrellis.spark.pyramid.Pyramid.write(Pyramid.scala:67)
    at geotrellis.sdg.OutputPyramid$.saveLayer(OutputPyramid.scala:53)
    at geotrellis.sdg.PopulationNearRoadsJob$$anonfun$9$$anonfun$apply$11.apply(PopulationNearRoadsJob.scala:171)
    at geotrellis.sdg.PopulationNearRoadsJob$$anonfun$9$$anonfun$apply$11.apply(PopulationNearRoadsJob.scala:148)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:934)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:933)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:933)
    at geotrellis.spark.store.s3.S3RDDWriter.update(S3RDDWriter.scala:78)
    at geotrellis.spark.store.s3.S3RDDWriter.write(S3RDDWriter.scala:51)
    at geotrellis.spark.store.s3.S3LayerWriter._write(S3LayerWriter.scala:129)
    ... 16 more
Caused by: java.io.NotSerializableException: geotrellis.spark.store.s3.S3SparkLayerProvider
Serialization stack:
    - object not serializable (class: geotrellis.spark.store.s3.S3SparkLayerProvider, value: geotrellis.spark.store.s3.S3SparkLayerProvider@67eb115a)
    - field (class: geotrellis.spark.store.s3.S3SparkLayerProvider$$anonfun$layerWriter$1, name: $outer, type: class geotrellis.spark.store.s3.S3SparkLayerProvider)
    - object (class geotrellis.spark.store.s3.S3SparkLayerProvider$$anonfun$layerWriter$1, <function0>)
    - field (class: geotrellis.spark.store.s3.S3RDDWriter, name: geotrellis$spark$store$s3$S3RDDWriter$$s3Client, type: interface scala.Function0)
    - object (class geotrellis.spark.store.s3.S3RDDWriter, geotrellis.spark.store.s3.S3RDDWriter@75503aa7)
    - field (class: geotrellis.spark.store.s3.S3RDDWriter$$anonfun$update$1, name: $outer, type: class geotrellis.spark.store.s3.S3RDDWriter)
    - object (class geotrellis.spark.store.s3.S3RDDWriter$$anonfun$update$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
    ... 28 more
19/09/23 13:42:57 INFO ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: geotrellis.store.package$LayerWriteError: Failed to write Layer(name = "MWI", zoom = 6)
    at geotrellis.spark.store.s3.S3LayerWriter._write(S3LayerWriter.scala:125)
    at geotrellis.spark.store.s3.S3LayerWriter._write(S3LayerWriter.scala:49)
    at geotrellis.spark.store.LayerWriter$class.write(LayerWriter.scala:150)
    at geotrellis.spark.store.s3.S3LayerWriter.write(S3LayerWriter.scala:49)
    at geotrellis.spark.pyramid.Pyramid$$anonfun$write$1.apply$mcVI$sp(Pyramid.scala:68)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at geotrellis.spark.pyramid.Pyramid.write(Pyramid.scala:67)
    at geotrellis.sdg.OutputPyramid$.saveLayer(OutputPyramid.scala:53)
    at geotrellis.sdg.PopulationNearRoadsJob$$anonfun$9$$anonfun$apply$11.apply(PopulationNearRoadsJob.scala:171)
    at geotrellis.sdg.PopulationNearRoadsJob$$anonfun$9$$anonfun$apply$11.apply(PopulationNearRoadsJob.scala:148)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:934)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:933)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:933)
    at geotrellis.spark.store.s3.S3RDDWriter.update(S3RDDWriter.scala:78)
    at geotrellis.spark.store.s3.S3RDDWriter.write(S3RDDWriter.scala:51)
    at geotrellis.spark.store.s3.S3LayerWriter._write(S3LayerWriter.scala:129)
    ... 16 more
Caused by: java.io.NotSerializableException: geotrellis.spark.store.s3.S3SparkLayerProvider
Serialization stack:
    - object not serializable (class: geotrellis.spark.store.s3.S3SparkLayerProvider, value: geotrellis.spark.store.s3.S3SparkLayerProvider@67eb115a)
    - field (class: geotrellis.spark.store.s3.S3SparkLayerProvider$$anonfun$layerWriter$1, name: $outer, type: class geotrellis.spark.store.s3.S3SparkLayerProvider)
    - object (class geotrellis.spark.store.s3.S3SparkLayerProvider$$anonfun$layerWriter$1, <function0>)
    - field (class: geotrellis.spark.store.s3.S3RDDWriter, name: geotrellis$spark$store$s3$S3RDDWriter$$s3Client, type: interface scala.Function0)
    - object (class geotrellis.spark.store.s3.S3RDDWriter, geotrellis.spark.store.s3.S3RDDWriter@75503aa7)
    - field (class: geotrellis.spark.store.s3.S3RDDWriter$$anonfun$update$1, name: $outer, type: class geotrellis.spark.store.s3.S3RDDWriter)
    - object (class geotrellis.spark.store.s3.S3RDDWriter$$anonfun$update$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
    ... 28 more

For clarity the serialization stack is:

- object not serializable (class: geotrellis.spark.store.s3.S3SparkLayerProvider, value: geotrellis.spark.store.s3.S3SparkLayerProvider@67eb115a)
- field (class: geotrellis.spark.store.s3.S3SparkLayerProvider$$anonfun$layerWriter$1, name: $outer, type: class geotrellis.spark.store.s3.S3SparkLayerProvider)
- object (class geotrellis.spark.store.s3.S3SparkLayerProvider$$anonfun$layerWriter$1, <function0>)
- field (class: geotrellis.spark.store.s3.S3RDDWriter, name: geotrellis$spark$store$s3$S3RDDWriter$$s3Client, type: interface scala.Function0)
- object (class geotrellis.spark.store.s3.S3RDDWriter, geotrellis.spark.store.s3.S3RDDWriter@75503aa7)
- field (class: geotrellis.spark.store.s3.S3RDDWriter$$anonfun$update$1, name: $outer, type: class geotrellis.spark.store.s3.S3RDDWriter)
- object (class geotrellis.spark.store.s3.S3RDDWriter$$anonfun$update$1, <function1>)
    at 

The first problem is that s3Client caused a serialization. I'm unclear how S3SparkLayerProvider gets pulled into this serialization chain however.

echeipesh commented 4 years ago

Actually this seems pretty straight forward: The s3Client gets used here when making a layerWriter:

https://github.com/locationtech/geotrellis/blob/3609bf707d811960b44f6f9eed0cafbf7f7099cd/s3-spark/src/main/scala/geotrellis/spark/store/s3/S3SparkLayerProvider.scala#L41

Which ultimately a is a field on a class extended by S3SparkLayerProvider: https://github.com/locationtech/geotrellis/blob/3609bf707d811960b44f6f9eed0cafbf7f7099cd/s3/src/main/scala/geotrellis/store/s3/S3CollectionLayerProvider.scala#L34

The knee-jerk reaction is to make the provider Serializable but its slightly wrong to keep pulling more classes into closure. What what field does is ensure that every call to provider re-uses the same client. Which is a good thing. However, looking at it deeper (separate issue really):

https://github.com/locationtech/geotrellis/blob/master/s3/src/main/scala/geotrellis/store/s3/S3ClientProducer.scala

... we see that S3ClientProduce already does the same thing:

https://github.com/locationtech/geotrellis/blob/3609bf707d811960b44f6f9eed0cafbf7f7099cd/s3/src/main/scala/geotrellis/store/s3/S3ClientProducer.scala#L37

.. gets used by default which will cache. But if we user uses set:

https://github.com/locationtech/geotrellis/blob/3609bf707d811960b44f6f9eed0cafbf7f7099cd/s3/src/main/scala/geotrellis/store/s3/S3ClientProducer.scala#L71-L72

he's providing a function which may or may not have the same caching behavior depending on its implementation. Thats kind of non-obvious consequence here.