contribsys / faktory_worker_ruby

Faktory worker for Ruby
GNU Lesser General Public License v3.0
214 stars 31 forks source link

Client Initialization with Multiple Faktories #5

Closed bryanrite closed 6 years ago

bryanrite commented 6 years ago

Hello!

I was evaluating Faktory to replace our current job queue. Since Faktory server itself doesn't scale horizontally (yet), and our scale is very large, we decided to partition the jobs into two Faktory server instances (rather than different queues). Basically our web frontend will push jobs to Faktory-1 server, one cluster of workers will pull jobs off Faktory-1 server, do work, and potentially queue jobs on Faktory-2 server.

The issue we are having is that the way the Faktory::Client is initialized, you cannot override the server location if the environment variable is set. As long as FAKTORY_PROVIDER/FAKTORY_URL is set, Faktory::Client.new(url: ENV["FAKTORY_OTHER_URL"]) will never point to that server.

https://github.com/contribsys/faktory_worker_ruby/blob/master/lib/faktory/client.rb#L38

Could we change the initializer to be something like:

def initialize(url: uri_from_env || 'tcp://localhost:7419', debug: false)
  @debug = debug
  @location = URI(url)
  open
end

This would allow same default functionality but allow overriding the url if manually creating clients.

I can submit a PR if you 👍

bryanrite commented 6 years ago

This request may only be partially complete... sharing the same Faktory::Client across the workers to push jobs to the other faktory results in some random, funky errors if concurrency > 1, which I now realize because the Client isn't pooled:

2017-12-07T14:49:32.127Z 1 TID-o3z80 WARN: {"context":"Job raised exception","job":null}
2017-12-07T14:49:32.128Z 1 TID-o3z80 WARN: Faktory::CommandError: ERR invalid character 'P' after top-level value

Looking through the code, I think I need Faktory::Connection to allow a new url to pass down for the Client, which is a more invasive change, and probably not worth it, so I'm not sure how to proceed.

Currently I am monkey patching the initialize as above, then in an initializer, creating a ConnectionPool manually to the other Faktory so the workers can use the pooled Clients.

# config/initializers/faktory.rb
size = Faktory.worker? ? (Faktory.options[:concurrency] + 2) : 5
$FAKTORY_OTHER_POOL = ConnectionPool.new(:timeout => Faktory.options[:pool_timeout] || 1, :size => size) do
  Faktory::Client.new(url: ENV["FAKTORY_OTHER_URL"])
end

Then within a job:

# app/jobs/test_job.rb
class TestJob
  include Faktory::Job
  def perform
    # Do Something
    $FAKTORY_OTHER_POOL.with do |conn|
      conn.push({jid: SecureRandom.hex(16), jobtype: "TestOtherJob", args: [1,2,'3']})
    end
  end
end

I'm guessing this use-case isn't that common... I still think it would be good to make the change on the Client initialize method so the location is settable, but I'm happy creating the connection pool manually, so as to not making anything else more complicated.

mperham commented 6 years ago

Hi Bryan, it's early days for Faktory so we don't have good support for things like sharding yet. That initializer looks better. :+1: You are welcome to submit a PR but please add a test too.

You definitely cannot share a global Faktory::Client instance, they are not thread-safe. You need to pool it, FWR exposes the default Faktory.server for the worker process:

Faktory.server do |client|
  client.push(...)
end

or you can build your own connection pool as with Faktory::Connection. Sounds like you have already figured this out.

Please let me know how your evaluation goes, user feedback is extremely valuable for me right now. You can open a meta-issue with things we can improve or shoot me an email (email in my github profile).

bryanrite commented 6 years ago

Thanks Mike! I'll be putting it through the paces next week, so far (with the adjustment above) everything works super well. I'll let you know how things go and get a PR submitted for the initializer change.