Closed GoogleCodeExporter closed 9 years ago
It happens also on 0.5-SNAPSHOT with 4 instances of
tools_scripts/subscriber_client
(groovy subscriber_client.groovy localhost) while the publisher
tools_scripts/publisher_client (groovy publisher_client.groovy localhost)
publishes 1000 QoS0 messages.
Note that while on the first run it work, publishes pub 1000 msg in 382ms but
the subscribers receive the 1000 notifications in about 18800ms
Original comment by selva.an...@gmail.com
on 4 Feb 2014 at 10:23
The exception logged is:
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.sendPublish(ProtocolProcessor.java:310)
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.sendPublish(ProtocolProcessor.java:287)
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.publish2Subscribers(ProtocolProcessor.java:278)
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.processPublish(ProtocolProcessor.java:226)
at org.dna.mqtt.moquette.messaging.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:138)
at org.dna.mqtt.moquette.messaging.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:30)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:113)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
Original comment by selva.an...@gmail.com
on 4 Feb 2014 at 10:25
Fixed with commit
http://code.google.com/p/moquette-mqtt/source/detail?r=4ff6b2aeae2156a1b508694e9
171e2b08162af40
Original comment by selva.an...@gmail.com
on 8 Feb 2014 at 7:02
We still see this issue in the following situation:
1. Have two or more subscribers to the same topic.
2. Publish a message to this topic.
3. Except in the case where publisher and subscribers all request qos=2, then
only the first subscriber that connected will get the message body.
I think the problem may be the repeated use of the message ByteBuffer in the
for-loop over subscriptions in ProtocolProcessor.publish2subscribers. Some
preliminary testing indicated that adding a message.rewind() in the loop helps
matters.
Original comment by yoto...@gmail.com
on 12 Feb 2014 at 12:40
Original comment by selva.an...@gmail.com
on 12 Feb 2014 at 7:41
Fixed with the rewind you suggested and integrated some unit tesitng to verify.
http://code.google.com/p/moquette-mqtt/source/detail?r=e73a2a8b9eaecd96a4feadd45
c988686e73692c9
Thanks
Andrea
Original comment by selva.an...@gmail.com
on 15 Feb 2014 at 11:07
I believe there is a fundamental issue with re-using the same ByteBuffer when
publishing a message to multiple subscribers. We are seeing some
non-deterministic effects in experiments with multiple subscribers. After the
ProtocolProcessor puts the OutputMessagingEvent in the RingBuffer, the
ProtocolProcessor can immediately continue the loop and start processing the
publish for the next subscriber (using the same message ByteBuffer). However,
another thread will process the new ValueEvent in the RingBuffer and write this
to the NettyChannel. This second causes the ByteBuffer position to get advanced
while the other thread is still using it to publish to other subscribers.
In the log snippet below, you can see the interleaved threads operating on the
same ByteBuffer, such that the position of the ByteBuffer gets advanced in the
middle of ProtcolProcessor.publish2Subscribers(), at timestamp 42812. (The line
numbers will not match up with the source in the repo because I have added
quite a bit of instrumentation.)
42807 [pool-1-thread-1] DEBUG ProtocolProcessor:257 - Broker republishing to
client <sub-000-000> topic <000> qos <EXACTLY_ONCE>, active true
42807 [pool-1-thread-1] DEBUG ProtocolProcessor:260 - content
<java.nio.HeapByteBuffer[pos=0 lim=34 cap=34]>
42807 [pool-1-thread-1] DEBUG ProtocolProcessor:278 - content
<java.nio.HeapByteBuffer[pos=0 lim=34 cap=34]>
42808 [pool-1-thread-1] DEBUG ProtocolProcessor:282 - content
<java.nio.HeapByteBuffer[pos=0 lim=34 cap=34]>
42808 [pool-1-thread-1] DEBUG ProtocolProcessor:286 - content
<java.nio.HeapByteBuffer[pos=0 lim=34 cap=34]>
42808 [pool-1-thread-1] DEBUG ProtocolProcessor:292 - content
<java.nio.HeapByteBuffer[pos=0 lim=34 cap=34]>
42809 [pool-1-thread-1] DEBUG ProtocolProcessor:308 - sendPublish invoked
clientId <sub-000-000> on topic <000> QoS EXACTLY_ONCE ratained false messageID
1
42809 [pool-1-thread-1] INFO ProtocolProcessor:315 - send publish message to
<sub-000-000> on topic <000>
42809 [pool-1-thread-1] DEBUG ProtocolProcessor:317 - content
<java.nio.HeapByteBuffer[pos=0 lim=34 cap=34]>
42809 [pool-1-thread-1] DEBUG ProtocolProcessor:326 - clientIDs are
{pub-000-000=ConnectionDescriptor{m_clientID=pub-000-000, m_cleanSession=true},
pub-000-001=ConnectionDescriptor{m_clientID=pub-000-001, m_cleanSession=true},
sub-000-000=ConnectionDescriptor{m_clientID=sub-000-000, m_cleanSession=true},
sub-000-001=ConnectionDescriptor{m_clientID=sub-000-001, m_cleanSession=true}}
42810 [pool-1-thread-1] DEBUG ProtocolProcessor:330 - Session for clientId
sub-000-000 is org.dna.mqtt.moquette.server.netty.NettyChannel@46edf730
42810 [pool-1-thread-1] DEBUG ProtocolProcessor:506 - disruptorPublish
publishing event on output
org.dna.mqtt.moquette.server.netty.NettyChannel@46edf730|PublishMessage(3|000|ja
va.nio.HeapByteBuffer[pos=0 lim=34 cap=34])
42810 [pool-2-thread-1] DEBUG ProtocolProcessor:520 - about to write message
to channel
42810 [pool-1-thread-1] DEBUG ProtocolProcessor:257 - Broker republishing to
client <sub-000-001> topic <000> qos <EXACTLY_ONCE>, active true
42811 [pool-2-thread-1] DEBUG NettyChannel:74 - NettyChannel about to
writeAndFlush PublishMessage(3|000|java.nio.HeapByteBuffer[pos=0 lim=34 cap=34])
42811 [pool-1-thread-1] DEBUG ProtocolProcessor:260 - content
<java.nio.HeapByteBuffer[pos=0 lim=34 cap=34]>
42811 [pool-2-thread-1] DEBUG NettyChannel:78 - NettyChannel writeAndFlush
complete
42811 [pool-1-thread-1] DEBUG ProtocolProcessor:278 - content
<java.nio.HeapByteBuffer[pos=0 lim=34 cap=34]>
42812 [pool-1-thread-1] DEBUG ProtocolProcessor:282 - content
<java.nio.HeapByteBuffer[pos=34 lim=34 cap=34]>
Original comment by yoto...@gmail.com
on 19 Feb 2014 at 2:22
Original comment by selva.an...@gmail.com
on 19 Feb 2014 at 7:28
Original comment by selva.an...@gmail.com
on 19 Feb 2014 at 7:29
Fixed with
http://code.google.com/p/moquette-mqtt/source/detail?r=6be1f99cf9f071897fa597eab
da5460a460b24aa, changed the rewind with a duplicate of the buffer (copy only
the buffer's pointers).
Please @yotommy could you check the fix, please?
Original comment by selva.an...@gmail.com
on 22 Feb 2014 at 7:17
My early testing shows that your fix looks good so far. Thanks for the quick
response.
Original comment by yoto...@gmail.com
on 23 Feb 2014 at 10:52
Original issue reported on code.google.com by
erwan.daubert
on 29 Jan 2014 at 2:42