dotnet / spark

.NET for Apache® Spark™ makes Apache Spark™ easily accessible to .NET developers.
https://dot.net/spark
MIT License
2.02k stars 313 forks source link

[BUG]: downloadDriverFile failing on Utils.fetchFile using microsoft-spark-3-2_2.12-2.1.0 on a 10.4 LTS (includes Apache Spark 3.2.1, Scala 2.12) #1048

Open strtdusty opened 2 years ago

strtdusty commented 2 years ago

Describe the bug I am trying to upgrade my spark-submit job in azure databricks to use the newest dotnet spark version. I have upgraded the version number in db-init.sh, upgraded my nuget package and updated the spark-submit params. I see there was a recent change in downloadDriverFile because the signature of fetchFile changed. I'm guessing I just missed something in the upgrade.

To Reproduce

Steps to reproduce the behavior:

  1. Create an app that uses Microsoft.Spark v 2.1 Setup the db-init.sh to pull the 2.1 version: https://github.com/dotnet/spark/releases/download/v2.0.0/Microsoft.Spark.Worker.netcoreapp3.1.linux-x64-2.1.0.tar.gz
  2. Upload the artifacts to dbfs
  3. Create a databricks job using spark-submit params similar to ["--class","org.apache.spark.deploy.dotnet.DotnetRunner","dbfs:/FileStore/OO/OO-TestApp/microsoft-spark-3-2_2.12-2.1.0.jar","dbfs:/FileStore/OO/OO-TestApp/SparkTestApp.zip","SparkTestApp"] Use a 10.4 LTS cluster (includes Apache Spark 3.2.1, Scala 2.12)

Standard Error: Warning: Ignoring non-Spark config property: libraryDownload.sleepIntervalSeconds Warning: Ignoring non-Spark config property: libraryDownload.timeoutSeconds Warning: Ignoring non-Spark config property: eventLog.rolloverIntervalSeconds Exception in thread "main" java.lang.NoSuchMethodError: org.apache.spark.util.Utils$.fetchFile$default$7()Z at org.apache.spark.deploy.dotnet.DotnetRunner$.downloadDriverFile(DotnetRunner.scala:222) at org.apache.spark.deploy.dotnet.DotnetRunner$.main(DotnetRunner.scala:77) at org.apache.spark.deploy.dotnet.DotnetRunner.main(DotnetRunner.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:956) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1045) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1054) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

log4j 22/04/11 16:23:30 INFO AzureNativeFileSystemStore: URI scheme: wasbs, using https for connections 22/04/11 16:23:30 INFO NativeAzureFileSystem: Delete with limit configurations: deleteFileCountLimitEnabled=false, deleteFileCountLimit=-1 22/04/11 16:23:30 INFO DBFS: Initialized DBFS with DBFSV2 as the delegate. 22/04/11 16:23:31 INFO Utils: Fetching dbfs:/FileStore/OO/OO-TestApp/microsoft-spark-3-2_2.12-2.1.0.jar to /local_disk0/tmp/spark-ba7cd3c5-19f9-4461-b80e-f0e7dc01fcda/fetchFileTemp4257510112956878374.tmp 22/04/11 16:23:31 WARN SparkConf: The configuration key 'spark.akka.frameSize' has been deprecated as of Spark 1.6 and may be removed in the future. Please use the new key 'spark.rpc.message.maxSize' instead. 22/04/11 16:23:31 INFO DotnetRunner: Copying user file dbfs:/FileStore/OO/OO-TestApp/SparkTestApp.zip to /databricks/driver 22/04/11 16:23:31 INFO ShutdownHookManager: Shutdown hook called 22/04/11 16:23:31 INFO ShutdownHookManager: Deleting directory /local_disk0/tmp/spark-ba7cd3c5-19f9-4461-b80e-f0e7dc01fcda

strtdusty commented 2 years ago

Further, it looks like there is no overload in spark 3.2.1 fetchFile that takes 6 parameters as is used in main's DotnetRunner. It looks like there are 2 overloads, one with 7 params and one with 5.

alastairs commented 2 years ago

Unfortunately I've hit this with my first foray into the world of .NET on Spark. Is there a known-good set of libraries and cluster configuration that I should use as a workaround instead?

strtdusty commented 2 years ago

Version 2.0 of dotnetrunner and version 3.1 of spark work

alastairs commented 2 years ago

Thank you @strtdusty 🙏🏻

strtdusty commented 2 years ago

@Niharikadutta it looks like you might be familiar with the newer dotnetrunner code. Have you seen this issue in the wild?

Niharikadutta commented 2 years ago

@strtdusty Open source Spark has the following signature for the fetchFile util function:

