socketry / async

An awesome asynchronous event-driven reactor for Ruby.
MIT License
2.09k stars 86 forks source link

Use Set instead of Array for Condition @waiting queue to avoid memory leak #186

Closed oeoeaio closed 1 year ago

oeoeaio commented 1 year ago

Related to #176

We found that calling dequeue on instances of Queue (and LimitedQueue) inside a Task.with_timeout block inside a loop will result in fibers accumulating in the @waiting list for the Queue in the absence of any other fiber signalling the Queue, leading to a memory leak.

Our short term fix for this was just to manually signal the Queue whenever the timeout was reached, but this only works where a single fibre is waiting on the queue with a timeout in a loop.

Switching to using a Set instead of an Array to hold the list of waiting fibres resolves the core issue. I don't think this will break anything as I can't really see a use case for having the same fibre in the @waiting list multiple times.

Keen for input on what tests might be appropriate for this, if any. Is suppose the main thing is that this change does not break the existing test suite.

Types of Changes

Contribution

Profiling code

#!/usr/bin/env ruby

require 'async'
require 'async/queue'
require 'memory_profiler'

def produce(input_channel)
  Async do
    input_channel.enqueue('some infrequent message')
  end
end

def consume(input_channel)
  Async do |task|
    buffer = []
    report = MemoryProfiler.report do
      10000.times do
        # This timeout is artificially tight to demonstrate the memory leak
        task.with_timeout(0.001) do
          buffer << input_channel.dequeue
        rescue Async::TimeoutError
          # do something with the buffer
        end
        # do something with the buffer
      end
    end
    report.pretty_print
  end
end

Async do
  input_channel = Async::Queue.new
  produce(input_channel)
  consume(input_channel)
end

Profiling before this change

retained memory by location

399960  /usr/local/bundle/gems/async-2.2.1/lib/async/condition.rb:37
   584  /usr/local/bundle/gems/async-2.2.1/lib/async/scheduler.rb:73
   160  /usr/local/bundle/gems/timers-4.3.5/lib/timers/timer.rb:25
    88  /usr/local/bundle/gems/timers-4.3.5/lib/timers/group.rb:46
    40  /usr/local/bundle/gems/async-2.2.1/lib/async/scheduler.rb:273
    40  /usr/local/bundle/gems/timers-4.3.5/lib/timers/events.rb:62

retained memory by class

399960  Async::Condition::Queue
   584  Thread::Backtrace
   160  Proc
    88  Timers::Timer
    40  Async::TimeoutError
    40  Timers::Events::Handle

Profiling after this change

retained memory by location

   584  /usr/local/bundle/gems/async-2.2.1/lib/async/scheduler.rb:73
   384  /usr/local/lib/ruby/3.1.0/set.rb:522
   192  /usr/local/lib/ruby/3.1.0/set.rb:540
   160  /usr/local/bundle/gems/timers-4.3.5/lib/timers/timer.rb:25
    88  /usr/local/bundle/gems/timers-4.3.5/lib/timers/group.rb:46
    40  /usr/local/bundle/gems/async-2.2.1/lib/async/scheduler.rb:273
    40  /usr/local/bundle/gems/timers-4.3.5/lib/timers/events.rb:62

retained memory by class

   584  Thread::Backtrace
   576  Hash
   160  Proc
    88  Timers::Timer
    40  Async::TimeoutError
    40  Timers::Events::Handle
ioquatix commented 1 year ago

The ideal solution for this is to use a linked list. Short of a native implementation, we could fall back to using a native Ruby mutex/queue. It might even be faster, but the semantics aren’t quite the same. A Ruby mutex uses a linked list internally and my goal was to preserve FIFO order. I like your PR but I think we might need to be a little more sensitive to order and I’m not sure about performance either.

Do you mind opening another PR using Thread::Queue as the queue? Let’s see if it works. In Ruby 3+ Thread::Queue supports non-blocking fibers.