Closed ramisra closed 5 years ago
Hi @ratikesh9 could you please share your initialization hash and code? Thanks!
I would have to see:
Writing a robust producer application which uses failover is challenging.
Note that the stomp specification says:
The reliability semantics of the message are also server specific and will depend on the destination value being used and the other message headers such as the transaction header or other server specific message headers.
Hello @gmallard many thanks for your assistance.
We have quite sophisticated workflow. We have a wrapper around activemq producer and consumer. let me show you.
Wrapper read current delayed backed defined in config file and then implement the domain specific logic:
class AsyncHandler::Backend::ActiveMQ < AsyncHandler::Backend::Base
def exec
msg = ::Marshal::dump({ class: @klass.to_s, method: @method.to_sym, args: @args }.with_indifferent_access)
queue = @opts[:queue] || "defaut"
client.publish("/queue/#{queue}", msg)
end
def client
@client ||= AsyncHandler::Client::ActiveMQ.new(logger: Rails.logger)
end
end
Here is a producer code:
class AsyncHandler::Client::ActiveMQ
delegate :publish, :join, :acknowledge, :subscribe, to: :@client
def initialize(config: {}, logger: Logger.new(STDOUT))
config_hash = config.empty? ? Rails.configuration.x.active_mq : config
@client = ::Stomp::Client.new(config_hash.merge(logger: logger))
end
end
Producer initialized with default config:
[1] pry(main)> Rails.configuration.x.active_mq
=> {
:hosts => [
[0] {
:login => nil,
:password => nil,
:host => "localhost",
:port => 61613
}
],
:reliable => true,
:closed_check => false,
:connect_timeout => 5,
:connread_timeout => 3,
:connect_headers => {
:host => "localhost",
:"accept-version" => "1.2"
}
}
Also we have a base consumer:
class BaseWorker
def initialize(queue:, logger:)
@queue = queue
@logger = logger
end
def run
@logger.info "Starting #{self.class.name} at #{Time.now}"
client = AsyncHandler::Client::ActiveMQ.new(logger: @logger)
client.subscribe("/queue/#{@queue}", { ack: self.class.name }) do |message|
body = ::Marshal::load(message.body)
@logger.info "[#{@queue}] got message: #{message.headers['message-id']}, body: #{body}"
execute(body)
# client.acknowledge(message.headers['ack'])
end
rescue => e
@logger.error "Error in message processing! #{e.message} #{e.backtrace.join('\n')}"
end
private
def execute(hash)
klass = hash['class'].classify.constantize
method = hash['method'].to_sym
args = hash['args']
klass.send(method, args)
end
end
It read new message de-marshaling it and execute given method
Also we have simple class Foo
for testing:
class Foo
def self.bar(pld)
File.open("foo.log", "a") do |f|
f.write "Foo class triggered with payload #{pld}\n"
end
end
end
If I generate let's say 10 messages all working fine:
10.times{|i| Foo.run_async(:bar, {queue: 'default'}, "Some payload #{i}") }
[default] got message: ID:MacBookPro-Roman.local-58984-1519877225506-3:3:-1:1:1, body: {"class"=>"Foo", "method"=>:bar, "args"=>["Some payload 0"]}
[default] got message: ID:MacBookPro-Roman.local-58984-1519877225506-3:4:-1:1:1, body: {"class"=>"Foo", "method"=>:bar, "args"=>["Some payload 1"]}
[default] got message: ID:MacBookPro-Roman.local-58984-1519877225506-3:5:-1:1:1, body: {"class"=>"Foo", "method"=>:bar, "args"=>["Some payload 2"]}
[default] got message: ID:MacBookPro-Roman.local-58984-1519877225506-3:6:-1:1:1, body: {"class"=>"Foo", "method"=>:bar, "args"=>["Some payload 3"]}
[default] got message: ID:MacBookPro-Roman.local-58984-1519877225506-3:7:-1:1:1, body: {"class"=>"Foo", "method"=>:bar, "args"=>["Some payload 4"]}
[default] got message: ID:MacBookPro-Roman.local-58984-1519877225506-3:8:-1:1:1, body: {"class"=>"Foo", "method"=>:bar, "args"=>["Some payload 5"]}
[default] got message: ID:MacBookPro-Roman.local-58984-1519877225506-3:9:-1:1:1, body: {"class"=>"Foo", "method"=>:bar, "args"=>["Some payload 6"]}
[default] got message: ID:MacBookPro-Roman.local-58984-1519877225506-3:10:-1:1:1, body: {"class"=>"Foo", "method"=>:bar, "args"=>["Some payload 7"]}
[default] got message: ID:MacBookPro-Roman.local-58984-1519877225506-3:11:-1:1:1, body: {"class"=>"Foo", "method"=>:bar, "args"=>["Some payload 8"]}
[default] got message: ID:MacBookPro-Roman.local-58984-1519877225506-3:12:-1:1:1, body: {"class"=>"Foo", "method"=>:bar, "args"=>["Some payload 9"]}
But if I fire 1000 messages:
[3] pry(main)> 1000.times{|i| Foo.run_async(:bar, {queue: 'default'}, "Some payload #{i}") }
connect to localhost failed: getaddrinfo: nodename nor servname provided, or not known will retry(#0) in 0.01
connect to localhost failed: getaddrinfo: nodename nor servname provided, or not known will retry(#1) in 0.02
connect to localhost failed: getaddrinfo: nodename nor servname provided, or not known will retry(#2) in 0.04
connect to localhost failed: getaddrinfo: nodename nor servname provided, or not known will retry(#3) in 0.08
connect to localhost failed: getaddrinfo: nodename nor servname provided, or not known will retry(#4) in 0.16
connect to localhost failed: getaddrinfo: nodename nor servname provided, or not known will retry(#5) in 0.32
connect to localhost failed: getaddrinfo: nodename nor servname provided, or not known will retry(#6) in 0.64
^C/Users/retgoat/.rvm/gems/ruby-2.4.1@nebenan/gems/stomp-1.4.4/lib/stomp/client.rb:103: warning: constant ::TimeoutError is deprecated
Interrupt:
from /Users/retgoat/.rvm/gems/ruby-2.4.1@nebenan/gems/stomp-1.4.4/lib/connection/utils.rb:150:in `sleep'
[4] pry(main)>
Please note, if I interrupt it with ctrl+c I can see warning: constant ::TimeoutError is deprecated
Last message in log is:
...
[default] got message: ID:MacBookPro-Roman.local-58984-1519877225506-3:237:-1:1:1, body: {"class"=>"Foo", "method"=>:bar, "args"=>["Some payload 224"]}
[default] got message: ID:MacBookPro-Roman.local-58984-1519877225506-3:238:-1:1:1, body: {"class"=>"Foo", "method"=>:bar, "args"=>["Some payload 225"]}
I'm using ActiveMQ v5.15.2 from homebrew with default configuration
Please find the logs and activemq config in the gist https://gist.github.com/retgoat/e77834f8722f2e796f1f2784bf9392e0
Thanks a lot!
Thanks for all the documentation. It is good background.
It is obvious that I will not able to recreate the complexity of your environment.
I can not recreate what you are seeing. I just pushed 100,000 messages at several different brokers, in elapsed times from 65 seconds to 130 seconds (depending on the broker):
AMQ:
I, [2018-03-03T13:28:19.364612 #19992] INFO -- : Main end, message count 100000, elapsed time 133.6205415725708sec
Apollo:
I, [2018-03-03T13:32:21.693268 #20230] INFO -- : Main end, message count 100000, elapsed time 66.69967222213745sec
Artemis:
I, [2018-03-03T13:45:23.339450 #20777] INFO -- : Main end, message count 100000, elapsed time 66.59979200363159sec
My AMQ environment clearly needs some tuning.
A couple of questions, in no particular order:
Can you recreate this outside of a Rails environment?
Can you recreate this if you just shut your consumer off?
Can you try and recreate this with :relaible => false? I would be interested to see if we can get a stack trace for the very original failure.
Is your publisher 'in transaction'? And coded with retry logic if a reconnect actually occurs?
I really suggest that you try to understand the very original failure. That might give a clue as to how to alleviate this problem.
I am not surprised you are seeing dropped messages. It seems your situation is:
In this case any messages that are 'in flight' at the time of connection close are bound to be lost. At least it seems that way to me.
Try to think about some simple code that recreates this, and that you could share with me.
Thanks, Guy
Hey @gmallard!
Many thanks for your help. I found the issue and it was on my side. I did not close the producer's socket after message sending and completely left it without attention.
Open sockets limited to 256 in MacOS (It seems by default, because I didn't change it)
So, I changed the code to the following and all works like a charm!
# client
class AsyncHandler::Backend::ActiveMQ < AsyncHandler::Backend::Base
def exec
msg = ::Marshal::dump({ class: @klass.to_s, method: @method.to_sym, args: @args }.with_indifferent_access)
queue = @opts[:queue] || "defaut"
client.publish("/queue/#{queue}", msg)
client.close # this line was missing
end
def client
@client ||= AsyncHandler::ActiveMQ::Client.new(logger: Rails.logger)
end
end
# base worker
class AsyncHandler::ActiveMQ::Workers::BaseWorker
def initialize(queue:, logger:)
@queue = queue
@logger = logger
end
def run
id = SecureRandom.uuid
@logger.info "Starting #{self.class.name} with client ID: #{id} at #{Time.now}"
client = AsyncHandler::ActiveMQ::Client.new(logger: @logger)
@logger.info "Connected to #{client.connection_frame.headers['server']}"\
" server with STOMP #{client.connection_frame.headers['version']}"
client.subscribe("/queue/#{@queue}", { 'id' => id, 'ack' => 'client' } ) do |message|
body = ::Marshal::load(message.body)
@logger.info "[#{Thread.current.object_id}-#{@queue}] got message: #{message.headers['message-id']}, body: #{body}"
execute(body)
client.acknowledge(message, message.headers)
end
rescue => e
@logger.error "Error in message processing! #{e.message} #{e.backtrace.join('\n')}"
end
private
def execute(hash)
klass = hash['class'].classify.constantize
method = hash['method'].to_sym
args = hash['args']
klass.send(method, args)
end
end
Now all messages processed once with the 4 workers running:
[1] pry(main)> 4.times{ AsyncHandler::ActiveMQ::Workers::DefaultQueueWorker.new.run }
Starting AsyncHandler::ActiveMQ::Workers::DefaultQueueWorker with client ID: 06820380-8e6c-42cd-808d-2d2616f2115e at 2018-03-05 12:13:40 +0700
Connected to ActiveMQ/5.15.2 server with STOMP 1.2
Starting AsyncHandler::ActiveMQ::Workers::DefaultQueueWorker with client ID: 9226b137-a2bf-4316-b41e-bd1c39b6a771 at 2018-03-05 12:13:40 +0700
Connected to ActiveMQ/5.15.2 server with STOMP 1.2
Starting AsyncHandler::ActiveMQ::Workers::DefaultQueueWorker with client ID: f4ecd9f6-2266-4d51-8d51-3639a6028b67 at 2018-03-05 12:13:40 +0700
Connected to ActiveMQ/5.15.2 server with STOMP 1.2
Starting AsyncHandler::ActiveMQ::Workers::DefaultQueueWorker with client ID: e2bf2e85-6613-4786-9272-c118207238b5 at 2018-03-05 12:13:40 +0700
Connected to ActiveMQ/5.15.2 server with STOMP 1.2
1000.times{|i|Foo.run_async(:bar, {queue: 'default'}, "#{i} of 10") }
....
[70110519215960-default] got message: ID:MacBookPro-Roman.local-58984-1519877225506-3:7457:-1:1:1, body: {"class"=>"Foo", "method"=>:bar, "args"=>["990 of 10"]}
[70110544378640-default] got message: ID:MacBookPro-Roman.local-58984-1519877225506-3:7458:-1:1:1, body: {"class"=>"Foo", "method"=>:bar, "args"=>["991 of 10"]}
[70110556154480-default] got message: ID:MacBookPro-Roman.local-58984-1519877225506-3:7459:-1:1:1, body: {"class"=>"Foo", "method"=>:bar, "args"=>["992 of 10"]}
[70110462346820-default] got message: ID:MacBookPro-Roman.local-58984-1519877225506-3:7460:-1:1:1, body: {"class"=>"Foo", "method"=>:bar, "args"=>["993 of 10"]}
[70110519215960-default] got message: ID:MacBookPro-Roman.local-58984-1519877225506-3:7461:-1:1:1, body: {"class"=>"Foo", "method"=>:bar, "args"=>["994 of 10"]}
[70110544378640-default] got message: ID:MacBookPro-Roman.local-58984-1519877225506-3:7462:-1:1:1, body: {"class"=>"Foo", "method"=>:bar, "args"=>["995 of 10"]}
[70110556154480-default] got message: ID:MacBookPro-Roman.local-58984-1519877225506-3:7463:-1:1:1, body: {"class"=>"Foo", "method"=>:bar, "args"=>["996 of 10"]}
[70110462346820-default] got message: ID:MacBookPro-Roman.local-58984-1519877225506-3:7464:-1:1:1, body: {"class"=>"Foo", "method"=>:bar, "args"=>["997 of 10"]}
[70110519215960-default] got message: ID:MacBookPro-Roman.local-58984-1519877225506-3:7465:-1:1:1, body: {"class"=>"Foo", "method"=>:bar, "args"=>["998 of 10"]}
Thanks again!
If you are satisfied with the current status, can you please close this issue?
@gmallard I would love to, but I can't. Ticked opened by another person @ratikesh9
@ratikesh9 - please either:
I will leave this open for a while longer .....
Closing due to lack of response from OP.
We are publishing 1000 message/second using above library and some of the messages are getting dropped . Is this issue in library or whether I am missing some parameter tweak?