jondot / sneakers

A fast background processing framework for Ruby and RabbitMQ
https://github.com/jondot/sneakers
MIT License
2.25k stars 332 forks source link

Lost message with multiprocess? #51

Closed wangli3274 closed 10 years ago

wangli3274 commented 10 years ago

Run this [WORKERS=TradeWorker rake sneakers:run] command first. the work coding :

class TradeWorker
    QUEUENAME = "TradeWorker"
    include Sneakers::Worker
    from_queue QUEUENAME,
        # :workers => 1, # 进程数,默认为4
        :durable => true,
        :ack => true,
        :threads => 50, # default is 10
        :prefetch => 50, # A good practice is to set up a prefetch policy against RabbitMQ of at least the amount of threads involved
        :start_worker_delay => 0 # 默认0.2
        # :exchange => QUEUENAME
        # :timeout_job_after => 10

    def work(msg)
        t = Time.now.to_f
        a,b = msg.split(":")
        out = {
            :id => a,
            :push => b,
            :pull => t,
            :cost_time => t - b.to_f
        }
        Rails.logger.info out

        # Trade::Trade.tb_trade_create(id) unless id.blank?
        ack! #  表示 处理结束
    end 
end

then publish 10000 messages into the queue, used :

10000.times do |n|
     Sneakers.publish("#{n}:#{Time.now.to_f}", :to_queue => TradeWorker::QUEUENAME)
end

the lost message problem happend!

there is no problem when publish 10000 messages into the queue,used

conn = Bunny.new#("amqp://guest:guest@localhost:5672")
conn.start
channel  = conn.create_channel
queue  = channel.queue(TradeWorker::QUEUENAME, :durable => true)
10000.times do |n|
     # Sneakers.publish("#{n}:#{Time.now.to_f}", :to_queue => TradeWorker::QUEUENAME)
    queue.publish("#{n}:#{Time.now.to_f}", :routing_key => TradeWorker::QUEUENAME, persistent: true)
end
channel.close

there is also no problem when set the Sneakers.configure workers to 1.

changed the open source [~/.rvm/gems/ruby-2.0.0-p451/gems/sneakers-0.0.7/lib/sneakers/queue.rb] into:

def subscribe(worker)
    @bunny = Bunny.new(@opts[:amqp], :vhost => @opts[:vhost], :heartbeat => @opts[:heartbeat])
    @bunny.start

    @channel = @bunny.create_channel
    @channel.prefetch(@opts[:prefetch])

    @exchange = @channel.exchange(@opts[:exchange],
                                  :type => @opts[:exchange_type],
                                  :durable => @opts[:durable])

    handler = @handler_klass.new(@channel)

    queue = @channel.queue(@name, :durable => @opts[:durable])
    queue.bind(@exchange, :routing_key => @name)

    @consumer = queue.subscribe(:block => false, :ack => @opts[:ack]) do | hdr, props, msg |
      handler.acknowledge(hdr.delivery_tag) ######## just receive msg,don't do worker ############ 
      # worker.do_work(hdr, props, msg, handler)
    end
    nil
end

the lost message problem alse happend!

jondot commented 10 years ago

Hi, I'm not aware of any messages getting lost either in testing, or in production either in my own deployment or others running Sneakers. Are you sure there's nothing related to networking that you are bumping into?

kamal-github commented 8 years ago

Hey I am also facing the same issue, After getting 4 messages, now i can see the fifth message in queue in RabbitMQ plugin management, but sneaker is not consuming it.

require 'sneakers'

Sneakers.configure heartbeat: 5,
  amqp: ENV['RABBITMQ'] || 'amqp://*****:****@localhost:5672/',
  workers: 1,
  timeout_job_after: 60,
  threads: 5,
  prefetch: 5,
  after_fork: Proc.new { ActiveRecord::Base.establish_connection },
  durable: true

Sneakers.logger.level = Logger::INFO
michaelklishin commented 8 years ago

@kamal-github you can inspect channels, their QoS prefetch value, and how many messages are pending acknowledgement in the management UI. See RabbitMQ tutorial 2 if something isn't clear.

kamal-github commented 8 years ago

@michaelklishin Is there any limit like if no acknowledgment coming of messages to RabbitMQ, either sneakers stop consuming or Rabbit deny giving it to Sneakers?

Also, I saw management UI, channel has prefetch value = 5 and pending acknowledgement = 0

akram71093 commented 3 years ago

@kamal-github Hello, did you solve this issue ? I am facing the same issue that always two messages are not received by sneakers but are available in rabbitmq. If i have 1 worker then the first two messages are not received, the third one is received and then the next two are not received and so on