akka / alpakka

Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/libraries/alpakka/current/
Other
1.26k stars 643 forks source link

S/FTP sink materializes a successful IOResult on move/delete even if result is permission denied #1381

Closed UrosLF closed 4 years ago

UrosLF commented 5 years ago

Hi all,

I saw a similar issue here which lead to creating this.

Snippets to move / delete a file are:

  private def moveSftpFile(sftpFile: FtpFile): Future[Try[Unit]] = {
    Source
      .single(sftpFile)
      .runWith(Sftp.move(processedFileName, sftpSettings))
      .map { ioResult ⇒
        if (ioResult.wasSuccessful) {
          logger.info(s"Successfully moved the file [$ioResult]")
          Success(())
        } else {
          logger.error(
            s"Failed while moving remote file [${sftpFile.name}]",
            ioResult.getError
          )
          Failure(ioResult.getError)
        }
      }
      .recover {
        case NonFatal(err) ⇒
          logger.error(
            s"Failed while moving remote file [${sftpFile.name}]",
            err
          )

          Failure(err)
      }
  }

  private def deleteSftpFile(sftpFile: FtpFile): Future[Try[Unit]] = {
    Source
      .single(sftpFile)
      .runWith(Sftp.remove(sftpSettings))
      .map { ioResult ⇒
        if (ioResult.wasSuccessful) {
          logger.info(s"Successfully deleted the file [$ioResult]")
          Success(())
        } else {
          logger.error(
            s"Failed while deleting remote file [${sftpFile.name}]",
            ioResult.getError
          )
          Failure(ioResult.getError)
        }
      }
      .recover {
        case NonFatal(err) ⇒
          logger.error(
            s"Failed while deleting remote file [${sftpFile.name}]",
            err
          )
          Failure(err)
      }
  }

Those are fed from the result of listFiles:

  private def listRemoteFiles(path: String): Future[List[FtpFile]] = {
    Sftp
      .ls(path, sftpSettings)
      .runWith(Sink.collection[FtpFile, List[FtpFile]])
      .recover {
        case NonFatal(err) ⇒
          logger.error(s"Failed while listing remote files", err)
          List.empty
      }
  }

To reproduce:

Result I get:

INFO  Moving file [test1]
INFO  Successfully moved the file [IOResult(1,Success(Done))]
ERROR [sftp-spec-akka.actor.default-dispatcher-3] [akka://sftp-spec/system/StreamSupervisor-0/flow-5-0-unnamed] Error in stage [akka.stream.alpakka.ftp.impl.FtpSourceFactory$$anon$4@52a5e5c7]: Permission denied
net.schmizz.sshj.sftp.SFTPException: Permission denied
    at net.schmizz.sshj.sftp.Response.error(Response.java:137)
    at net.schmizz.sshj.sftp.Response.ensureStatusIs(Response.java:130)
    at net.schmizz.sshj.sftp.Response.ensureStatusPacketIsOK(Response.java:122)
    at net.schmizz.sshj.sftp.SFTPEngine.rename(SFTPEngine.java:231)
    at net.schmizz.sshj.sftp.SFTPClient.rename(SFTPClient.java:120)
    at akka.stream.alpakka.ftp.impl.SftpOperations$class.move(SftpOperations.scala:147)
    at akka.stream.alpakka.ftp.impl.FtpLike$$anon$3.move(FtpLike.scala:50)
    at akka.stream.alpakka.ftp.impl.FtpLike$$anon$3.move(FtpLike.scala:50)
    at akka.stream.alpakka.ftp.impl.FtpMoveSink$$anon$3$$anon$7.onPush(FtpIOGraphStage.scala:251)
    at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:519)
    at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:411)
    at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:588)
    at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:558)
    at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:679)
    at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:727)
    at akka.actor.Actor$class.aroundPreStart(Actor.scala:528)
    at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:670)
    at akka.actor.ActorCell.create(ActorCell.scala:652)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:523)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:545)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:283)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
INFO  Deleting file [test1]
INFO  Successfully deleted the file [IOResult(1,Success(Done))]
ERROR [12/18/2018 00:51:41.880] [sftp-spec-akka.actor.default-dispatcher-3] [akka://sftp-spec/system/StreamSupervisor-0/flow-5-0-unnamed] Error in stage [akka.stream.alpakka.ftp.impl.FtpSourceFactory$$anon$5@7705045]: Permission denied
net.schmizz.sshj.sftp.SFTPException: Permission denied
    at net.schmizz.sshj.sftp.Response.error(Response.java:137)
    at net.schmizz.sshj.sftp.Response.ensureStatusIs(Response.java:130)
    at net.schmizz.sshj.sftp.Response.ensureStatusPacketIsOK(Response.java:122)
    at net.schmizz.sshj.sftp.SFTPEngine.remove(SFTPEngine.java:205)
    at net.schmizz.sshj.sftp.SFTPClient.rm(SFTPClient.java:125)
    at akka.stream.alpakka.ftp.impl.SftpOperations$class.remove(SftpOperations.scala:150)
    at akka.stream.alpakka.ftp.impl.FtpLike$$anon$3.remove(FtpLike.scala:50)
    at akka.stream.alpakka.ftp.impl.FtpLike$$anon$3.remove(FtpLike.scala:50)
    at akka.stream.alpakka.ftp.impl.FtpRemoveSink$$anon$4$$anon$8.onPush(FtpIOGraphStage.scala:290)
    at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:519)
    at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:411)
    at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:588)
    at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:558)
    at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:679)
    at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:727)
    at akka.actor.Actor$class.aroundPreStart(Actor.scala:528)
    at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:670)
    at akka.actor.ActorCell.create(ActorCell.scala:652)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:523)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:545)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:283)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

As you can see the Permission denied isn't a part of the ioResult and I believe it should be.

Scala version and ftp lib version:

scalaVersion := "2.11.12"
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-ftp" % "1.0-M1"

Let me know if you need any additional details

eshantandon commented 5 years ago

Facing the same issue.

2m commented 5 years ago

Completing the future with failed IOResult is missing in FtpMoveSink:

https://github.com/akka/alpakka/blob/9d2aafb058c39e6812c59cb0f41917f25d76e5c2/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpIOGraphStage.scala#L249-L252

and in FtpRemoveSink

https://github.com/akka/alpakka/blob/9d2aafb058c39e6812c59cb0f41917f25d76e5c2/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpIOGraphStage.scala#L289-L292

A call to matFailure should be added similarly as it is in FTPIoSinkStage and others:

https://github.com/akka/alpakka/blob/9d2aafb058c39e6812c59cb0f41917f25d76e5c2/ftp/src/main/scala/akka/stream/alpakka/ftp/impl/FtpIOGraphStage.scala#L163-L172

The more tricky part is going to be configuring ftp servers for the tescase where it tries to remove or move a file and get permission denied exception.

UrosLF commented 5 years ago

To simulate that behavior, I've found that changing the permission on the directory containing files to be moved / deleted such that write is prohibited works. When a file (or folder) is uploaded to SFTP it keeps its permissions so that can be used for tests.

tgambet commented 5 years ago

To get permission denied exception you could try moving or deleting .ssh/ or upload/ now that tests are run against docker.

ennru commented 4 years ago

This was fixed with #2111.