mperham / connection_pool

Generic connection pooling for Ruby
MIT License
1.63k stars 143 forks source link

Extend dirty checking to the stack #70

Closed zanker closed 9 years ago

zanker commented 9 years ago

Thought this would take a bit more originally, but realized we can just change how we store connections and use a hash and it's pretty easy. Builds on what @tamird did

This is the least compatibility breaking way of doing it. If you wanted to extend this so people can use checkout and checkin and get the same kind of dirty tracking automatically, you could make checkout and option to return the entry so they can set the flags themselves. In which case, encapsulating it with a class would make it a little cleaner.

zanker commented 9 years ago

The only remaining problem at that point, is if you have a thread that's rarely called and gets into an invalid state via an ensure timeout, you will have a connection entry sitting in the thread that can never be GCed until the thread itself dies.

That was an issue prior to both this and Tamir's fix though. You would need to add some sort of reaper and basically reimplement all of the state tracking that ActiveRecord is doing. I think it's an extremely niche issue, where you have a long running thread that only calls this once or twice, gets into an invalid state and then never tries to checkout another connection.

zanker commented 9 years ago

Unrelated to this, but I added the rescue around the thread in use_pool because if you have an error in with, you get harder to debug test failures than something that makes it clearer the call itself errored.

zanker commented 9 years ago

Don't quite have time today to wrap this PR up, I'll get you something in the morning that is a bit more usable. @tenderlove I can integrate your example into a test to make sure the problem is resolved, unless you would like to open the PR?

zanker commented 9 years ago

I realized, this is unnecessarily complicated. @mperham by adding an in use check when we checkout a connection, we remove the ability to checkout a connection multiple times in the same thread (eg, the nested checkout test). If you're ok with that, then the entire PR can be simplified.

This is a 100% fix for the connection corruption. There's one more issue that I can think of, that's unrelated to this, I split that discussion to https://github.com/mperham/connection_pool/issues/71 for sanity.

mperham commented 9 years ago

@zanker Will that break with any implicit recursive use of the connection, e.g.:

POOL = ConnectionPool.new(size: 2) { Redis.new }

def push_to_redis(array)
  return if array.empty?
  POOL.with do |conn|
    conn.lpush 'array', array[0]
    push_to_redis(array[1..-1])
  end
end

push_to_redis([1,2,3])

ActiveRecord has the same semantic where a thread checks out a database connection and has free use of it until the request is complete.

mperham commented 9 years ago

I've released 2.1.1 with @tamird's fix. I think this is the necessary immediate fix. I understand that it's not a theoretical 100% fix but I think in practice it might be close enough. @tenderlove had to perform some serious thread contortions to break it on MRI, YMMV on JRuby. I'm happy to work with y'all on a better solution longer term and want to spend more time understanding @zanker's fix before I accept it.

tenderlove commented 9 years ago

@mperham AFAICT, it can't happen on MRI due to the GVL. You'll never get thread switches at the right time. Anyway, I redid the example using Thread.pass to get the scheduler to switch:

require 'connection_pool'
require 'thread'
require 'monitor'

# Add a latch class so we can manipulate thread order
class Latch
  def initialize count = 1
    @count = count
    @lock = Monitor.new
    @cv = @lock.new_cond
  end

  def release
    @lock.synchronize do
      @count -= 1 if @count > 0
      @cv.broadcast if @count.zero?
    end
  end

  def await
    @lock.synchronize { @cv.wait_while { @count > 0 } }
  end
end

memcached = Class.new(ConnectionPool) {
  def checkin
    # add a Thread.pass so we can give the scheduler a chance to switch
    Thread.pass
    super
  end

  def pop_connection
    # add a Thread.pass so we can give the scheduler a chance to switch
    Thread.pass
    super
  end
}.new(size: 5, timeout: 5) { Object.new }

Thread.abort_on_exception = true
queue = SizedQueue.new 2

2.times.map {
  Thread.new {
    loop do
      job_start_latch = Latch.new
      job_finish_latch = Latch.new
      worker = Thread.new do
        # This latch is to ensure the thread is up and running before we have
        # a different thread attempt to kill it.
        job_start_latch.await

        memcached.with do |connection|
          # This latch is to notify the "killer" thread that the work is done,
          # and we're ready to be killed
          job_finish_latch.release
        end
      end
      queue << [job_start_latch, job_finish_latch, worker]
    end
  }
}

2.times.map {
  Thread.new {
    loop do
      start, finish, thread = *queue.pop
      start.release # let the target thread know that we're ready to kill it
      finish.await  # wait on the target thread to tell us it's time
      thread.kill   # kill the target thread
    end
  }
}.each(&:join)

AFAIK, it's not possible to handle thread kills like this in a safe way 100% of the time (which is why it's being removed from Java).

zanker commented 9 years ago

@mperham No problem, totally agree that Tamir's fix is the immediate one worth getting out.

This removes the ability to call it recursively and get the same connection. ActiveRecord will give you a new connection if you call checkout twice in the same thread though, quick example on Rails 4.1.8 in a console:

conn = ActiveRecord::Base.connection_pool.checkout
puts "OBJ ID: #{conn.object_id}"
ActiveRecord::Base.connection_pool.checkin(conn)

conn = ActiveRecord::Base.connection_pool.checkout
puts "OBJ ID: #{conn.object_id}"
ActiveRecord::Base.connection_pool.checkin(conn)

conn1 = ActiveRecord::Base.connection_pool.checkout
conn2 = ActiveRecord::Base.connection_pool.checkout
puts "OBJ IDS #{conn1.object_id} / #{conn2.object_id}"

ActiveRecord::Base.connection_pool.checkin(conn1)
ActiveRecord::Base.connection_pool.checkin(conn2)

Gives

OBJ ID: 70115246133940
OBJ ID: 70115246133940
OBJ IDS 70115246133940 / 70115246115020