stompgem / stomp

A ruby gem for sending and receiving messages from a Stomp protocol compliant message queue. Includes: failover logic, ssl support.
http://stomp.github.com
Apache License 2.0
152 stars 80 forks source link

Subscribers are not receiving all messages that we are publishing #170

Closed sudeeptarlekar closed 3 years ago

sudeeptarlekar commented 3 years ago

We are experiencing weird issue while publishing large number of messages to Stomp client as few of them are getting dropped. We are using ActiveMQ as broker and adding 2 subscribers to the ActiveMQ using gem and then publishing 20-25 messages and we are seeing subscriber in some cases dropping few messages.

We followed example/amqdurasub.rb with minor tweaks, use this gist for complete code.

Sample code we tweaked from example for testing

# Establish the client connection
cli = Stomp::Client.open(hash)
# SUBSCRIBE Headers
sh = { "activemq.subscriptionName" => "subname01" } # REF the 1st AMQ link above
# And the client subscribe
puts "*****#{Thread.current.object_id}******"

cli.subscribe(topic, sh) do |msg|
  file.puts "msg1: Count: #{msg.body}; Thread ID: #{Thread.current.object_id};"
end

hash[:connect_headers][:'client-id'] = 'dursubcli02'

cli2 = Stomp::Client.open(hash)
sh2 = { "activemq.subscriptionName" => "subname01" }

cli2.subscribe(topic, sh2) do |msg|
  file.puts "msg2: Count: #{msg.body}; Thread ID: #{Thread.current.object_id};"
end

25.times { |n| cli.publish('/topic/topicName', "#{n}") }

So as you can see in above example when we give same subscription name in header while subscribing to topic, each subscriber is not receiving all 25 messages. When give different subscription names in headers everything seems to work fine.

But, in theory as we are creating subscriptions from two different clients then those subscribers should receive correct number of messages.

Are we missing something here?

samba4vineti commented 3 years ago

To add to this thread, would appreciate if we can have an example of how multiple subscribers should be registered on a topic. Do we need to create separate STOMP client for each subscriber for the same topic or would just a subscription suffice ?

gmallard commented 3 years ago

Hi -

First: this is really a question for the AMQ developers. STOMP clients have nothing to do with how (or even if) durability is implemented.

Second - I can recreate what you are seeing. But ...... it is a race condition I think. At the end of your code (above) do a

sleep 10

And when the program exits, then look at the AMQ admin screens. With a suitable sleep, I get all messages received by all consumers.

As far as I can tell, all of this is working as designed.

Note that AMQ creates a durable sub. client if you do not specify one on CONNECT. You can see this in the admin screens.

samba4vineti commented 3 years ago

Hi,

Thanks a lot for your response and appreciate it much. FYI : sleep would not work in a production environment and hence looking for alternate solutions. Few Qs :

PaulGale commented 3 years ago

Hi Samba,

What broker are you connecting to? ActiveMQ? If you are then your client must set its prefetch value to 1 (the gem should really do this by default). If it's not set then the default value is 1000 for a queue and 32767 for a topic. The Stomp gem doesn't buffer messages dispatched from the broker the way that a Java implemented JMS based client library would.

On Mon, Sep 21, 2020 at 6:43 PM Samba Pedapalli notifications@github.com wrote:

Hi,

Thanks a lot for your response and appreciate it much. FYI : sleep would not work in a production environment and hence looking for alternate solutions. Few Qs :

  • Could you throw some light on whether these are two separate threads that are reading from their individual sockets ?
  • When we have 2 subscribers on a topic, do we need to create separate Stomp::Client connections OR create one Stomp:Client and subscriber the second Subscriber on the Client ? Would appreciate if you can share a sample code ?

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/stompgem/stomp/issues/170#issuecomment-696417574, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAASDN2K64FEMQV52DLTD2TSG7JILANCNFSM4RSL7MAA .

samba4vineti commented 3 years ago

Thanks Paul. Yes we are using ActiveMQ. Thanks a lot for calling out the pre-fetch size.

PaulGale commented 3 years ago

So by only setting the prefetch to 1 (and no code change in the gem itself) did that fix your problem?

On Wed, Sep 23, 2020 at 2:06 AM Samba Pedapalli notifications@github.com wrote:

Thanks Paul. Yes we are using ActiveMQ. Thanks a lot for calling out the pre-fetch size.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/stompgem/stomp/issues/170#issuecomment-697153676, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAASDN2YYEBDWUO63RQU7UTSHGF7RANCNFSM4RSL7MAA .

samba4vineti commented 3 years ago

Paul, havent tested the change yet. Made the change to our larger codebase and is in the build pipeline. Havent run the above code either. Will confirm once we test the above unit test or have an opportunity to run the prod codebase.

sudeeptarlekar commented 3 years ago

We tried passing activemq.prefetchSize as the header value while subscribing to the topic and found that after adding the value we are able to receive correct number of messages from both client.

Thanks for helping us out here @PaulGale

gmallard commented 3 years ago

Thanks much Paul. Great suggestion.

Regarding making this the default:

I would entertain a modification where the clients asks for this to be the default. Say yet another option in the Connect hash.

I do not want it across the board. There is no sense spamming Rabbit MQ (e.g.) with AMQ specific headers.

@sudeeptarlekar - With no objections I will close this issue in the next few days.