springml / spark-sftp

Spark connector for SFTP
Apache License 2.0
100 stars 98 forks source link

Null Pointer Exception When Writing csv to FTP in Databricks #59

Closed Rs111 closed 5 years ago

Rs111 commented 5 years ago

I am getting a NPE when attempting to write a csv file to an FTP via Databricks. This is a Databricks-specific problem; the exact same code block below works great when Spark is run locally via Intellij. Note: I have tried running the code with the default temp folder locations, as well inputting custom folder locations (such as the ones commented out).

After logging, I believe the culprit is the copiedFile function. Specifically, I believe this value is a null: baseTemp.listFiles().

df
  .write
  .format("com.springml.spark.sftp")
  .option("host", ftpHost)
  .option("username", ftpUserName)
  .option("password", ftpPassword)
  .option("fileType", "csv")
//  .option("tempLocation", "dbfs:/FileStore/ftp_temp")
//  .option("hdfsTempLocation", hdfsLocation)
  .save(ftpBasePath + "testFile4.csv")

java.lang.NullPointerException
at scala.collection.mutable.ArrayOps$ofRef$.newBuilder$extension(ArrayOps.scala:190)
    at scala.collection.mutable.ArrayOps$ofRef.newBuilder(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:246)
    at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
    at scala.collection.mutable.ArrayOps$ofRef.filter(ArrayOps.scala:186)
    at com.springml.spark.sftp.DefaultSource.copiedFile(DefaultSource.scala:276)
    at com.springml.spark.sftp.DefaultSource.writeToTemp(DefaultSource.scala:264)
    at com.springml.spark.sftp.DefaultSource.createRelation(DefaultSource.scala:130)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:72)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:88)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:150)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:138)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:190)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:187)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:108)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:108)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:683)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:683)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:89)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:175)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:84)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:126)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:683)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:287)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:281)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
    at line6709443c8c4144f29eee46954436a0e9104.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(command-1878019:23)
    at line6709443c8c4144f29eee46954436a0e9104.$read$$iw$$iw$$iw$$iw$$iw.<init>(command-1878019:79)
    at line6709443c8c4144f29eee46954436a0e9104.$read$$iw$$iw$$iw$$iw.<init>(command-1878019:81)
    at line6709443c8c4144f29eee46954436a0e9104.$read$$iw$$iw$$iw.<init>(command-1878019:83)
    at line6709443c8c4144f29eee46954436a0e9104.$read$$iw$$iw.<init>(command-1878019:85)
    at line6709443c8c4144f29eee46954436a0e9104.$read$$iw.<init>(command-1878019:87)
    at line6709443c8c4144f29eee46954436a0e9104.$read.<init>(command-1878019:89)
    at line6709443c8c4144f29eee46954436a0e9104.$read$.<init>(command-1878019:93)
    at line6709443c8c4144f29eee46954436a0e9104.$read$.<clinit>(command-1878019)
    at line6709443c8c4144f29eee46954436a0e9104.$eval$.$print$lzycompute(<notebook>:7)
    at line6709443c8c4144f29eee46954436a0e9104.$eval$.$print(<notebook>:6)
    at line6709443c8c4144f29eee46954436a0e9104.$eval.$print(<notebook>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
    at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)
    at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
    at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
    at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
    at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:199)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply$mcV$sp(ScalaDriverLocal.scala:189)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:189)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:189)
    at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:493)
    at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:448)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:189)
    at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$3.apply(DriverLocal.scala:248)
    at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$3.apply(DriverLocal.scala:228)
    at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:188)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:183)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:40)
    at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:221)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:40)
    at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:228)
    at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:595)
    at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:595)
    at scala.util.Try$.apply(Try.scala:192)
    at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:590)
    at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:474)
    at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:548)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:380)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:327)
    at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:215)
    at java.lang.Thread.run(Thread.java:748)
mi7flat5 commented 5 years ago

This looks like an issue with your templocation

On Thu, Mar 21, 2019, 8:00 PM Radu Stoian notifications@github.com wrote:

