dotnet / spark

.NET for Apache® Spark™ makes Apache Spark™ easily accessible to .NET developers.
https://dot.net/spark
MIT License
2.02k stars 312 forks source link

[BUG]: Error while running a .net application containing a UDF against remote databricks apache spark (e.g. Azure Databricks) #384

Open zwitbaum opened 4 years ago

zwitbaum commented 4 years ago

Describe the bug A clear and concise description of what the bug is.

To Reproduce

Prerequisites: simple .net application with a simple SQL query and a UDF, which runs properly against a local apache spark. Steps to reproduce the behavior:

  1. Setup connection to a remote databricks apache spark (I have tested against an Azure Databricks) on your local dev machine using "databricks-connect configure". Test the connection using the "databricks-connect test"
  2. Test the .net application without UDF first: comment out UDF related line codes and make sure the simple query runs against configured remote Azure Databricks.
  3. Uncomment UDF related line codes.
  4. See error.
Error (copied from the command line) ```javascript [2020-01-06T08:04:29.4589366Z] [L-020538381857] [Error] [JvmBridge] JVM method execution failed: Nonstatic method showString failed for class 15 when called with 3 arguments ([Index=1, Type=Int32, Value=20], [Index=2, Type=Int32, Value=20], [Index=3, Type=Boolean, Value=False], ) [2020-01-06T08:04:29.4590049Z] [L-020538381857] [Error] [JvmBridge] org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.139.64.14, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/databricks/spark/python/pyspark/worker.py", line 342, in main ("%d.%d" % sys.version_info[:2], version)) Exception: Python in worker has different version 3.7 than that in driver 0.4.0, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set. at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:534) at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81) at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:488) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:640) at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:62) at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:159) at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:158) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140) at org.apache.spark.scheduler.Task.run(Task.scala:113) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:533) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:539) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2362) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2350) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2349) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2349) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1102) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1102) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2581) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2529) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2517) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:897) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2280) at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:270) at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:280) at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:80) at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:86) at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:508) at org.apache.spark.sql.execution.CollectLimitExec.executeCollectResult(limit.scala:55) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:300) at com.databricks.service.SparkServiceImpl$$anonfun$executePlan$1$$anonfun$apply$2.apply(SparkServiceImpl.scala:84) at com.databricks.service.SparkServiceImpl$$anonfun$executePlan$1$$anonfun$apply$2.apply(SparkServiceImpl.scala:78) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:111) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:240) at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:97) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:170) at com.databricks.service.SparkServiceImpl$$anonfun$executePlan$1.apply(SparkServiceImpl.scala:77) at com.databricks.service.SparkServiceImpl$$anonfun$executePlan$1.apply(SparkServiceImpl.scala:74) at com.databricks.logging.UsageLogging$$anonfun$recordOperation$1.apply(UsageLogging.scala:417) at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:239) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:234) at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:18) at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:276) at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:18) at com.databricks.logging.UsageLogging$class.recordOperation(UsageLogging.scala:398) at com.databricks.spark.util.PublicDBLogging.recordOperation(DatabricksSparkUsageLogger.scala:18) at com.databricks.spark.util.PublicDBLogging.recordOperation0(DatabricksSparkUsageLogger.scala:55) at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:98) at com.databricks.spark.util.UsageLogger$class.recordOperation(UsageLogger.scala:67) at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:67) at com.databricks.spark.util.UsageLogging$class.recordOperation(UsageLogger.scala:342) at com.databricks.service.SparkServiceImpl$.recordOperation(SparkServiceImpl.scala:54) at com.databricks.service.SparkServiceImpl$.executePlan(SparkServiceImpl.scala:74) at com.databricks.service.SparkServiceRPCHandler.com$databricks$service$SparkServiceRPCHandler$$execute0(SparkServiceRPCHandler.scala:487) at com.databricks.service.SparkServiceRPCHandler$$anonfun$executeRPC0$1.apply(SparkServiceRPCHandler.scala:376) at com.databricks.service.SparkServiceRPCHandler$$anonfun$executeRPC0$1.apply(SparkServiceRPCHandler.scala:317) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at com.databricks.service.SparkServiceRPCHandler.executeRPC0(SparkServiceRPCHandler.scala:317) at com.databricks.service.SparkServiceRPCHandler$$anon$3.call(SparkServiceRPCHandler.scala:272) at com.databricks.service.SparkServiceRPCHandler$$anon$3.call(SparkServiceRPCHandler.scala:260) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at com.databricks.service.SparkServiceRPCHandler$$anonfun$executeRPC$1.apply(SparkServiceRPCHandler.scala:304) at com.databricks.service.SparkServiceRPCHandler$$anonfun$executeRPC$1.apply(SparkServiceRPCHandler.scala:284) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at com.databricks.service.SparkServiceRPCHandler.executeRPC(SparkServiceRPCHandler.scala:283) at com.databricks.service.SparkServiceRPCServlet.doPost(SparkServiceRPCServer.scala:124) at javax.servlet.http.HttpServlet.service(HttpServlet.java:707) at javax.servlet.http.HttpServlet.service(HttpServlet.java:790) at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:848) at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:585) at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:134) at org.eclipse.jetty.server.Server.handle(Server.java:539) at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:333) at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251) at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:283) at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:108) at org.eclipse.jetty.io.SelectChannelEndPoint$2.run(SelectChannelEndPoint.java:93) at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.executeProduceConsume(ExecuteProduceConsume.java:303) at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceConsume(ExecuteProduceConsume.java:148) at org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:136) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:671) at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:589) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/databricks/spark/python/pyspark/worker.py", line 342, in main ("%d.%d" % sys.version_info[:2], version)) Exception: Python in worker has different version 3.7 than that in driver 0.4.0, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set. at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:534) at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81) at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:488) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:640) at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:62) at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:159) at org.apache.spark.sql.execution.collect.Collector$$anonfun$2.apply(Collector.scala:158) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140) at org.apache.spark.scheduler.Task.run(Task.scala:113) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:533) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:539) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more [2020-01-06T08:04:29.5052265Z] [L-020538381857] [Exception] [JvmBridge] JVM method execution failed: Nonstatic method showString failed for class 15 when called with 3 arguments ([Index=1, Type=Int32, Value=20], [Index=2, Type=Int32, Value=20], [Index=3, Type=Boolean, Value=False], ) at Microsoft.Spark.Interop.Ipc.JvmBridge.CallJavaMethod(Boolean isStatic, Object classNameOrJvmObjectReference, String methodName, Object[] args) Unhandled exception. System.Exception: JVM method execution failed: Nonstatic method showString failed for class 15 when called with 3 arguments ([Index=1, Type=Int32, Value=20], [Index=2, Type=Int32, Value=20], [Index=3, Type=Boolean, Value=False], ) at Microsoft.Spark.Interop.Ipc.JvmBridge.CallJavaMethod(Boolean isStatic, Object classNameOrJvmObjectReference, String methodName, Object[] args) at Microsoft.Spark.Interop.Ipc.JvmBridge.CallNonStaticJavaMethod(JvmObjectReference objectId, String methodName, Object[] args) at Microsoft.Spark.Interop.Ipc.JvmObjectReference.Invoke(String methodName, Object[] args) at Microsoft.Spark.Sql.DataFrame.Show(Int32 numRows, Int32 truncate, Boolean vertical) at TestApp.Program.Main(String[] args) in C:\_projects\TestApp\test-data-science\Sources\TestApp\Program.cs:line 40 ```

