oconnor663 / duct.rs

a Rust library for running child processes
MIT License
795 stars 34 forks source link

Live streaming output #69

Closed spl closed 4 years ago

spl commented 5 years ago

Would it be possible to implement something like tail -f on the output of running a command? Currently, I believe the entire stdout and stderr output are buffered until the command completes. For long-running commands that incrementally produce output, it would be useful to see the output as it is produced.

I understand this may be too complex and/or beyond the scope of duct, but I think it is a useful feature to consider.

spl commented 5 years ago

Possibly related:

oconnor663 commented 5 years ago

One thing you can do is grab a pipe() from os_pipe, and pass its write end in as the stdout to your child. That's also what duct is doing for you under the covers to set up pipelines and things. But I've had several people ask about it, so maybe at least I should call it out more strongly in the documentation. What do you think?

One of the downsides of telling people to do this, is that there are several steps. You have to 1) spawn the process the right way, 2) read output in a loop, and then 3) await the child to avoid leaving a zombie. I suppose we could imagine an API that somehow automatically awaits the child after reading EOF? I would probably want to come up with two or three specific use cases to help think about designing an API. Another factor here is that the std::process::Command API already adequately handles some of the most obvious use cases, like spawning a single child and reading its output as a stream. Some of my intuition before was that, if you need enough manual control over your pipes to care about this, then it might be that duct gets in your way more than it helps. But yeah, it's a big broad question, and I'd love to hear more of your thoughts.

spl commented 5 years ago

I'm just getting started with Rust, so I'm not familiar with the APIs and I don't follow everything you're saying. But, just to put something concrete down, here's a simple shell script that produces output incrementally with delay:

#!/bin/bash
for c in A B C D E F G H I J K L M N O P Q R S T U V W X Y Z; do
  echo $c
  sleep 1
done

At the simplest, I think some form of blocking iterators for stdout and stderr (akin to this example with BufReader?) would work. At first glance, the nice thing about BufReader for shell output is the lines iterator.

spl commented 5 years ago

Just to clarify for my own understanding, when you say “read output in a loop,” is this (read_one_byte) an example of what you mean?

fn read_one_byte(reader: &mut dyn Read) -> Option<Result<u8>> {
    let mut buf = [0];
    loop {
        return match reader.read(&mut buf) {
            Ok(0) => None,
            Ok(..) => Some(Ok(buf[0])),
            Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
            Err(e) => Some(Err(e)),
        };
    }
}
oconnor663 commented 5 years ago

Here's a couple examples of what I was thinking. Tell me how close this is to what you were looking for:

extern crate duct;
extern crate os_pipe;

use duct::cmd;
use std::error::Error;
use std::io::prelude::*;
use std::io::BufReader;
use std::process::{Command, Stdio};

const BASH_PROG: &str = "
for c in A B C; do
  echo $c
  sleep 1
done
";

fn main() -> Result<(), Box<dyn Error>> {
    // We're going to read each line of the output above, and print it out with
    // a smiley face added on. First, let's do it using duct.
    let (pipe_reader, pipe_writer) = os_pipe::pipe()?;
    let child = cmd!("bash", "-c", BASH_PROG)
        .stdout_handle(pipe_writer)
        .start()?;
    // Note that because the duct::Expression object above is a temporary, the
    // pipe writer it's holding has dropped by this point. Otherwise we'd need
    // to manually drop it, to avoid blocking the read loop forever.
    println!("duct");
    for line in BufReader::new(pipe_reader).lines() {
        println!("{} :)", line?);
    }
    child.wait()?;

    // Now do the same thing with std::process::Command.
    let mut child = Command::new("bash")
        .arg("-c")
        .arg(BASH_PROG)
        .stdout(Stdio::piped())
        .spawn()?;
    println!("std::process");
    for line in BufReader::new(child.stdout.take().ok_or("stdout missing")?).lines() {
        println!("{} (:", line?);
    }
    // NOTE: I forgot this line until @spl reminded me below. Editing it in
    // here in case anyone copy-pastes this code.
    child.wait()?;

    Ok(())
}
spl commented 5 years ago

Thank you for kindly writing out the different approaches. I now have a better understanding of what you were saying (and a bit better understanding of the duct and std::process APIs). It does indeed look like it's relatively easy to do with either approach.

