contribsys / faktory_worker_ruby

Faktory worker for Ruby
GNU Lesser General Public License v3.0
214 stars 31 forks source link

List of queues passed to Faktory::Launcher is ignored #12

Closed jagthedrummer closed 6 years ago

jagthedrummer commented 6 years ago

In working on an ActiveJob adapter for Faktory I'm trying to start and stop workers programmatically using Faktory::Launcher. I can get workers to start and stop, but I can't ever seem to get them to watch any queue other than default.

I'm basically doing this:

require "faktory/launcher"
faktory = Faktory::Launcher.new(queues: ["blarf"],
                                 environment: "test",
                                 concurrency: 1,
                                 timeout: 1)
faktory.run

No matter what I put in the queues array it always acts like I've done queues: ["default"].

I'm running this against the branch from #7. I haven't been able to spot any changes there that I think might affect this but it's certainly possible I've missed something.

Here's the entire process I'm using to start/stop workers if it helps. (This code is adapted from the AJ test adapter for Sidekiq: https://github.com/rails/rails/blob/master/activejob/test/support/integration/adapters/sidekiq.rb)

  def start_workers
    continue_read, continue_write = IO.pipe
    death_read, death_write = IO.pipe

    @pid = fork do
      continue_read.close
      death_write.close

      # Faktory is not warning-clean :(
      $VERBOSE = false

      $stdin.reopen(File::NULL)
      $stdout.sync = true
      $stderr.sync = true

      logfile = Rails.root.join("log/faktory.log").to_s
      puts "logfile = #{logfile}"
      Faktory.logger = Logger.new(logfile)
      #Faktory::Logging.initialize_logger(logfile)

      self_read, self_write = IO.pipe
      trap "TERM" do
        self_write.puts("TERM")
      end

      Thread.new do
        begin
          death_read.read
        rescue Exception
        end
        self_write.puts("TERM")
      end

      require "faktory/cli"
      require "faktory/launcher"
      faktory = Faktory::Launcher.new(queues: ["blarf"],
                                       environment: "test",
                                       concurrency: 1,
                                       timeout: 1)
      #Faktory.average_scheduled_poll_interval = 0.5
      #Faktory.options[:poll_interval_average] = 1
      begin
        faktory.run
        continue_write.puts "started"
        while readable_io = IO.select([self_read])
          signal = readable_io.first[0].gets.strip
          raise Interrupt if signal == "TERM"
        end
      rescue Interrupt
      end

      puts "about to call faktory.stop"
      faktory.stop
      exit!
    end
    continue_write.close
    death_read.close
    @worker_lifeline = death_write

    raise "Failed to start worker" unless continue_read.gets == "started\n"
  end

  def stop_workers_hard
    if @pid
      Process.kill "TERM", @pid
      Process.wait @pid
    end
  end