simplexspatial / osm4scala

Scala and Spark library focused on reading OpenStreetMap Pbf files.
https://simplexspatial.github.io/osm4scala/
MIT License
81 stars 18 forks source link

Unable to run Spark jobs on an Azure Databricks cluster: java.lang.NullPointerException at org.apache.spark.util.LocalHadoopConfiguration.set(SparkHadoopConfiguration.scala:196) #102

Closed jochenhebbrecht closed 3 years ago

jochenhebbrecht commented 3 years ago

Hi,

Since last Friday, 5th of November, we seem to be unable to use the osm4scala library on an Azure Databricks cluster with configuration: 8.3 (includes Apache Spark 3.1.1, Scala 2.12).

When we try to run this simple command in a notebook:

spark.read
     .format("osm.pbf")
     .load("dbfs:/FileStore/shared_uploads/andorra_latest_osm.pbf")
     .select("id","type")
     .show()

... we're getting following exception:

Caused by: java.lang.NullPointerException
    at org.apache.spark.util.LocalHadoopConfiguration.set(SparkHadoopConfiguration.scala:196)
    at org.apache.hadoop.conf.Configuration.set(Configuration.java:1115)
    at org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2877)
    at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)
    at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)
    at org.apache.spark.SerializableWritable.$anonfun$readObject$1(SerializableWritable.scala:45)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1609)
    at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:41)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2296)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
    at org.apache.spark.broadcast.TorrentBroadcast$.$anonfun$unBlockifyObject$4(TorrentBroadcast.scala:372)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1643)
    at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:374)
    at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$4(TorrentBroadcast.scala:279)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$2(TorrentBroadcast.scala:253)
    at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64)
    at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$1(TorrentBroadcast.scala:248)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1609)
    at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:248)
    at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:118)
    at org.apache.spark.broadcast.Broadcast.$anonfun$value$1(Broadcast.scala:80)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:78)
    at com.acervera.osm4scala.spark.OsmPbfFormat.$anonfun$buildReader$1(OsmPbfFormat.scala:86)
    at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:155)
    at org.apache.spark.sql.execution.datasources.FileFormat$$anon$1.apply(FileFormat.scala:142)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:331)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:475)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.$anonfun$hasNext$1(FileScanRDD.scala:300)
    at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:295)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:757)
    at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)
    at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:178)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:91)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:812)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1643)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:815)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:671)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

As we have been running osm4scala so many times and we didn't change anything on our side, we escaled this as a critical support case to Azure.

Unfortunately, this is the feedback we've received:

If we follow the stacktrace, you'll notice we'll end up at this part of your code:

com.acervera.osm4scala.spark.OsmPbfFormat:86

... which is this line:

val fs = path.getFileSystem(broadcastedHadoopConf.value.value)

Are there any insights what could be causing this very annoying behavior?

It is also very unclear from where the following classes are originating. I would assume those are Azure Databricks specific, but Azure support is not confirming this.


org.apache.spark.util.LocalHadoopConfiguration
org.apache.spark.util.SparkHadoopConfiguration
jochenhebbrecht commented 3 years ago

Aha, I was able to get it back working again:

Result is that it started to work again. So I can tell you with 100% confidence we introduced a new bug in this commit: https://github.com/simplexspatial/osm4scala/commit/5f54f8d8ab6e627d43f95649a75af62967029329, but it got recently exposed (due to an underlying Azure Databricks upgrade).

Unfortunately, I cannot really tell you yet what exactly is wrong in OsmPbfFormat.scala

angelcervera commented 3 years ago

Hi @jochenhebbrecht Which version of osm4scala are you using? Looks line Line 86 is a comment.

I'm going to try to reproduce the error using the last version v1.0.10

BTW, the last time I tested it in Azure was 7 months ago, so the version tested there should be 1.0.7.

jochenhebbrecht commented 3 years ago

Yes, sorry, we are using 1.0.9 in production. We haven't upgraded to 1.0.10 yet because we still have to revert the workaround with the timestamp. So we're talking about this line. I've tested it with 1.0.10 as well, and the problem remains.

