bkeepers / qu

a Ruby library for queuing and processing background jobs.
MIT License
505 stars 50 forks source link

Mongoid 3 (Moped) backend #56

Closed dwbutler closed 12 years ago

dwbutler commented 12 years ago

I was excited to find a queue/worker gem that supported mongoDB as a backend!

The current implementation uses the 10gen mongo driver, which works beautifully with MongoMapper and Mongoid 2. However, Mongoid 3 has implemented its own mongoDB driver, Moped.

To avoid having two different drivers loaded in my app, I ported the mongo driver over to use Mongoid's Moped driver. It is a separate gem/backend: qu-mongoid.

Thanks again for the awesome gem!

bkeepers commented 12 years ago

Hey @dwbutler,

Thanks for the pull request!

How would you feel about just realeasing this as a separate gem on your own account? I don't have a problem linking to it and giving you all the love.

dwbutler commented 12 years ago

Sure, I can do that. I'll work on that today.

dwbutler commented 12 years ago

Alright, I've released the qu-mongoid gem on Rubygems. Following your convention, I set the version to 0.2.0 to match Qu's version.

bkeepers commented 12 years ago

Awesome! Want to add it to https://github.com/bkeepers/qu/wiki/Backends?

dwbutler commented 12 years ago

Alright, done!

johnae commented 11 years ago

@bkeepers, @dwbutler This doesn't seem to be thread safe. Afaik the 10gen-driver has it's thread safety problems (though I've never encountered them myself) but in general is thread safe as is. The moped driver, however, is different. I've been looking into this but haven't found a solution. My problem is that when running on JRuby and running qu-workers in threads instead of processes I sooner or later encounter "ConcurrencyError: can not set IO blocking after select; concurrent select detected?". This was never a problem on the 10gen-driver and the default qu/mongo code.

I've filed an issue regarding this here https://github.com/mongoid/moped/issues/119 but I think it's more to do with the qu-mongoid gem than moped since mongoid 3 IS thread safe. Haven't figured out exactly how Mongoid 3 does it though.

dwbutler commented 11 years ago

Hi @johnae,

qu-mongoid was more or less a direct port of qu-mongo. In other words, I took the qu-mongo gem and translated line-by-line to the Mongoid/Moped backend. It's possible that something in there became non thread safe, but nothing jumps out at me when I go back and look at the code.

My understanding is that Mongoid normally uses a separate Moped connection on each thread. Hence, it is thread safe because threads never share the same connection. However, when I go back and review the qu-mongoid code, Qu.backend is set to a single instance of a connection, which then gets shared between threads.

However, the same thing happens with qu-mongo, so I'm not sure why the 10-gen driver gives you less problems. Perhaps the 10-gen driver is more resilient to sharing the same connection between threads.

Anyway, probably the solution is as simple as not caching the Mongoid session locally in Qu. Rather, Mongoid should be called every time, which will delegate to the thread safe implementation of one connection per thread.

I can try out this fix on a branch. I'm not too jruby saavy, so could you help me test the fix?

dwbutler commented 11 years ago

Go ahead and give this branch a try: https://github.com/dwbutler/qu-mongoid/tree/threadsafe

@bkeepers: I had to do some wrangling to get this to pass the shared backend spec. Since a backend expects to be able to set the connection attribute, I had to store the connection in Thread.current instead of in @connection. The specs also appear to using more than once instance of the backend in a single thread, so I had to scope the thread-local variable by object id.

See https://github.com/dwbutler/qu-mongoid/commit/02410ffeff8be978c2a8ae3e5b86ff2aac37fb8e

johnae commented 11 years ago

I looked at your code and it does look like it could work reasonably well, however what if I do this:

Qu.configure do |c|
  c.connection = Mongoid::Sessions.with_name(:qu)
  c.logger = Log4jruby::Logger.get('QU')
end

That's definitely a problem.

johnae commented 11 years ago

I think I solved it in my app by doing something like this:

class QuMongoidSession

  def initialize(name)
    @name = name
  end

  def qu_mongoid_session
    Thread.current[:qu_mongoid_session] ||= ::Mongoid::Sessions.with_name(@name)
  end

  def method_missing(sym, *args, &block)
    qu_mongoid_session.send(sym, *args, &block)
  end

end

Qu.configure do |c|
  c.connection = QuMongoidSession.new(:qu)
  c.logger = Log4jruby::Logger.get('QU')
end

The above was just a test by the way and should probably include object_id or something as well in the thread local. I haven't seen any problems so far with my code and it makes it possible to still use the Qu.configure block.

dwbutler commented 11 years ago

The problem with both of these code samples is that each thread needs to have a separate connection. Your Qu.configure will only work within the current thread. Once you start a new thread, Qu.connection will either used the shared cached value (in the master branch) or will default to a new session (in the threadsafe branch).

This seems to be a limitation in how the Qu API is designed... It is assumed that Qu will only be configured once, and that a single connection will be shared among threads. @bkeepers, any suggestions on the best way to design this?

For now, I would suggest you configure Qu with a new connection each time a new thread is started. It should be sufficient to pass in Mongoid::Sessions.with_name(:qu), because this always gets stored in Thread.current. (See Mongoid code below)

      def with_name(name)
        Threaded.sessions[name.to_sym] ||= Sessions::Factory.create(name)
      end
johnae commented 11 years ago

Strange thing is that I had problems with passing in Mongoid::Sessions.with_name(:qu) but not with my code, I've tested pushing ALOT of data through the workers with my code without the issues I saw previously. That doesn't mean my code is bugfree of course and I really just did a quick and dirty test. I did wonder however why Mongoid::Sessions.with_name(:qu) didn't work since it seemed to be doing "the right thing"...

With my code, eg. the QuMongoidSession above, it will check out a new session in every thread that doesn't have one already. So QuMongoidSession is a duck-type for a Moped Session basically, but it ensures that a new session is checked out for every thread.

dwbutler commented 11 years ago

This is getting confusing huh? :)

