oconnor663 / duct.rs

a Rust library for running child processes
MIT License
795 stars 34 forks source link

How to set pipe buffer size? #85

Open Boscop opened 4 years ago

Boscop commented 4 years ago

How can I set the size of the pipe buffer with duct? I need to set a buffer of size 100 MB for reading video frames from a ffmpeg child process (streamingly, reading each frame before it terminates) through a pipe like pipe = sp.Popen(command, stdout = sp.PIPE, bufsize=10**8) but in Rust with duct :)

Is the pipe unbuffered by default and if I do BufReader::with_capacity(100_000_000, cmd!(...).reader()?), will that be equivalent to that python code?


And for writing video frames to another ffmpeg childprocess, how can I incrementally feed data to its stdin? (Using duct as well.) I don't see a .writer() method..

And how can I then close the stdin of the video-writing ffmpeg child-process to tell it that no more frames are incoming, so that it can finalize writing the video? (I don't want to forcefully kill it when my process terminates, because then it wouldn't be finalizing the video file.)

oconnor663 commented 4 years ago

Is the pipe unbuffered by default and if I do BufReader::with_capacity(100_000_000, cmd!(...).reader()?), will that be equivalent to that python code?

Yes, I think that's all correct. The ReaderHandle returned by reader() contains an unwrapped os_pipe::PipeReader, which represents an unbuffered OS pipe. If you want in-process buffering (as opposed to the OS's own pipe buffering), this would be the right way to do it.

And for writing video frames to another ffmpeg childprocess, how can I incrementally feed data to its stdin? (Using duct as well.) I don't see a .writer() method.. And how can I then close the stdin of the video-writing ffmpeg child-process to tell it that no more frames are incoming, so that it can finalize writing the video? (I don't want to forcefully kill it when my process terminates, because then it wouldn't be finalizing the video file.)

Right, there's no convenience wrapper for this right now. What you can do is call os_pipe::pipe() to create a reader/writer pair, and then pass the reader end to your child process using the stdin_file() method. You can then write to the writer end after the child is spawned, and when you close (that is, drop) the writer end, the child process will see EOF.

Note that this would also work with std::process::Command's stdin() method, if you didn't want to use the Stdio::piped() convenience wrapper. The standard library doesn't use os_pipe internally, but what happens on the inside is effectively the same.

Boscop commented 4 years ago

Thanks for the quick reply. I'll use os_pipe::pipe() then.

Btw, is it planned to add support for incremental writing in duct? (E.g. with a .writer() method returning a WriterHandle.) That would be nice :)

oconnor663 commented 4 years ago

Proooooobably not :) The main reason we have ReaderHandle is to save the caller from having to remember to wait() on the child after reading is done. In most API designs in this space, if the parent doesn't wait() on the child, a zombie process gets leaked. If the parent process is long-lived, this resource leak can be a serious problem. In duct, instead of that, we automatically wait() in ReaderHandle's destructor. And to prevent that wait from possibly blocking, we also kill() first. (Normally you'll drop the ReaderHandle after having read it to EOF, which will mean that the kill() is a no-op. But this can guarantee good cleanup in error cases that might come up before EOF.)

So given all that, if we also defined a WriterHandle type, we would have to consider how it interacts with all of those design goals. Unlike a reader reaching EOF, which is a pretty good (if not actually guaranteed) sign that the child has exited, a writer doesn't usually receive any indication of this kind.

Boscop commented 4 years ago

@oconnor663 Why can't dropping the writer be used as that indication? And writing when the child-process has already terminated, that would cause a io error.

Streaming into a child-process is a common use case so it would be nice if duct could support it as well. Please consider it :)

oconnor663 commented 4 years ago

Why can't dropping the writer be used as that indication?

Imagine a situation where you're writing something to the child (let's say some raw video), and the child is writing its output to disk (let's say the re-encoded video). Your write loop ends as soon as it writes the last byte to your in-process buffer. If you drop the writer at that point, your BufWriter's drop implementation will flush the in-process buffer to the OS pipe buffer before dropping the hypothetical contained duct::WriteHandle. However, at that point there's no guarantee that 1) the child has finished reading from the OS pipe buffer, or that 2) the child has finished writing its own output. The only indication the parent can receive about those things is waiting for the child to exit. So if WriteHandle is going to clean up the child, it's got to do a potentially long wait in its own destructor. Blocking in a destructor is really questionable. But we absolutely cannot kill the child to get rid of the wait, because the child be still working.

Boscop commented 4 years ago

Can't the handle to the child process be decoupled from the writer? So that there's a separate ChildHandle that can outlive the WriterHandle?

Boscop commented 4 years ago

@oconnor663 It works now but I'm getting a lot of dropped frames when reading from 2 webcams at the same time (separate ffmpeg child processes, even in separate threads). I tried different sizes for BufReader: framesize (w*h*3), framesize*4 and 100 MB like in that article: With all buffer sizes ffmpeg was dropping frames and the 100 MB buffer increased the latency unbearably (because it takes so long until it's full). I think the issue is, maybe the bufsize passed to sp.Popen is the for OS-level buffer? Ideally my pipe would have a large buffer (not blocking the decoder) but I could already read from it before it's full. ) The OS-level pipe buffer behaves like that, right? How can I set the size of that? :)

