socketry / io-event

MIT License
66 stars 15 forks source link

Simplify Select's fiber IO waiting queues. #64

Closed Math2 closed 1 year ago

Math2 commented 1 year ago

To deal with multiple fibers waiting on the same IO, use nested hash tables rather than maintaining a linked list of Waiting objects.

Types of Changes

Contribution

Comment

It allocates a lot of hashes, but I think it's much simpler to look at.

And the cleanup in #io_wait is more thorough.

I don't fully understand the big picture though, like in what kind of situations #io_wait might unwind, so I'm not certain what I'm doing is correct. I'm a bit wary of messing with it now that it works... but it passes all of my tests.

Math2 commented 1 year ago

To give you more insight into the kind of race conditions that can occur, let's say you had two fibers both waiting on the same IO.

* Fiber 1 is resumed because the IO operation is ready. It cancels fiber 2.

* Fiber 2 cancellation should remove itself from the list of fibers that can be woken up.

* The event loop, on Fiber 1 completing it's work, shouldn't try to wake up Fiber 2.

This kind of concurrency issue can be exceedingly tricky to diagnose, and the linked list invariants make it a good data-structure for this kind of use case - which is, no matter what, either the IO is removed by cancellation, or the IO is removed by completion, and after each discrete operation, the linked list of waiters is still valid.

Ohhh yeah I see there's something wrong with my code now. I think.

When a fiber gets cancelled, it would try to remove itself from the wrong hash. The IO ready loop at the end of #select deletes the whole IO hash from @waiting before it transfers to the fibers (and then it requeues them in a new hash if needed). And when unwinding, a fiber will try to delete itself from the new hash in @waiting. But the loop is still iterating over the old hash, so it could still process that fiber (assuming it is still #alive? by then). And if you make fibers remember their old IO hash, then you could be modifying the hash while the loop is iterating over it...

So here's a new version that "invalidates" the waiting fibers by setting their hash value (representing the waited on events) to nil by remembering the IO hash they were added in, which will be the hash the ready loop is still working with.

Damn I don't know if that's really making the whole thing simpler now, it's just a little bit shorter. The complexity is still there.

ioquatix commented 1 year ago

I have to admit, the reason why I know of these issues is because they were previously bugs, so there is some hard won knowledge. Perhaps we can codify some of these behavioural issues into regression tests, i.e. maybe you can test your hypothesis with a test that fails on this PR but works with the linked list approach.

Math2 commented 1 year ago

Alright I got a test case. It works with Select on the current main branch, fails with the first commit in this PR, and works with the second commit in this PR.

diff --git a/test/io/event/selector.rb b/test/io/event/selector.rb
index 88dd994..fb1eca5 100644
--- a/test/io/event/selector.rb
+++ b/test/io/event/selector.rb
@@ -283,6 +283,49 @@ Selector = Sus::Shared("a selector") do
                :select, :readable, :readable
            ]
        end
+
+       it "can handle exception raised during wait from another fiber that was waiting on the same io" do
+           [false, true].each do |swapped| # Try both orderings.
+               writable1 = writable2 = false
+               error1 = false
+               raised1 = false
+               
+               boom = Class.new(RuntimeError)
+               
+               fiber1 = fiber2 = nil
+               
+               fiber1 = Fiber.new do
+                   begin
+                       selector.io_wait(Fiber.current, local, IO::WRITABLE)
+                   rescue boom
+                       error1 = true
+                       # Transfer back to the signaling fiber to simulate doing something similar to raising an exception in an asynchronous task or thread.
+                       fiber2.transfer
+                   end
+                   writable1 = true
+               end
+               
+               fiber2 = Fiber.new do
+                   selector.io_wait(Fiber.current, local, IO::WRITABLE)
+                   # Don't do anything if the other fiber was resumed before we were by the selector.
+                   unless writable1
+                       raised1 = true
+                       fiber1.raise(boom) # Will return here.
+                   end
+                   writable2 = true
+               end
+               
+               fiber1.transfer unless swapped
+               fiber2.transfer
+               fiber1.transfer if swapped
+               selector.select(0)
+               
+               # If fiber2 did manage to be resumed by the selector before fiber1, it should have raised an exception in fiber1, and fiber1 should not have been resumed by the selector since its #io_wait call should have been cancelled.
+               expect(error1).to be == raised1
+               expect(writable1).to be == !raised1
+               expect(writable2).to be == true
+           end
+       end
    end

    with '#io_read' do

Bad news is that it fails with EPoll. And most likely KQueue as well since it works similarly. No idea about URing.

So IIUC, it fails for the same reason Select would fail. EPoll/KQueue receive triggered events in bulk from the kernel. And when a fiber unwinds, it can unregister itself from the underlying kernel event mechanism, but it doesn't remove itself from that buffered list of received events the selector is currently processing.

The problem can be triggered when a fiber raises an exception in another fiber which is ahead of it in the list of buffered events, and the target fiber does not immediately dies, but blocks on something. Then the selector will wake up the unwound fiber spuriously.

The interacting fibers don't need to be waiting on the same IO object, they just need to be in the same buffered events list.

The async task API doesn't encourage you to raise random rescuable events in tasks. But, you could have stuff that blocks in ensure blocks:

require 'socket'
require 'async'

Sync do
  s1, s2 = Socket.pair(:UNIX, :STREAM)

  t1 = Async do
    s1.wait_readable
  ensure
    s2.wait_readable # should block forever
    fail '1'
  end

  t2 = Async do
    s1.wait_readable
    t1.stop
    s2.wait_readable # should block forever
    fail '2'
  end

  s2.write('x')
end

This reaches the fail '1' with IO_EVENT_SELECTOR=EPoll here.

In practice it must be hard to trigger because you'd have to be stopping a job, it would have to be in the current buffered events list of the selector, it would have to do blocking operations in reaction to the exception, and those blocking operations would have to actually block, and the spurious resume would have to cause some problem (most likely some EWOULDBLOCK errors, but I got CRuby assertions related to inconsistent errnos sometimes).

ioquatix commented 1 year ago

I'll need to dig into this a bit more.

Spurious wake-ups are a thing that can happen, and maybe the answer is, the caller needs to be robust enough to handle that situation. However, if that's leading to bugs, then we might have our hands tied w.r.t. handling these cases.

Do you mind making a PR with just the failing test case?

Math2 commented 1 year ago

Sure, done. #65

ioquatix commented 1 year ago

@Math2 we don't need to merge this right?

Math2 commented 1 year ago

@Math2 we don't need to merge this right?

Nah. Probably better to keep the Waiter objects, it makes it more similar to EPoll/KQueue at this point.