I am getting a NPE when attempting to write a csv file to an FTP via Databricks. This is a Databricks-specific problem; the exact same code block below works great when Spark is run locally via Intellij. Note, I have tried running with the default temp folder location, as well using inputting folder locations (such as the ones commented out).

After logging, I believe the culprit is the copiedFile function. Specifically, I believe this value is a null: baseTemp.listFiles().

df .write .format("com.springml.spark.sftp") .option("host", ftpHost) .option("username", ftpUserName) .option("password", ftpPassword) .option("fileType", "csv") // .option("tempLocation", "dbfs:/FileStore/ftp_temp") // .option("hdfsTempLocation", hdfsLocation) .save(ftpBasePath + "testFile4.csv")

java.lang.NullPointerException at scala.collection.mutable.ArrayOps$ofRef$.newBuilder$extension(ArrayOps.scala:190) at scala.collection.mutable.ArrayOps$ofRef.newBuilder(ArrayOps.scala:186) at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:246) at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259) at scala.collection.mutable.ArrayOps$ofRef.filter(ArrayOps.scala:186) at com.springml.spark.sftp.DefaultSource.copiedFile(DefaultSource.scala:276) at com.springml.spark.sftp.DefaultSource.writeToTemp(DefaultSource.scala:264) at com.springml.spark.sftp.DefaultSource.createRelation(DefaultSource.scala:130) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:72) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:88) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:150) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:138) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:190) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:187) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:108) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:108) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:683) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:683) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:89) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:175) at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:84) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:126) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:683) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:287) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:281) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229) at line6709443c8c4144f29eee46954436a0e9104.$read$$iw$$iw$$iw$$iw$$iw$$iw.(command-1878019:23) at line6709443c8c4144f29eee46954436a0e9104.$read$$iw$$iw$$iw$$iw$$iw.(command-1878019:79) at line6709443c8c4144f29eee46954436a0e9104.$read$$iw$$iw$$iw$$iw.(command-1878019:81) at line6709443c8c4144f29eee46954436a0e9104.$read$$iw$$iw$$iw.(command-1878019:83) at line6709443c8c4144f29eee46954436a0e9104.$read$$iw$$iw.(command-1878019:85) at line6709443c8c4144f29eee46954436a0e9104.$read$$iw.(command-1878019:87) at line6709443c8c4144f29eee46954436a0e9104.$read.(command-1878019:89) at line6709443c8c4144f29eee46954436a0e9104.$read$.(command-1878019:93) at line6709443c8c4144f29eee46954436a0e9104.$read$.(command-1878019) at line6709443c8c4144f29eee46954436a0e9104.$eval$.$print$lzycompute(:7) at line6709443c8c4144f29eee46954436a0e9104.$eval$.$print(:6) at line6709443c8c4144f29eee46954436a0e9104.$eval.$print() at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786) at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047) at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638) at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637) at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31) at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19) at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565) at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:199) at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply$mcV$sp(ScalaDriverLocal.scala:189) at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:189) at com.databricks.backend.daemon.driver.ScalaDriverLocal$$anonfun$repl$1.apply(ScalaDriverLocal.scala:189) at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:493) at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:448) at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:189) at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$3.apply(DriverLocal.scala:248) at com.databricks.backend.daemon.driver.DriverLocal$$anonfun$execute$3.apply(DriverLocal.scala:228) at com.databricks.logging.UsageLogging$$anonfun$withAttributionContext$1.apply(UsageLogging.scala:188) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at com.databricks.logging.UsageLogging$class.withAttributionContext(UsageLogging.scala:183) at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:40) at com.databricks.logging.UsageLogging$class.withAttributionTags(UsageLogging.scala:221) at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:40) at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:228) at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:595) at com.databricks.backend.daemon.driver.DriverWrapper$$anonfun$tryExecutingCommand$2.apply(DriverWrapper.scala:595) at scala.util.Try$.apply(Try.scala:192) at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:590) at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:474) at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:548) at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:380) at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:327) at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:215) at java.lang.Thread.run(Thread.java:748)

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/springml/spark-sftp/issues/59, or mute the thread https://github.com/notifications/unsubscribe-auth/AIJoYmM4-yg4E8itu-EY_Ya8ve3PEO-Vks5vZB0fgaJpZM4cCl9y .