Expected behavior A .net application containing a UDF works properly using connection to a remote databricks.

Additional context Similar program in Python (same SQL query and similar UDF) works using the same configured remote connection properly.

Application has been started using spark-submit, i.e. %SPARK_HOME%\bin\spark-submit --class org.apache.spark.deploy.dotnet.DotnetRunner --master local microsoft-spark-2.4.x-0.7.0.jar dotnet TestApp.dll

imback82 commented 4 years ago

@zwitbaum did you follow this https://github.com/dotnet/spark/blob/master/deployment/README.md#databricks?

cc: @suhsteve @Niharikadutta @elvaliuliuliu

zwitbaum commented 4 years ago

@imback82 : the Microsoft.Spark.Worker is already deployed on the azure databricks and the .net application works properly, when it runs fully in the cluster. But what I mean is something else.

The issue occurred only if the .net app is started on the local windows machine (my dev pc) and should query the data from the remote azure databricks. So what I mean

Have you ever tested such the scenario (local run against remote databricks cluster with calling UDFs)?

imback82 commented 4 years ago

I see. Can you try to set the environment variable DOTNET_WORKER_DIR to the remote path where worker binaries are installed?

elvaliuliuliu commented 4 years ago

Moreover, I saw the error message is Exception: Python in worker has different version 3.7 than that in driver 0.4.0, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

