maydemirx / moquette-mqtt

Automatically exported from code.google.com/p/moquette-mqtt
Apache License 2.0
0 stars 1 forks source link

Multiple subscribes but only one receive the message (other receive an empty message instead) #30

Closed GoogleCodeExporter closed 8 years ago

GoogleCodeExporter commented 8 years ago
What steps will reproduce the problem?
1. start the server
2. start multiple clients (using paho)
3. start a publisher which send multiple data one the same topic

What is the expected output? What do you see instead?
Each subscribers must receive all the data published.
Instead often only one of the subscriber receive the data and others receive an 
empty data.

What version of the product are you using? On what operating system?
0.4
linux

Please provide any additional information below.

Original issue reported on code.google.com by erwan.daubert on 29 Jan 2014 at 2:42

GoogleCodeExporter commented 8 years ago
It happens also on 0.5-SNAPSHOT with 4 instances of 
tools_scripts/subscriber_client
(groovy subscriber_client.groovy localhost) while the publisher 
tools_scripts/publisher_client (groovy publisher_client.groovy  localhost) 
publishes 1000 QoS0 messages. 
Note that while on the first run it work, publishes pub 1000 msg in 382ms but 
the subscribers receive the 1000 notifications in about 18800ms

Original comment by selva.an...@gmail.com on 4 Feb 2014 at 10:23

GoogleCodeExporter commented 8 years ago
The exception logged is:
    at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.sendPublish(ProtocolProcessor.java:310)
    at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.sendPublish(ProtocolProcessor.java:287)
    at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.publish2Subscribers(ProtocolProcessor.java:278)
    at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.processPublish(ProtocolProcessor.java:226)
    at org.dna.mqtt.moquette.messaging.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:138)
    at org.dna.mqtt.moquette.messaging.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:30)
    at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:113)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
    at java.util.concurrent.FutureTask.run(FutureTask.java:166)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:722)

Original comment by selva.an...@gmail.com on 4 Feb 2014 at 10:25

GoogleCodeExporter commented 8 years ago
Fixed with commit 
http://code.google.com/p/moquette-mqtt/source/detail?r=4ff6b2aeae2156a1b508694e9
171e2b08162af40

Original comment by selva.an...@gmail.com on 8 Feb 2014 at 7:02

GoogleCodeExporter commented 8 years ago
We still see this issue in the following situation:

1. Have two or more subscribers to the same topic.
2. Publish a message to this topic.
3. Except in the case where publisher and subscribers all request qos=2, then 
only the first subscriber that connected will get the message body.

I think the problem may be the repeated use of the message ByteBuffer in the 
for-loop over subscriptions in ProtocolProcessor.publish2subscribers. Some 
preliminary testing indicated that adding a message.rewind() in the loop helps 
matters.

Original comment by yoto...@gmail.com on 12 Feb 2014 at 12:40

GoogleCodeExporter commented 8 years ago

Original comment by selva.an...@gmail.com on 12 Feb 2014 at 7:41

GoogleCodeExporter commented 8 years ago
Fixed with the rewind you suggested and integrated some unit tesitng to verify.
http://code.google.com/p/moquette-mqtt/source/detail?r=e73a2a8b9eaecd96a4feadd45
c988686e73692c9

Thanks

 Andrea

Original comment by selva.an...@gmail.com on 15 Feb 2014 at 11:07

GoogleCodeExporter commented 8 years ago
I believe there is a fundamental issue with re-using the same ByteBuffer when 
publishing a message to multiple subscribers. We are seeing some 
non-deterministic effects in experiments with multiple subscribers. After the 
ProtocolProcessor puts the OutputMessagingEvent in the RingBuffer, the 
ProtocolProcessor can immediately continue the loop and start processing the 
publish for the next subscriber (using the same message ByteBuffer). However, 
another thread will process the new ValueEvent in the RingBuffer and write this 
to the NettyChannel. This second causes the ByteBuffer position to get advanced 
while the other thread is still using it to publish to other subscribers.

In the log snippet below, you can see the interleaved threads operating on the 
same ByteBuffer, such that the position of the ByteBuffer gets advanced in the 
middle of ProtcolProcessor.publish2Subscribers(), at timestamp 42812. (The line 
numbers will not match up with the source in the repo because I have added 
quite a bit of instrumentation.)

