ruby-shoryuken / shoryuken

A super efficient Amazon SQS thread based message processor for Ruby. This project is in MAINTENANCE MODE. Reach me out on Slack link on the description if you want to become a new maintainer.
Other
2.06k stars 280 forks source link

Concurrency not working as expected #624

Closed dinesh-rdk closed 4 years ago

dinesh-rdk commented 4 years ago

Hi @phstc Thank you for this wonderful library. I have been using RabbitMQ for queuing messages and now I am thinking of shifting to SQS.

I tested in my local with mock server as suggested in https://github.com/phstc/shoryuken/wiki/Using-a-local-mock-SQS-server

I have a lot (67) queues & I want them all to be independent (i.e. Don't want any queue to wait for any other queue to process)

So I configured shoryuken as

groups:
  group1:
    concurrency: 50
    queues:
      - Queue1
  group2:
    concurrency: 50
    queues:
      - Queue2
  group3:
    concurrency: 20
    queues:
      - Queue3
  group4:
    concurrency: 20
    queues:
      - Queue4
  group5:
    concurrency: 20
    queues:
      - Queue5
  group6:
    concurrency: 20
    queues:
      - Queue6
  group7:
    concurrency: 2
    queues:
      - Queue7
  group8:
    concurrency: 2
    queues:
      - Queue8
  group9:
    concurrency: 2
    queues:
      - Queue9
  group10:
    concurrency: 2
    queues:
      - Queue10
  group11:
    concurrency: 2
    queues:
      - Queue11
  group12:
    concurrency: 2
    queues:
      - Queue12
  group13:
    concurrency: 2
    queues:
      - Queue13
  group14:
    concurrency: 2
    queues:
      - Queue14
  group15:
    concurrency: 2
    queues:
      - Queue15
  group16:
    concurrency: 2
    queues:
      - Queue16
  group17:
    concurrency: 2
    queues:
      - Queue17
  group18:
    concurrency: 2
    queues:
      - Queue18
  group19:
    concurrency: 2
    queues:
      - Queue19
  group20:
    concurrency: 8
    queues:
      - Queue20
  group21:
    concurrency: 2
    queues:
      - Queue21
  group22:
    concurrency: 4
    queues:
      - Queue22
  group23:
    concurrency: 4
    queues:
      - Queue23
  group24:
    concurrency: 2
    queues:
      - Queue24
  group25:
    concurrency: 4
    queues:
      - Queue25
  group26:
    concurrency: 2
    queues:
      - Queue26
  group27:
    concurrency: 2
    queues:
      - Queue27
  group28:
    concurrency: 2
    queues:
      - Queue28
  group29:
    concurrency: 2
    queues:
      - Queue29
  group30:
    concurrency: 2
    queues:
      - Queue30
  group31:
    concurrency: 2
    queues:
      - Queue31
  group32:
    concurrency: 2
    queues:
      - Queue32
  group33:
    concurrency: 2
    queues:
      - Queue33
  group34:
    concurrency: 2
    queues:
      - Queue34
  group35:
    concurrency: 2
    queues:
      - Queue35
  group36:
    concurrency: 2
    queues:
      - Queue36
  group37:
    concurrency: 2
    queues:
      - Queue37
  group38:
    concurrency: 2
    queues:
      - Queue38
  group39:
    concurrency: 2
    queues:
      - Queue39
  group40:
    concurrency: 2
    queues:
      - Queue40
  group41:
    concurrency: 2
    queues:
      - Queue41
  group42:
    concurrency: 2
    queues:
      - Queue42
  group43:
    concurrency: 2
    queues:
      - Queue43
  group44:
    concurrency: 2
    queues:
      - Queue44
  group45:
    concurrency: 2
    queues:
      - Queue45
  group46:
    concurrency: 12
    queues:
      - Queue46
  group47:
    concurrency: 2
    queues:
      - Queue47
  group48:
    concurrency: 2
    queues:
      - Queue48
  group49:
    concurrency: 2
    queues:
      - Queue49
  group50:
    concurrency: 4
    queues:
      - Queue50
  group51:
    concurrency: 2
    queues:
      - Queue51
  group52:
    concurrency: 2
    queues:
      - Queue52
  group53:
    concurrency: 4
    queues:
      - Queue53
  group54:
    concurrency: 2
    queues:
      - Queue54
  group55:
    concurrency: 2
    queues:
      - Queue55
  group56:
    concurrency: 2
    queues:
      - Queue56
  group57:
    concurrency: 6
    queues:
      - Queue57
  group58:
    concurrency: 2
    queues:
      - Queue58
  group59:
    concurrency: 2
    queues:
      - Queue59
  group60:
    concurrency: 2
    queues:
      - Queue60
  group61:
    concurrency: 2
    queues:
      - Queue61
  group62:
    concurrency: 2
    queues:
      - Queue62
  group63:
    concurrency: 4
    queues:
      - Queue63
  group64:
    concurrency: 4
    queues:
      - Queue64
  group65:
    concurrency: 8
    queues:
      - Queue65
  group66:
    concurrency: 4
    queues:
      - Queue66
  group67:
    concurrency: 2
    queues:
      - Queue67

By default delay is 0. So I expect shoryuken to poll messages from each queue in SQS every second.

But the logs in mock server shows ~20 requests per second on average.

127.0.0.1 - - [07/Aug/2020 10:43:32] "POST /123456789012/development_Queue27 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:32] "POST /123456789012/development_Queue19 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:33] "POST /123456789012/development_Queue29 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:33] "POST /123456789012/development_Queue44 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:33] "POST /123456789012/development_Queue46 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:33] "POST /123456789012/development_Queue63 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:33] "POST /123456789012/development_Queue48 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:33] "POST /123456789012/development_Queue1 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:33] "POST /123456789012/development_Queue37 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:33] "POST /123456789012/development_Queue55 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:33] "POST /123456789012/development_Queue3 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:33] "POST /123456789012/development_Queue38 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:33] "POST /123456789012/development_Queue23 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:33] "POST /123456789012/development_Queue7 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:33] "POST /123456789012/development_Queue40 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:33] "POST /123456789012/development_Queue10 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:33] "POST /123456789012/development_Queue12 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:33] "POST /123456789012/development_Queue9 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:33] "POST /123456789012/development_Queue11 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:33] "POST /123456789012/development_Queue26 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:33] "POST /123456789012/development_Queue22 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:34] "POST /123456789012/development_Queue24 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:34] "POST /123456789012/development_Queue17 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:34] "POST /123456789012/development_Queue21 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:34] "POST /123456789012/development_Queue5 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:34] "POST /123456789012/development_Queue56 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:34] "POST /123456789012/development_Queue4 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:34] "POST /123456789012/development_Queue34 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:34] "POST /123456789012/development_Queue20 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:34] "POST /123456789012/development_Queue13 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:34] "POST /123456789012/development_Queue49 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:34] "POST /123456789012/development_Queue2 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:34] "POST /123456789012/development_Queue53 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:34] "POST /123456789012/development_Queue43 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:34] "POST /123456789012/development_Queue35 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:34] "POST /123456789012/development_Queue6 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:34] "POST /123456789012/development_Queue32 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:34] "POST /123456789012/development_Queue47 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:34] "POST /123456789012/development_Queue66 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:34] "POST /123456789012/development_Queue59 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:34] "POST /123456789012/development_Queue36 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:34] "POST /123456789012/development_Queue33 HTTP/1.0" 200 -
127.0.0.1 - - [07/Aug/2020 10:43:35] "POST /123456789012/development_Queue25 HTTP/1.0" 200 -