In the meantime, Azure support gave us a possibility to revert to the previous Databricks environment, but according to them, their underlying upgrade actually just exposes a bug on osm4scala and we would need to dive deeper on that one.

angelcervera commented 3 years ago

I was able to reproduce it in Azure. Bug looks localized at com.acervera.osm4scala.spark.OsmPbfFormat.$anonfun$buildReader$1(OsmPbfFormat.scala:153) that is the equivalent to L86 in other version.

It is weird, because this line is getting the file system, so nothing special. Also, the NPE is triggered in a non reader related class. Looks like it is not broadcasting the hadoop configuration properly.

vanhove commented 3 years ago

yes, it looks like the line val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableWritable(hadoopConf)) would need to be reverted to use SerializableConfiguration. (Similarly to the ParquetFileFormat)

I noticed that this change was done related to a bug, but it's not clear what the cause was: https://github.com/simplexspatial/osm4scala/pull/62

angelcervera commented 3 years ago

That fixed a back compatibility problem with Spark 2. So the problem could be a combination of the Spark version and the Hadoop version. No time now to work on that. I hope to do it this evening.

Could you try to run it using Spark 2.4? Only to know if it is still working there.

angelcervera commented 3 years ago

In AWS EMR with Hadoop 3.2.1 and Spark 3.1.2, version 1.0.10 is working without problems. Looks like there is something weird in Azure Databrick.

jochenhebbrecht commented 3 years ago

Thanks Ángel for already having a look at this.

When downgrading to 6.4 Extended Support (includes Apache Spark 2.4.5, Scala 2.11) and installing the osm4scala-spark2-shaded_2.11-1.0.10.jar library, I can no longer reproduce the problem.

We still have the support case open and Azure are also looking further into this problem on their side. I'll provide this feedback as well. It is still perfectly possible something is broken on Azure Databricks side, and that your code is actually perfectly fine.

We are also still waiting for the source code on their side, to get smarter on where the NPE occurs

angelcervera commented 3 years ago

I will research more using Azure, but I will not have time until the weekend. Also, this type of error requires infrastructure that is not free. Do you know if Microsoft or Databricks would give me resources for testing? That would be great. Could ask them? Thanks

jochenhebbrecht commented 3 years ago

That sounds like a very great idea, it's definitely something I would like to achieve. I will contact the support team of Azure and ask whether you could get some capacity on resources to further investigate this bug.

jochenhebbrecht commented 3 years ago

I cannot reproduce the problem on 8.3 (includes Apache Spark 3.1.1, Scala 2.12) and osm4scala:1.0.10 with cluster mode set to Single Node (instead of Standard). This would indeed learn us the problem is serializing the Hadoop configuration.

jochenhebbrecht commented 3 years ago

Feedback from Azure

Databricks team have already identified that the issue is caused by a maintenance release. They are currently investigating what change specifically caused the issue and a permanent fix to this issue will be planned for that.

So I would not worry too much on your codebase, let's give the support team some time and freedom to first investigate what they potentially broke.

With regards to asking resources to reproduce it on your end:

Thank you for letting us know about this. During the investigation if we get to a stage where help from third party library owner is required, we would definitely explore this option. Given that the investigation is underway, this is not immediately needed. But we appreciate your suggestion and willingness to collaborate.

I would suggest to close this ticket for now. We can always reopen it in case Azure Databricks would still come back to us indicating something's not valid in the codebase of osm4scala

angelcervera commented 3 years ago

Hi @jochenhebbrecht Thanks for your update. I will close the ticket, but let's recap to keep it in mind. Maybe it is necessary to change something in the connector related to this, like find a way to avoid serializing the HadoopConfig object.

Tested in cloud providers with Spark 3.

Looks like the error is related to Hadoop config serialization. It makes sense because in the single node execution serialization is not needed, so it does not fail.

FYI: I created ticket #103 to be able to test these types of problems quickly.

vanhove commented 2 years ago

Just wanted to confirm that databricks fixed the issue on azure.