stripe / subprocess

A port of Python's subprocess module to Ruby
MIT License
209 stars 17 forks source link

Convenience Method for Line-Delimited I/O #59

Open rbroemeling opened 4 years ago

rbroemeling commented 4 years ago

Hello,

So I've written a slight "wrapper" for the Subprocess module that we use internally, and I'm filing this ticket both to get the code out there (if it could be useful for others), and to ask whether there's any interest in pulling this upstream into the Subprocess gem that Stripe is hosting.

The general thrust is similar to the request in #13 -- specifically, rather than having the output "blobbed" to the caller, provide a convenience method that issues callbacks, with one call per line of output. Generally speaking, I find that this is a common request/pattern within our code (dealing with output line-by-line is pretty usual for our needs), and if everyone else has similar needs then this is a really nice convenience wrapper to have.

The code that I have implemented looks like this, and just wraps Subprocess.popen to allow for line-delimited callbacks:

    CHUNK_SIZE = 1024

    def self.popen(*args, **kwargs)
      stderr = kwargs.fetch(:stderr, nil)
      stdout = kwargs.fetch(:stdout, nil)
      kwargs[:stderr] = ::Subprocess::PIPE
      kwargs[:stdout] = ::Subprocess::PIPE
      subprocess = ::Subprocess.popen(*args, **kwargs) do |chld|
        buffers  = { chld.stderr => "",     chld.stdout => "" }
        eof  = []
        handlers = { chld.stderr => stderr, chld.stdout => stdout }
        begin
          IO::select(buffers.keys).first.each do |fd|
            begin
              buffers[fd] += fd.read_nonblock(CHUNK_SIZE)
            rescue EOFError
              eof << fd
            end
          end
          buffers.each_key do |fd|
            while buffers[fd].include?("\n")
              i = buffers[fd].index("\n")
              handlers[fd].call(buffers[fd][0..i]) if handlers[fd]
              buffers[fd] = buffers[fd][(i+1)..-1]
            end
          end
          eof.each do |fd|
            remainder = buffers.delete(fd)
            handlers[fd].call(remainder) if remainder && remainder.length > 0
          end
        end until buffers.empty?
        chld.wait
      end
      subprocess.status
    end

A really simple example of using it looks like this:

[1] pry(main)> require "themis/subprocess"
=> true
[2] pry(main)> @stdout = []; @stderr = []
=> []
[3] pry(main)> Themis::Subprocess.popen(
[3] pry(main)*   ["ls", "-la", "/proc/1/cwd", "/proc/1/smaps"],
[3] pry(main)*   stderr: lambda { |l| @stderr << l },
[3] pry(main)*   stdout: lambda { |l| @stdout << l }
[3] pry(main)* )
=> #<Process::Status: pid 21537 exit 2>
[4] pry(main)> @stdout
=> ["lrwxrwxrwx 1 root root 0 Jul  4 01:52 /proc/1/cwd\n", "-r--r--r-- 1 root root 0 Jul  4 01:52 /proc/1/smaps\n"]
[5] pry(main)> @stderr
=> ["ls: cannot read symbolic link '/proc/1/cwd': Permission denied\n"]

If there is interest from Stripe/the maintainers of this gem, I'd love to prepare a PR and get this functionality pulled into Stripe's Subprocess gem.

Thanks!

pete-stripe commented 4 years ago

Thanks @rbroemeling! This looks interesting and useful. I.d be happy to take a look at a PR. If you can find a clean way to integrate it into Process.initialize that would be neat, otherwise a wrapper function would probably be the best approach.

rbroemeling commented 4 years ago

Hi @pete-stripe -- OK, great. Thanks. We've got a hackathon coming up at the end of this week, so I'll prioritize this up and see if I can get a PR together during that time.