stompgem / stomp

A ruby gem for sending and receiving messages from a Stomp protocol compliant message queue. Includes: failover logic, ssl support.
http://stomp.github.com
Apache License 2.0
152 stars 80 forks source link

Extra unwanted subscriber created by stomp1.3.4 with activemq on a msg with null header #113

Closed bhuvangu closed 8 years ago

bhuvangu commented 9 years ago

We recently upgraded stomp from 1.1.8 to 1.3.4 with activemq 5.5.1 on a Redhat 6 system.

We have producer and consumer both in ruby stomp

What we observe is that when we send a msg from activemq web console to a topic with empty header the subscriber number increments by one.
Which is not desirable behavior. Is this a stomp bug ?

        **What can be done to avoid it... any patch will help ?**

Upgrading activemq is currently out of scope. Attached are the consumer and producer files

Reader.rb

require 'rubygems'
require 'stomp'
active_mq = Stomp::Client.new("failover:(stomp://localhost:61613)")
def reader(active_mq_broker, topic)
  begin
     active_mq_broker.subscribe(topic, {}) do |msg|
     puts "message: #{msg}"
     $stdout.flush
 end
 active_mq_broker.join()
 active_mq_broker.unsubscribe(topic)
 rescue Exception => e
     puts "#{topic}: #{e}"
     $stdout.flush
 end
 end
  t = Thread.new do reader(active_mq,"/topic/testTopic")
 end
 t.join()

Producer.rb

require 'rubygems'
require 'stomp'
@active_mq_broker_scan = Stomp::Client.open("failover:(stomp://localhost:61613)")
def send_message
  headers = {}
  message = "Scan Log Message"
  puts " message : #{message}"
  $stdout.flush
  @active_mq_broker_scan.publish("/topic/testTopic", message, headers)
end

def run
begin
  while true
    send_message
    sleep(1)
  end
rescue Exception => e
  puts "activemq_scanlog: #{e}"
  e.backtrace.join("\n")
end
end

t= Thread.new do
run
end

t.join()

EDIT

in logs we do see:

 cur_sslfalsemax_hbrlck_fails0max_reconnect_attempts0hostspasscodehostlocalhostport61613sslfalseloginconnect_timeout0connect_headerscur_recondelay0.01fast_hbs_adjust0.0randomizefalselogger#<Stomp::NullLogger:0x7f962a52b490>cur_hostlocalhosthbserfalsecur_parseto5connread_timeout0dmhfalsecur_port61613start_timeout0stompconnfalsemax_reconnect_delay30.0cur_conattempts0sslctx_newparmtcp_nodelaytruecur_loginuse_exponential_back_offtruecur_failureStomp::Error::ProtocolErrorEmptyHeaderValueinitial_reconnect_delay0.01reliabletruecur_passcodemax_hbread_fails0back_off_multiplier2openstattrueclosed_checktrueparse_timeout5es_oldrecv: receive failed: Stomp::Error::ProtocolErrorEmptyHeaderValuegot a msg in scan log <Stomp::Message headers={"priority"=>"4", "content-length"=>"11", "expires"=>"0", "message-id"=>"ID:cnndcnrm003.nndc.as.org-48000-1438866697976-1:242:-1:1:1533", "entity_type"=>"test", "system_name"=>"test", "subscription"=>"17c6e847711841e4e05ec2e7ab4584c3fa3a6b87", "content-type"=>"text/plain; charset=UTF-8", "entity_id"=>"23365", "timestamp"=>"1438869559233", "change_type"=>"scan", "destination"=>"/topic/testTopic"} body='see headers' command='MESSAGE' >**/topic/testTopiccur_sslfalsemax_hbrlck_fails0max_reconnect_attempts0hostspasscodehostlocalhostport61613sslfalseloginconnect_timeout0connect_headerscur_recondelay0.01fast_hbs_adjust0.0randomizefalselogger#<Stomp::NullLogger:0x7f962a52b490>cur_hostlocalhosthbserfalsecur_parseto5connread_timeout0dmhfalsecur_port61613start_timeout0stompconnfalsemax_reconnect_delay30.0cur_conattempts0sslctx_newparmtcp_nodelaytruecur_loginuse_exponential_back_offtruecur_failureStomp::Error::ProtocolErrorEmptyHeaderValueinitial_reconnect_delay0.01reliabletruecur_passcodemax_hbread_fails0back_off_multiplier2openstattrueclosed_checktrueparse_timeout5es_oldrecv: receive failed: Stomp::Error::ProtocolErrorEmptyHeaderValuegot a msg in scan log <Stomp::Message headers={"priority"=>"4", "content-length"=>"11", "expires"=>"0", "message-id"=>"ID:cnndcnrm003.nndc.as.org-48000-1438866697976-1:242:-1:1:1533", "entity_type"=>"test", "system_name"=>"test", "subscription"=>"17c6e847711841e4e05ec2e7ab4584c3fa3a6b87", "content-type"=>"text/plain; charset=UTF-8", "entity_id"=>"23365", "timestamp"=>"1438869559233", "change_type"=>"scan", "destination"=>"/topic/testTopic"} body='see headers' command='MESSAGE' 
