Closed wppurking closed 6 years ago
Hi @wppurking. Thanks for the report! This behavior makes sense as we don't ack/verify that messages we send to NSQ are received by NSQ. So if you've got a backup in the queue and terminate the client prior to sending all messages, those messages will be lost.
I'm going to open a PR with an update to the README as well as add a code sample for those who want to wait for all messages to be sent prior to terminating.
Let me know if that makes sense, and thanks again for writing this up.
Just added this note in the README: https://github.com/wistia/nsq-ruby/commit/c6ff6e506888067d3bdb3726799264bb9e7dfe4e
My preference is to leave this up to the user rather than blocking if there are unsent messages. I suppose we could expose the internal queue so someone could check that it's empty before calling terminate
but I saw the note in your gist that you were seeing zero messages in the queue and still having messages dropped (presumably because it takes time from when you write a message to the socket to when it arrives in NSQ).
Going to close this for now. Let me know if you think we should be handling this case differently.
@bschwartz Thanks for explain the details about this. I noticed the internal queue desgin and the limits. Let me draw some pictures to explain my thoughts.
:stop_write_loop
in SizeQueue
This is the situation "If you're writing messages faster than they can be sent to NSQ calling #terminate
will lose message".
In this situation we need do like
p = Nsq::Producer.new(nsqlookupd: '127.0.0.1:4161', topic: 'st')
# sleep for nsqlookupd is ready
sleep 1
1.upto(10000) { p.write("hello world!") }
# may be like this to ensure all message in the SizeQueue will write to nsqd
# but if the Producer is produce message all the time the loop won`t stop - -||
# (this is out of our discusstion)
while p.queue_size > 0 # fake code to explain: can check the SizeQueue.size
sleep 0.3
end
p.terminate
You explain well in README about this. But i am curious about the situation below.
:stop_write_loop
is the last message in the SizeQueue
If the SizeQueue last message is :stop_write_loop
when break #write_loop
the other message should all write to nsqd though tcpsocket. when i monkey patch about this
class Nsq::Connection
def write_to_socket(raw)
# check how many message send to socket
#puts raw
@socket.write(raw)
end
end
If i send 1000000 message send to nsqd, there have 1000000 PUB command
write to socket.
On the other side when we call #producer.terminate
it internal will block by #stop_write_loop
will ensure all message in SizeQueue will write to socket.
"presumably because it takes time from when you write a message to the socket to when it arrives in NSQ". i have the same guess but i can`t ensure it, so i change the code like below:
require 'nsq'
p = Nsq::Producer.new(nsqlookupd: ['127.0.0.1:4161'], topic: 'st')
# sleep for nsqlookupd is ready
sleep 1
q = p.write("hello world!")
1.upto(1000000 - 1) { p.write("hello world!") }
# wait for all message in SizeQueue write then call terminate
while q.size > 0
sleep 0.3 # change it from 300ms(0.3) to 1ms(0.001)
end
p.terminate
This time the message won't missing, but when i change the sleep time from 300ms to 1ms
they become lose message again.
(This code is no differece between the nsq-ruby already have in #stop_write_loop
in this situation, just give some more time to socket to write data)
I think these situation is exists because we don't ack/verify the message we sent to nsqd. So the message sent to nsqd have network speed to effect the message write. (The data may be write success to the socket but they just on the way to the nsqd 😃 )
Maybe we could do:
#size
(not queue, we can't control how user use queue object), let user to handle it.
Issue
Running the producer code bellow and use nsqadmin to lookup topic Depth, it will lose some message.
Ex: write 10000 message to nsqd but they received 9989 message, will randomly lose some message.
If i increase the message number from 10000 to 1000000 the issue more obvious.
Debug
I use monkey patch to debugg it like this gist
sleep 1
afterstop_read_loop
the message size will correct.@socket.read
afterstop_read_loop
the message size will correct.It seems the
socket.close
is not work correct? Some message was not success receive by nsqd. I am not very familiar with nsqd, just play with it some days, is there something not correct on the nsqd side? or is something we can do on the nsq-ruby side?misc