Closed celesteking closed 13 years ago
To get feed back on ERROR frames sent from the broker, you'll need to set up a handler:
client.on_error do |error_frame|
STDERR.puts "Error from broker: #{error_frame.body}"
end
Neither the STOMP 1.0 nor STOMP 1.1 protocols provide an automated means of relating an ERROR frame sent by the broker to a frame sent by the client, so any ERROR frame handling has to be done manually. The only real exception to this is in the CONNECT / CONNECTED frame exchange. If the broker responds to a CONNECT frame with anything other than CONNECTED, it's considered a protocol error, and that should raise an exception in the OnStomp::Client#connect method.
Try setting up an error handler and let me know if the error frame comes through.
Thanks for the feedback.
Thanks for explaining. Well, this sucks then. Uh, at least activemq sends receipt-id that can be used to correlate answer to the request. But I wonder why this gem doesn't provide synchronous communications - I tried to use receipts, but it didn't wait for them - it just went down the code, calling client.disconnect and terminating application.
client.connect
scope = client.with_receipt do |r|
puts "Got my receipt!"
end
scope.send '/queue/testo', 'walks in to the room'
client.send '/queue/testo', 'Did you get this?' do |r|
puts "Got my receipt: #{r['receipt-id']}"
end
client.on_error do |error_frame, client|
STDERR.puts "Error from broker: #{error_frame.body}"
end
puts 'ready to finish'
client.disconnect
pp Thread.list
Thread.list.each {|th| th.join if th != Thread.current}
with output:
/usr/bin/ruby -e $stdout.sync=true;$stderr.sync=true;load($0=ARGV.shift) /on/mq/first.rb
ready to finish
[#<Thread:0x7f7da09a4370 run>]
I thought scope.send
would send the message synchronously - but it didn't wait for it. What's the right usage there?
Also, Thread.list shows only main thread, so I assume Client#disconnect kills all workers, even if there are "waiters" for receipt.
It is a little tricky to deal with as you are correct, all of the IO is done non-blocking and on a separate thread, so none of the methods will block, except Client#disconnect.
Client#disconnect does terminate the worker thread, but not by outright killing it. It tells the IO thread to write all of its contents, and then send a DISCONNECT frame.
A quick way to get your RECEIPT response would be to add a receipt header to your disconnect call:
client.disconnect :receipt => "rcpt-disconnect"
(You could also use a with_receipt
block)
This will still flush the write buffer, but the connection will not be closed until a RECEIPT frame comes back with receipt-id: rcpt-disconnect
.
I'm sorry for the confusion, I should probably make it more explicit in the docs that this library is almost entirely asynchronous, which requires a different approach to developing clients. However, for simple use cases where you have a producers and consumers, it's pretty straight forward. By the time client.disconnect
is called, every frame you've written will be sent. On the consumer side, if your consumers subscribe and loop forever you don't have to do anything more, eventually the IO thread will read and process the data sent to it while the main thread loops.
For mixed operations and situations where you want to ensure your frames were all properly receipted, you'll need to approach things differently since everything is asynchronous. Another simple (and more correct) modification to your code would be:
client.connect
receipt_count = 0
scope = client.with_receipt do |r|
puts "Got my receipt!"
receipt_count += 1
end
scope.send '/queue/testo', 'walks in to the room'
client.send '/queue/testo', 'Did you get this?' do |r|
puts "Got my receipt: #{r['receipt-id']}"
receipt_count += 1
end
client.on_error do |error_frame, client|
STDERR.puts "Error from broker: #{error_frame.body}"
end
Thread.pass while receipt_count < 2
puts 'ready to finish'
client.disconnect
It's a different sort of approach, and I admit it's not everyone's cup of tea. You might like the stomp
gem's approach more. I'm not sure if you can make frames and their receipts synchronous, but with the last version I looked at, all of the client frame methods (#send
, #subscribe
, etc) were blocking at least.
I guess my docs need more work than I thought :)
One thing to note in the above code is that you don't have to worry about using mutex synchronization when updating receipt_count
, as both of those events are dispatched from the same thread and will be dispatched consecutively, never concurrently. RIght now there are a few instances where you'd have to be more careful, calling client.send ...
triggers the before_transmitting
and before_send
events within the same thread that client.send
was called in, whereas after_transmitting
and on_send
are called within the IO processing thread. One of the changes I'm working on is putting all event dispatch into its own thread, so variables can be safely modified in any event callback without having to worry about crap like that.
If you have any other questions or comments, feel free to shoot me a message. And, of course, if you find any other bugs feel free to open another issue here.
Thanks again for your input. It's given me an idea for more specialized error handling.
Never seen such a helpful open-source developer! ;) Huge thanks for the help.
No problem, glad to help. If there's more interest in synchronous behavior in the future, I may implement that functionality. However, I'm not sure how much use this library is seeing at the moment, so it's difficult for me to commit to future features that don't just serve myself :)
It doesn't handle errors returned form broker. Thing finishes successfully while it should alert like hell. Also I wonder why broker send that error as all access params were configured correctly. tcpdump attached.
Here's the code:
client = OnStomp.connect('stomp://xxx:6163', opts)
scope = client.with_receipt do |r| puts "Got my receipt!" end
scope.send '/queue/testo', 'walks in to the room'
01:13:09.987833 IP 192.168.88.102.46657 > xxx.6163: S 945257220:945257220(0) win 5840 <mss 1460,sackOK,timestamp 439263393 0,nop,wscale 7> E..<..@.@.._..Xf.V.Q.A..8W{.................... ............ 01:13:10.133726 IP xxx.6163 > 192.168.88.102.46657: S 1950693974:1950693974(0) ack 945257221 win 5840 <mss 1460,nop,nop,sackOK> E..0..@.5....V.Q..Xf...AtE:V8W{.p....;.......... 01:13:10.133763 IP 192.168.88.102.46657 > xxx.6163: . ack 1 win 5840 E..(..@.@..r..Xf.V.Q.A..8W{.tE:WP....... 01:13:10.134141 IP 192.168.88.102.46657 > xxx.6163: P 1:100(99) ack 1 win 5840 E.....@.@.....Xf.V.Q.A..8W{.tE:WP....H..CONNECT login:tester heart-beat:0,0 passcode:sdfsdf host:broker.yyy accept-version:1.0,1.1
. 01:13:10.279916 IP xxx.6163 > 192.168.88.102.46657: . ack 100 win 5840 E..(v.@.5.@@.V.Q..Xf...AtE:W8W{hP............J 01:13:10.286294 IP xxx.6163 > 192.168.88.102.46657: P 1:67(66) ack 100 win 5840 E..jv.@.5.?..V.Q..Xf...AtE:W8W{hP...=...CONNECTED session:ID:broker.yyy-49796-1303594934541-4:58
.
01:13:10.286309 IP 192.168.88.102.46657 > xxx.6163: . ack 67 win 5840 E..(..@.@..p..Xf.V.Q.A..8W{htE:.P....... 01:13:10.387027 IP 192.168.88.102.46657 > xxx.6163: P 100:180(80) ack 67 win 5840 E..x..@.@.....Xf.V.Q.A..8W{htE:.P....T..SEND destination:/queue/testo receipt:1 content-length:20
walks in to the room. 01:13:10.535728 IP xxx.6163 > 192.168.88.102.46657: . 67:1527(1460) ack 180 win 5840 E...v.@.5.:..V.Q..Xf...AtE:.8W{.P....3..ERROR message:User tester is not authorized to write to: queue://testo receipt-id:1
java.lang.SecurityException: User tester is not authorized to write to: queue://testo at org.apache.activemq.security.AuthorizationBroker.send(AuthorizationBroker.java:187) at org.apache.activemq.broker.MutableBrokerFilter.send(MutableBrokerFilter.java:135) at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:460) at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:663) at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:309) at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:185) at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69) at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:81) at org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:140) at org.apache.activemq.transport.stomp.ProtocolConverter.onStompSend(ProtocolConverter.java:253) at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:178) at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:70) at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83) at org.apache.activemq.transport.tcp.TcpTrans 01:13:10.535775 IP 192.168.88.102.46657 > xxx.6163: . ack 1527 win 8760 E..(..@.@..n..Xf.V.Q.A..8W{.tE@MP."8.... 01:13:10.535784 IP xxx.6163 > 192.168.88.102.46657: P 1527:1651(124) ack 180 win 5840 E...v.@.5.?..V.Q..Xf...AtE@M8W{.P.......) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:201) at java.lang.Thread.run(Thread.java:662)