springml / spark-sftp

Spark connector for SFTP
Apache License 2.0
99 stars 96 forks source link

Algorithm negotiation fail saving csv to SFTP from Azure Databricks #75

Open AndyAldersley opened 4 years ago

AndyAldersley commented 4 years ago

I'm receiving the above error when trying to write a csv file to SFTP from Azure Databricks.

Code snippet: df.write. format("com.springml.spark.sftp"). option("host", "sftp_host"). option("username", "username"). option("password", "pwd"). option("fileType", "csv"). option("tempLocation", "/dbfs/mnt/sandbox/test/"). option("hdfsTempLocation", "/mnt/sandbox/test/"). save("/test.csv")

Which is producing the following error log: com.jcraft.jsch.JSchException: Algorithm negotiation fail at com.jcraft.jsch.Session.receive_kexinit(Session.java:582) at com.jcraft.jsch.Session.connect(Session.java:320) at com.jcraft.jsch.Session.connect(Session.java:183) at com.springml.sftp.client.SFTPClient.createSFTPChannel(SFTPClient.java:275) at com.springml.sftp.client.SFTPClient.copyToFTP(SFTPClient.java:102) at com.springml.spark.sftp.DefaultSource.upload(DefaultSource.scala:160) at com.springml.spark.sftp.DefaultSource.createRelation(DefaultSource.scala:126) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:147) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:135) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:188) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:184) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:135) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:118) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:116) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:112) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:241) at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:98) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:171) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:710) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:306) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:292) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:235) at line8d8b6a0390294c468569f4d9bc480baa67.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-2234296783096833:17) at line8d8b6a0390294c468569f4d9bc480baa67.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-2234296783096833:88) at line8d8b6a0390294c468569f4d9bc480baa67.$read$$iw$$iw$$iw$$iw.<init>(command-2234296783096833:90) at line8d8b6a0390294c468569f4d9bc480baa67.$read$$iw$$iw$$iw.<init>(command-2234296783096833:92) at line8d8b6a0390294c468569f4d9bc480baa67.$read$$iw$$iw.<init>(command-2234296783096833:94) at line8d8b6a0390294c468569f4d9bc480baa67.$read$$iw.<init>(command-2234296783096833:96) at line8d8b6a0390294c468569f4d9bc480baa67.$read.<init>(command-2234296783096833:98) at line8d8b6a0390294c468569f4d9bc480baa67.$read$.<init>(command-2234296783096833:102) at line8d8b6a0390294c468569f4d9bc480baa67.$read$.<clinit>(command-2234296783096833) at line8d8b6a0390294c468569f4d9bc480baa67.$eval$.$print$lzycompute(<notebook>:7) at line8d8b6a0390294c468569f4d9bc480baa67.$eval$.$print(<notebook>:6) at line8d8b6a0390294c468569f4d9bc480baa67.$eval.$print(<notebook>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:793) at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1054) at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:645) at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:644) at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31) at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19) at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:644) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:576) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:572) at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:215) at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply$mcV$sp(ScalaDriverLocal.scala:202) at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:202) at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:202) at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:699) at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:652) at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:202) at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:385) at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$9.apply(DriverLocal.scala:362) at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:251) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:246) at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:49) at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:288) at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:49) at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:362) at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644) at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644) at scala.util.Try$.apply(Try.scala:192) at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:639) at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:485) at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:597) at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:390) at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:337) at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:219) at java.lang.Thread.run(Thread.java:748)

I followed previous instructions to add the temp and hdfs location options to the write function. Without these I get a NullPointerException. Is this error a bug, or can someone advise? It appears I am using spark-sftp version 1.1.3 as available through the Databricks package installer link to Maven.

nonsignificantp commented 4 years ago

I've got the same error, tried it on spark local and works flawlessly so I'm guessing is an error on how databricks manage tmps. Has anybody found a workaround?

nonsignificantp commented 4 years ago