def fetchFile(
      url: String,
      targetDir: File,
      conf: SparkConf,
      hadoopConf: Configuration,
      timestamp: Long,
      useCache: Boolean,
      shouldUntar: Boolean = true)

Since the last parameter is optional, the 6 parameters filled in from DotnetRunner match the required parameter list and we have not seen this issue otherwise. However, it is very possible that Databricks has their own implementation of this function and as their code is closed source, we do not have visibility into it. Can you try running on an open source based cluster (say your local machine) and see if you still face this issue?

JoshRosen commented 2 years ago

Hi, I'm an engineer from Databricks who works on Databricks Runtime.

In Apache Spark 3.2.0, the signature of the Utils.fetchFile method was changed in https://github.com/apache/spark/pull/30945 as part of code cleanup. The org.apache.spark.util.Utils class is declared as private[spark] and is not intended to be used outside of Spark's own code.

Databricks Runtime 10.0+ currently does not include the change from https://github.com/apache/spark/pull/30945, so fetchFile continues to use the old method signature that requires a SecurityManager:

  def fetchFile(
      url: String,
      targetDir: File,
      conf: SparkConf,
      securityMgr: SecurityManager,
      hadoopConf: Configuration,
      timestamp: Long,
      useCache: Boolean,
      shouldUntar: Boolean = true): File = {

Although the method signature requires a SecurityManager, that argument is not used in the method itself (so it is safe to pass null).

Short-term workaround:

For now, I think the quickest route to unblocking your use of the .NET connector on Databricks Runtime 10.0+ is to modify DotnetRunner.scala to invoke this method using reflection.

The code snippet below defines a callFetchFileViaReflection helper method that automatically detects whether fetchFile requires the SecurityManager argument, allowing it to be be compatible both Apache Spark 3.2.0 and current Databricks Runtime 10.0+ releases. I manually tested this reflection code in both Databricks Runtime 10.4 and in Apache Spark 3.2.0. You can replace the call to Utils.fetchFile() with a call to this helper method.

import org.apache.spark.SparkConf
import org.apache.spark.SecurityManager
import org.apache.hadoop.conf.Configuration
import org.apache.spark.util.Utils
import java.io.File
import java.lang.NoSuchMethodException
import java.lang.reflect.InvocationTargetException

def callFetchFileViaReflection(
    url: String,
    targetDir: File,
    conf: SparkConf,
    hadoopConf: Configuration,
    timestamp: Long,
    useCache: Boolean,
    shouldUntar: Boolean = true): File = {

  val signatureWithSecurityManager = Array(
    classOf[String],
    classOf[File], 
    classOf[SparkConf],
    classOf[SecurityManager],
    classOf[Configuration],
    java.lang.Long.TYPE,
    java.lang.Boolean.TYPE,
    java.lang.Boolean.TYPE
  )

  val signatureWithoutSecurityManager = Array(
    classOf[String],
    classOf[File], 
    classOf[SparkConf],
    classOf[Configuration],
    classOf[Long],
    classOf[Boolean],
    classOf[Boolean]
  )

  val (needSecurityManagerArg, method) = {
    try {
      (true, utilsClass.getMethod("fetchFile", signatureWithSecurityManager: _*))
    } catch {
      case _: NoSuchMethodException => 
        (false, utilsClass.getMethod("fetchFile", signatureWithoutSecurityManager: _*))
    }
  }

  val args: Seq[Any] = 
    Seq(
      url,
      targetDir,
      conf
    ) ++ (if (needSecurityManagerArg) Seq(null) else Nil) ++ Seq(
      hadoopConf,
      timestamp,
      useCache, 
      shouldUntar)

  // Unwrap InvocationTargetException to preserve exception in case of errors:
  try {
    method.invoke(utilsObject, args.map(_.asInstanceOf[Object]): _*).asInstanceOf[File]
  } catch {
    case e: InvocationTargetException => 
      throw e.getCause()
  }
}
michael-damatov commented 2 years ago

Any update on this issue?

Unless resolved, it forces us to stay with the Databricks Runtime 9.1 (the last available runtime supporting Spark 3.1.2)

ValtorX commented 2 years ago

Based on @JoshRosen's workaround. I created this jar that works with 10.4 LTS.

microsoft-spark-3-2_2.12-2.1.1.jar.zip

michael-damatov commented 1 year ago

Any update on this issue?

Unless resolved, it still forces us to stay with the Databricks Runtime 9.1 (the last available runtime supporting Spark 3.1.2)

grazy27 commented 4 weeks ago

It's not obligatory to use unzipping functionality, there's a string check that allows using path to exe directly. So if you manually copy executables in init.sh script to driver - it's possible to define path to it directly, and omit using the broken API call

image