stompgem / stomp

A ruby gem for sending and receiving messages from a Stomp protocol compliant message queue. Includes: failover logic, ssl support.
http://stomp.github.com
Apache License 2.0
152 stars 80 forks source link

Unexpected ACK received for message-id #149

Closed retgoat closed 6 years ago

retgoat commented 6 years ago

Hello, sometimes I'm getting ACK error messages from a consumer. After googling a bit I found that I need to pass ACK header in acknowledge method.

I doing it but anyways. Could you please advice? Here is the code:

# consumer
  def run
    id = SecureRandom.uuid
    @logger.info "Starting #{self.class.name} with client ID: #{id} at #{Time.now}"
    @client = AsyncHandler::ActiveMQ::Client.new(logger: @logger)
    @logger.info "Connected to #{@client.connection_frame.headers['server']}"\
                 " server with STOMP #{@client.connection_frame.headers['version']}"

    @client.subscribe("/queue/#{@queue}", { 'id' => id, 'ack' => 'client' } ) do |message|
      body = ::Marshal::load(message.body)
      @logger.info "[#{Thread.current.object_id}-#{@queue}-#{id}] got message: #{message.headers['message-id']}, body: #{body}"
      execute(body)
      @client.acknowledge(message, message.headers)
    end
  rescue => e
    @logger.error "Error in message processing! #{e.message} #{e.backtrace.join('\n')}"
 end

Here is the error message

