contribsys / faktory_worker_ruby

Faktory worker for Ruby
GNU Lesser General Public License v3.0
214 stars 31 forks source link

Jobs submitted by other jobs are going to the wrong queue #67

Closed gee-forr closed 2 years ago

gee-forr commented 2 years ago

Hi there,

I have a queue for jobs that's fairly long-running ( > 90s) which goes through a large CSV file and writes its contents to a DB.

At some point in that job, I submit several more jobs (1 per row inserted) to another another set of workers on another queue.

For a while - I can't determine how long or why - jobs submitted by the long running worker will go to the correct queue. After something happens - again, I don't know what that is - the jobs are instead added under the same queue as the long running worker, completely ignoring the faktory_options queue: 'other_queue' setting.

These jobs getting added to the wrong queue clogs up the long running queue, and at the same time, delays the results of the other queue.

Here's a concrete example, this job below takes around 90s ± to run. There's normally a few dozen to process at a time...

class ParseBatchedCSVFileJob
  include Faktory::Job

  faktory_options queue: "default"

  def perform(file)
    return unless File.file?(file)

    job = ::ETL::CSVParser::Process.setup(file_path: file)

    ::ETL::CSVParser::Process.run(job)
  end
end

Somewhere in the above ETL::CSV::Parser code, there is more code that runs this line:

Rating::RateCDR.perform_async(cdr_id)

That class has the following faktory config and code.

module Rating
  class RateCDR
    class Error < StandardError; end

    include Faktory::Job
    extend ::LightService::Organizer

    faktory_options queue: "rating"

    def self.call(cdr)
      with(cdr: cdr).reduce(actions)
    end

    def self.actions
      [::Rating::AssociateRates, ::Rating::CalculateCharges]
    end

    def perform(cdr_id)
      cdr = CDR.find(cdr_id)

      response = self.class.call(cdr)
      raise(Error, response.message) if response.failure?
    end
  end
end

So, what happens is the main job happily processes itself in the default queue, and submits rating jobs per CDR to the rating queue, but after something happens, these jobs get diverted to the default queue as well.

Luckily all jobs are running in the context of a rails app, which has access to all the queue's job code, so even though a Rating job gets erroneously submitted to the default queue, the code can still be executed.

Do you have any idea why jobs aren't being submitted to the queue for which their class says they should go to?

mperham commented 2 years ago

No, I can't explain that behavior.

gee-forr commented 2 years ago

Yeah - hoping there's be a straight-forward answer was a bit of a hail mary from me :)

@mperham - can you think of any possible tips I could employ to help me get to the bottom of what could be the causing this bug?

mperham commented 2 years ago

I don't have any magical debugging tricks. Try to reproduce the issue, fire up pry and start debugging.

On Mon, May 9, 2022 at 11:47 PM Gabriel Fortuna @.***> wrote:

Yeah - hoping there's be a straight-forward answer was a bit of a hail mary from me :)

@mperham https://github.com/mperham - can you think of any possible tips I could employ to help me get to the bottom of what could be the causing this bug?

— Reply to this email directly, view it on GitHub https://github.com/contribsys/faktory_worker_ruby/issues/67#issuecomment-1121993996, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAAAWX3MKBOWZUTFYY2UBPTVJIA65ANCNFSM5VODBKWQ . You are receiving this because you were mentioned.Message ID: @.***>

gee-forr commented 2 years ago

Hey there, just closing the loop on this issue. I unfortunately was not able to get to the bottom of why the jobs where being added to the default queue instead of their rating queue, but I did workaround it by doing the following:

::Rating::RateCDR.set(queue: 'rating').perform_async(cdr_id)

By setting the queue just before submitting the job, it seems to have done the trick.

@mperham - not sure if you think this might be wiki worthy? Let me know if you feel it's valuable enough, and I'll add something to the wiki.

ahacop commented 2 years ago

I think the test below may demonstrate the behavior.

require "helper"

class MergeTest < LiveTest
  class SomeJob
    include Faktory::Job
    faktory_options queue: :some_q, retry: 1, reserve_for: 2, custom: {unique_for: 3}
    def perform(*)
    end
  end

  describe "MergeTest" do
    before do
      require "faktory/testing"
      Faktory::Testing.fake!
    end

    after do
      Faktory::Testing.disable!
      Faktory::Queues.clear_all
    end

    it "raises" do
      threads = []
      threads << Thread.new { 100_000.times { SomeJob.perform_async("example-arg") } }
      threads << Thread.new { 100_000.times { SomeJob.perform_async("example-arg") } }

      err = assert_raises(RuntimeError) { threads.each(&:join) }

      assert_equal "can't add a new key into hash during iteration", err.message
      refute_equal 200_000, Faktory::Queues["some_q"].size
      refute_equal 0, Faktory::Queues["default"].size
    end
  end