Because of it SQS seems very slow compared to Rabbit. Can you please guide me to make here?

dinesh-rdk commented 4 years ago

And this is my initializer file config/initializers/shoryuken.rb

# Create SQS queues
require 'aws-sdk-sqs'
require 'shoryuken'

# We inline the jobs in development & testing
# Start mock server by "moto_server sqs -p 4576 -H localhost"
if Rails.env.development? || Rails.env.test?
  client = Aws::SQS::Client.new(
    region: ENV["AWS_REGION"],
    access_key_id: ENV["AWS_ACCESS_KEY_ID"],
    secret_access_key: ENV["AWS_SECRET_ACCESS_KEY"],
    endpoint: 'http://localhost:4576',
    verify_checksums: false
  )
  Shoryuken.configure_client do |config|
    config.sqs_client = client
  end
  Shoryuken.configure_server do |config|
    config.sqs_client = client
  end
else
  client = Aws::SQS::Client.new(
    region: ENV['AWS_REGION'],
    access_key_id: ENV['AWS_ACCESS_KEY_ID'],
    secret_access_key: ENV['AWS_SECRET_ACCESS_KEY']
  )
  Shoryuken.sqs_client = client
end

# We have environment prepended queues in ActiveJob
all_queues = <AllQueues[]>
available_queues = client.list_queues.queue_urls.map{ |q| q.split('/').last }
queues_to_create = all_queues - available_queues
queues_to_create.each do |queue|
  begin
    puts "Creating queue #{queue}"
    client.create_queue(
      queue_name: queue,
      attributes: {
        "VisibilityTimeout" => "3600", # 3600 => 60 minutes
      }
    )
  rescue => exception
    puts "Error in creating queue #{queue}"
  end