abhishek538 commented 9 years ago

And while tracing it back we found out that, the exception was being thrown from stomp/lib/stomp/message.rb line 89, this is the stack trace we got when we tried to catch the exception:

/usr/lib64/ruby/gems/1.8/gems/stomp-1.3.4/lib/stomp/message.rb:96:in  `initialize'
/usr/lib64/ruby/gems/1.8/gems/stomp-1.3.4/lib/connection/netio.rb:87:in `new'
/usr/lib64/ruby/gems/1.8/gems/stomp-1.3.4/lib/connection/netio.rb:87:in  `_receive'
/usr/lib/ruby/1.8/timeout.rb:67:in `timeout'
/usr/lib64/ruby/gems/1.8/gems/stomp-1.3.4/lib/connection/netio.rb:38:in `_receive'
/usr/lib64/ruby/gems/1.8/gems/stomp-1.3.4/lib/connection/netio.rb:16:in `synchronize'
/usr/lib64/ruby/gems/1.8/gems/stomp-1.3.4/lib/connection/netio.rb:16:in `_receive'
/usr/lib64/ruby/gems/1.8/gems/stomp-1.3.4/lib/connection/utils.rb:241:in  `__old_receive'
/usr/lib64/ruby/gems/1.8/gems/stomp-1.3.4/lib/stomp/connection.rb:448:in `receive'
/usr/lib64/ruby/gems/1.8/gems/stomp-1.3.4/lib/client/utils.rb:176:in `start_listeners'
/usr/lib64/ruby/gems/1.8/gems/stomp-1.3.4/lib/client/utils.rb:175:in `loop'
/usr/lib64/ruby/gems/1.8/gems/stomp-1.3.4/lib/client/utils.rb:175:in `start_listeners'
/usr/lib64/ruby/gems/1.8/gems/stomp-1.3.4/lib/client/utils.rb:174:in `start'
/usr/lib64/ruby/gems/1.8/gems/stomp-1.3.4/lib/client/utils.rb:174:in `start_listeners'
/usr/lib64/ruby/gems/1.8/gems/stomp-1.3.4/lib/stomp/client.rb:93:in `initialize'
/usr/lib/ruby/1.8/timeout.rb:53:in `timeout'
/usr/lib/ruby/1.8/timeout.rb:101:in `timeout'
/usr/lib64/ruby/gems/1.8/gems/stomp-1.3.4/lib/stomp/client.rb:90:in `initialize'
Reader.rb:4:in `new'Reader.rb:4

On further investigation it was found that when the method "_reconn_prep()" is called in method "__old_receive()" in "stomp/lib/connection/utils.rb" line 253, one more subscriber to that topic gets created , as when we tried to comment that method call, there was no increase in no. of subscribers when a message is sent from the activemq admin console.

gmallard commented 9 years ago

I can not recreate anything like this.

I doubt actions through the AMQ console can affect what you see from the stomp gem - other that the reader gets an additional message.

Pastebin the following:

abhishek538 commented 9 years ago

Initially AMQ Web console, [ AMQ version : 5.5.1 ]

screen1

After sending a message with body as "Stomp New Message" from AMQ Web console -

screen2

The error we get at the reader: cur_hostlocalhostsslctx_newparmparse_timeout5use_exponential_back_offtruecur_parseto5dmhfalsecur_port61613connect_timeout0back_off_multiplier2cur_conattempts0tcp_nodelaytruemax_hbread_fails0cur_loginmax_reconnect_attempts0hostsloginhostlocalhostport61613passcodesslfalselogger#Stomp::NullLogger:0x7f343ebc9688cur_failureStomp::Error::ProtocolErrorEmptyHeaderValuemax_hbrlck_fails0reliabletruecur_passcodeclosed_checktruerandomizefalseopenstattruestart_timeout0fast_hbs_adjust0.0cur_sslfalsehbserfalseconnread_timeout0max_reconnect_delay30.0initial_reconnect_delay0.01connect_headerscur_recondelay0.01stompconnfalsees_oldrecv: receive failed: Stomp::Error::ProtocolErrorEmptyHeaderValue

And the following stomp debug logs:

2015-08-10 06:09:30,962 | TRACE | Sending: 
MESSAGE
message-id:ID:ip-10-199-81-186-44325-1439201019020-2:1:1:1:1
type:
destination:/topic/testTopic
timestamp:1439201370952
expires:0
subscription:8a3cf4f15c6e2ede02a0c634e95739d4a0ddea95
priority:0
correlation-id:

Stomp New message | org.apache.activemq.transport.stomp.StompTransportFilter | BrokerService[MyMQ] Task-42
2015-08-10 06:09:30,966 | TRACE | Received: 
CONNECT
content-type:text/plain; charset=UTF-8
content-length:0

 | org.apache.activemq.transport.stomp.StompTransportFilter | ActiveMQ NIO Worker
2015-08-10 06:09:30,967 | TRACE | Sending: 
CONNECTED
session:ID:ip-10-199-81-186-44325-1439201019020-1:11

 | org.apache.activemq.transport.stomp.StompTransportFilter | ActiveMQ NIO Worker
2015-08-10 06:09:30,969 | TRACE | Received: 
SUBSCRIBE
content-type:text/plain; charset=UTF-8
destination:/topic/testTopic
id:8a3cf4f15c6e2ede02a0c634e95739d4a0ddea95
content-length:0

 | org.apache.activemq.transport.stomp.StompTransportFilter | ActiveMQ NIO Worker
gmallard commented 9 years ago

I still can not recreate either:

1) Consumer count of two 2) Any exception stacktrace at all

I need, apparently, a lot more details as to how to recreate. Including a stop/start of AMQ for failover purposes, since that seems in this mix somewhere.

The stomp log you sent seems to indicate events are the reverse of what you describe verbally. Why is that?

This may have something to do with your exact AMQ configs. Is it possible to post your activemq.xml file ?

At this point, I sincerely doubt that it is a gem bug.

abhishek538 commented 9 years ago

We tried to recreate it on a new machine, and were able to reproduce it again. Environment : OS : Red Hat Enterprise Linux Server release 6.2 (Santiago) Ruby : ruby 1.8.7 (2011-06-30 patchlevel 352) [x86_64-linux] Stomp : stomp (1.3.4) ActiveMQ : 5.5.1

Below are the detailed steps to reproduce the issue:

[1] Install jdk
    java version : "1.6.0_29"
    Url: http://download.oracle.com/otn/java/jdk/6u29-b11/jdk-6u29-linux-x64-rpm.bin
[2] Install stomp version 1.3.4
    gem install stomp -v 1.3.4
[3] Install activemq version 5.5.1
    Url : http://activemq.apache.org/activemq-551-release.html
[4] Start activemq, goto activemq installation directory [ a new transport connector needs to be added in the activemq.xml for stomp i.e., <transportConnector name="stomp" uri="stomp://0.0.0.0:61613"/> ]
./activemq start
[5] Create a ruby reader for a topic and start it. [ as written in the first post "Reader.rb"]
[6] Now, go to ActiveMQ web console click on the topic name and then hit send on the topic page.
[7] Now, look for the number of consumers for this topic and the exception at the reader.

In our case, The Exception we got at the reader :

cur_recondelay0.01max_hbread_fails0cur_hostlocalhostuse_exponential_back_offtruelogger#<Stomp::NullLogger:0x7fd1c580e118>hostsloginport61613passcodesslfalsehostlocalhostcur_parseto5max_hbrlck_fails0dmhfalsecur_port61613closed_checktrueback_off_multiplier2cur_conattempts0tcp_nodelaytruefast_hbs_adjust0.0reliabletruecur_loginhbserfalsemax_reconnect_attempts0cur_failureStomp::Error::ProtocolErrorEmptyHeaderValueconnread_timeout0initial_reconnect_delay0.01cur_passcodestompconnfalserandomizefalseopenstattruesslctx_newparmparse_timeout5connect_headerscur_sslfalsestart_timeout0connect_timeout0max_reconnect_delay30.0es_oldrecv: receive failed: Stomp::Error::ProtocolErrorEmptyHeaderValue

This time also no. of consumers of topic "testTopic" got increased by "1", every time we sent a message from AMQ web console with default header values.

gmallard commented 9 years ago

Aha. Thanks. I can recreate this now.

I agree that the gem should not behave as it does. And I will try to figure out how to change that behavior.

Having said that, please note:

For that protocol level, header names and values MUST have at least one character. AMQ is violating that part of the spec, and the gem detects that.

And example of what AMQ sends:

2015-08-11 22:47:09,579 [0:0:0:0:1:60922] TRACE StompTransportFilter - Sending: * MESSAGE message-id:ID:tjjackson-36367-1439347470127-3:1:1:1:1 type: destination:/topic/testTopic timestamp:1439347629559 expires:0 subscription:8a3cf4f15c6e2ede02a0c634e95739d4a0ddea95 priority:0 correlation-id: asdasd*

gmallard commented 9 years ago

I will push a fix to the dev branch in the next day or two, and ask you to test a gem build from that. We can discuss this in more detail later.

I would like to make a couple of comments.

I strongly recommend that you upgrade ActiveMQ. The release you are running is ..... well, quite old. But the main reason I recommend this is so you can begin to use STOMP specification levels 1.1 or 1.2 in your application code. Level 1.2 would be recommended.

See:

http://stomp.github.io/

for details.

I would also recommend that you update your Ruby system. To at least 1.9.3, but preferably to the latest 2.x. I recommend this strictly for performance. There is lot of benefit in moving off of 1.8.7. Some of my tests run in half the time with 1.9.x+.

I realize those upgrades might be difficult with RHEL. Difficult both politically and technically. But they are worth it.

psharpNumerex commented 8 years ago

Has there been any more thought on this? We are seeing something similar when we get a frame with an extra newline at the beginning (e.g. "\nMESSAGE\n....). This causes the command to be invalid and stomp tries to reconnect which results in a 'ghost' consumer.

We're definitely trying to track down why AMQ is doing this and have commented out the _reconn_prep()in utils, but it would be great if the gem handled it as well.

Thanks!

gmallard commented 8 years ago

I have a fix for this, which will be in the next code drop.

In the meantime, if you are willing to temporarily modify your gem installation, add the following line of code to 'lib/connection/utils.rb'. Immediately after the 'def _reconn_prep()' add:

close_socket()

If you do that, please provide feedback.

psharpNumerex commented 8 years ago

What's the timeframe on the code drop? If it's reasonably soon we might wait and avoid a post deploy, on-server modification.

gmallard commented 8 years ago

Soon I hope. I am looking for feedback regarding PR#117, and then will cut a new gem release.

Just to be clear, the "fix" is for the original problem described here, and not necessarily related to extra newlines at the beginning of a frame.

I should have asked this before, but ...... better late than never. Is this JRuby ??

psharpNumerex commented 8 years ago

PR 116 took care of the newline, so no worries there.

Nope, just vanilla ruby 2.2

gmallard commented 8 years ago

There is a fix at the HEAD of the dev branch (fix for the original problem described here). Try that if you like.

Note: using the AMQ admin console to send messages if going to continue to be problematic. Why?

Because AMQ is "out of spec" with the headers in the MESSAGE frame sent to the gem/client.

For a 1.0 connection, header keys and values MUST contain at least one character. See the BNF for 1.0 here:

http://stomp.github.io/stomp-specification-1.0.html

The gem detects violations of that part of the specification, and raises:

Stomp::Error::ProtocolErrorEmptyHeaderValue

This should be reported on the AMQ Jira bug tracker I think.

gmallard commented 8 years ago

Fixed (hopefully) in gem version 1.3.5.

Please try that version.

Let me know if there are difficulties.

Please close this issue if all is OK.

Regards, Guy

gmallard commented 8 years ago

Fixed: https://github.com/stompgem/stomp/commit/157eeb2ab326b4bb18f5325a92a38a526ceb57cd

Use gem version 1.3.5.