zio / zio-ftp

A simple, idiomatic (S)FTP client for ZIO
https://zio.dev/zio-ftp
Apache License 2.0
29 stars 13 forks source link

SFtp.upload error #253

Closed Fristi closed 2 years ago

Fristi commented 2 years ago

On ZIO 1

Svedi export exited with failure* : Traced(Die(net.schmizz.sshj.sftp.SFTPException: EOF while reading packet),ZTrace(Id(1656419394410,4343),List(SourceLocation(ZManaged.scala,zio.ZManaged$,fromAutoCloseable,1876), SourceLocation(ZIO.scala,zio.ZIO$,effectSuspendTotal,2837), SourceLocation(ZIO.scala,zio.ZIO,bracket_,291), SourceLocation(FiberContext.scala,zio.internal.FiberContext,evaluateNow,559), SourceLocation(ZIO.scala,zio.ZIO,zipWith,2294), SourceLocation(ZIO.scala,zio.ZIO$,foreach,3028), SourceLocation(ZIO.scala,zio.ZIO,flatten,700), SourceLocation(ZRef.scala,zio.ZRef$Atomic,modify,215), SourceLocation(ZIO.scala,zio.ZIO$,effectSuspendTotal,2837), SourceLocation(ZManaged.scala,zio.ZManaged,use,1088), SourceLocation(ZIO.scala,zio.ZIO,run,1764), SourceLocation(FiberContext.scala,zio.internal.FiberContext$InterruptExit$,apply,153), SourceLocation(FiberContext.scala,zio.internal.FiberContext$InterruptExit$,apply,160), SourceLocation(FiberContext.scala,zio.internal.FiberContext$InterruptExit$,apply,153), SourceLocation(ZIO.scala,zio.ZIO$,effectSuspendTotal,2837), SourceLocation(ZIO.scala,zio.ZIO$,bracketExit,2437), SourceLocation(ZIO.scala,zio.ZIO$,effectSuspendTotal,2837), SourceLocation(ZManaged.scala,zio.ZManaged$ReleaseMap$$anon$1,releaseAll,1410), SourceLocation(ZIO.scala,zio.ZIO$,foreach,3030), SourceLocation(ZIO.scala,zio.ZIO$,foreach,3029), SourceLocation(ZIO.scala,zio.ZIO,run,1764), SourceLocation(FiberContext.scala,zio.internal.FiberContext$InterruptExit$,apply,160), SourceLocation(FiberContext.scala,zio.internal.FiberContext$InterruptExit$,apply,153), SourceLocation(ZIO.scala,zio.ZIO$,effectSuspendTotal,2837), SourceLocation(ZIO.scala,zio.ZIO$,bracketExit,2437), SourceLocation(FiberContext.scala,zio.internal.FiberContext,evaluateNow,563), SourceLocation(ZIO.scala,zio.ZIO$,effectSuspendTotal,2837), SourceLocation(ZIO.scala,zio.ZIO,bracket_,291), SourceLocation(ZIO.scala,zio.ZIO,run,1764), SourceLocation(FiberContext.scala,zio.internal.FiberContext$InterruptExit$,apply,160), SourceLocation(FiberContext.scala,zio.internal.FiberContext$InterruptExit$,apply,153), SourceLocation(ZManaged.scala,zio.ZManaged$,finalizerRef,1689), SourceLocation(ZRef.scala,zio.ZRef$Atomic,get,203), SourceLocation(ZIO.scala,zio.ZIO$,effectSuspendTotal,2837), SourceLocation(ZIO.scala,zio.ZIO,bracket_,291), SourceLocation(FiberContext.scala,zio.internal.FiberContext,evaluateNow,559), SourceLocation(ZIO.scala,zio.ZIO,zipWith,2294), SourceLocation(ZIO.scala,zio.ZIO$,foreach,3028), SourceLocation(ZIO.scala,zio.ZIO,flatten,700), SourceLocation(ZRef.scala,zio.ZRef$Atomic,modify,215), SourceLocation(ZIO.scala,zio.ZIO$,effectSuspendTotal,2837), SourceLocation(ZManaged.scala,zio.ZManaged,use,1088), SourceLocation(ZIO.scala,zio.ZIO,run,1764), SourceLocation(FiberContext.scala,zio.internal.FiberContext$InterruptExit$,apply,153), SourceLocation(ZManaged.scala,zio.ZManaged,use,1089), SourceLocation(FiberContext.scala,zio.internal.FiberContext$InterruptExit$,apply,160), SourceLocation(FiberContext.scala,zio.internal.FiberContext$InterruptExit$,apply,153), SourceLocation(ZIO.scala,zio.ZIO$,effectSuspendTotal,2837), SourceLocation(ZIO.scala,zio.ZIO$,bracketExit,2437), SourceLocation(FiberContext.scala,zio.internal.FiberContext,evaluateNow,563), SourceLocation(ZIO.scala,zio.ZIO$,effectSuspendTotal,2837), SourceLocation(ZIO.scala,zio.ZIO,bracket_,291), SourceLocation(ZIO.scala,zio.ZIO,run,1764), SourceLocation(FiberContext.scala,zio.internal.FiberContext$InterruptExit$,apply,153), SourceLocation(FiberContext.scala,zio.internal.FiberContext$InterruptExit$,apply,160), SourceLocation(FiberContext.scala,zio.internal.FiberContext$InterruptExit$,apply,153), SourceLocation(ZIO.scala,zio.ZIO$,effectSuspendTotal,2837), SourceLocation(ZIO.scala,zio.ZIO$,bracketExit,2437), SourceLocation(FiberContext.scala,zio.internal.FiberContext,evaluateNow,563), SourceLocation(ZIO.scala,zio.ZIO$,effectSuspendTotal,2837), SourceLocation(ZIO.scala,zio.ZIO,bracket_,291), SourceLocation(ZIO.scala,zio.ZIO,run,1764), SourceLocation(FiberContext.scala,zio.internal.FiberContext$InterruptExit$,apply,153), SourceLocation(ZManaged.scala,zio.ZManaged,mapM,517), SourceLocation(ZStream.scala,zio.stream.ZStream,runManaged,2653), SourceLocation(ZSink.scala,zio.stream.ZSink,dropLeftover,505), SourceLocation(ZSink.scala,zio.stream.ZSink$,foldChunksM,712), SourceLocation(ZRef.scala,zio.ZRef$Atomic,get,203), SourceLocation(ZStream.scala,zio.stream.ZStream,runManaged,2649), SourceLocation(ZIO.scala,zio.ZIO,flatten,700), SourceLocation(ZRef.scala,zio.ZRef$Atomic,modify,215), SourceLocation(ZIO.scala,zio.ZIO,flatten,700), SourceLocation(ZRef.scala,zio.ZRef$Atomic,modify,215), SourceLocation(ZStream.scala,zio.stream.ZStream,flatMap,1424), SourceLocation(ZIO.scala,zio.ZIO$,effectSuspendTotal,2837), SourceLocation(ZManaged.scala,zio.ZManaged$ReleaseMap$$anon$1,releaseAll,1410), SourceLocation(ZIO.scala,zio.ZIO$,foreach,3030), SourceLocation(ZIO.scala,zio.ZIO$,foreach,3029), SourceLocation(ZIO.scala,zio.ZIO,run,1764), SourceLocation(FiberContext.scala,zio.internal.FiberContext$InterruptExit$,apply,160), SourceLocation(FiberContext.scala,zio.internal.FiberContext$InterruptExit$,apply,153), SourceLocation(ZIO.scala,zio.ZIO$,effectSuspendTotal,2837), SourceLocation(ZIO.scala,zio.ZIO$,bracketExit,2437), SourceLocation(FiberContext.scala,zio.internal.FiberContext,evaluateNow,563), SourceLocation(ZIO.scala,zio.ZIO$,effectSuspendTotal,2837), SourceLocation(ZIO.scala,zio.ZIO,bracket_,291), SourceLocation(ZIO.scala,zio.ZIO,run,1764), SourceLocation(FiberContext.scala,zio.internal.FiberContext$InterruptExit$,apply,153), SourceLocation(ZManaged.scala,zio.ZManaged$,finalizerRef,1689), SourceLocation(ZRef.scala,zio.ZRef$Atomic,get,203), SourceLocation(ZIO.scala,zio.ZIO$,effectSuspendTotal,2837), SourceLocation(ZIO.scala,zio.ZIO,bracket_,291), SourceLocation(FiberContext.scala,zio.internal.FiberContext,evaluateNow,559), SourceLocation(ZIO.scala,zio.ZIO,zipWith,2294), SourceLocation(ZIO.scala,zio.ZIO$,foreach,3028), SourceLocation(ZIO.scala,zio.ZIO,flatten,700), SourceLocation(ZRef.scala,zio.ZRef$Atomic,modify,215), SourceLocation(ZStream.scala,zio.stream.ZStream,flatMap,1394), SourceLocation(ZRef.scala,zio.ZRef$Atomic,getAndSet,206), SourceLocation(ZStream.scala,zio.stream.ZStream,flatMap,1418)),List(SourceLocation(ZIO.scala,zio.ZIO,run,1764), SourceLocation(ZIO.scala,zio.ZIO,bracket_,291), SourceLocation(ZIO.scala,zio.ZIO,run,1764), SourceLocation(ZIO.scala,zio.ZIO$,foreach,3029), SourceLocation(ZIO.scala,zio.ZIO$,foreach,3030), SourceLocation(ZManaged.scala,zio.ZManaged$ReleaseMap$$anon$1,releaseAll,1410), SourceLocation(ZIO.scala,zio.ZIO$,bracketExit,2437), SourceLocation(SecureFtp.scala,zio.ftp.SecureFtp,upload,109), SourceLocation(SecureFtp.scala,zio.ftp.SecureFtp,upload,109), SourceLocation(SvediExporter.scala,io.dhlparcel.inventory.tasks.SvediExporter$,action,58), SourceLocation(SvediExporter.scala,io.dhlparcel.inventory.tasks.SvediExporter$,doExport,73), SourceLocation(ZIO.scala,zio.ZIO,run,1764), SourceLocation(ZIO.scala,zio.ZIO,bracket_,291), SourceLocation(ZIO.scala,zio.ZIO,run,1764), SourceLocation(ZManaged.scala,zio.ZManaged,use,1088), SourceLocation(ZIO.scala,zio.ZIO,retryOrElseEither,1704), SourceLocation(ZIO.scala,zio.ZIO,retryOrElseEither,1706), SourceLocation(ZIO.scala,zio.ZIO,retryOrElse,1690)),Some(ZTrace(Id(1656419394277,4336),List(SourceLocation(ServerInterpreter.scala,sttp.tapir.server.interpreter.ServerInterpreter$ResultOrValue,flatMap,230), SourceLocation(ServerInterpreter.scala,sttp.tapir.server.interpreter.ServerInterpreter$ResultOrValue,flatMap,230), SourceLocation(ServerInterpreter.scala,sttp.tapir.server.interpreter.ServerInterpreter$ResultOrValueFrom,apply,248), SourceLocation(ServerInterpreter.scala,sttp.tapir.server.interpreter.ServerInterpreter$ResultOrValue,flatMap,230), SourceLocation(ServerInterpreter.scala,sttp.tapir.server.interpreter.ServerInterpreter$$anonfun$$nestedInanonfun$tryServerEndpoint$6$1,apply,64), SourceLocation(ServerInterpreter.scala,sttp.tapir.server.interpreter.ServerInterpreter,tryServerEndpoint,108), SourceLocation(ServerInterpreter.scala,sttp.tapir.server.interpreter.ServerInterpreter$ResultOrValue,flatMap,230), SourceLocation(ServerInterpreter.scala,sttp.tapir.server.interpreter.ServerInterpreter$ResultOrValue,map,236), SourceLocation(ServerInterpreter.scala,sttp.tapir.server.interpreter.ServerInterpreter$ResultOrValue,flatMap,230), SourceLocation(ServerInterpreter.scala,sttp.tapir.server.interpreter.ServerInterpreter$ResultOrValue,flatMap,230)),List(SourceLocation(InternalRoutes.scala,io.dhlparcel.inventory.api.routes.InternalRoutes$,exportSVEDIRoute,45), SourceLocation(ZIO.scala,zio.ZIO$,zio$ZIO$$_succeedRight,4495), SourceLocation(ZIO.scala,zio.ZIO,unrefineWith,2183), SourceLocation(ServerInterpreter.scala,sttp.tapir.server.interpreter.ServerInterpreter$$anon$2,onDecodeSuccess,195), SourceLocation(ServerLogInterceptor.scala,sttp.tapir.server.interceptor.log.ServerLogInterceptor$$anon$1,onDecodeSuccess,19), SourceLocation(ServerLogInterceptor.scala,sttp.tapir.server.interceptor.log.ServerLogInterceptor$$anon$1$$anonfun$onDecodeSuccess$6,apply,12), SourceLocation(ExceptionInterceptor.scala,sttp.tapir.server.interceptor.exception.ExceptionInterceptor$$anon$1$$anonfun$onDecodeSuccess$2,apply,16), SourceLocation(ServerInterpreter.scala,sttp.tapir.server.interpreter.ServerInterpreter,tryServerEndpoint,130), SourceLocation(ServerInterpreter.scala,sttp.tapir.server.interpreter.ServerInterpreter$ResultOrValueFrom,value,265), SourceLocation(ServerInterpreter.scala,sttp.tapir.server.interpreter.ServerInterpreter$ResultOrValue,map,236)),Some(ZTrace(Id(1656419394277,4335),List(),List(SourceLocation(GenSpawn.scala,cats.effect.kernel.GenSpawn,race,321), SourceLocation(Http1ServerStage.scala,org.http4s.blaze.server.Http1ServerStage,raceTimeout,391), SourceLocation(http4s.scala,io.dhlparcel.seed.http4s.package$Http4s$$anon$1$$anonfun$apply$1,apply,96), SourceLocation(Http1ServerStage.scala,org.http4s.blaze.server.Http1ServerStage$$anon$2,run,209), SourceLocation(ZIO.scala,zio.ZIO$,zio$ZIO$$_succeedRight,4495), SourceLocation(Http1ServerStage.scala,org.http4s.blaze.server.Http1ServerStage$$anon$2,run,211), SourceLocation(Dispatcher.scala,cats.effect.std.Dispatcher$$anon$2,unsafeToFutureCancelable,193), SourceLocation(ApplicativeError.scala,cats.ApplicativeError,onError,241), SourceLocation(cats.scala,zio.interop.ZioMonadError,as,389), SourceLocation(ZIO.scala,zio.ZIO,ensuring,591)),Some(ZTrace(Id(1656419285093,95),List(SourceLocation(Supervisor.scala,cats.effect.std.Supervisor$$anon$1,supervise,135), SourceLocation(ZioAsync.scala,zio.interop.ZioAsync,unique,21), SourceLocation(Supervisor.scala,cats.effect.std.Supervisor$$anon$1,supervise,134), SourceLocation(cats.scala,zio.interop.ZioConcurrent,ref,209), SourceLocation(ZRef.scala,zio.ZRef$,make,756), SourceLocation(Dispatcher.scala,cats.effect.std.Dispatcher$,apply,162), SourceLocation(Dispatcher.scala,cats.effect.std.Dispatcher$,apply,148), SourceLocation(Dispatcher.scala,cats.effect.std.Dispatcher$,apply,145), SourceLocation(Dispatcher.scala,cats.effect.std.Dispatcher$,apply,145), SourceLocation(cats.scala,zio.interop.ZioMonadError,tailRecM,413)),List(SourceLocation(cats.scala,zio.interop.ZioConcurrent,start,215), SourceLocation(Supervisor.scala,cats.effect.std.Supervisor$$anon$1,supervise,138), SourceLocation(Dispatcher.scala,cats.effect.std.Dispatcher$,apply,175), SourceLocation(cats.scala,zio.interop.ZioMonadError,as,389), SourceLocation(cats.scala,zio.interop.ZioMonadError,as,389), SourceLocation(cats.scala,zio.interop.ZioMonadError,tailRecM,413)),Some(ZTrace(Id(1656419285092,94),List(SourceLocation(ZIO.scala,zio.ZIO$,effectSuspendTotal,2837), SourceLocation(ZIO.scala,<unknown>,<unknown>,0), SourceLocation(Resource.scala,cats.effect.kernel.Resource,allocatedCase,400), SourceLocation(Resource.scala,cats.effect.kernel.Resource$,apply,681), SourceLocation(Resource.scala,cats.effect.kernel.Resource$,make,755), SourceLocation(Dispatcher.scala,cats.effect.std.Dispatcher$,apply,138), SourceLocation(Resource.scala,cats.effect.kernel.Resource,allocatedCase,440), SourceLocation(ZioAsync.scala,zio.interop.ZioAsync,executionContext,18), SourceLocation(ZIO.scala,zio.ZIO$,executor,2879), SourceLocation(Resource.scala,cats.effect.kernel.Resource,allocatedCase,440)),List(SourceLocation(cats.scala,zio.interop.ZioConcurrent,start,215), SourceLocation(Resource.scala,cats.effect.kernel.Resource$,make,755), SourceLocation(Resource.scala,cats.effect.kernel.Resource$,apply,681), SourceLocation(Resource.scala,cats.effect.kernel.Resource,allocatedCase,400), SourceLocation(ZIO.scala,zio.ZIO,run,1764), SourceLocation(ZIO.scala,zio.ZIO,onExit,1053), SourceLocation(ApplicativeError.scala,cats.ApplicativeError,onError,241), SourceLocation(ZIO.scala,zio.ZIO,run,1764), SourceLocation(ZIO.scala,zio.ZIO,onExit,1053), SourceLocation(ApplicativeError.scala,cats.ApplicativeError,onError,241)),None))))))))))