Rs111 commented 5 years ago

Yup; java.io.File is seemingly no good for Databricks file paths (including the templocation).

Rs111 commented 5 years ago

Note: closed by accident and re-opened!

ankushreddy commented 5 years ago

do we have work around for this issue? Thanks.

samuel-pt commented 5 years ago

@ankushreddy - Only workaround is to specify hdfs location as temp.

sshah90 commented 5 years ago

It looks like in databricks, whenever write operation triggered it saves committed and started file which is not filtered out in copiedFile function that's why even you give tempLocation explicitly it's not able to pick up a csv file.

Following Changes, I made and at least it able to pick up CSV file.

 private def copiedFile(tempFileLocation: String) : String = {
    val baseTemp = new File(tempFileLocation)
    val files = baseTemp.listFiles().filter { x =>
      (!x.isDirectory()
        && !x.getName.contains("SUCCESS")
        && !x.isHidden()
        && !x.getName.contains(".crc")
        && !x.getName.contains("_committed_")
        && !x.getName.contains("_started_")
       )
        }
    files(0).getAbsolutePath
  }

copiedFile("/dbfs/mnt/sandbox/work_smit/spark_sftp_connection_temp_1a145402-35c5-4581-a936-8c39247f5c98/")

Output : res28: String = /dbfs/mnt/sandbox/work_smit/spark_sftp_connection_temp_1a145402-35c5-4581-a936-8c39247f5c98/part-00000-tid-2716892591428142706-9b9c9e5c-0eca-430a-a08d-d638ba1f4dc9-167-c000.csv

I haven't tested end to end (source to SFTP) with these changes in databricks.

samuel-pt commented 5 years ago

@sshah90 - Can you create a PR with your changes?

sshah90 commented 5 years ago

@samuel-pt Done.

Please review.

allen-healthdirect commented 5 years ago

I have the same issue (Spark 2.4 on Databricks) and am eager to see this fixed. Please advise if you need help with testing.

samuel-pt commented 5 years ago

@sshah90 Merged the changes. Thanks for your PR. @allen-healthdirect Can you test the fix in your Databricks environment? Please let us know the result. Once the fix is confirmed, I'll publish the changes to maven repository and spark-packages.org

ahnugent commented 5 years ago

@samuel-pt Will do, but can't happen before Monday. Will advise.

allen-healthdirect commented 5 years ago

I managed to sit down with an engineer and try this today. We got this error: java.lang.NoClassDefFoundError: com/springml/sftp/client/SFTPClient

We don't know if this is an issue with the source code or a defect in our building of the jar.

I invoked the method as follows: outDF.write \ .format("com.springml.spark.sftp") \ .option("host", host) \ .option("username", username) \ .option("password", password) \ .option("fileType", "csv") \ .option("delimiter", ",") \ .option("header", "true") \ .option("codec", "bzip2") \ .save(path)

Here is the Databricks output: Databricks error output.txt

Please advise if you need to see logs.

sshah90 commented 5 years ago

it's actually weird because I am having same issue same as you and I thought it's a networking issue.

By the way in databricks, you need to explicitly mention temp location path like below:

df_final.write
  .format("com.springml.spark.sftp")
  .option("tempLocation", "/dbfs/mnt/sandbox/work_smit/")
  .option("hdfsTempLocation", "/mnt/sandbox/work_smit/")
  .option("host", "sftp.com")
  .option("username", "username")
  .option("password", "password")
  .option("fileType", "csv")
  .option("delimiter", "|") 
  .save("/incoming/test_2019.csv")

@allen-healthdirect

allen-healthdirect commented 5 years ago