As long as you always call Mongoid::Sessions.with_name(:qu), and don't cache the return value locally, then yes, you are guaranteed to have a thread-local session.

(Code extracted from mongoid (lib/mongoid/threaded.rb)

    def sessions
      Thread.current["[mongoid]:sessions"] ||= {}
    end

However, your Qu.configure is only called once, and caches the return value from Mongoid. Hence, a new thread won't "see" a value for connection, and will create a new one. Your QuMongoidSession doesn't help unless Qu.configure is called separately in each thread. Even then, Mongoid already checks out a new session for each thread anyway, so QuMongoidSession shouldn't be necessary.

Hope that clarifies things.

dwbutler commented 11 years ago

So just to summarize, my recommendation is to run the following every time a new thread is created:

Qu.configure do |c|
  c.connection = Mongoid::Sessions.with_name(:qu)
end
dwbutler commented 11 years ago

It might be better to have a qu-mongoid specific configuration option such as:

Qu.configure do |c|
  c.session = :qu
end

That way, a connection can be automatically checked out with the right session in each thread.

johnae commented 11 years ago

But really, this part here:

def qu_mongoid_session
   ## this code gets called inside the workers and will check out new sessions as needed
   ## it's used by way of method_missing
   Thread.current[:qu_mongoid_session] ||= ::Mongoid::Sessions.with_name(@name)
end

Should actually do the trick since it gets called EVERY time ANY method on "connection" gets called. And connection will be called in the workers that run in different threads - eg. it ought to work since the Session isn't cached here. Perhaps it could be simplified down to:

def qu_mongoid_session
   ::Mongoid::Sessions.with_name(@name)
end

If Mongoid does it's own thread locals. The nice thing about it is that what Qu.configure gets is NOT a ::Mongoid::Session but a wrapper that checks out sessions but in every other way works just like a Session does.

I've tested this wrapper and it does really seem to work. However a solution directly in qu-mongoid would be preferrable I guess if it's possible to do while maintaining flexibility.

Perhaps something like my QuMongoidSession can be used in the mongoid backend?

johnae commented 11 years ago

Your suggestion using only a symbol might be a good way of doing it in Qu.configure, so the code can be moved down into the backend.

dwbutler commented 11 years ago

Alright, I pushed an implementation that allows the Mongoid session to be configured. Thread safety is then handled completely within the Qu::Mongoid backend. Please pull and try the following in your app:

Qu.configure do |c|
  c.session = :qu
end

@bkeepers, I know this breaks the consistency of the configuration between different backends. Let me know what you think about this design. I think it wouldn't hurt to allow backend-specific configuration. Or at least tell users to do something like c.backend.session = :qu

johnae commented 11 years ago

It's late here now, need to go to bed. Just did a quick test against your "threadsafe"-branch but got an error:

NoMethodError: undefined method `session=' for Qu:Module

Not sure if I missed something. Will look into it tomorrow again. Thanks for taking your time with this!

dwbutler commented 11 years ago

Are you using bundler? Make sure you do:

gem 'qu-mongoid', :git => 'https://github.com/dwbutler/qu-mongoid.git', :branch => 'threadsafe'

and bundle update qu-mongoid to update to the latest version.

dwbutler commented 11 years ago

I'm thinking of changing the interface to c.connection = :qu to avoid introducing a qu-mongoid specific config value. I would overload connection= to accept a symbol and set up the correct session.

johnae commented 11 years ago

Looks as if this code:

module Qu
  def_delegators :backend, :session, :session=
end

is defined in:

qu-mongoid/spec/qu/backend/mongoid_spec.rb

so it wouldn't work outside the specs.

The commit I'm looking at is https://github.com/dwbutler/qu-mongoid/commit/1dc0275ae5c87f4b3bb0b8bec1723c3ac715b05c

If I patch Qu the same way in my app I get it running. I guess that code should be in, perhaps, lib/qu/qu.rb, lib/qu.rb or some other reasonable place that gets loaded on require.

After patching the Qu module I tested it again and I get no concurrency issues so it seems it's working ok now. I will be testing some more today.

dwbutler commented 11 years ago

Oops! :)

I decided not to patch Qu, because session is Qu::Mongoid backend specific. I also decided not to overload connection= because this violates the Qu spec. So please do another pull and try the following:

Qu.configure do |c|
  c.backend.session = :qu
end

If you're happy with this fix I'll release a new version.

johnae commented 11 years ago

Yeah that'll work just fine. Just tested it doing 3500 jobs without any issues. Thanks!

dwbutler commented 11 years ago

Okay, I've updated documentation and pushed a new gem version, 0.2.1. Please give it a try. Thanks!