Should not be overwrite, since our filename is based on a time stamp

regis-leray commented 2 years ago

Not sure it is related to file permissions.

Can you copy paste the code your are using to call the upload method . What is the source of data your are uploading ?

Fristi commented 2 years ago

It turns out it's an fs2 stream converted to a zio stream. Converted that code to see if it helps

Also checked our old code, which used fs2 to create sftp client. Same underlying sshj lib:

def write(filename: String): Pipe[F, Byte, Unit] =
          stream =>
            fs2.Stream
              .resource(remoteFile(sftpClient, filename, NonEmptyList.of(OpenMode.WRITE, OpenMode.CREAT, OpenMode.TRUNC)))
              .flatMap(file => io.writeOutputStream[F](Async[F].delay(new file.RemoteFileOutputStream), closeAfterUse = true).apply(limitChunkSize(stream)))

def remoteFile[F[_]: Async](sftpClient: SFTPClient, filename: String, modes: NonEmptyList[OpenMode]): Resource[F, RemoteFile] =
    Resource.fromAutoCloseable {
      Async[F].delay(sftpClient.open(filename, java.util.EnumSet.of(modes.head, modes.tail: _*)))
    }

We use different flags here and we don't override the RemoteFile.

I think it has to do with the fs2 stream conversion, will get back on that

Fristi commented 2 years ago

Issue was in the conversion from a fs2 stream to a zio stream. I've converted cormorant fs2 to a zio kantan.csv version and that works 🎉

Closing this, but leaving #251 open