springml / spark-sftp

Spark connector for SFTP
Apache License 2.0
100 stars 98 forks source link

Error when using in Azure DataBricks #43

Open brynnborton opened 5 years ago

brynnborton commented 5 years ago

I receive the below error when trying to use inside an Azure DataBricks notebook. After some investigation I believe the issue is when it copies the file to the local directory and then reads it again. It seems when writing you need to specify "file:" at the beginning of the path but when reading it you do not need the "file: part. I have managed to get it working using the SFTPClient directly, e.g. this works

`dbutils.fs.cp(partition_path,"file:/tmp/test3.csv")

sftpClient.copyToFTP("/tmp/test3.csv", "/")`

Hence I think to fix it I need another option called something like "writetemplocation"

java.lang.NullPointerException at scala.collection.mutable.ArrayOps$ofRef$.newBuilder$extension(ArrayOps.scala:190) at scala.collection.mutable.ArrayOps$ofRef.newBuilder(ArrayOps.scala:186) at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:246) at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259) at scala.collection.mutable.ArrayOps$ofRef.filter(ArrayOps.scala:186) at com.springml.spark.sftp.DefaultSource.copiedFile(DefaultSource.scala:282) at com.springml.spark.sftp.DefaultSource.writeToTemp(DefaultSource.scala:262) at com.springml.spark.sftp.DefaultSource.createRelation(DefaultSource.scala:124) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:72) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:88) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:150) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:138) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:190) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:187) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:108) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:108) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:683)

samuel-pt commented 5 years ago

@brynnborton - Will add this fix. thanks

oleksandr-yatsuk commented 5 years ago

Hello guys. I am getting the same error using the latest version (1.1.5) on Databricks. Is it really fixed now ?

dirceusemighinieleflow commented 4 years ago

I've made a fix for this, what's the best way to push it?

yuvapraveen commented 4 years ago

@brynnborton @dirceusemighinieleflow I am still running into NPE even after using 1.1.5 on Databricks.. any suggestions to resolve this please?

dircsem commented 4 years ago

@brynnborton @dirceusemighinieleflow I am still running into NPE even after using 1.1.5 on Databricks.. any suggestions to resolve this please?

Can you describe what have you done? What environment are you using, and which code are you using?

yuvapraveen commented 4 years ago

@dirceusemighini I am running this from databricks and trying to ftp a file. code below, also some forum says that hdfstemplocation should be used. help me understand how this connector is using hdfstemplocation and templocation. Can hdfstemplocation be dbfs:/ (databricks file system)?

%scala val df = spark.read.text("dbfs:/databricks-datasets/online_retail/data-001/data.csv") display(df) df.write. format("com.springml.spark.sftp"). option("host", "XXXXXX"). option("username", "XXXXX"). option("password", "XXXXXX"). option("fileType","csv"). option("tempLocation","dbfs:/databricks/test/"). save("XXXXXXXX")

Error below, at scala.collection.mutable.ArrayOps$ofRef$.newBuilder$extension(ArrayOps.scala:190) at scala.collection.mutable.ArrayOps$ofRef.newBuilder(ArrayOps.scala:186) at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:246) at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259) at scala.collection.mutable.ArrayOps$ofRef.filter(ArrayOps.scala:186) at com.springml.spark.sftp.DefaultSource.copiedFile(DefaultSource.scala:275) at com.springml.spark.sftp.DefaultSource.writeToTemp(DefaultSource.scala:264) at com.springml.spark.sftp.DefaultSource.createRelation(DefaultSource.scala:130) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:72) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:88) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:146) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:134) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:187) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:183) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:134) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:116) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:116) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:111) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:240) at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:97) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:170) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:710) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:306) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:292) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:235) at linef54d033b830e42cbbaacd889ae183bb139.$read$$iw$$iw$$iw$$iw$$iw$$iw.(command-3082876545442976:11) at linef54d033b830e42cbbaacd889ae183bb139.$read$$iw$$iw$$iw$$iw$$iw.(command-3082876545442976:58) at linef54d033b830e42cbbaacd889ae183bb139.$read$$iw$$iw$$iw$$iw.(command-3082876545442976:60) at linef54d033b830e42cbbaacd889ae183bb139.$read$$iw$$iw$$iw.(command-3082876545442976:62) at linef54d033b830e42cbbaacd889ae183bb139.$read$$iw$$iw.(command-3082876545442976:64) at linef54d033b830e42cbbaacd889ae183bb139.$read$$iw.(command-3082876545442976:66) at linef54d033b830e42cbbaacd889ae183bb139.$read.(command-3082876545442976:68) at linef54d033b830e42cbbaacd889ae183bb139.$read$.(command-3082876545442976:72) at linef54d033b830e42cbbaacd889ae183bb139.$read$.(command-3082876545442976) at linef54d033b830e42cbbaacd889ae183bb139.$eval$.$print$lzycompute(:7) at linef54d033b830e42cbbaacd889ae183bb139.$eval$.$print(:6) at linef54d033b830e42cbbaacd889ae183bb139.$eval.$print() at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:793) at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1054) at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:645) at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:644) at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31) at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19) at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:644) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:576) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:572) at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:215) at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply$mcV$sp(ScalaDriverLocal.scala:197) at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:197) at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:197) at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:685) at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:638) at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:197) at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$8.apply(DriverLocal.scala:373) at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$8.apply(DriverLocal.scala:350) at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:238) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:233) at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:48) at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:271) at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:48) at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:350) at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644) at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:644) at scala.util.Try$.apply(Try.scala:192) at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:639) at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:485) at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:597) at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:390) at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:337) at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:219) at java.lang.Thread.run(Thread.java:748)

dircsem commented 4 years ago

My main problem, if I recall, is that, azure databricks, mess the temp file location.

val tempFolder = "/dbfs/mount_point of/MYFOLDER/"
df.write.
            format("com.springml.spark.sftp").
            option("host", salesforceHost).
            option("username", user).
            option("password", pw).
            option("fileType", "csv").
            option("delimiter", ",").
            option("azuremountpoint", tempFolder).
            option("templocation", s"abfss://MYFOLDER"). 
            option("gen","gen2").
            save(s"SFTP_FOLDER.csv")

Temp folder, was the same as templocation, but azuremountpoint was the mount point of my gen2 temp folder. Using the jar, compiled from the code that I've commited here I could write to sftp, with the code showed above.