/Users/retgoat/.rvm/gems/ruby-2.4.1@nebenan/gems/rake-12.2.1/lib/rake/trace_output.rb:20:in `write': Unexpected ACK received for message-id [ID:MacBookPro-Roman.local-58984-1519877225506-3:9789:-1:1:14] (Stomp::Error::ProtocolException)

Thanks a lot!

gmallard commented 6 years ago

Hi - several points:

Number 1:

Yes:

@client.acknowledge(message, message.headers)

should work with the gem.

I am assuming that your client is compatible with the gem. You do not have an instance of Stomp::Client. So that makes it hard to tell. If you can, show me your 'require's.

Number 2:

With the exception you show (Stomp::Error::ProtocolException), your broker has likely returned an ERROR, and the body of the error contained a Java stacktrace which happened in the broker.

You should modify your broker's configuration to produce detailed stomp logs. Then inspect those logs and examine the stacktrace.

This error make me think that .... something other than ACKs might be wrong.

retgoat commented 6 years ago

Here is the messages from ActiveMQ log

2018-03-14 14:13:41,897 | WARN  | Exception occurred processing: ACK -> org.apache.activemq.transport.stomp.ProtocolException: Unexpected ACK received for message-id [ID:MacBookPro-Roman.local-50431-1521011395844-3:3:-1:1:2] | org.apache.activemq.transport.stomp.ProtocolConverter | ActiveMQ Transport: tcp:///0:0:0:0:0:0:0:1:50489@61613
2018-03-14 14:13:42,269 | WARN  | Exception occurred processing: ACK -> org.apache.activemq.transport.stomp.ProtocolException: Unexpected ACK received for message-id [ID:MacBookPro-Roman.local-50431-1521011395844-3:3:-1:1:4] | org.apache.activemq.transport.stomp.ProtocolConverter | ActiveMQ Transport: tcp:///0:0:0:0:0:0:0:1:50489@61613
2018-03-14 14:13:42,594 | WARN  | Exception occurred processing: ACK -> org.apache.activemq.transport.stomp.ProtocolException: Unexpected ACK received for message-id [ID:MacBookPro-Roman.local-50431-1521011395844-3:3:-1:1:6] | org.apache.activemq.transport.stomp.ProtocolConverter | ActiveMQ Transport: tcp:///0:0:0:0:0:0:0:1:50489@61613
2018-03-14 14:13:42,956 | WARN  | Exception occurred processing: ACK -> org.apache.activemq.transport.stomp.ProtocolException: Unexpected ACK received for message-id [ID:MacBookPro-Roman.local-50431-1521011395844-3:3:-1:1:8] | org.apache.activemq.transport.stomp.ProtocolConverter | ActiveMQ Transport: tcp:///0:0:0:0:0:0:0:1:50489@61613
2018-03-14 14:13:43,329 | WARN  | Exception occurred processing: ACK -> org.apache.activemq.transport.stomp.ProtocolException: Unexpected ACK received for message-id [ID:MacBookPro-Roman.local-50431-1521011395844-3:3:-1:1:10] | org.apache.activemq.transport.stomp.ProtocolConverter | ActiveMQ Transport: tcp:///0:0:0:0:0:0:0:1:50489@61613

Here is the client initializetion code

class AsyncHandler::ActiveMQ::Client
  delegate :publish, :join, :acknowledge, :subscribe, :connection_frame, :uuid,  :close, to: :@client

  def initialize(config: {}, logger: Logger.new(STDOUT))
    config_hash = config.empty? ? Rails.configuration.x.active_mq : config
    @client = ::Stomp::Client.new(config_hash.merge(logger: logger))
  rescue => e
    logger.error("#{e.message} #{e.backtrace.join('\n')}")
  end
end

And the worker code:

class AsyncHandler::ActiveMQ::Workers::BaseWorker

  def initialize(queue:, logger:)
    @queue = queue
    @logger = logger
  end

  def run
    id = SecureRandom.uuid
    @logger.info "Starting #{self.class.name} with client ID: #{id} at #{Time.now}"
    @client = AsyncHandler::ActiveMQ::Client.new(logger: @logger)
    @logger.info "Connected to #{@client.connection_frame.headers['server']}"\
                 " server with STOMP #{@client.connection_frame.headers['version']}"

    @client.subscribe("/queue/#{@queue}", { 'id' => id, 'ack' => 'client' } ) do |message|
      body = ::Marshal::load(message.body)
      @logger.info "[#{@queue}-#{id}] got message: #{message.headers['message-id']}, body: #{body}"
      execute(body)
      @client.acknowledge(message, message.headers)
    end
  rescue => e
    @logger.error "Error in message processing! #{e.message} #{e.backtrace.join('\n')}"
  end

  def close
    @client.close
  end

  private

  def execute(hash)
    ActiveRecord::Base.connection.initialize_shards(Octopus.config)
    klass = hash['class'].classify.constantize
    method = hash['method'].to_sym
    args = hash['args']
    if args.dup.flatten.empty?
      klass.send(method)
    else
      klass.send(method, *args)
    end
  end
end

Thanks for assistance!

retgoat commented 6 years ago

One more addition. It seems this is normal for client ack mode. Please see https://markmail.org/thread/4am76tb6ilzqzgul#query:+page:1+mid:7vqxye63a5ikl3gm+state:results I tryed to use client-individual ack mode as well, but with no luck again. Any ideas? Thanks!

gmallard commented 6 years ago

My ideas are that your code is running ACK's out of order (with :ackmode => "client"). That is the only way I can recreate that AMQ message here.

You need to figure out why that is happening - and not do it.

Get some additional debugging information.

Suggestion 1:

More STOMP logging. Here is how (this is also explained on the AMQ web site).

Modify your activemq.xml configuration file to add STOMP tracing. The trace=true part in this example:

<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600&amp;trace=true"/>

Also modify the log4j.properties file that AMQ uses. Add lines like the following:

log4j.logger.org.apache.activemq.transport.stomp=TRACE, stomp
log4j.appender.stomp=org.apache.log4j.RollingFileAppender
log4j.appender.stomp.file=${activemq.base}/data/stomp.log
log4j.appender.stomp.maxFileSize=4096KB
log4j.appender.stomp.maxBackupIndex=5
log4j.appender.stomp.append=true
log4j.appender.stomp.layout=org.apache.log4j.PatternLayout
log4j.appender.stomp.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
log4j.additivity.org.apache.activemq.transport.stomp=false

When problems occur, look in the stomp.log file for clues. This is a great debugging tool.

Suggestion 2

Add detection of ERROR frames when you are processing messages. You really should do this all the time. Something like this example:

c.subscribe("/queue/aq", :ack => "client", :id => "subscription-id") {|m|
  puts "Msg: #{m.inspect}"
  raise "ERROR detected" if m.command == "ERROR"
  # Normal processing follows .....
}

I can not recreate this at all using "client-individual". That seems to indicate that there is some logic in your code that is ..... not well understood. With "client-individual" ACK order should not matter.

retgoat commented 6 years ago

@gmallard thank a lot! I clarified that the above behavior only reproduces when I starting more than one consumer per queue So, I think this is a kind of race conditions.

here is the trace from activemq

2018-03-16 15:06:28,834 [0:1:55838@61613] WARN  ProtocolConverter              - Exception occurred processing: ACK -> org.apache.activemq.transport.stomp.ProtocolException: Unexpected ACK received for message-id [ID:MacBookPro-Roman.local-52351-1521179472444-3:132:-1:1:179]
2018-03-16 15:06:28,834 [0:1:55838@61613] DEBUG ProtocolConverter              - Exception detail
org.apache.activemq.transport.stomp.ProtocolException: Unexpected ACK received for message-id [ID:MacBookPro-Roman.local-52351-1521179472444-3:132:-1:1:179]
    at org.apache.activemq.transport.stomp.ProtocolConverter.onStompAck(ProtocolConverter.java:475)[activemq-stomp-5.15.2.jar:5.15.2]
    at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:250)[activemq-stomp-5.15.2.jar:5.15.2]
    at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:85)[activemq-stomp-5.15.2.jar:5.15.2]
    at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)[activemq-client-5.15.2.jar:5.15.2]
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)[activemq-client-5.15.2.jar:5.15.2]
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)[activemq-client-5.15.2.jar:5.15.2]
    at java.lang.Thread.run(Thread.java:745)[:1.8.0_112]
2018-03-16 15:06:28,834 [0:1:55838@61613] TRACE ProtocolConverter              - Command that caused the error: ACK
content-length:0
expires:0
destination:/queue/heavy
ack:ID:MacBookPro-Roman.local-52351-1521179472444-229:15
subscription:b36c5538-d508-40a6-8e3a-f37edf588753
priority:4
message-id:ID:MacBookPro-Roman.local-52351-1521179472444-3:132:-1:1:179
content-type:text/plain; charset=UTF-8
id:ID:MacBookPro-Roman.local-52351-1521179472444-229:15
timestamp:1521187580962

Anyways, thanks for your help!

Would be great and very appriciated if you could advise something in this situation.

Anyways I'll close the ticket when I'll find the solution

gmallard commented 6 years ago

If all of the following are true:

1) :ackmode => "client" 2) SUBSCRIBE an a given queue is called multiple times

You are in a situation where multiple threads (the SUBSCRIBE callback) are receiving simultaneously from the same queue. (the callbacks are separate threads.) If the callback logic includes sending ACKS, then you are for sure in a situation with a race condition.

I am thinking you should:

Call this method from the SUBSCRIBE callback.

With ActiveMQ you should be able to examine the 'message-id' and determine if a subsequent message has already been ACK'd. Or you could add a user header when the messages are originally written to track this (example: :msgseq => #####), and examine that data.

As I said above, if you use :ackmode => "client-individual" this should not be required. Examine the stomp.log file to make sure that your SUBSCRIBE's use the correct ackmode. Note: in the scenario you describe do not use different ackmode's for the different SUBSCRIBEs. That is just asking for trouble.

A curiosity question: is this a JRuby engine or standard Ruby?

retgoat commented 6 years ago

Thanks @gmallard I came to the same conclusion. So I solved the issue by forking the processes like this:

module AsyncHandler::ActiveMQ
  class Processor

    def self.run(concurrency: 2, daemon: false, debug: false)
      concurrency.times do |i|
        fork do
          self.new(concurrency: concurrency, daemon: daemon, stat: i, debug: debug).start
        end
      end
      Process.waitall
    rescue Interrupt
      @logger.info "Shutting down"
      shut_down
      exit(0)
    end

    def initialize(concurrency: 2, daemon: false, stat:, debug: false)
      @concurrency     = concurrency.to_i
      @daemon          = daemon
      @stat            = stat
      @debug           = debug
      @logger          = Logger.new("#{Rails.root}/log/active_mq.log")
      @pidfile         = "#{Rails.root}/tmp/pids/active_mq_#{@stat}.pid"
      @workers_started = nil
    end

    def start
      @logger.info "Starting #{@concurrency} workers #{@daemon ? 'in daemon mode' : ''} of each of #{workers.map { |w| w.class.name }.join(", ")}"
      check_pid
      daemonize if @daemon
      write_pid
      trap_signals
      redirect_output
      start_workers

      while !@quit
        sleep(1)
      end

      if @quit
        shut_down
      end
    rescue Interrupt
      @logger.info "Shutting down"
      shut_down
      exit(0)
    end

    private

    # Some non important stuff here

    def workers
      @workers ||= [AsyncHandler::ActiveMQ::Workers::DefaultQueueWorker,
                    AsyncHandler::ActiveMQ::Workers::HeavyQueueWorker].map { |w| w.new(logger: @logger, debug: @debug) }
    end

    def start_workers
      workers.each { |w| w.run }
      @workers_started = true
    end
  end
end

So, now all working perfectly! Many thanks!