stompgem / stomp

A ruby gem for sending and receiving messages from a Stomp protocol compliant message queue. Includes: failover logic, ssl support.
http://stomp.github.com
Apache License 2.0
152 stars 80 forks source link

Uncaught exception thrown when AMQ closes connection #63

Closed PaulGale closed 11 years ago

PaulGale commented 11 years ago

Using ActiveMQ 5.8.0 with 1.2.11 of the gem.

There appears to be a bug in utils.rb, line 165. What if message.nil? is true and reliable is true as well? You'll get "undefined method 'command' for nil:NilClass".

In my case the broker decided to close the connection owing to an inactivity timeout kicking in (2000ms) (log output can be provided if interested).

Thanks, Paul

gmallard commented 11 years ago

Paul - Looking at utils.rb the code for a Stomp::Client, that seems like a valid complaint......

Sure, show me the logs. And your AMQ (I assume) configs. How do you get a server to timeout for inactivity in 2 seconds? I am not sure how I could test any proposed fix.

Is your connection at protocol level 1.0? If that is correct, I advise that you change to 1.1 or preferably 1.2 and use appropriate heartbeats.

Or if you have a local fix, send it to me please.

This recent rash of activity around Stomp::Client connections is refreshing. Just be aware there are limitations to that connection type. There have been since before my first commit, and some are likely to remain. Most large production gem users are inclined to a Stomp::Connection interface.

Thanks, Guy

PaulGale commented 11 years ago

Guy,

I am using JRuby 1.7.3 on RHEL 6.1.

The problem really surfaced when I turned on heartbeating and multi-host support in order to test failover. Perhaps, therefore, the issue should be regarding heartbeating itself. With hearbeating turned off the consumer is able to subscribe and receive messages as normal.

However, I have seem other cases today where the broker throws an exception for whatever reason then closes its side of the connection causing the error to manifest in utils.rb at line 165.

In the general case though, I would have thought that the correct response by the client whenever the broker decides to close the connection would be to failover to the next host in the list of hosts. It doesn't seem reasonable for the client to be second guess why the broker chose to close the connection. Therefore the client should loop forever until some broker answers.

I am using STOMP 1.2 to connect with bi-directional heartbeats set to 2000ms (which seems reasonable to me). The client code is shown below (contained inside a rakefile). Of the two hosts listed the first queue02 was offline during entirety of the testing:

task :consume do
  begin
    hash = { :hosts => [
        {:login    => 'system',
         :passcode => 'password',
         :host     => 'inf-dc4lab-queue02.ove.local',
         :port     => 61613},
        {:login    => 'system',
         :passcode => 'password',
         :host     => 'inf-dc4lab-queue03.ove.local',
         :port     => 61613},
      ],
      :connect_headers => {'accept-version' => '1.2', 'host' => 'localhost', 'client-id' => 'test', 'heart-beat' => '2000,2000'},
      :logger    => Slogger::new,
      :autoflush => true,
      :reliable  => true,
      :parse_timeout => 5,
      :initial_reconnect_delay => 2,
    }

    c = Stomp::Client.new(hash)

    Kernel.at_exit do
      puts 'Client disconnecting...'
      c.close
    end

    c.subscribe('/topic/Vicki.Event', :ack => 'client', :'activemq.subscriptionName' => 'test_sub2') do |msg|
      puts msg.command
      puts msg.headers

      c.acknowledge(msg)
    end

    c.join
  rescue => e
    puts e.message
    puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
  end
end

Here is the corresponding stomp.log. Note that the broker sends three KEEPALIVE frames before closing the connection (see the activemq.log below). Yes, I know that the STOMP protocol documentation states that one only needs to send a NULL character to satisfy the heartbeat requirement but something appears to be going wrong here:

2013/07/30 18:11:37.410 | TRACE | StompIO                        | Received: 
CONNECT
content-type:text/plain; charset=UTF-8
heart-beat:2000,2000
host:localhost
accept-version:1.2
content-length:0
passcode:*****
client-id:test
login:system

2013/07/30 18:11:37.410 | DEBUG | StompInactivityMonitor         | Stomp Inactivity Monitor read check: 2000, write check: 2000
2013/07/30 18:11:37.411 | DEBUG | ProtocolConverter              | Stomp Connect heartbeat conf RW[2000,2000]
2013/07/30 18:11:37.415 | TRACE | StompIO                        | Sending: 
CONNECTED
heart-beat:2000,2000
session:test
server:ActiveMQ/5.8.0
version:1.2

2013/07/30 18:11:37.466 | TRACE | StompIO                        | Received: 
SUBSCRIBE
content-type:text/plain; charset=UTF-8
activemq.subscriptionName:test_sub2
ack:client
destination:/topic/Vicki.Event
id:6e7ae348fede8a38346e1bf55baebf3cef4529ad
content-length:0