@sshah90 , @samuel-pt : Looks like I forgot to flag you when I posted 5 days ago - sorry. Waiting for instructions.

samuel-pt commented 5 years ago

@allen-healthdirect - How are importing the jar in to databricks. I hope you are building assembly jar and add the assembly jar in to databricks. Please let me know

sshah90 commented 5 years ago

I created a package and tested in data bricks by pushing files on SFTP location. These changes are working fine.

@samuel-pt please publish these changes to maven.

samuel-pt commented 5 years ago

@sshah90 - What changes you did? Please create a PR with your changes. I'll push the changes into maven once the new code is verified

shashankupadhye7 commented 5 years ago

Hi @samuel-pt -

I am trying to access my SFTP server from a scala script. For that I deployed the jar file "spark-sftp_2.11-1.1.3.jar" in the library location.

After deploying when we follow the instructions in the link (https://github.com/springml/spark-sftp) to access the FTP using a scala script. However, I also got the error which is listed above:

java.lang.NoClassDefFoundError: com/springml/sftp/client/SFTPClient

But as I was going through this discussion, I see some issue is being fixed @sshah90 . Would this fix address the issue we are facing too?

Thanks for your help.

Shashank Upadhye

sshah90 commented 5 years ago

@sshah90 my PR already approved and merged to master.

Please check this comment. https://github.com/springml/spark-sftp/issues/59#issuecomment-486198998

We are waiting to test this change in Databricks and I did that.

samuel-pt commented 5 years ago

@shashankupadhye7 - No, it is different. Please make sure you used the uber jar in your environment

shashankupadhye7 commented 5 years ago

Thanks @samuel-pt ..

Can you please provide more information on this? Any documentation surrounding that?

Also, the link (https://github.com/springml/spark-sftp) is not relevant in that case?

Please suggest. Thanks,

samuel-pt commented 5 years ago

@shashankupadhye7 - Please create a new issue for NoClassDefFoundError

samuel-pt commented 5 years ago

@sshah90 - Latest changes are pushed into maven repo. https://mvnrepository.com/artifact/com.springml/spark-sftp_2.11/1.1.5

fabian-fuentealba commented 4 years ago

the problem still exits ava.lang.ArrayIndexOutOfBoundsException: 0 even with the 1.15 Im using k8s

fabian-fuentealba commented 4 years ago

and if you use option("tempLocation", "file:/tmp"). \ the error is : java.lang.NullPointerException

mashroomxl commented 3 years ago

the root cause is here:

// DefaultSource.scala
private def copyFromHdfs(sqlContext: SQLContext, hdfsTemp : String,
                           fileLocation : String): String  = {
    val hadoopConf = sqlContext.sparkContext.hadoopConfiguration
    val hdfsPath = new Path(hdfsTemp)
    val fs = hdfsPath.getFileSystem(hadoopConf)
    // root cause
    if ("hdfs".equalsIgnoreCase(fs.getScheme)) {
      fs.copyToLocalFile(new Path(hdfsTemp), new Path(fileLocation))
      fs.deleteOnExit(new Path(hdfsTemp))
      return fileLocation
    } else {
      return hdfsTemp
    }
  }

  private def copiedFile(tempFileLocation: String) : String = {
    val baseTemp = new File(tempFileLocation)
    // NPE encountered here, because listFiles returns sole element of array, i.e. [null] for a non-existence folder.
    val files = baseTemp.listFiles().filter { x =>
      (!x.isDirectory()
        && !x.getName.contains("SUCCESS")
        && !x.isHidden()
        && !x.getName.contains(".crc")
        && !x.getName.contains("_committed_")
        && !x.getName.contains("_started_")
        )
    }
    files(0).getAbsolutePath
  }

if scheme of the fs not start with "hdfs", it just return the hdfsTemp location, but the temp file on local machine does not exist acutually, encounter the NPE accordingly.

solution 1: set hdfsTempLocation option with a value qualified by hdfs scheme solution 2: overwrite the DefaultSource, ensure fs.copyToLocalFile to be invoked when using HDFS.