socketry / async

An awesome asynchronous event-driven reactor for Ruby.
MIT License
2.05k stars 85 forks source link

Errno::EAGAIN errors when stress testing async with regular IO objects #255

Closed Math2 closed 11 months ago

Math2 commented 1 year ago

While experimenting with async (its ability to work with ordinary built-in IO objects in particular), I was getting some errors apparently caused by unexpected EAGAIN in CRuby's IO code. I can reproduce it with a simple echo client/server, but no idea how specific this might be to my computer (FreeBSD 13).

Server:

#!/usr/bin/env ruby

require 'socket'
require 'async'

Sync do
  server = TCPServer.new(nil, 4444)
  loop do
    s = server.accept
    Async do
      s.each_line do |b|
        s.write(b)
      end
    end
  end
end

Client:

#!/usr/bin/env ruby

require 'socket'
require 'async'

Sync do
  s = TCPSocket.new(nil, 4444)
  Async do # write as fast as possible
    loop do
      s.write("test\n")
    end
  end
  Async do # read as fast as possible
    n = 0
    t0 = Time.now
    s.each_line do |b|
      n += b.size
      if (t1 = Time.now) - t0 > 1 # show progress
        puts n
        t0 = t1
      end
    end
  end
end

Reading line-wise or block-wise doesn't seem to matter. The server can stop reading while it's blocked for writing (and vice versa), but since the client is supposed to keep doing both concurrently it shouldn't matter (I think?). That kind of code works with threads, so I was hoping it would "just work" with async too...

On ruby 3.2, I only get errors in the read path (so far):

  0.0s     warn: Async::Task [oid=0x3ac] [ec=0x3c0] [pid=33648] [2023-06-25 16:41:03 -0400]
               | Task may have ended with unhandled exception.
               |   Errno::EAGAIN: Resource temporarily unavailable @ io_fillbuf - fd:7 
               |   → test/test-echo-client-fiber.rb:16 in `each_line'
               |     test/test-echo-client-fiber.rb:16 in `block (2 levels) in <top (required)>'
               |     /usr/home/mathieu/.gem/ruby/3.2/gems/async-2.6.2/lib/async/task.rb:180 in `block in run'
               |     /usr/home/mathieu/.gem/ruby/3.2/gems/async-2.6.2/lib/async/task.rb:350 in `block in schedule'

But on 3.1, I also get some in the write path:

  0.0s     warn: Async::Task [oid=0x3ac] [ec=0x3c0] [pid=44813] [2023-06-25 16:41:11 -0400]
               | Task may have ended with unhandled exception.
               |   Errno::EAGAIN: Resource temporarily unavailable
               |   → test/test-echo-client-fiber.rb:10 in `write'
               |     test/test-echo-client-fiber.rb:10 in `block (3 levels) in <top (required)>'
               |     test/test-echo-client-fiber.rb:9 in `loop'
               |     test/test-echo-client-fiber.rb:9 in `block (2 levels) in <top (required)>'
               |     /usr/home/mathieu/.gem/ruby/3.1/gems/async-2.6.2/lib/async/task.rb:180 in `block in run'
               |     /usr/home/mathieu/.gem/ruby/3.1/gems/async-2.6.2/lib/async/task.rb:350 in `block in schedule'
ioquatix commented 1 year ago

Interesting, thanks for this report, I'll check it.

Math2 commented 12 months ago

I kept getting weird errors (including ruby interpreter dying with assertion errors due to IO failing with errno being 0 somewhere in io.c IIRC) so I looked into it some more. The problems happen with io-event's select backend. It's what is used on FreeBSD by default (the kqueue backend needs EV_UDATA_SPECIFIC which is an OS X thing). The problem doesn't happen on Linux with the epoll backend, but it does happen if you make it use the select backend with IO_EVENT_SELECTOR=Select env var.

I have another script that triggers the problem more easily (including on Linux with IO_EVENT_SELECTOR=Select):

#!/usr/bin/env ruby

require 'socket'
require 'async'

Sync do
  s1, s2 = Socket.pair(:UNIX, :STREAM)
  m = 3

  Async do # echo every line m times
    s2.each_line do |line|
      m.times do
        s2.write(line)
      end
    end
    s2.shutdown(:SHUT_WR)
  end

  n = 1_000_000
  Async do # send n lines
    n.times do |i|
      s1.puts("test #{i}")
    end
    s1.shutdown(:SHUT_WR)
  end
  Async do # drain and check received lines
    n.times do |i|
      m.times do
        line = s1.gets
        fail unless line == "test #{i}\n"
      end
    end
    fail unless s1.eof?
  end
end

With this hack, the script works with the select backend (on ruby 3.1 with the v1 scheduler interface):

diff --git i/lib/io/event/selector/select.rb w/lib/io/event/selector/select.rb
index e54fb47..3756980 100644
--- i/lib/io/event/selector/select.rb
+++ w/lib/io/event/selector/select.rb
@@ -211,7 +211,7 @@ module IO::Event

                        case result = blocking{io.read_nonblock(maximum_size, exception: false)}
                        when :wait_readable
-                           if length > 0
+                           if length > 0 || true
                                self.io_wait(fiber, io, IO::READABLE)
                            else
                                return EWOULDBLOCK

Not really sure what is going on though, this obviously doesn't seem like a correct fix.

Math2 commented 12 months ago

I think I found the problem.

The select backend has a queue of waiters for each IO object, but it always drops the queue as a whole even if some waiters didn't have their specific events triggered. That makes #io_wait return 0 sometimes (which makes some of CRuby's IO code give up on whatever it was doing and raise an exception for the EWOULDBLOCK it got earlier).

This patch requeues the waiters when none of their events were triggered. The code is probably getting a bit more complicated than it should be now though...

diff --git i/lib/io/event/selector/select.rb w/lib/io/event/selector/select.rb
index e54fb47..a76ba14 100644
--- i/lib/io/event/selector/select.rb
+++ w/lib/io/event/selector/select.rb
@@ -103,14 +103,23 @@ module IO::Event
                    self.fiber&.alive?
                end

-               def transfer(events)
+               def dispatch(events, &reactivate)
+                   tail = self.tail
                    if fiber = self.fiber
-                       self.fiber = nil
-                       
-                       fiber.transfer(events & self.events) if fiber.alive?
+                       if fiber.alive?
+                           revents = events & self.events
+                           if revents.zero?
+                               reactivate.call(self)
+                           else
+                               self.fiber = nil
+                               fiber.transfer(revents)
+                           end
+                       else
+                           self.fiber = nil
+                       end
                    end
-               
-                   self.tail&.transfer(events)
+
+                   tail&.dispatch(events, &reactivate)
                end

                def invalidate
@@ -349,7 +358,10 @@ module IO::Event
                end

                ready.each do |io, events|
-                   @waiting.delete(io).transfer(events)
+                   @waiting.delete(io).dispatch(events) do |waiter|
+                       waiter.tail = @waiting[io]
+                       @waiting[io] = waiter
+                   end
                end

                return ready.size
ioquatix commented 12 months ago

Do you mind making a PR to the io-event gem? Please include appropriate tests.

Math2 commented 12 months ago

Alright done.

ioquatix commented 12 months ago

The PR for reference: https://github.com/socketry/io-event/pull/63

Math2 commented 11 months ago

There are no more errors on 3.1, 3.2 and 3.3 no matter how much I try to stress it. Thanks for committing/releasing!

I'm going to take a whack at simplifying the queue handling. I have an idea (but not sure it's a good one). I'll submit another PR.

ioquatix commented 11 months ago

Thanks so much for testing this and helping to fix it!!