Can you check and verify the version?

zwitbaum commented 4 years ago

@elvaliuliuliu: I have investigated the correct values for and set the PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON as follow, but it does not solve the problem:

@imback82: what you mean by "remote path where worker binaries are installed"? An URL to the cluster (e.g. https://northeurope.azuredatabricks.net/?o=xxxxxx) or to the worker file in the cluster? Or the file path to the worker file in the cluster (i.e. the worker is installed in dbfs:/FileStore/job-jars/microsoft-spark-2.4.x-0.7.0.jar) Can you please provide an example?

imback82 commented 4 years ago

I meant the path on the cluster. So if the worker binaries are installed under /usr/bin, you would set DOTNET_WORKER_DIR to /usr/bin on the driver side. Looking at your error log, /databricks/spark/python/pyspark/worker.py is being launched instead of the .NET worker. Also, how do you plan to ship your DLLs that contain your UDFs? I don't believe we tested your scenario, but you are always welcomed to contribute back your findings!

zwitbaum commented 4 years ago

If I change the DOTNET_WORKER_DIR on my local windows machine, the app writes in cmd, e.g.: "Using the environment variable to construct .NET worker path: /usr/bin/Microsoft.Spark.Worker\Microsoft.Spark.Worker.exe", what IMO makes no sense. If you mean the environment variable should be set on the cluster by the the db-init.sh or install-worker.sh instead, please describe how to extend one of these scripts. Currently, we use a default version of the scripts described on https://docs.microsoft.com/en-ie/dotnet/spark/tutorials/databricks-deployment.

I will try to describe the whole desired development and deployment process later, so you can better understand how we want and need to use the .net for apache spark in our software development process, in order to develop our application locally in the VS by using the remote azure databricks.

imback82 commented 4 years ago

After setting the env variable, does /databricks/spark/python/pyspark/worker.py still get launched? The worker path gets inserted into UDF on the driver side if the env is set, otherwise, it will look for the binary under PATH. See this function https://github.com/dotnet/spark/blob/5e9c08b430b4bc56b5f42252c4b73437377afaed/src/csharp/Microsoft.Spark/Services/ConfigurationService.cs#L58

If worker.py is still getting launched, there could be a missing link. @zwitbaum can you share the exact repro steps so we can try on our side?

@elvaliuliuliu / @Niharikadutta does any one of you want to take this up?

elvaliuliuliu commented 4 years ago

@imback82 I will follow up on this.

zwitbaum commented 4 years ago

@imback82 If I set the DOTNET_WORKER_DIR to the azure worker path on my local dev pc, e.g. "setx DOTNET_WORKER_DIR /usr/bin" (or /usr/local/bin, or /usr/bin/Microsoft.Spark.Worker) the /databricks/spark/python/pyspark/worker.py still get launched. Must the DOTNET_WORKER_DIR be set on the azure databricks cluster instead? If yes, please advice how to do it in the init-script mentioned above.

elvaliuliuliu commented 4 years ago

Can you try set up DOTNET_WORKER_DIR to the local path of Microsoft.Spark.Worker? Please make sure path is set correctly and Microsoft.Spark.Worker.exe is in the path.

imback82 commented 4 years ago

@zwitbaum before moving furhther, can you please provide the exact command to submit your spark application to azure databricks?

zwitbaum commented 4 years ago

@elvaliuliuliu : the DOTNET_WORKER_DIR was already set to the local path of Microsoft.Spark.Worker before I have posted the bug (setx DOTNET_WORKER_DIR "C:\bin\Microsoft.Spark.Worker-0.7.0"). Unfortunately, it does not help to solve the problem.

@imback82 : please take a look at description. The .net app is not submitted to the azure databricks. It is strated on the local dev PC, but uses the data from the remote azure databricks using the connection configured by "databricks-connect configure". The submit command is as described above %SPARK_HOME%\bin\spark-submit --class org.apache.spark.deploy.dotnet.DotnetRunner --master local microsoft-spark-2.4.x-0.7.0.jar dotnet TestApp.dll. As I already mentioned, the app works well with remote azure databricks as long as it does not use any UDFs. We don't want to start / deploy the app on the azure databricks during development (or debugging), because of time consumption. We want to start it locally and work against the big data on remote cluster instead. Usually a developer doe not have a big data on his local machine :)

zwitbaum commented 4 years ago

Once again to describe our requirements for a .net spark software development: grafik We want to develop our .net spark application on a local dev PC with Visual Studio and use all IDE benefits like debugger, source control, static code checker, refactoring, etc. It means, we must "hit F5" and run the app from the visual studio. Because typically we don't have the necessary spark "big data" on the local machine, we need to connect to the remote spark and use it from (as well as to test queries). As soon as the development iteration done the app is deployed to the cluster and will run there. As I already mentioned most of our dev scenarios can already be done by now except if we need to use an UDF. In this case is a huge blocker for us, because we can only test our code after deploying to the cluster. We have tested both alternatives: place an UDF in the executing assembly or move it to a separate (referenced) assembly. Both are failed by now.

imback82 commented 4 years ago

Can't you do the following?

  1. Develop/debug your application locally. We have docs how to debug your application: https://github.com/dotnet/spark/blob/master/docs/developer-guide.md
  2. Once you are ready to deploy your app, follow this https://github.com/dotnet/spark/blob/master/deployment/README.md#databricks.

Again, can you share the exact command you are using to submit so we can get a better idea how you are doing this? If you want to submit a job from outside databricks, you can follow this: https://docs.databricks.com/dev-tools/api/latest/jobs.html#jobssparksubmittask

cc: @suhsteve

zwitbaum commented 4 years ago

@imback82 : we develop and debug our application on the windows local machine exactly as described in the link you've provided: %SPARK_HOME%\bin\spark-submit --class org.apache.spark.deploy.dotnet.DotnetRunner --master local microsoft-spark-2.4.x-0.7.0.jar debug

Because we want to query the data from the azure databricks we do following:

  1. Let's imagine a following query:

    var spark = SparkSession.Builder().AppName(nameof(ActiveDevices)).GetOrCreate();
    var dataFrame = spark.Table("bigdata.table1").Select("testData");
    dataFrame.Show();

    Because the data is stored not on the local machine, but in azure databricks instead, we need to configure a connection to this databricks.

  2. We have configured the connection to the azure databricks using the "databricks-connect configure" and tested it by the "databricks-connect test" like described here: https://docs.microsoft.com/en-us/azure/databricks/dev-tools/databricks-connect It requires, that the spark home is set properly: setx SPARK_HOME "c:\users\zwitbaum\appdata\local\programs\python\python37\lib\site-packages\pyspark"

  3. Now we can run our .net app: -- open command prompt and start %SPARK_HOME%\bin\spark-submit ... debug like described above -- in visual studio set a breakpoint e.g. on the first line in the main method and hit the F5. -- the app runs properly and shows the queried data.

  4. Now let's extend the application with a UDF, like

    
    spark.Sql("SELECT T1 FROM VALUES ('one'), ('two'), ('three') as (T1)").CreateOrReplaceTempView("Test");
    var sqlDf = spark.Sql("SELECT T1 from Test");
    sqlDf.Show();

spark.Udf().Register<string, string>("my_udf", UserDefinedFunctions.Udf.ToUpper); sqlDf = spark.Sql("SELECT , my_udf() FROM Test"); sqlDf.Show();

Start the app in the visual studio again by hitting the F5. The app failed with exception, which you can find in the description of the issue.

That are the steps you can do to reproduce the error easily.

What we have additionally tested before to submit the bug:

- Check, whether the same app can run properly **with a local windows spark: yes, it works**
-- Install spark on the local windows machine (like described https://docs.microsoft.com/en-ie/dotnet/spark/tutorials/get-started)
-- switch to the local spark: setx SPARK_HOME C:\bin\spark-2.4.1-bin-hadoop2.7\
-- open command prompt and start %SPARK_HOME%\bin\spark-submit ... debug like described above
-- comment the lines, which are relates to remote azure databricks:

//var dataFrame = spark.Table("bigdata.table1").Select("testData"); //dataFrame.Show();


Start the app in the visual studio again by hitting the F5.  The app runs properly and shows the queried data. UDF works well converting the values to upper.

- Check, whether a **similar app with a UDF written in python** can run locally using the data from the remote azure databricks:  **yes, it works**
-- switch to the remote databricks connection by `setx SPARK_HOME "c:\users\zwitbaum\appdata\local\programs\python\python37\lib\site-packages\pyspark"` and testing it with "databricks-connect test"
-- run the python script locally: it works properly

I hope this information can be useful for you to investigate the bug.
imback82 commented 4 years ago

Thanks @zwitbaum! @elvaliuliuliu will investigate this.

elvaliuliuliu commented 4 years ago

@zwitbaum We have been investigating on this scenario. I can reproduce the error you mentioned. Looks like there are some architecture differences which cause SparkDotnet worker is not invoked but PySpark worker. We may need more time investigating this but I am on top of this. Thanks for your patience and feel free to let me know if you have any new findings/updates!

Details of my repro, settings are as below:

set SPARK_HOME=c:\users\ruinliu\appdata\local\continuum\anaconda3\lib\site-packages\pyspark
set DOTNET_WORKER_DIR=/usr/local/bin/spark-dotnet/Microsoft.Spark.Worker-0.8.0/ # Remote SparkDotnet worker path on DBx cluster

And I query the data table stored on the cluster and run an UDF on it. I got the same error as you have. The logical plan from the submission are as below which SparkDotnet worker is passed correctly but not invoked.

env_vars {
  key: "DOTNET_WORKER_SPARK_VERSION"
  nullableValue: "2.4.3"
}
env_vars {
  key: "DOTNET_ASSEMBLY_SEARCH_PATHS"
  nullableValue: "C:\\Users\\<user name>\\AppData\\Local\\Temp\\spark-57c30aba-9b9e-4529-8bc3-de075ff4c76b\\userFiles-260a2a14-346e-434f-9401-650b5add8d30"
}
python_exec: "/usr/local/bin/spark-dotnet/Microsoft.Spark.Worker-0.8.0/Microsoft.Spark.Worker.exe"
python_ver: "0.4.0"
elvaliuliuliu commented 4 years ago

@zwitbaum: Just FYI, we have opened a ticket with Azure Databricks, this is currently being investigated. I will update you once I hear back from them.

elvaliuliuliu commented 4 years ago

@zwitbaum : We had a discussion with Databricks today and will get back to you once there are any updates from their side.

alexoss68 commented 4 years ago

Another way around is using the compatible python version in dotNet app so it matches spark cluster. How does driver write UDF and send them to spark cluster? It could be a bug at driver side

elvaliuliuliu commented 4 years ago

Which compatible python version you mean here? For example, error shows the python version "0.4.0" mismatches, this version is from Microsoft.Spark.Worker.exe. So looks like the logic plan is correct, but when sending it to driver in the cluster through databricks-connect, the Microsoft.Spark.Worker.exe is overrided by worker.py.

alexoss68 commented 4 years ago

Are you sure that spark cluster runs Microsoft.Spark.Worker.exe? Why Microsoft.Spark.Worker.exe uses python version "0.4.0" which is quite old.

mllab-nl commented 4 years ago

Might be relates with this one: https://github.com/dotnet/spark/issues/494

zwitbaum commented 3 years ago

Any progress in the version 2.0?

selvavm commented 2 years ago

Hi May I know if this is fixed?

zwitbaum commented 2 years ago

My last test indicated that the issue is still not fixed.

selvavm commented 2 years ago

I came to update that I was able to make this work with UDF and Databricks. Moreover, I was able to do debugging in Visual studio as well. However, it is not practical because I will have to copy the dlls available in Debug folder to Databricks cluster each time I make a code change in udf. This impacts productivity :(.

Update: In the case of Python, I can edit the udf anytime and get it executed in Databricks. It is productive but my personal opinion is dotnet is better at productivity because of its tooling (visual studio, unit testing, nugets, and so on).