end
ahacop commented 2 years ago

ClassMethods.perform_async is called, which calls set with get_faktory_options.

https://github.com/contribsys/faktory_worker_ruby/blob/96f67b77346782e518f97952beb955f6754189bd/lib/faktory/job.rb#L114-L116

set calls deep_merge with get_faktory_options and options which contains get_faktory_options. https://github.com/contribsys/faktory_worker_ruby/blob/96f67b77346782e518f97952beb955f6754189bd/lib/faktory/job.rb#L110-L112

deep_merge iterates over the first reference to get_faktory_options and mutates the second reference to get_faktory_options, which raises the RuntimeError, as demonstrated in the above test. https://github.com/contribsys/faktory_worker_ruby/blob/96f67b77346782e518f97952beb955f6754189bd/lib/faktory/job.rb#L201-L204

but I did workaround it by doing the following:

::Rating::RateCDR.set(queue: 'rating').perform_async(cdr_id)

By setting the queue just before submitting the job, it seems to have done the trick.

Calling set explicitly first works because we bypass the call to ClassMethods.perform_async by calling ClassMethods.set first, which returns a Setter whose Setter.perform_async does not attempt to merge two references to get_faktory_options.

https://github.com/contribsys/faktory_worker_ruby/blob/96f67b77346782e518f97952beb955f6754189bd/lib/faktory/job.rb#L64-L66

One possible solution would be to change ClassMethods.perform_async and ClassMethods.perform_in to call set with an empty hash, as set is already pulling in the default options anyway. I'm happy to write up the PR, but would like @mperham to chime in with his thoughts on how to proceed.

gee-forr commented 2 years ago

Reopening based on feedback from @ahacop

mperham commented 2 years ago

Thank you, fix is on main if you want to test.

ahacop commented 2 years ago

@mperham Thank you for the quick fix!

I think you may want to also change:

https://github.com/contribsys/faktory_worker_ruby/blob/96f67b77346782e518f97952beb955f6754189bd/lib/faktory/job.rb#L120-L123

mperham commented 1 year ago

Where’s the pull request?

On Thu, Jun 9, 2022 at 12:37 Ara Hacopian @.***> wrote:

I think the test below may demonstrate the behavior.

require "helper"

class MergeTest < LiveTest class SomeJob include Faktory::Job faktory_options queue: :some_q, retry: 1, reserve_for: 2, custom: {unique_for: 3} def perform(*) end end

describe "MergeTest" do before do require "faktory/testing" Faktory::Testing.fake! end

after do
  Faktory::Testing.disable!
  Faktory::Queues.clear_all
end

it "raises" do
  threads = []
  threads << Thread.new { 100_000.times { SomeJob.perform_async("example-arg") } }
  threads << Thread.new { 100_000.times { SomeJob.perform_async("example-arg") } }

  err = assert_raises(RuntimeError) { threads.each(&:join) }

  assert_equal "can't add a new key into hash during iteration", err.message
  refute_equal 200_000, Faktory::Queues["some_q"].size
  refute_equal 0, Faktory::Queues["default"].size
end

end end

One fix would be to change:

https://github.com/contribsys/faktory_worker_ruby/blob/96f67b77346782e518f97952beb955f6754189bd/lib/faktory/job.rb#L204 to other_hash = other_hash.transform_keys(&:to_s)

— Reply to this email directly, view it on GitHub https://github.com/contribsys/faktory_worker_ruby/issues/67#issuecomment-1151545967, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAAAWX43DG77RMGTGWVPOIDVOJBX5ANCNFSM5VODBKWQ . You are receiving this because you were mentioned.Message ID: @.***>

ahacop commented 1 year ago

@mperham You fixed it, so no need for a PR. See: https://github.com/contribsys/faktory_worker_ruby/issues/67#issuecomment-1152466693, https://github.com/contribsys/faktory_worker_ruby/commit/cc7d839d5297e1e7fb6ea9b757b30043c86bb16a and https://github.com/contribsys/faktory_worker_ruby/commit/6a580598844ee8db13c83f13d641f638816b1b43.