(So I could set it to like 5*frame_size so that it's still providing enough backpressure to cause ffmpeg to drop frames if my process processes the frames too slowly, because I don't want to receive old frames way later.)

oconnor663 commented 4 years ago

Can't the handle to the child process be decoupled from the writer? So that there's a separate ChildHandle that can outlive the WriterHandle?

Maybe, but at that point is it any different from passing in an os_pipe::PipeWriter? Other than being easier to find in the docs?

With all buffer sizes ffmpeg was dropping frames and the 100 MB buffer increased the latency unbearably (because it takes so long until it's full).

If latency is the main issue, are you sure you want buffering at all? Why not read/write directly from/to your pipes, so that data is always immediately available to the recipient? It sounds like you're working with large enough frames that system call overhead shouldn't be an issue.

If you're using a large read buffer to avoid blocking the writer on the other end, that's not how in-process read buffers work, as it sounds like you've noticed below. Your reader process isn't going to try to fill its buffer until you call read(), so the writer process will still tend to block when it fills up the OS buffer, 64 KiB by default on Linux. If promptly draining the pipe is important, you might want to have a dedicated thread reading it.

I think the issue is, maybe the bufsize passed to sp.Popen is the for OS-level buffer?

I don't think this is how it works. The Python docs say that bufsize is passed along to open(), which uses an in-process buffer.

It sounds like you're seeing different behavior between Python and Rust+duct+os_pipe, but I'm not clear on the exact differences. The Python version is working better? We're getting to the level of detail where it's hard for me to know what's going on without reading your code.

Ideally my pipe would have a large buffer (not blocking the decoder) but I could already read from it before it's full.) The OS-level pipe buffer behaves like that, right? How can I set the size of that?

You'd need to write OS-specific code using libc. On Linux, I think you want to search for the F_SETPIPE_SZ fcntl flag. On other OS's, I have no idea.

providing enough backpressure to cause ffmpeg to drop frames if my process processes the frames too slowly

This sounds much more complicated than any use case I've considered before. Will ffmpeg start dropping frames as soon as any pipe write would block? Or will it do some amount of internal buffering? If so, is that amount configurable? These seem like critical details, and I have no idea how they work.

In my mind (without any experience doing anything this complicated with video pipes), it seems like the best approach would be to have our broker process be as dumb as possible, and just read-write data as quickly as possible using dedicated threads to avoid introducing any extra latency. Then we could trust ffmpeg to handle the more complicated details around buffering and backpressure, because it's the Expert System that knows what the heck is going on. If we try to tune our buffering in the middle based on what we think ffmpeg is going to do in response, we're likely to get that wrong?

oconnor663 commented 4 years ago

By the way, what is your broker process's actual job? Is it just copying between pipes, or does it do some processing in the middle?

Boscop commented 4 years ago

Can't the handle to the child process be decoupled from the writer? So that there's > > a separate ChildHandle that can outlive the WriterHandle?

Maybe, but at that point is it any different from passing in an os_pipe::PipeWriter? Other than being easier to find in the docs?

But it seems to be only possible on Unix. I'm on Windows 8.1 btw. Could a duct Expression have a similar API (like stdin_file) to work with a PipeWriter on Windows in the future? :)


If latency is the main issue, are you sure you want buffering at all? Why not read/write directly from/to your pipes, so that data is always immediately available to the recipient? It sounds like you're working with large enough frames that system call overhead shouldn't be an issue. If you're using a large read buffer to avoid blocking the writer on the other end, that's not how in-process read buffers work, as it sounds like you've noticed below. Your reader process isn't going to try to fill its buffer until you call read(), so the writer process will still tend to block when it fills up the OS buffer, 64 KiB by default on Linux. If promptly draining the pipe is important, you might want to have a dedicated thread reading it.

Yeah, it seems the BufReader only introduces latency and doesn't help at all, I removed it and started a dedicated thread per webcam for reading ffmpeg's output, like this:

https://gist.github.com/Boscop/86c50317d407460fd5c2a8900fffbeec#file-ffmpeg_webcam_input-rs-L63-L75

The ffmpeg-calling thread sends each webcam frame over to the opengl thread where it is then written to a texture, because each webcam is rendered as a virtual screen (quad) in the scene. This works better than the serial/blocking/buffering version, I tested it with 4 webcams in parallel. (I'm also sending the frame Vecs back to the decoding thread to be reused, to avoid memory allocations but it doesn't seem to make much of a difference.) To minimize latency, I'm using a sync_channel of size 1, and the receiving thread uses the last frame (self.rx.try_iter().last()). Is there any theoretical reason why channel size 0 would be better? Or do you see any other opportunity to optimize this code? (I heard crossbeam channels are faster?)

When running this with 2 webcams, it works most of the time, sometimes even with 4. But not always: Sometimes I still get dropped frames and on some runs, the 2nd webcam only results in a glitched frame that stays constant while the process is running. Any idea why that could be? (This is very bad.)

While running with 4 webcams I profiled using VerySleepy and this were the top exclusive functions: image

Not sure which part of my code is the bottleneck, how much of the bottleneck is writing the frame data to the textures? Any idea what I can still optimize? :)


