akash-akya / ex_cmd

ExCmd is an Elixir library to run external programs and to communicate with back pressure
MIT License
65 stars 3 forks source link

Switch input streams and not kill process #11

Closed mpope9 closed 3 years ago

mpope9 commented 3 years ago

I was interested in switching from Porcelain. Neither support switching input streams while a process is executing (I think). Would supporting this be as 'simple' as exposing a way of creating a new sink from a ExCmd.Stream and killing the process spawned by start_input_streamer?

My full use case is having two processes, A and B. I want to be able to stream from process A to process B, and eventually kill process A and spawn C which then should replace process A as B's input stream.

akash-akya commented 3 years ago

Hi @mpope9

We are intentionally hiding ExCmd.Process with stream to free the user from external process/port life-cycle details. I'm assuming for most use cases there will be only one producer.

exposing a way of creating a new sink from a ExCmd.Stream and killing the process spawned by start_input_streamer?

We previously had something like that but I removed it later to simplify the interface, see old usage example. My concerns with that approach is that it leaks internal process details and there is possibility of deadlock.

My full use case is having two processes, A and B. I want to be able to stream from process A to process B, and eventually kill process A and spawn C which then should replace process A as B's input stream.

I think in this case skipping ExCmd.Stream altogether and building a new "sink" and interacting with ExCmd.Process directly might be simpler.

Something like,

defmodule ExCmdSink do
  defstruct [:process]

  def new(process) do
    %__MODULE__{process: process}
  end

  defimpl Collectable do
    def into(%{process: process} = sink) do
      collector_fun = fn
        :ok, {:cont, x} ->
          :ok = ExCmd.Process.write(process, x)

        :ok, :done ->
          sink

        :ok, :halt ->
          :ok
      end

      {:ok, collector_fun}
    end
  end
end

defmodule Example do
  def run do
    {:ok, process} = ExCmd.Process.start_link(["base64"])

    # we *must* consume external process output to avoid blocking, even if we are not interested in output
    _ = Task.async(fn -> consume_output(process) end)

    sink = ExCmdSink.new(process)

    # sink can be used multiple times 
    binary_stream(1..100)
    |> Stream.into(sink)
    |> Stream.run()

    binary_stream(101..200)
    |> Stream.into(sink)
    |> Stream.run()

    ExCmd.Process.close_stdin(process)
    {:ok, 0} = ExCmd.Process.await_exit(process, 5_000)
  end

  defp consume_output(process) do
    case ExCmd.Process.read(process) do
      :eof ->
        :ok

      {:ok, output} ->
        # IO.puts(output)
        consume_output(process)
    end
  end

  defp binary_stream(range), do: Stream.map(range, &to_string/1)
end

Let me know what you think.

mpope9 commented 3 years ago

Interesting, will killing a process cause an :eof to be sent?

akash-akya commented 3 years ago

By killing you mean, killing external os process, then the reading process will get :eof. If process is terminated normally by close_stdin then we'll get :eof. But if ExCmd.Process is killed abnormally then that's not the case and we'll get exit signal instead. Basically we try to handle it whenever it can, if one forcefully kills ExCmd.Process we can't do anything for reading processes.

I missed handling exit signals from ExCmd.Process in the above code. Perhaps we should spawn consumer_output with spawn/1 instead of Task.async/1. or handle exits signals from Process.read

mpope9 commented 3 years ago

I meant would calling ExCmd.Process.stop send the :eof, as that is what I would probably end up doing. But I do think that reusing the sink looks great! I will give it a spin today or tomorrow with two processes and let ya know how it looks.

mpope9 commented 3 years ago

This'll work for me. Thank you v much