Can you help me understand why you use child.wait() for the duct approach and not for the std::process approach?

Given all this, nothing occurs to me on how to extend the duct API to cover use cases like the above in general.

oconnor663 commented 5 years ago

Can you help me understand why you use child.wait() for the duct approach and not for the std::process approach?

Lol, that's a bug! I totally should've used wait in both cases, and my second example leaves a zombie child that will turn into a resource leak if the parent lives long enough for it to matter. Kind of highlights how easy it is to get this stuff wrong without a wrapper library :p (I edited the code above with your correction.)

spl commented 5 years ago

Lol, that's a bug!

Okay, whew! 😌 I was really struggling to figure out what the difference was.

Just for speculation, let's identify the components needed for the example as written (https://github.com/oconnor663/duct.rs/issues/69#issuecomment-443269084). We'll use the duct approach to reduce the scope and because I think we'd want to take advantage of the cross-platform shell support. We can potentially generalize later. Here's a rough list:

  1. os_pipe::pipe() -> Result<(PipeReader, PipeWriter)>
  2. pipe_reader: PipeReader
  3. pipe_writer: PipeWriter
  4. duct::Expression
  5. duct::Expression::stdout_handle<T: IntoRawFd>(&self, handle: T) -> Expression
  6. duct::Expression::start(&self) -> Result<Handle>
  7. drop the duct::Expression
  8. BufReader::new(inner: R) -> BufReader<R>
  9. child: duct::Handle
  10. duct::Handle::wait(&self) -> Result<&Output>
  11. BufRead::lines(self) -> Lines<Self>

Now, what do we start with and what do we end with? I think it would be a good assumption that the first thing provided is an Expression. You already have Expression::read(&self) -> Result<String> as a wrapper around an presumably common pattern, so it follows that we could define something like Expression::read_other(&self) -> Result<?>, where ? is, at this point, unknown, for our purposes.

As for the end, I see the following options, with comments attached:

  1. Lines: This seems too limiting. I'm not sure it is reasonable to assume this is the most common usage.
  2. BufReader: This is more flexible than Lines.
  3. PipeReader: This appears to be the most flexible. An alternative to this could be to provide some kind of impl Read, which would allow hiding the low-level os_pipe details.

Supposing we return Result<...>, where ... is filled in with one of the above, I see at least two issues that need to be resolved.

First, there's the issue of dropping pipe_writer. I adapted your code to what I'm working on, and I actually keep a cmd: Expression for logging a debug message with it. So, unlike you, I have to call drop(cmd).

Second, there's the issue of calling child.wait() after reading from pipe_reader. If I understand it correctly, we wouldn't want to call child.wait() before reading because that would block and defeat the purpose of the whole exercise. One option would be to return the Handle and ask the programmer to do it in documentation. Another option might be to create an implementation of BufRead (or something like it) and wrap BufReader in such a way that child.wait() is called after all the input is consumed.

Anyway, these are just some rough thoughts coming from an ignorant person. Maybe you can make something coherent out of them.

spl commented 5 years ago

Another option might be to create an implementation of BufRead (or something like it) and wrap BufReader in such a way that child.wait() is called after all the input is consumed.

I suppose, actually, that this could just be an implementation of the Read trait. When the underlying read() returns Ok(0), then we wait().

oconnor663 commented 4 years ago

Apologies, somehow I dropped this thread on the ground in December.

I think we could define a new type, something like a ReaderHandle. It would implement Read, and internally it would hold a PipeReader and Handle. Once the underlying PipeReader returned EOF, the ReaderHandle would make a blocking call to Handle::wait before returning the EOF itself. This could satisfy the relatively common use case of wanting to read from a child process in a stream, without the footgun of forgetting to call wait after reading ad leaving a zombie (or the other footgun of mistakenly calling wait before reading and risking a deadlock). We could expose this type through a method like .reader(). I'll file a separate issue for it.

oconnor663 commented 4 years ago

I finally got around to implementing this. Let me know if you get a chance to try it.

https://github.com/oconnor663/duct.rs/commit/0e04a2742cdc97cded1b3bef542fac2295d08972