42807 [pool-1-thread-1] DEBUG ProtocolProcessor:257  - Broker republishing to 
client <sub-000-000> topic <000> qos <EXACTLY_ONCE>, active true
42807 [pool-1-thread-1] DEBUG ProtocolProcessor:260  - content 
<java.nio.HeapByteBuffer[pos=0 lim=34 cap=34]>
42807 [pool-1-thread-1] DEBUG ProtocolProcessor:278  - content 
<java.nio.HeapByteBuffer[pos=0 lim=34 cap=34]>
42808 [pool-1-thread-1] DEBUG ProtocolProcessor:282  - content 
<java.nio.HeapByteBuffer[pos=0 lim=34 cap=34]>
42808 [pool-1-thread-1] DEBUG ProtocolProcessor:286  - content 
<java.nio.HeapByteBuffer[pos=0 lim=34 cap=34]>
42808 [pool-1-thread-1] DEBUG ProtocolProcessor:292  - content 
<java.nio.HeapByteBuffer[pos=0 lim=34 cap=34]>
42809 [pool-1-thread-1] DEBUG ProtocolProcessor:308  - sendPublish invoked 
clientId <sub-000-000> on topic <000> QoS EXACTLY_ONCE ratained false messageID 
1
42809 [pool-1-thread-1] INFO  ProtocolProcessor:315  - send publish message to 
<sub-000-000> on topic <000>
42809 [pool-1-thread-1] DEBUG ProtocolProcessor:317  - content 
<java.nio.HeapByteBuffer[pos=0 lim=34 cap=34]>
42809 [pool-1-thread-1] DEBUG ProtocolProcessor:326  - clientIDs are 
{pub-000-000=ConnectionDescriptor{m_clientID=pub-000-000, m_cleanSession=true}, 
pub-000-001=ConnectionDescriptor{m_clientID=pub-000-001, m_cleanSession=true}, 
sub-000-000=ConnectionDescriptor{m_clientID=sub-000-000, m_cleanSession=true}, 
sub-000-001=ConnectionDescriptor{m_clientID=sub-000-001, m_cleanSession=true}}
42810 [pool-1-thread-1] DEBUG ProtocolProcessor:330  - Session for clientId 
sub-000-000 is org.dna.mqtt.moquette.server.netty.NettyChannel@46edf730
42810 [pool-1-thread-1] DEBUG ProtocolProcessor:506  - disruptorPublish 
publishing event on output 
org.dna.mqtt.moquette.server.netty.NettyChannel@46edf730|PublishMessage(3|000|ja
va.nio.HeapByteBuffer[pos=0 lim=34 cap=34])
42810 [pool-2-thread-1] DEBUG ProtocolProcessor:520  - about to write message 
to channel
42810 [pool-1-thread-1] DEBUG ProtocolProcessor:257  - Broker republishing to 
client <sub-000-001> topic <000> qos <EXACTLY_ONCE>, active true
42811 [pool-2-thread-1] DEBUG NettyChannel:74  - NettyChannel about to 
writeAndFlush PublishMessage(3|000|java.nio.HeapByteBuffer[pos=0 lim=34 cap=34])
42811 [pool-1-thread-1] DEBUG ProtocolProcessor:260  - content 
<java.nio.HeapByteBuffer[pos=0 lim=34 cap=34]>
42811 [pool-2-thread-1] DEBUG NettyChannel:78  - NettyChannel writeAndFlush 
complete
42811 [pool-1-thread-1] DEBUG ProtocolProcessor:278  - content 
<java.nio.HeapByteBuffer[pos=0 lim=34 cap=34]>
42812 [pool-1-thread-1] DEBUG ProtocolProcessor:282  - content 
<java.nio.HeapByteBuffer[pos=34 lim=34 cap=34]>

Original comment by yoto...@gmail.com on 19 Feb 2014 at 2:22

GoogleCodeExporter commented 8 years ago

Original comment by selva.an...@gmail.com on 19 Feb 2014 at 7:28

GoogleCodeExporter commented 8 years ago

Original comment by selva.an...@gmail.com on 19 Feb 2014 at 7:29

GoogleCodeExporter commented 8 years ago
Fixed with 
http://code.google.com/p/moquette-mqtt/source/detail?r=6be1f99cf9f071897fa597eab
da5460a460b24aa, changed the rewind with a duplicate of the buffer (copy only 
the buffer's pointers).

Please @yotommy could you check the fix, please?

Original comment by selva.an...@gmail.com on 22 Feb 2014 at 7:17

GoogleCodeExporter commented 8 years ago
My early testing shows that your fix looks good so far. Thanks for the quick 
response.

Original comment by yoto...@gmail.com on 23 Feb 2014 at 10:52