socketry / async

An awesome asynchronous event-driven reactor for Ruby.
MIT License
2.12k stars 86 forks source link

Blocking subprocess (popen3) #304

Closed schmijos closed 7 months ago

schmijos commented 7 months ago

I think I've got a conceptual misunderstanding somewhere or there is a bug. Consider the following code:

require 'open3'
require 'async'

Async do |task|
  File.open("tmp/test.log", "w") {}

  obj = task.async do
    Open3.popen3(%(tail -n 0 -f tmp/test.log | grep "id" --line-buffered)) do |stdin, stdout, stderr, thread|
      puts "wait for line"
      line = stdout.readline
      Process.kill("SIGINT", thread.pid)
     JSON.parse(line)
    end
  end

  File.write("tmp/test.log", '{"id": "1"}')
  puts "wrote"
  puts obj.wait['id']
end

The output is

wait for line
wrote

then it blocks. But I append to the file from the outside exactly three times (with echo '{"id":1}'), the following happens

    2m     warn: Async::Task [oid=0x2fc10] [ec=0x2fc24] [pid=95584] [2024-02-08 23:22:42 +0100]
               | Task may have ended with unhandled exception.
               |   LocalJumpError: unexpected return
               |   → (irb):67 in `block (3 levels) in <top (required)>'
               |     /Users/josua/.asdf/installs/ruby/3.3.0/lib/ruby/3.3.0/open3.rb:540 in `popen_run'
               |     /Users/josua/.asdf/installs/ruby/3.3.0/lib/ruby/3.3.0/open3.rb:235 in `popen3'
               |     (irb):63 in `block (2 levels) in <top (required)>'
               |     /Users/josua/.asdf/installs/ruby/3.3.0/lib/ruby/gems/3.3.0/gems/async-2.6.5/lib/async/task.rb:160 in `block in run'
               |     /Users/josua/.asdf/installs/ruby/3.3.0/lib/ruby/gems/3.3.0/gems/async-2.6.5/lib/async/task.rb:330 in `block in schedule'
=> #<Async::Task:0x000000000002fc10>

How is this all connected?

ioquatix commented 7 months ago

At a glance:

  1. You can't use return unless you put it in a method.
  2. You aren't writing a full line to the file.

Maybe check those two issues and let me know if that solves your problem.

ioquatix commented 7 months ago
require 'open3'
require 'async'

Async do |task|
  log = File.open("test.log", "a")

  obj = task.async do
    stdin, stdout, process = Open3.popen2(%(tail -n 0 -f test.log | grep "id" --line-buffered))
    stdin.close

    line = stdout.readline
    puts "got line: #{line}"
    Process.kill(:INT, process.pid)

    JSON.parse(line)
  end

  sleep 1

  log.puts('{"id": "1"}')
  log.flush
  puts "wrote"
  puts "result:", obj.wait.inspect
end

This works for me, maybe you can adapt it to your needs.

sleep is required because there is a race condition between writing to then log, and your command starting.

schmijos commented 7 months ago

Thank you very much!

You can't use return unless you put it in a method.

Sorry, the return was a left-over from before the issue-breakdown. I removed it from the issue description to reduce confusion.

You aren't writing a full line to the file.

You're completely right. This may have lead me astray while debugging.

sleep is required

This is a very good point I forgot about. And this was actually the issue. Is there a way I could signal the process start to the reactor so that I don't need to actively wait?

One thing aside… Please tell me if you could use some financial help with the async gems. We've got an open source donations budget over at Renuo. Drop me an email to josua.schmid@renuo.ch if you have an idea.
schmijos commented 7 months ago

I'm banging my head around that sleep. How can I fix the race condition without a sleep? Is that even possible? Before readline it's to early to signal fiber handover and on first byte, it's too late. readline itself does the waiting, so no Async::Condition will solve that.

ioquatix commented 7 months ago

I have an idea for a solution but need to check it.

ioquatix commented 7 months ago

I don't know if your example is representative of the problem you are facing, but if it is, you can do it like this:

require 'async'

Async do |task|
  # Open the log file, and seek to the end of the file:
  stdin = File.open("test.log")
  stdin.seek(0, IO::SEEK_END)

  # Create a child task to read the log file:
  child = task.async do
    # Create a pipe to capture the output:
    input, output = IO.pipe
    pid = Process.spawn(%(grep "id" --line-buffered), in: stdin, out: output)

    # Close "stdin" as no longer needed by the parent process:
    stdin.close

    # Read the output from the pipe:
    line = input.readline
    puts "got line: #{line}"

    # Wait for the process to exit:
    Process.kill(:INT, pid)
    Process.wait(pid)

    # Return the result:
    JSON.parse(line)
  end

  # Append to the log file:
  log = File.open("test.log", "a")
  log.puts('{"id": "1"}')
  log.close # or flush

  puts "result:", child.wait.inspect
end

Basically, make sure there is no race condition by getting the log file before writing to it, and injecting that into the child process that handles reading, rather than opening a file at some non-deterministic time, which is what causes the race.

ioquatix commented 7 months ago

Regarding sponsorship, it's always welcome and allows me to spend more time on these projects.

See https://github.com/socketry/community/?tab=readme-ov-file#sponsorship for details :)

schmijos commented 7 months ago

This works well for me, thank you! Although I don't think that handing over the file to the async task is the issue. From my point of view it seems to matter how I write to the file.

The following works:

require 'async'

Sync do |task|
  child = task.async do
    stdin = File.open("test.log")
    stdin.seek(0, IO::SEEK_END)

    input, output = IO.pipe
    pid = Process.spawn(%(grep "id" --line-buffered), in: stdin, out: output)
    stdin.close
    line = input.readline
    Process.kill(:INT, pid)
    Process.wait(pid)
    JSON.parse(line)
  end

  File.write("test.log", '{"id": "1"}', mode: "a")
  puts "result:", child.wait.inspect
end

While removing mode: "a" blocks again.

require 'async'

Sync do |task|
  child = task.async do
    stdin = File.open("test.log")
    stdin.seek(0, IO::SEEK_END)

    input, output = IO.pipe
    pid = Process.spawn(%(grep "id" --line-buffered), in: stdin, out: output)
    stdin.close
    line = input.readline
    Process.kill(:INT, pid)
    Process.wait(pid)
    JSON.parse(line)
  end

  File.write("test.log", '{"id": "1"}')
  puts "result:", child.wait.inspect
end
ioquatix commented 7 months ago

I believe File.write is probably truncating the file, which causes the other file handle which has done SEEK_END to probably become invalid (i.e. pointing beyond the end of the file). If the file was empty, it might work, as SEEK_END in that case would be offset=0. You could try doing stdin.tell to see the offset.

schmijos commented 7 months ago

That was it. When starting with an empty file, stdin.tell is 0 after seek. Otherwise 11 (file containing the write of my last test). Thank you very much for your help!