2013/07/30 18:11:41.413 | TRACE | StompIO                        | Sending: 
KEEPALIVE

2013/07/30 18:11:43.411 | TRACE | StompIO                        | Sending: 
KEEPALIVE

2013/07/30 18:11:45.412 | TRACE | StompIO                        | Sending: 
KEEPALIVE

Here is the corresponding snippet of the activemq.log file. Note that the following block is repeated three times as the STOMP client makes three attempts to connect and subscribe until it's eventually cutoff by the broker:

INFO   | jvm 1    | 2013/07/30 18:31:19.712 | DEBUG | TransportConnection            | Setting up new connection id: ID:inf-dc4lab-queue03.ove.local-54891-1375221526044-3:11, address: tcp://10.103.36.138:52868, info: ConnectionInfo {commandId = 0, responseRequired = true, connectionId = ID:inf-dc4lab-queue03.ove.local-54891-1375221526044-3:11, clientId = test, clientIp = null, userName = system, password = *****, brokerPath = null, brokerMasterConnector = false, manageable = false, clientMaster = true, faultTolerant = false, failoverReconnect = false} | ActiveMQ NIO Worker 13
INFO   | jvm 1    | 2013/07/30 18:31:19.812 | DEBUG | DurableTopicSubscription       | Activating DurableTopicSubscription-test:test_sub2, id=ID:inf-dc4lab-queue03.ove.local-54891-1375221526044-3:11:-1:1, active=false, destinations=1, total=0, pending=0, dispatched=0, inflight=0, prefetchExtension=0 | ActiveMQ NIO Worker 13
INFO   | jvm 1    | 2013/07/30 18:31:23.719 | DEBUG | AbstractInactivityMonitor      | 2000 ms elapsed since last read check. | ActiveMQ InactivityMonitor ReadCheckTimer
INFO   | jvm 1    | 2013/07/30 18:31:23.719 | DEBUG | AbstractInactivityMonitor      | WriteChecker 2000 ms elapsed since last write check. | ActiveMQ InactivityMonitor WriteCheckTimer
INFO   | jvm 1    | 2013/07/30 18:31:23.719 | DEBUG | AbstractInactivityMonitor      | Running WriteCheck[tcp://10.103.36.138:52868] | ActiveMQ InactivityMonitor Worker
INFO   | jvm 1    | 2013/07/30 18:31:25.723 | DEBUG | AbstractInactivityMonitor      | 2000 ms elapsed since last read check. | ActiveMQ InactivityMonitor ReadCheckTimer
INFO   | jvm 1    | 2013/07/30 18:31:25.723 | DEBUG | AbstractInactivityMonitor      | WriteChecker 2000 ms elapsed since last write check. | ActiveMQ InactivityMonitor WriteCheckTimer
INFO   | jvm 1    | 2013/07/30 18:31:25.723 | DEBUG | AbstractInactivityMonitor      | Running WriteCheck[tcp://10.103.36.138:52868] | ActiveMQ InactivityMonitor Worker
INFO   | jvm 1    | 2013/07/30 18:31:27.726 | DEBUG | AbstractInactivityMonitor      | 2000 ms elapsed since last read check. | ActiveMQ InactivityMonitor ReadCheckTimer
INFO   | jvm 1    | 2013/07/30 18:31:27.726 | DEBUG | AbstractInactivityMonitor      | No message received since last read check for tcp:///10.103.36.138:52868@61613. Throwing InactivityIOException. | ActiveMQ InactivityMonitor ReadCheckTimer
INFO   | jvm 1    | 2013/07/30 18:31:27.726 | DEBUG | AbstractInactivityMonitor      | Running ReadCheck[tcp://10.103.36.138:52868] | ActiveMQ InactivityMonitor Worker
INFO   | jvm 1    | 2013/07/30 18:31:27.726 | DEBUG | ThreadPoolUtils                | Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@47c5e6ed[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 2] is shutdown: true and terminated: false took: 0.000 seconds. | ActiveMQ InactivityMonitor Worker
INFO   | jvm 1    | 2013/07/30 18:31:27.726 | DEBUG | Transport                      | Transport Connection to: tcp://10.103.36.138:52868 failed: org.apache.activemq.transport.InactivityIOException: Channel was inactive for too (>2000) long: tcp://10.103.36.138:52868 | ActiveMQ InactivityMonitor Worker
INFO   | jvm 1    | 2013/07/30 18:31:27.726 | org.apache.activemq.transport.InactivityIOException: Channel was inactive for too (>2000) long: tcp://10.103.36.138:52868
INFO   | jvm 1    | 2013/07/30 18:31:27.726 |   at org.apache.activemq.transport.AbstractInactivityMonitor$4.run(AbstractInactivityMonitor.java:233)
INFO   | jvm 1    | 2013/07/30 18:31:27.726 |   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
INFO   | jvm 1    | 2013/07/30 18:31:27.726 |   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
INFO   | jvm 1    | 2013/07/30 18:31:27.726 |   at java.lang.Thread.run(Thread.java:722)
INFO   | jvm 1    | 2013/07/30 18:31:27.726 | DEBUG | TransportConnection            | Stopping connection: tcp://10.103.36.138:52868 | ActiveMQ BrokerService[inf-dc4lab-queue03.ove.local] Task-11
INFO   | jvm 1    | 2013/07/30 18:31:27.726 | DEBUG | TcpTransport                   | Stopping transport tcp:///10.103.36.138:52868@61613 | ActiveMQ BrokerService[inf-dc4lab-queue03.ove.local] Task-11
INFO   | jvm 1    | 2013/07/30 18:31:27.726 | DEBUG | TcpTransport                   | Closed socket Socket[unconnected] | ActiveMQ BrokerService[inf-dc4lab-queue03.ove.local] Task-11
INFO   | jvm 1    | 2013/07/30 18:31:27.726 | DEBUG | TransportConnection            | Stopped transport: null | ActiveMQ BrokerService[inf-dc4lab-queue03.ove.local] Task-11
INFO   | jvm 1    | 2013/07/30 18:31:27.726 | DEBUG | TransportConnection            | Cleaning up connection resources: null | ActiveMQ BrokerService[inf-dc4lab-queue03.ove.local] Task-11
INFO   | jvm 1    | 2013/07/30 18:31:27.726 | DEBUG | TransportConnection            | remove connection id: ID:inf-dc4lab-queue03.ove.local-54891-1375221526044-3:11 | ActiveMQ BrokerService[inf-dc4lab-queue03.ove.local] Task-11
INFO   | jvm 1    | 2013/07/30 18:31:27.726 | DEBUG | DurableTopicSubscription       | Deactivating keepActive=true, DurableTopicSubscription-test:test_sub2, id=ID:inf-dc4lab-queue03.ove.local-54891-1375221526044-3:11:-1:1, active=true, destinations=1, total=0, pending=0, dispatched=0, inflight=0, prefetchExtension=0 | ActiveMQ BrokerService[inf-dc4lab-queue03.ove.local] Task-11
INFO   | jvm 1    | 2013/07/30 18:31:27.726 | DEBUG | TransportConnection            | Connection Stopped: null | ActiveMQ BrokerService[inf-dc4lab-queue03.ove.local] Task-11

And here is the command-line output from the consume rake task. Note the the timestamps don't match with above and I had to re-create it just now as the original was lost. The flow is identical though. Note that I un-commented the various p debug statements contained in the gem. According to the output shown below it would appear the the gem is doing the right thing, evidently ActiveMQ disagrees:

** Invoke consume (first_time)
** Execute consume
I, [2013-07-30T21:33:38.972000 #31703]  INFO -- : Logger initialization complete.
D, [2013-07-30T21:33:38.976000 #31703] DEBUG -- : Connecting: Host: inf-dc4lab-queue02.ove.local, Port: 61613, Login: system
D, [2013-07-30T21:33:38.993000 #31703] DEBUG -- : Connect Fail Host: inf-dc4lab-queue02.ove.local, Port: 61613, Login: system
D, [2013-07-30T21:33:40.995000 #31703] DEBUG -- : Connecting: Host: inf-dc4lab-queue03.ove.local, Port: 61613, Login: system
["debug_01", "1.0", nil]
["wiredataout_02:", "CONNECT\n"]
["debug_01", "1.0", nil]
["wiredataout_02:", "accept-version:1.2\n"]
["debug_01", "1.0", nil]
["wiredataout_02:", "host:localhost\n"]
["debug_01", "1.0", nil]
["wiredataout_02:", "client-id:test\n"]
["debug_01", "1.0", nil]
["wiredataout_02:", "heart-beat:2000,2000\n"]
["debug_01", "1.0", nil]
["wiredataout_02:", "login:system\n"]
["debug_01", "1.0", nil]
["wiredataout_02:", "passcode:password\n"]
["debug_01", "1.0", nil]
["wiredataout_02:", "content-length:0\n"]
["debug_01", "1.0", nil]
["wiredataout_02:", "content-type:text/plain; charset=UTF-8\n"]
["debug_01", "1.0", nil]
["wiredataout_02:", "\n"]
["wiredatain_01", "CONNECTED\n"]
["wiredatain_02", "heart-beat:2000,2000\n"]
["wiredatain_02", "session:test\n"]
["wiredatain_02", "server:ActiveMQ/5.8.0\n"]
["wiredatain_02", "version:1.2\n"]
["wiredatain_02", "\n"]
D, [2013-07-30T21:33:41.029000 #31703] DEBUG -- : Connected: Host: inf-dc4lab-queue03.ove.local, Port: 61613, Login: system
D, [2013-07-30T21:33:41.031000 #31703] DEBUG -- : Subscribe Parms Host: inf-dc4lab-queue03.ove.local, Port: 61613, Login: system
D, [2013-07-30T21:33:41.032000 #31703] DEBUG -- : Subscribe Headers {:ack=>"client", :"activemq.subscriptionName"=>"test_sub2", :id=>"6e7ae348fede8a38346e1bf55baebf3cef4529ad", :destination=>"/topic/Vicki.Event"}
["debug_01", "1.2", nil]
["wiredataout_02:", "SUBSCRIBE\n"]
["debug_01", "1.2", nil]
["wiredataout_02:", "ack:client\n"]
["debug_01", "1.2", nil]
["wiredataout_02:", "activemq.subscriptionName:test_sub2\n"]
["debug_01", "1.2", nil]
["wiredataout_02:", "id:6e7ae348fede8a38346e1bf55baebf3cef4529ad\n"]
["debug_01", "1.2", nil]
["wiredataout_02:", "destination:/topic/Vicki.Event\n"]
["debug_01", "1.2", nil]
["wiredataout_02:", "content-length:0\n"]
["debug_01", "1.2", nil]
["wiredataout_02:", "content-type:text/plain; charset=UTF-8\n"]
["debug_01", "1.2", nil]
["wiredataout_02:", "\n"]
D, [2013-07-30T21:33:43.030000 #31703] DEBUG -- : HeartBeat Fire Parms Host: inf-dc4lab-queue03.ove.local, Port: 61613, Login: system
D, [2013-07-30T21:33:43.030000 #31703] DEBUG -- : HeartBeat Fire Parms Host: inf-dc4lab-queue03.ove.local, Port: 61613, Login: system
D, [2013-07-30T21:33:43.032000 #31703] DEBUG -- : HeartBeat Fire Send/Receive receive_fire, time = 1375234423.03
D, [2013-07-30T21:33:43.033000 #31703] DEBUG -- : HeartBeat Fire Send/Receive send_fire, time = 1375234423.029
D, [2013-07-30T21:33:45.033000 #31703] DEBUG -- : HeartBeat Fire Parms Host: inf-dc4lab-queue03.ove.local, Port: 61613, Login: system
D, [2013-07-30T21:33:45.033000 #31703] DEBUG -- : HeartBeat Fire Parms Host: inf-dc4lab-queue03.ove.local, Port: 61613, Login: system
D, [2013-07-30T21:33:45.034000 #31703] DEBUG -- : HeartBeat Fire Send/Receive send_fire, time = 1375234425.033
D, [2013-07-30T21:33:45.034000 #31703] DEBUG -- : HeartBeat Fire Send/Receive receive_fire, time = 1375234425.033
D, [2013-07-30T21:33:45.034000 #31703] DEBUG -- : HeartBeat Fire Parms Host: inf-dc4lab-queue03.ove.local, Port: 61613, Login: system
D, [2013-07-30T21:33:45.035000 #31703] DEBUG -- : HeartBeat Fire Send/Receive send_heartbeat, time = 1375234425.033
D, [2013-07-30T21:33:47.035000 #31703] DEBUG -- : HeartBeat Fire Parms Host: inf-dc4lab-queue03.ove.local, Port: 61613, Login: system
D, [2013-07-30T21:33:47.035000 #31703] DEBUG -- : HeartBeat Fire Send/Receive receive_fire, time = 1375234427.034
D, [2013-07-30T21:33:47.036000 #31703] DEBUG -- : HeartBeat Fire Parms Host: inf-dc4lab-queue03.ove.local, Port: 61613, Login: system
D, [2013-07-30T21:33:47.036000 #31703] DEBUG -- : HeartBeat Fire Send/Receive send_fire, time = 1375234427.036
D, [2013-07-30T21:33:47.037000 #31703] DEBUG -- : HeartBeat Fire Parms Host: inf-dc4lab-queue03.ove.local, Port: 61613, Login: system
D, [2013-07-30T21:33:47.038000 #31703] DEBUG -- : HeartBeat Fire Send/Receive send_heartbeat, time = 1375234427.036
D, [2013-07-30T21:33:49.036000 #31703] DEBUG -- : HeartBeat Fire Parms Host: inf-dc4lab-queue03.ove.local, Port: 61613, Login: system
D, [2013-07-30T21:33:49.037000 #31703] DEBUG -- : HeartBeat Fire Send/Receive receive_fire, time = 1375234429.036
D, [2013-07-30T21:33:49.038000 #31703] DEBUG -- : HeartBeat Fire Parms Host: inf-dc4lab-queue03.ove.local, Port: 61613, Login: system
D, [2013-07-30T21:33:49.039000 #31703] DEBUG -- : HeartBeat Fire Send/Receive send_fire, time = 1375234429.038
D, [2013-07-30T21:33:51.021000 #31703] DEBUG -- : Miscellaneous Error Host: inf-dc4lab-queue03.ove.local, Port: 61613, Login: system
D, [2013-07-30T21:33:51.022000 #31703] DEBUG -- : Miscellaneous Error String es_recv: connection.receive returning EOF as nil - resetting connection.

D, [2013-07-30T21:33:51.023000 #31703] DEBUG -- : Connecting: Host: inf-dc4lab-queue02.ove.local, Port: 61613, Login: system
D, [2013-07-30T21:33:51.027000 #31703] DEBUG -- : Connect Fail Host: inf-dc4lab-queue02.ove.local, Port: 61613, Login: system
D, [2013-07-30T21:33:53.028000 #31703] DEBUG -- : Connecting: Host: inf-dc4lab-queue03.ove.local, Port: 61613, Login: system
["debug_01", "1.2", nil]
["wiredataout_02:", "CONNECT\n"]
["debug_01", "1.2", nil]
["wiredataout_02:", "accept-version:1.2\n"]
["debug_01", "1.2", nil]
["wiredataout_02:", "host:localhost\n"]
["debug_01", "1.2", nil]
["wiredataout_02:", "client-id:test\n"]
["debug_01", "1.2", nil]
["wiredataout_02:", "heart-beat:2000,2000\n"]
["debug_01", "1.2", nil]
["wiredataout_02:", "login:system\n"]
["debug_01", "1.2", nil]
["wiredataout_02:", "passcode:password\n"]
["debug_01", "1.2", nil]
["wiredataout_02:", "content-length:0\n"]
["debug_01", "1.2", nil]
["wiredataout_02:", "content-type:text/plain; charset=UTF-8\n"]
["debug_01", "1.2", nil]
["wiredataout_02:", "\n"]
["wiredatain_01", "CONNECTED\n"]
["wiredatain_02", "heart-beat:2000,2000\n"]
["wiredatain_02", "session:test\n"]
["wiredatain_02", "server:ActiveMQ/5.8.0\n"]
["wiredatain_02", "version:1.2\n"]
["wiredatain_02", "\n"]
["debug_01", "1.2", nil]
["wiredataout_02:", "SUBSCRIBE\n"]
["debug_01", "1.2", nil]
["wiredataout_02:", "ack:client\n"]
["debug_01", "1.2", nil]
["wiredataout_02:", "activemq.subscriptionName:test_sub2\n"]
["debug_01", "1.2", nil]
["wiredataout_02:", "id:6e7ae348fede8a38346e1bf55baebf3cef4529ad\n"]
["debug_01", "1.2", nil]
["wiredataout_02:", "destination:/topic/Vicki.Event\n"]
["debug_01", "1.2", nil]
["wiredataout_02:", "content-length:0\n"]
["debug_01", "1.2", nil]
["wiredataout_02:", "content-type:text/plain; charset=UTF-8\n"]
["debug_01", "1.2", nil]
["wiredataout_02:", "\n"]
D, [2013-07-30T21:33:53.053000 #31703] DEBUG -- : Connected: Host: inf-dc4lab-queue03.ove.local, Port: 61613, Login: system
D, [2013-07-30T21:33:55.050000 #31703] DEBUG -- : HeartBeat Fire Parms Host: inf-dc4lab-queue03.ove.local, Port: 61613, Login: system
D, [2013-07-30T21:33:55.051000 #31703] DEBUG -- : HeartBeat Fire Parms Host: inf-dc4lab-queue03.ove.local, Port: 61613, Login: system
D, [2013-07-30T21:33:55.051000 #31703] DEBUG -- : HeartBeat Fire Send/Receive send_fire, time = 1375234435.05
D, [2013-07-30T21:33:55.051000 #31703] DEBUG -- : HeartBeat Fire Send/Receive receive_fire, time = 1375234435.05
D, [2013-07-30T21:33:57.052000 #31703] DEBUG -- : HeartBeat Fire Parms Host: inf-dc4lab-queue03.ove.local, Port: 61613, Login: system
D, [2013-07-30T21:33:57.052000 #31703] DEBUG -- : HeartBeat Fire Parms Host: inf-dc4lab-queue03.ove.local, Port: 61613, Login: system
D, [2013-07-30T21:33:57.052000 #31703] DEBUG -- : HeartBeat Fire Send/Receive send_fire, time = 1375234437.051
D, [2013-07-30T21:33:57.052000 #31703] DEBUG -- : HeartBeat Fire Send/Receive receive_fire, time = 1375234437.052
D, [2013-07-30T21:33:57.053000 #31703] DEBUG -- : HeartBeat Fire Parms Host: inf-dc4lab-queue03.ove.local, Port: 61613, Login: system
D, [2013-07-30T21:33:57.053000 #31703] DEBUG -- : HeartBeat Fire Send/Receive send_heartbeat, time = 1375234437.051
D, [2013-07-30T21:33:59.053000 #31703] DEBUG -- : HeartBeat Fire Parms Host: inf-dc4lab-queue03.ove.local, Port: 61613, Login: system
D, [2013-07-30T21:33:59.054000 #31703] DEBUG -- : HeartBeat Fire Send/Receive receive_fire, time = 1375234439.053
D, [2013-07-30T21:33:59.054000 #31703] DEBUG -- : HeartBeat Fire Parms Host: inf-dc4lab-queue03.ove.local, Port: 61613, Login: system
D, [2013-07-30T21:33:59.055000 #31703] DEBUG -- : HeartBeat Fire Send/Receive send_fire, time = 1375234439.054
D, [2013-07-30T21:34:01.046000 #31703] DEBUG -- : Receive Parms Host: inf-dc4lab-queue03.ove.local, Port: 61613, Login: system
D, [2013-07-30T21:34:01.046000 #31703] DEBUG -- : Receive Result 
undefined method `command' for nil:NilClass
Backtrace:
    /home/tworker/work/stomp/lib/client/utils.rb:165:in `start_listeners'
Client disconnecting...
["debug_01", "1.2", nil]
["wiredataout_02:", "DISCONNECT\n"]
["debug_01", "1.2", nil]
["wiredataout_02:", "content-length:0\n"]
["debug_01", "1.2", nil]
["wiredataout_02:", "content-type:text/plain; charset=UTF-8\n"]
["debug_01", "1.2", nil]
["wiredataout_02:", "\n"]
D, [2013-07-30T21:34:01.051000 #31703] DEBUG -- : Disconnected Host: inf-dc4lab-queue03.ove.local, Port: 61613, Login: system

Concerning my broker config I'll only bother with a few snippet as the thing is large and would contribute more noise than anything. Yes, I know the stomp transport connector is stomp+nio but that affects nothing from the client perspective; it's only the port and hostname that matter:

<transportConnectors>
  <transportConnector name="stomp+nio"
                      uri="stomp+nio://0.0.0.0:61613?transport.closeAsync=false&amp;transport.trace=true"/>
</transportConnectors>

<destinationPolicy>
 <policyMap>
   <policyEntries>
     <policyEntry topic=">"
                  memoryLimit="10mb"
                  expireMessagesPeriod="3600000"
                  gcInactiveDestinations="true"
                  inactiveTimoutBeforeGC="432000000"
                  advisoryWhenFull="true"
                  advisoryForFastProducers="true"
                  advisoryForSlowConsumers="true"
                  advisoryForDiscardingMessages="true">
       <deadLetterStrategy>
         <individualDeadLetterStrategy topicPrefix="DLQ."
                                       processExpired="false"
                                       useQueueForTopicMessages="true"
                                       destinationPerDurableSubscriber="false"/>
       </deadLetterStrategy>
     </policyEntry>
  </policyEntries>
 </policyMap>
</destinationPolicy>
gmallard commented 11 years ago

Paul - Thank you for the documentation.

I will absorb it in detail and consider the correct course of action. I think something can be done to assist in your situation. I am just not sure exactly what yet.

For now, a couple of thoughts in no particular order.

First, please review comments and see the notes directory in the install for information about connect parameters:

:max_hbread_fails => 0,
:max_hbrlck_fails => 0,

Shown above are the default values. With those default values the heartbeat threads will never initiate a fail over. I am reasonably certain that you will need some positive value for these parameters. I will suggest that you set those values to 2 initially.

Second, I do think 2000,2000 for heartbeats is probably too low. My advice to other folks has been:

a) Make heartbeats as long as you can and still achieve your fail over requirements. b) Do not use the same value for heartbeat read and write.

One large gem user is using 29500,30500 to good effect. Point b) is really for a pure Ruby environment, and meant to 'help' Ruby multi-threading. It may or may not apply to JRuby, I just do not know.

Point a) is really about busy systems. With lower heartbeat values, missed beats are much more likely. And additionally server and client 'miss tolerances' are smaller. The spec says:

because of timing inaccuracies, the receiver SHOULD be tolerant and take into account an error margin

AMQ is, for whatever reason, the least tolerant of the brokers I test with.

Please think about both above points, and perhaps run another test or two.

I sincerely hope that +nio is not part of this. It was virtually unusable prior to 5.8. See AMQ JIRA for details.

My current thinking is to just loop in that listener thread is a message is nil (essentially an EOF). And to let one f the heartbeat threads initiate the fail over.

I will run some experiments here, and give this some more thought. I have a pretty robust set of fail over tests here, but they are all Stomp::Connection based. And truthfully I have never ran them with JRuby.

There was no hope at all of any of this working until recently. That because JRuby's IO#ready? does not behave at all like it does with a standard Ruby release. I think I finally coded around that correctly, but this:

http://jira.codehaus.org/browse/JRUBY-5617

is amusing. Over two years old, and still unresolved.

Nitpick: heartbeat data is EOL, not a NULL.

gmallard commented 11 years ago

Well, the good news here is that I can recreate this with AMQ using JRuby or a standard Ruby. Thanks to:

a) Your example code AND b) My recollection of issue #50

Remembering 50, was pretty crucial to recreation.

No ETA on a gem fix. And no actual promises on a gem fix. This is AMQ specific at present. I am not observing this behavior from Apollo or RabbitMQ.

I will look at in more detail, and document results.

PaulGale commented 11 years ago

So I have fixed the issue and have it working locally. Quite an easy fix once I realized what was going on.

I isolated the problem by creating a connection configured with just read heart beats of 5000ms. On every occasion the broker would reset the connection after just a could of heart beats. The furthest I saw it go was 13 consecutive heart beats. There were a number of bugs in both the _start_read_ticker and _start_read_ticker methods. The culprit? The sleep Kernel call. It always slept for the same amount of time. This is never the right thing to do way to implement interval based network protocol messaging. The problem is that the remainder of the code in the while loop takes a finite amount of time to execute, even if it's only slight, before going back to sleep. So, instead of sending a heart beat at 5000, 10000, 15000, 20000ms they're sent at 5000, 10010, 150020, 20030 and so on. As more heart beats are sent the response time drifts further and further from the interval boundary until it exceeds the tolerance of the broker which then closes the connection. The broker is perfectly entitled to do that. The gem should be coded to fully expect this behavior from the broker. One should not attempt to 'work around' this type of behavior.

Once I made the relevant changes everything worked smoothly. There were no broker initiated connection closures etc.

Having to set the heart beat interval to value like 20000ms etc to somehow get around the problem is a bad idea. It almost completely defeats the point of heart beating. In addition, the need to have these parameters

:max_hbread_fails => 2,
:max_hbrlck_fails => 2,

with carefully chosen values, seems like a hack. The notes suggest that a client should expect, and be tolerant of, 'network blips' to avoid false negatives that might otherwise result in an attempt to failover. This is false. It would seem that the majority of the 'false positives' originate from the faulty heart beating logic itself and not what the socket is or is not doing.

To promote rapid failover heart beat times must be kept as low as possible. Having the timeout set as high as was recommended is absurd. In a web application context we can be dealing with many thousands of STOMP messages per second. Waiting a minute or so for failover to complete is a lifetime to have to wait.

def _start_send_ticker()
  sleeptime = @hbsend_interval / 1000000.0 # Sleep time secs
  reconn = false
  adjust = 0

  @st = Thread.new {
    while true do
      sleep(sleeptime - adjust.to_i)
      start = Time.now
      adjust = 0
      next unless @socket

      if @logger && @logger.respond_to?(:on_hbfire)
        @logger.on_hbfire(log_params, "send_fire", start)
      end

      @transmit_semaphore.synchronize do
        begin
          @socket.puts
          @socket.flush
          @hb_sent = true     # Reset if necessary
          @hbsend_count += 1
        rescue Exception => sendex
          @hb_sent = false # Set the warning flag

          if @logger && @logger.respond_to?(:on_hbwrite_fail)
            @logger.on_hbwrite_fail(log_params, {"ticker_interval" => sleeptime, "exception" => sendex})
          end

          raise if @hbser

          if @reliable
            reconn = true
            break
          end
        end
      end

      if reconn
        # Attempt a fail over reconnect.  This is 'safe' here because
        # this thread no longer holds the @transmit_semaphore lock.
        @rt.kill if @rt   # Kill the receiver thread if one exists
        _reconn_prep_hb() # Drive reconnection logic
        Thread.exit       # This sender thread is done
      end

      Thread.pass
      adjust = Time.now - start
    end
  }
end

all the chicanery of having to calculate the delta and know the last read-time @lr is gone. They're not needed. The effective sleep time is adjusted on each iteration. Because the adjust time is converted to an integer it effectively rounds it down which is fine; the heart beat can get there early, it just can't be late.

The _start_receive_ticker method, however, still has issues in that a producer with only write heart beats configured will failover it just won't fail back when the original broker becomes the master once more. I'll have to investigate that some more.

gmallard commented 11 years ago

Comments:

One:

Accepted that you see multiple reconnects in the following environment only:

a) JRuby and b) AMQ broker

Noted: this behavior does not occur with:

a) Any native RUBY_ENGINE and AMQ b) Apollo and either JRuby or any native RUBY_ENGINE c) RabbitMQ and either JRuby or any native RUBY_ENGINE

Two:

You suggested replacement for _start_send_ticker() is not acceptable. The behavior of that replacement puts a heartbeat on the wire when:

a) A heartbeat is not required, and b) Therefore SHOULD not be sent

An example showing that is trivial.

PaulGale commented 11 years ago

So what is your proposed fix for your comment 1? Ignore JRuby with ActiveMQ!?

Are you disputing that the sleep time should not be adjusted on each iteration of the ticker?

Conduct the following experiment: add an else clause at line 136/137 of heartbeats.rb that matches the

if delta > sleeptime

and in it add

puts "delta(#{delta}) <= sleeptime(#{sleeptime}): going around again"

Then start a client that connects with heart beats configured, subscribes then leave it to sit idle. Ideally you should never see the new puts statement if the client is completely idle and there are no messages waiting for it. Even with the above constraints in place the puts statement is output. If you're tailing the ActiveMQ's log this shows up as a skipped heart beat. The broker then closes the connection, which is expected.

gmallard commented 11 years ago

I currently think there is a workable fix for this, and will push when I have one. The above is not it.

I do not believe the addition of an 'adjust' time is the entire story, and perhaps not a necessary condition at all. I will however use that idea in the final solution. Because the idea is basically sound.

JRuby has been ignored in the past. The 1.5 and 1.6 series socket IO was horribly flawed, and I did give up on those releases. 1.7.x seems much better, showing consistent and repeatable behavior at least, and I am willing to work with it. But under no circumstances will I potentially break functioning production Ruby environments to accommodate JRuby.

gmallard commented 11 years ago

Reintroducing some tolerance logic that was removed in d922fa helps some.

Introducing adjustment logic similar to the above helps some.

Using both of those helps fairly significantly, but seems not a complete solution. Using 2 second send beats, I get 3-4 fail overs in 12 hour tests.

Using both of the above and a very ugly hack which subtracts an additional 100ms from the sender sleep time seems to show some promise. Two tests have been running for about 16 hours with no restarts. I am going to continue watching those tests, and think about this some more.

gmallard commented 11 years ago

Closing per the above commit. Please see additional heartbeat notes for guidance.

Last tests here were with heartbeats at 2000ms, a send adjustment of 0.05 seconds, and ran for appx. 60 hours with no fail overs.

PaulGale commented 11 years ago

After making the request ActiveMQ now implements a heart beat grace-period multiplier. It defaults to 1.0 for backward compatibility. To match Apollo's behavior it should be set to 1.5. Configured as a transport option:

"?transport.hbGracePeriodMultiplier=1.0"

Just thought you might want to know. I have back-ported the fix in my local build of 5.8.0.

https://issues.apache.org/jira/browse/AMQ-4674

gmallard commented 11 years ago

Thank you, very good information.

Seems to me Apollo side stepped fast client HB send issues by default. Out of the box header is:

"heart-beat"=>"100,10000"

Thinking about this issue has me considering if the gem should (perhaps optionally) try to squelch Nagle's algorithm. However, when I first looked at this issue I tried that, with no difference in outcome.

PaulGale commented 11 years ago

I thought that Nagle's algorithm was circumvented by use of the 'autoflush' flag on the socket, no?

gmallard commented 11 years ago

The autoflush flag in the gem is actually fairly recent.

Note that the gem has always called 'flush' when sending heartbeats.

But my understanding is that calling 'flush', even from a C program only flushes the library's implementation buffers. It does not control what the TCP stack implementation actually forces on to the wire because of the flush. In effect, with Nagle on, TCP provides another level of buffering, specifically for 'small' packets.

Admittedly, I could be way off base on this ...... and I do not have the patience for wire shark tonight .... or probably any time soon.

Regardless, in your situation, there was absolutely no behavioral change with 'Nagle OFF' hard coded in my early tests.

Trying to suppress Nagle in the gem is pure speculation at this point. I would need to see an example of when it actually accomplished something.

gmallard commented 11 years ago

Docs:

http://www.unixguide.net/network/socketfaq/2.11.shtml

Again, just thinking.

PaulGale commented 11 years ago

I was wondering why you're re-opening this issue. It's not clear to me.

gmallard commented 11 years ago

Sorry, just meant to comment, not reopen.