Closed alcy closed 12 years ago
Try setting up your handler with the "on_connection_terminated" event, and see if that gives you the results you want. However, you might run into some race conditions if you re-establish the connection within that event (which is why I'm working on putting events into their own thread.)
If what you're after is just re-establishing the connection when it closes, you might want to give the failover extension a try. You can read more on enabling it at: http://mdvlrb.com/onstomp/file.UserNarrative.html#Failing_Over
I've been a bit remiss in updating this gem as I'm waiting for the STOMP 1.1 spec to finalize before going any further, I apologize for that.
If you have any more questions, feel free to post here or drop me a line.
One last note: I believe the event_thread branch is fairly stable; however, I don't know that I've got failover working properly there. So, if the failover extension on master doesn't work for you, you may have to roll your own failover functionality on the event_thread branch.
One more last note: I use the failover extension regularly in a production environment. However, it is still marked as experimental. If you find any bugs while using it, feel free to open some tickets :)
Great, thanks for the suggestions, will let you know how it goes. Yes, currently I am using the connection_on_closed handler, will see if _terminated is better suited.
on_connection_closed should trigger any time the connection closes, even when requested. So, if you weren't having any luck with that event, on_connection_terminated may not help either. If you're still having issues and can share the problematic code, that would be a great help.
Hello,
In the following code snippets, the register method is used to initialize the connection and then logstash ( the app with which I am trying to integrate onstomp ) carries out its stuff in other methods. The first snippet is trying to use failover and the second snippet the event interface.
Using failover:
def connect
begin
@client.connect
@logger.info("Connected to stomp server") if @client.connected?
rescue => e
@logger.debug("Failed in connect : #{e}")
sleep 2
retry
end
end
def register
require "onstomp"
require "onstomp/failover"
@client = OnStomp::Failover::Client.new "failover:(stomp://user:pass@host,stomp://user:pass@host)" # same host and user
connect # connect to stomp server
end # def register
def appmethod
# app logic - subscribe/send
end
Using event interface:
def connect
begin
@client.connect
@logger.info("Connected to stomp server") if @client.connected?
rescue => e
@logger.debug("Failed in connect : #{e}")
sleep 2
retry
end
end
public
def register
require "onstomp"
@client = OnStomp::Client.new "stomp://user:pass@host"
@client.on_connection_closed do | cl, cn, msg |
puts "try reconnecting"
connect
end
connect # connect to the stomp server
end # def register
def appmethod
# app logic - subscribe/send
end
The eventual exception in case of both failover and the event interface, when the broker shuts down is:
Exception `Errno::ECONNRESET' at org/jruby/RubyIO.java:2503 - Connection reset by peer - Connection reset by peer
Exception `Errno::ECONNRESET' at org/jruby/RubyIO.java:2503 - Connection reset by peer - Connection reset by peer
Errno::ECONNRESET: Connection reset by peer - Connection reset by peer
read_nonblock at org/jruby/RubyIO.java:2503
read_nonblock at /home/alcy/.rvm/gems/jruby-1.6.4/gems/onstomp-1.0.4/lib/onstomp/connections/base.rb:307
io_process_read at /home/alcy/.rvm/gems/jruby-1.6.4/gems/onstomp-1.0.4/lib/onstomp/connections/base.rb:210
io_process at /home/alcy/.rvm/gems/jruby-1.6.4/gems/onstomp-1.0.4/lib/onstomp/connections/base.rb:125
start at /home/alcy/.rvm/gems/jruby-1.6.4/gems/onstomp-1.0.4/lib/onstomp/components/threaded_processor.rb:28
call at org/jruby/RubyProc.java:274
call at org/jruby/RubyProc.java:233
Exception `SystemExit' at org/jruby/RubyThread.java:509 - Connection reset by peer - Connection reset by peer
In case of the event interface though, first there are debug messages saying "Connection refused" and the event handler then accordingly tries to connect again. It is able to successfully connect once I bring the broker back up again, but as soon as that happens ( after connection establishment ), immediately the app dies with the above ECONNRESET exception. This behavior is what I am finding tricky. In the broker logs ( I am using Apollo ), I can see during this time three connection attempts ( and three eventual disconnects ).
Thanks for the update.
I see you're running this with jRuby 1.6.4. What version of Apache Apollo are you using, and do you which STOMP protocol version gets negotiated? (If not, you can find out with puts @client.connection.version
in the on_connect
event.
I'll start playing around with this tonight and see if I can figure out why:
Once I reproduce this bug and fix it, I'll push a new bugfix release of OnStomp.
I'll also grab a copy of your fork of logstash and see if I can reproduce the bug there.
I am using Apollo 1.0-beta-5. tcpdump shows the negotiated version to be 1.1 ( client.connection.version complained about a missing 'version' method ). I will commit the changes to my logstash repo ( currently the on_closed interface is used only in my working directory ). Let me know if you need any help with setting up/testing logstash & onstomp. :)
Okay, I'm not able to reproduce the problem with the failover test code I have and Apollo 1.0-beta-5, so I'm going to move on to trying to reproduce the issue with logstash. I've cloned the repository, is there any particular test in the code that demonstrates this error?
I don't think currently there is a test written for it. But to reproduce, I use the input & output plugin separately and simply stop the broker once logstash is running and connection to broker has been established. I think it would be easier if you just did a gem install logstash and copy the onstomp input/output code to this installed gem. And then use a config like this:
For testing input code:
input {
onstomp {
type => "mq"
user => "admin"
password => "password"
destination => "/queue/testqueue"
host => "localhost"
}
}
output {
stdout { }
}
And for testing output:
input {
file {
type => "linux-syslog"
# Wildcards work, here :)
path => [ "/var/log/*.log", "/var/log/messages", "/var/log/syslog" ]
}
}
output {
onstomp {
type => "mq"
user => "admin"
password => "password"
destination => "/queue/testqueue"
host => "localhost"
}
}
And invoke logstash as 'logstash -f config -vvv', wait for connection to broker to get established and then shut down the broker.
I have managed to reproduce this with the given input config, apache apollo, your logstash fork, and onstomp 1.0.4:
D, [2011-10-13T07:54:52.242000 #65991] DEBUG -- inputs/onstomp.rb:33#connect: Failed in connect : Connection refused - Connection refused
I, [2011-10-13T07:54:54.437000 #65991] INFO -- inputs/onstomp.rb:31#connect: Connected to stomp server
Exception `Errno::ECONNRESET' at org/jruby/RubyIO.java:2503 - Connection reset by peer - Connection reset by peer
Exception `Errno::ECONNRESET' at org/jruby/RubyIO.java:2503 - Connection reset by peer - Connection reset by peer
Errno::ECONNRESET: Connection reset by peer - Connection reset by peer
read_nonblock at org/jruby/RubyIO.java:2503
read_nonblock at /Users/chaos/.rvm/gems/jruby-1.6.4/gems/onstomp-1.0.4/lib/onstomp/connections/base.rb:307
io_process_read at /Users/chaos/.rvm/gems/jruby-1.6.4/gems/onstomp-1.0.4/lib/onstomp/connections/base.rb:210
io_process at /Users/chaos/.rvm/gems/jruby-1.6.4/gems/onstomp-1.0.4/lib/onstomp/connections/base.rb:125
start at /Users/chaos/.rvm/gems/jruby-1.6.4/gems/onstomp-1.0.4/lib/onstomp/components/threaded_processor.rb:28
Exception `SystemExit' at org/jruby/RubyThread.java:509 - Connection reset by peer - Connection reset by peer
Now that I can reproduce it, I'll get on finding out what is going wrong.
Thanks for your time & patience. :)
No problem, I'm always up for hunting down and squashing bugs, especially ones that come up when using JRuby in 1.9 mode. With OnStomp 2.0, I plan on dropping support for Ruby < 1.9, so it's probably a good idea to make sure it actually works with all 1.9 implementations, especially JRuby :)
Also, given that Apollo is the first broker I know of that supports STOMP 1.1, it would be nice to know that I'm actually playing nice with real implementations instead of the dummy ones in my test suite :)
Heh yeah. Apollo is pretty cool & easy to setup as well.
Turns out, we're hitting on a race condition, which has been my biggest concern with the 1.0 branch. What's happening is your on_connection_closed
handler is re-connecting the client before the client has finished finalizing its connection due to the error that initially killed it. Your code (and my failover code does this, too) re-establishes the connection, and then lets OnStomp's connection code finish its work: it closes down the connection... again. That's why logstash reports that it re-established the connection, only to immediately throw a Connection Reset exception. I'm working on a quick fix for this now: force the event processing code to be invoked after the faulted connection is completely torn down. A more robust fix won't be available until I either handle events on their own thread, or throw in some mutex locks to prevent this kind of race condition.
Thanks for the help in isolating this, I'll close this out once a new version is released.
Thread synchronization FAIL
Calling Client#connect
creates a brand new connection object and a brand new processor thread. Multiple threads can end up trying to process the IO of a single connection, since the new connection can be created before the prior Processing thread had a chance to properly die.
Ah, I was also trying to do a client.close! on entering the on_closed callback, hoping to get rid of previous connection(s), my lack of knowledge working with threads not withstanding :) Looking forward to the fix !
I've committed a fix for this problem and pushed version 1.0.5 to rubygems.
As an aside, if you manage the reconnecting code yourself, you'll want to re-subscribe to the queues/topics in your reconnect handler :)
Also, as the changelog indicates, this fix is more of a band-aid than anything else and very nicely highlights the concerns I've been having with the interaction between event handling and exceptions across multiple threads.
Thanks again, for the fix, will give it a try ! Yep, will be tracking both branches and update the code accordingly. Looking forward to those. :)
I wrote up some thoughts on how I'd like to solve this and other issues: http://mathish.com/2012/01/08/io-unblock.html Feel free to give any feedback on the direction I'm taking.
Sure. I was also wondering if using celluloid ( a major architectural change I suppose ) should help in alleviating these issues.
On Wed, Jan 11, 2012 at 6:25 PM, Ian Eccles reply@reply.github.com wrote:
I wrote up some thoughts on how I'd like to solve this and other issues: http://mathish.com/2012/01/08/io-unblock.html Feel free to give any feedback on the direction I'm taking.
Reply to this email directly or view it on GitHub: https://github.com/meadvillerb/onstomp/issues/12#issuecomment-3446066
Is it not possible to catch connection reset exceptions ? I found some mention in your blog entry http://mathish.com/2011/04/24/events-on-a-train.html , which hints on this problem. Basically I was trying to use this gem in logstash ( https://github.com/logstash/logstash ) and ran across this.
So when a connection object exists and is connected to a broker, then if broker goes down, there is no way to rescue that and re-attempt a connection. Should I give the event_thread branch a try and/or other suggestions ?