An update from the previous message: this just have work for me:

 val df = spark.read.format("com.springml.spark.sftp")
  .option("host", "HOST")
  .option("username", "USER")
  .option("password", "PASS")
  .option("tempLocation", "/dbfs/tmp/")
  .option("hdfsTempLocation", "/dbfs/dbfs/tmp/")
  .option("fileType", "csv")
  .option("createDF", "true")
  .load(FILEPATH")

Using Apache Spark 2.4.3, Scala 2.11 and com.springml:spark-sftp_2.11:1.1.3 on Azure Databricks

EDIT: Ok, after many hours trying different solutions and reading the source code and tracebacks I found out this trick to be a solution for now. before using it make sure to create dbfs:/dbfs/tmp using:

dbutils.fs.mkdirs("dbfs:/dbfs/tmp/")

Main problem seems to be how databricks manages tmp, by creating a folder called /dbfs/ you patch the issue, but have in mind that this IS NOT a real tmp folder, so you probably should delete the content manually using dbutils at the end of the application.

eyalshafran commented 4 years ago

@nonsignificantp - I created a tmp folder and modified my code as you suggested (just with write instead of read) and I still got an error.

My code:

data.write.
      format("com.springml.spark.sftp").
      option("host", HOST).
      option("username", USER).
      option("pem", PEM_PATH).
      option("tempLocation", "/dbfs/tmp/").
      option("hdfsTempLocation", "/dbfs/dbfs/tmp/").
      option("fileType", "csv").
      option("createDF", "true").
      save("test.csv")

EDIT: I'm using Apache Spark 2.4.5, Scala 2.11, com.springml:spark-sftp_2.11:1.1.5 on AWS Databricks

alexandrugrigore commented 4 years ago

I had the same problem with a custom library, but using the same sftp used here com.jcraft.jsch. The problem is that on the executors, the static block in com.jcraft.jsch.JSch is not initialized correctly, so the static java.util.Hashtable config=new java.util.Hashtable(); needs to be populated on each executor. On top of that, the classes needed by some encryption algorithms are not available on executors, so the jar needs to be passed to spark-submit using --jars and --conf spark.executor.extraClassPath=

ezra-at-lumedic commented 3 years ago

Hi @alexandrugrigore could you share further details. I'm getting the same error and think it has the same cause : missing encryption algorithms on the spark nodes. Tried the temp location fixes suggested by @nonsignificantp but that didn't resolve the issue. Thx!

ashitabh commented 1 year ago

I had the same problem with a custom library, but using the same sftp used here com.jcraft.jsch. The problem is that on the executors, the static block in com.jcraft.jsch.JSch is not initialized correctly, so the static java.util.Hashtable config=new java.util.Hashtable(); needs to be populated on each executor. On top of that, the classes needed by some encryption algorithms are not available on executors, so the jar needs to be passed to spark-submit using --jars and --conf spark.executor.extraClassPath=

@alexandrugrigore Please let me know which additional jars I need to add

nova-jj commented 6 months ago

January 2024 and this is still an issue within Azure Databricks.

vijaythamalla commented 5 months ago

did anyone have a solution that works?

nova-jj commented 5 months ago

did anyone have a solution that works?

I've given up on this library once realizing it was only compatible with Spark v2 and our Databricks clusters are now using Spark 3.3+. I don't expect this to ever work moving forward for newer spark projects unless SpringML picks it up again or someone forks and updates it.

vijaythamalla commented 5 months ago

did anyone have a solution that works?

I've given up on this library once realizing it was only compatible with Spark v2 and our Databricks clusters are now using Spark 3.3+. I don't expect this to ever work moving forward for newer spark projects unless SpringML picks it up again or someone forks and updates it.

Yeah, That's so sad. This is the only library I could find loading the data directly into a dataframe. I was able to use pysftp but hoped this could work. What other library did you choose to sftp from databricks?

nova-jj commented 5 months ago

What other library did you choose to sftp from databricks?

We didn't. Ended up building an ADF Pipeline to sync from our SFTP Storage Container into our data lake. There's a big functionality gap here in my opinion, from the Databricks-SFTP-connectivity piece. Not really DB's problem, but an easy value add if they recognized how much batch interchange still takes place int he world.