zio / zio-process

A simple ZIO library for interacting with external processes and command-line programs
https://zio.dev/zio-process
Apache License 2.0
64 stars 13 forks source link

Process.stdout.linesStream is not interruptible #425

Open haja opened 3 months ago

haja commented 3 months ago

Hey, if I run this the program hangs for the full 3 seconds of sleep 3, instead of failing after 1 second by the timeoutFail(..).

import zio.durationInt
import zio.stream.ZSink
import zio.Clock
import zio.Scope
import zio.ZIO
import zio.ZIOAppArgs
import zio.ZIOAppDefault

object ProcessBug extends ZIOAppDefault {

  override def run: ZIO[Any with ZIOAppArgs with Scope, Any, Any] =
    for {
      process <- ZIO.acquireRelease(
        ZIO.attempt(zio.process.Process(new ProcessBuilder("sleep", "3").start())).orDie
      )(process =>
        ZIO.debug("process cleanup") *>
          process.killTreeForcibly.orDie)

      _         <- ZIO.debug("sleep started")
      timestamp <- Clock.instant

      _ <- process.stdout.linesStream
        .run(ZSink.foreach(line => ZIO.debug(line)))
        .timeoutFail("stream timeout")(1.second)
        .tapError(_ =>
          for {
            afterTimeout <- Clock.instant
            _            <- ZIO.debug(s"time until error: ${afterTimeout.toEpochMilli - timestamp.toEpochMilli}ms")
          } yield ())
    } yield ()
}

sample output:

sleep started
time until error: 2971ms
process cleanup
timestamp=2024-07-31T09:10:09.799879109Z level=ERROR thread=#zio-fiber-1 message="" cause="Exception in thread "zio-fiber-11" java.lang.String: stream timeout
    at <empty>.ProcessBugWorkaround.run(ProcessBug.scala:53)
    at <empty>.ProcessBugWorkaround.run(ProcessBug.scala:52)"

The issue seems to be that zio.process.ProcessStream.stream (called by linesStream) uses ZStream.fromInputStreamZIO, which in turn opens the InputStream with ZIO.attemptBlockingIO, which is not interruptible.

reibitto commented 3 months ago

Yeah, that's definitely not intended. You're right that the cause is inside ZStream.fromInputStream. We'll likely have to essentially take that implementation and then change the ZIO.attemptBlockingIO to ZIO.attemptBlockingCancelable in order to support proper interruption for the stream combinators.

I'll see if I can fix this over the weekend and cut a new release. There's been a bunch of changes for Scala.js and Scala Native support since I last did anything significant on this project so it looks like I have to clean up some stuff before I can do a full release.

TobiasPfeifer commented 2 months ago

@reibitto I think this constructor should be fixed or provided by zio-streams instead of reimplement a variant of ZStream.fromInputStream in zio-process. I've created an issue for zio-streams zio/zio#9084 But feel free to implement a workaround if you can fix it before a new version of zio-streams gets released

reibitto commented 2 months ago

Sorry, Scala.js and Scala Native were giving me problems but I created the PR and should merge and release soon: #430

I agree it would be nice to get https://github.com/zio/zio/issues/9084 in, but I fixed the issue locally in the meantime. Actually I cancel the process explicitly because attemptBlockingCancelable has less overhead than attemptBlockingInterrupt.