Closed zqhxuyuan closed 7 years ago
It seems the temp file is not created properly in your case. Can you try providing the tempLocation option? Please make sure to provide the folder that you have write access to it.
Something like,
scala> df.write.format("com.springml.spark.sftp").
| option("host", "xxx").
| option("port", "xxx").
| option("username", "xxx").
| option("password", "xx").
| option("fileType", "csv").
| option("tempLocation", "/home/user/temp").
| save("/sample.csv")
I also get the same error and tried passing option("tempLocation", "/home/user/temp") and that didn't help either.
In our spark cluster, the csv file was generated on HDFS, not local driver, so when upload file to ftp/sftp, there'll NPE. so my workaround is after save csv, invoke HDFS copyToLocalFile, and it works!
the code is in DefaultSource.scala @ writeToTemp
val conf = new org.apache.hadoop.conf.Configuration()
hdfs.copyToLocalFile(false, new Path(tempLocation + "/part-00000"), new Path(tempLocation), true)
tempLocation
I am testing this using a Jupyter notebook and tried all combinations of tempLocation, but nothing seems to work.
In my case the file is both in HDFS and at tempLocation. I also checked that the folder has the right permissions for write.
Here is what I am trying, my file is already in dataframe
df.write. format("com.springml.spark.sftp"). option("host", "<IP_ADDRESS>"). option("username", "<USERNAME>"). option("password", "<PASSWORD>"). option("fileType", "csv"). option("tempLocation", "/home/sshuser/"). save("/home/<sftp_user>/MYCSVFILE.csv")
Error I am getting
java.lang.NullPointerException at scala.collection.mutable.ArrayOps$ofRef$.newBuilder$extension(ArrayOps.scala:190) at scala.collection.mutable.ArrayOps$ofRef.newBuilder(ArrayOps.scala:186) at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:246) at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259) at scala.collection.mutable.ArrayOps$ofRef.filter(ArrayOps.scala:186) at com.springml.spark.sftp.DefaultSource.copiedFile(DefaultSource.scala:227) at com.springml.spark.sftp.DefaultSource.writeToTemp(DefaultSource.scala:207) at com.springml.spark.sftp.DefaultSource.createRelation(DefaultSource.scala:113) at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:457) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194) ... 47 elided
Team any update on this issue
If you run in cluster mode on cloudera cluster or something similar, the temp files save to HDFS as previous poster noted. Will there be an update adding copyToLocalFile? This way the library will copy to the edge node and then it can sftp out.
hdfs.copyToLocalFile(false, new Path(tempLocation + "/part-00000"), new Path(tempLocation), true)
This is fixed as suggested by @teweiliou . Thanks @teweiliou .
Changes are available as part of https://github.com/springml/spark-sftp/commit/3720389daba7e325b8dfcba1c176c4878b7eddf3
Please note that this fix is not yet pushed to maven repository. If you need this fix immediately, please build this package and include the assembly jar using --jars option
Will this work in Azure Data Bricks. I have the same error as above but with the latest code set. I have tried putting in a temp location but no joy.
java.lang.NullPointerException at scala.collection.mutable.ArrayOps$ofRef$.newBuilder$extension(ArrayOps.scala:190) at scala.collection.mutable.ArrayOps$ofRef.newBuilder(ArrayOps.scala:186) at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:246) at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259) at scala.collection.mutable.ArrayOps$ofRef.filter(ArrayOps.scala:186) at com.springml.spark.sftp.DefaultSource.copiedFile(DefaultSource.scala:282) at com.springml.spark.sftp.DefaultSource.writeToTemp(DefaultSource.scala:262) at com.springml.spark.sftp.DefaultSource.createRelation(DefaultSource.scala:124) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:72) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:88) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:150) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:138) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:190) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:187) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:108) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:108) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:683)
I am getting the exact error described when attempting to write a dataframe as a csv to an FTP via Databricks. I have also tried the possible solutions mentioned above.
java.lang.NullPointerException
at scala.collection.mutable.ArrayOps$ofRef$.newBuilder$extension(ArrayOps.scala:190)
at scala.collection.mutable.ArrayOps$ofRef.newBuilder(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:246)
at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
at scala.collection.mutable.ArrayOps$ofRef.filter(ArrayOps.scala:186)
at com.springml.spark.sftp.DefaultSource.copiedFile(DefaultSource.scala:276)
at com.springml.spark.sftp.DefaultSource.writeToTemp(DefaultSource.scala:264)
at com.springml.spark.sftp.DefaultSource.createRelation(DefaultSource.scala:130)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:72)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:88)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:150)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:138)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:190)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:187)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:108)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:108)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:683)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:683)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:89)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:175)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:84)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:126)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:683)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:287)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:281)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
at line8475d466735342bf944d916006977a09164.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1878019:22)
at line8475d466735342bf944d916006977a09164.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1878019:120)
at line8475d466735342bf944d916006977a09164.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1878019:122)
at line8475d466735342bf944d916006977a09164.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-1878019:124)
at line8475d466735342bf944d916006977a09164.$read$$iw$$iw$$iw$$iw.<init>(command-1878019:126)
at line8475d466735342bf944d916006977a09164.$read$$iw$$iw$$iw.<init>(command-1878019:128)
at line8475d466735342bf944d916006977a09164.$read$$iw$$iw.<init>(command-1878019:130)
at line8475d466735342bf944d916006977a09164.$read$$iw.<init>(command-1878019:132)
at line8475d466735342bf944d916006977a09164.$read.<init>(command-1878019:134)
at line8475d466735342bf944d916006977a09164.$read$.<init>(command-1878019:138)
at line8475d466735342bf944d916006977a09164.$read$.<clinit>(command-1878019)
at line8475d466735342bf944d916006977a09164.$eval$.$print$lzycompute(<notebook>:7)
at line8475d466735342bf944d916006977a09164.$eval$.$print(<notebook>:6)
at line8475d466735342bf944d916006977a09164.$eval.$print(<notebook>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)
at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)
at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:199)
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply$mcV$sp(ScalaDriverLocal.scala:189)
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:189)
at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:189)
at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:493)
at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:448)
at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:189)
at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$3.apply(DriverLocal.scala:248)
at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$3.apply(DriverLocal.scala:228)
at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:188)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:183)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:40)
at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:221)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:40)
at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:228)
at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:595)
at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:595)
at scala.util.Try$.apply(Try.scala:192)
at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:590)
at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:474)
at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:548)
at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:380)
at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:327)
at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:215)
at java.lang.Thread.run(Thread.java:748)
will this work in azure databricks.. I have the same error with the updated library: Py4JJavaError: An error occurred while calling o1606.save. : java.lang.NullPointerException at scala.collection.mutable.ArrayOps$ofRef$.newBuilder$extension(ArrayOps.scala:190) at scala.collection.mutable.ArrayOps$ofRef.newBuilder(ArrayOps.scala:186) at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:246) at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259) at scala.collection.mutable.ArrayOps$ofRef.filter(ArrayOps.scala:186) at com.springml.spark.sftp.DefaultSource.copiedFile(DefaultSource.scala:282) at com.springml.spark.sftp.DefaultSource.writeToTemp(DefaultSource.scala:262) at com.springml.spark.sftp.DefaultSource.createRelation(DefaultSource.scala:124) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:72) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:88) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:143) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:183) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:180) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:131) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:114) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:114) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:690) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:690) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:99) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:228) at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:85) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:158) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:690) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:284) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:232) at sun.reflect.GeneratedMethodAccessor1144.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380) at py4j.Gateway.invoke(Gateway.java:295) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:251) at java.lang.Thread.run(Thread.java:748)
@brynnborton, @Rs111, @TejashwiniK, @springml Did you find any solution yet?
@dibyenduc I raise a separate thread https://github.com/springml/spark-sftp/issues/43#issuecomment-442811210 a while back describing the problem but assume it hasn't been fixed yet.
I temporarily got a version working by downloading the code and creating my own version of the UploadFromBlobToSFTP function. Not ideal but worked.
def UploadFromBlobToSFTP(
blobPath: String,
sftpConnectionString : String,
sftpPath:String
) : Unit = {
val fileName = blobPath.split('/').last
val sftpProperties = sftp_connection_string_deconstructor(sftpConnectionString)
val host = sftpProperties("host").replace("ftps://", "")
val sftpClient = getSFTPClient(Some(sftpProperties("username")), Some(sftpProperties("password")), None, null, host, Some("22"), null, null)
dbutils.fs.cp("dbfs:" + blobPath,"file:/tmp/" + fileName)
sftpClient.copyToFTP("/tmp/" + fileName, sftpPath)
dbutils.fs.rm("file:/tmp/" + fileName,true)
}
I check the error line on source code:
https://github.com/springml/spark-sftp/blob/master/src/main/scala/com/springml/spark/sftp/DefaultSource.scala#L113
the code is
is there something about tmp file not being created?