ruby-shoryuken / shoryuken

A super efficient Amazon SQS thread based message processor for Ruby
Other
2.05k stars 278 forks source link

auto_visibility_timeout uses queue in options instead of receiving queue #52

Closed kookster closed 9 years ago

kookster commented 9 years ago

In Processor.auto_visibility_timeout, there is a queue passed in as the first arg, but this is not used for the visibility_timeout_heartbeat:

    def auto_visibility_timeout(queue, sqs_msg, worker_class)
      if worker_class.auto_visibility_timeout?
        timer = every(worker_class.visibility_timeout_heartbeat) do

Instead, when it gets the value from Worker.extended_visibility_timeout, it uses the queue from the shoryuken options to get the visibility_timeout:

      def extended_visibility_timeout
        Shoryuken::Client.visibility_timeout(get_shoryuken_options['queue'])
      end

However, the actual receiving queue, passed in to auto_visibility_timeout, may well not match the queue from get_shoryuken_options.

phstc commented 9 years ago

@kookster hm... in which cases the queue will not match?

kookster commented 9 years ago

Thanks @phstc - here's my scenario: I have different priority tasks, but they are all processed by the same task worker.

When I drop messages, they go on one of 4 queues depending on the task and who requested it: p1, p2, p3, and p3. All messages on any of these queues are processed using the same TaskWorker class. As a result, any given message being processed by that one TaskWorker may be any one of those 4 queues.

The correct current queue is passed in to the auto_visibility_timeout method, but then isn't used when determining the visibility_timeout in the heartbeat logic.

kookster commented 9 years ago

I should mention I have written the code to use the queue passed in to auto_visibility_timeout to determine the timeout, and I'm happy to provide it as a PR.

phstc commented 9 years ago

gotcha!

Could you send a PR with the changes you made? :heart:

kookster commented 9 years ago

awesome, yes, working on it now - one quick question - the timeout right now is hard coded to always extend by 5 seconds.

Some of my jobs (transcoding and manipulating > hour long audio files) can take a long up to an hour, so always extending by 5 seconds at a time seems like it could result in many extension messages.

What do you think about making the extension a function of the original timeout? e.g. if the original timeout is ~ 2 minutes, then 5 seconds makes sense, if it is 1 hour, then maybe extend by 1 minute at a time?

phstc commented 9 years ago

The timeout isn't hardcoded to 5 seconds, the extended timeout is the queue default visibility timeout and the heartbeat verifier is 5 seconds less than that, just to give a enough time to update the message visibility before SQS returns the message to the queue.

So if your queue visibility timeout is 60 seconds, when a job reaches 55 seconds and it still not done, shoryuken will extend it for more 60 seconds.

phstc commented 9 years ago

https://github.com/phstc/shoryuken/wiki/Worker-options#auto_visibility_timeout

kookster commented 9 years ago

I totally read that wrong, never mind!

phstc commented 9 years ago

As a result, any given message being processed by that one TaskWorker may be any one of those 4 queues.

How do you do to process messages from multiple queues via TaskWorker? A worker is defined to listen messages from just one queue.

You can have multiple workers for the same queue, but not multiple queues for the same worker.

def extended_visibility_timeout
  Shoryuken::Client.visibility_timeout(get_shoryuken_options['queue'])
end

Assuming that one worker can have only one queue: get_shoryuken_options['queue'] should be the same queue as the queue passed in auto_visibility_timeout.

kookster commented 9 years ago

The processor is listening to the queue, right?

And it looks up which worker to use (fetch_worker) based on the shoryuken_class, so I can have the TaskWorker used to process a message so long as I specify shoryuken_class.

Am I missing something?

kookster commented 9 years ago

yes, if I was only using perform_async, it always passes in the queue for the worker.

However, I should mention I have my own publish method to do this, based on perform_async, which looks like this:

  def self.shoryuken_publish(queue, message, options)
    options[:message_attributes] ||= {}
    options[:message_attributes]['shoryuken_class'] = {
      string_value: self.to_s,
      data_type: 'String'
    }
    logger.error("\nshoryuken_publish: #{shoryuken_q(queue)}, m: #{message.inspect}, o: #{options.inspect}\n")
    Shoryuken::Client.send_message(shoryuken_q(queue), message, options)
  end

  def self.shoryuken_q(queue)
    "#{SystemInformation.env}_#{queue}"
  end
kookster commented 9 years ago

This is no longer an issue, the aws-sdk-2 upgrade PR #39 took care of this in this commit:

https://github.com/phstc/shoryuken/commit/74732c695c2cb3ca83863199d076f32f87e27bb0

Now it uses the queue to look up the timeout:

https://github.com/phstc/shoryuken/blob/master/lib/shoryuken/processor.rb#L40