end

# After creating queues, make sure to subscribe with shoryuken
# RAILS_CACHE_CLASSES=true shoryuken -R -C ./shoryuken.yml

# By default, Shoryuken will make requests against SQS for getting the updated queue visibility timeout, so if you change the visibility timeout in SQS, Shoryuken will automatically update it.
# If you want to reduce the number of requests against SQS (therefore reducing your quota usage), you can disable this behavior by:
Shoryuken.cache_visibility_timeout = true
# Active Job allows you to prefix the queue names of all jobs. Shoryuken supports this behavior natively. By default, though, queue names defined in the config file (or passed to the CLI), are not prefixed in the same way. To have Shoryuken honor Active Job prefixes you must enable that option explicitly
Shoryuken.active_job_queue_name_prefixing = true
phstc commented 4 years ago

Hi

concurrency is the number of threads that Shoryuken will initiate per group. Check this. You are probably getting some I/O waiting.

Understand you probably have some specific needs but having threads allocated per group/per queue, oftentimes is not the best resource usage.

Concurrency works, but there you need to check if your server/container can handle that I/O based on the number of threads.

phstc commented 4 years ago

BTW Shoryuken does batch fetching, so a single request can request up to 10 messages. That depends on the number of available threads.

dinesh-rdk commented 4 years ago

Hi @phstc Thank you for your explanation. I will look forward to modify the way I use it :)

dinesh-rdk commented 4 years ago

Hi @phstc

Just for testing purpose, I modified my code to

class Queue1Job < ActiveJob::Base
  queue_as 'Queue1'

  def perform(message)
    if message.nil?
      10.times { |i| Queue2Job.perform_later(i) }
    else
      sleep(2)
      Queue2Job.perform_later(message)
    end
  end
end

class Queue2Job < ActiveJob::Base
  queue_as 'Queue2'

  def perform(message)
    sleep(2)
    Queue3Job.perform_later(message)
  end
end

class Queue3Job < ActiveJob::Base
  queue_as 'Queue3'

  def perform(message)
    sleep(2)
    Queue1Job.perform_later(message)
  end
end

And my shoryuken.yml config is as

groups:
  group1:
    concurrency: 20
    queues:
      - Queue1
  group2:
    concurrency: 20
    queues:
      - Queue2
  group3:
    concurrency: 20
    queues:
      - Queue3

After running Queue1Job.perform_later(nil) from rails console, the messages started enquing and jobs get executed.

After a minute of running, how many requests per second should Shoryuken send to Mock SQS server?

phstc commented 4 years ago

Hi @dinesh-rdk

Every time you call perform_later it should enqueue a message to SQS.

I also believe concurrency 60 might be a lot of threads depending of the type of processing you are doing. You may want to check htop to see if you are getting some I/O wait.