It sounds like you're seeing different behavior between Python and Rust+duct+os_pipe, but I'm not clear on the exact differences. The Python version is working better? We're getting to the level of detail where it's hard for me to know what's going on without reading your code.

I haven't tried the Python code but with that 100 MB buffer it probably has huge latency (I'm wondering why they are using such a large buffer). For my real-time use case I'm trying to achieve minimum latency (also soon with reading frames from video files in a similar way as input).

Will ffmpeg start dropping frames as soon as any pipe write would block? Or will it do some amount of internal buffering? If so, is that amount configurable? These seem like critical details, and I have no idea how they work.

Not sure what criteria has for dropping frames, I'll soon run some tests because when I start reading from video files, dropped frames would be bad because I want to record my opengl output as a video, which should have the same number of frames as the inputs, and they have to match up in the timeline.


By the way, what is your broker process's actual job? Is it just copying between pipes, or does it do some processing in the middle?

Not sure what you mean by "broker". The purpose of the process is to read in multiple inputs (each input either a webcam or video) using ffmpeg (all at the same framerate), write them to textures that represent virtual screens in a 3d scene that are post-processed with music-synced shader effects + other synced physical objects / shaders (such that every change in the music has a visual equivalent), then render the full scene as equirectangular projection onto the screen, fetch each frame from the GPU, pipe into another ffmpeg process that writes an 360° video (in 8K resolution). (Then the spatial (ambisonic) music track will be combined with the video also using ffmpeg.) So the input & output frames have to match up and I need the lowest latency possible (also to make it work for live-streaming). Even with the above multi-threaded code the latency is still slightly too high (but varies per run and is different for each camera) and I'm still getting dropped frames sometimes, any idea why that could be? :)

oconnor663 commented 4 years ago

But it seems to be only possible on Unix. I'm on Windows 8.1 btw. Could a duct Expression have a similar API (like stdin_file) to work with a PipeWriter on Windows in the future?

I expect this to work on Windows. Are you hitting an error? Could you copy it here?

Is there any theoretical reason why channel size 0 would be better?

I suppose it all depends where your bottleneck is expected to be, and who's responsible for making the decision about dropping frames? Should you rely on ffmpeg to drop frames for you, and organize your piping and channels to provide backpressure as quickly as possible? Or should you take all the frames you can from ffmpeg, and figure out whether you need to drop frames once they're in your own buffers? I don't have any experience in this area, so I don't know which way is the Right Way, but I suspect it's important to pick one approach and stick to it consistently at each step of the pipeline.

oconnor663 commented 4 years ago

Not sure which part of my code is the bottleneck, how much of the bottleneck is writing the frame data to the textures? Any idea what I can still optimize? :)

No idea about optimizations, but you could consider wrapping your pipes and channels in heavy logging, so that you know when each read comes in and when each write finishes. That should give you an idea about where in the pipeline the bottleneck begins?

Boscop commented 4 years ago

Ah thanks, it turns out it works on Windows. It was because of the docs that I thought it's unix-only because it's stdin_file<T: IntoRawFd> and on IntoRawFd it says "This is supported on Unix only." but it's probably because the doc can only be compiled taking only one platform into account(?)

Btw, this is how I'm writing the video frames out now: https://gist.github.com/Boscop/6a990e32cc90b4861b71c5682e4be73e#file-ffmpeg_video_output-rs It's unavoidable to spawn a new thread just for logging ffmpeg's output, right? (Because std's Read has no try_read, and reading in chunks would block.)

Btw, right now I'm doing stderr_to_stdout but ideally I want to streamingly log both stdout and stderr to separate files, is there a way to do it in any way, i.e. to get a reader for each?

oconnor663 commented 4 years ago

Btw, right now I'm doing stderr_to_stdout but ideally I want to streamingly log both stdout and stderr to separate files, is there a way to do it in any way, i.e. to get a reader for each?

If you just want to log both streams to disk, you can open both target files and then pass those files to the stdout_file and stderr_file methods respectively.

Boscop commented 4 years ago

@oconnor663 Thanks, that works. Btw, is there a crate for splitting a string into cmd args? I can't just split by whitespace because it might be escaped or inside a string, but I'd like to read cmd args from my gui's edit field, which is a string containing multiple args that needs to be split before I can call duct.

oconnor663 commented 4 years ago

Hmm, there likely is something, though I've never done it and don't know anything off the top of my head. Another option you could consider is https://crates.io/crates/duct_sh, which is a tiny wrapper library that handles spawning shell commands via Duct on Unix and Windows. You could just pass the user's command directly to the shell? That's assuming your user is completely trusted, of course. And you'd be exposing users to the differences between the Windows shell and the Unix one(s), but that's essentially already the case just from launching commands by name I think.