moquette-io / moquette

Java MQTT lightweight broker
http://moquette-io.github.io/moquette/
Apache License 2.0
2.31k stars 818 forks source link

ProtocolProcess.processConnect() processes closed sessions. #101

Closed runafter closed 9 years ago

runafter commented 9 years ago

When server is too busy, some ConnectMessage cannot be processed untill Constants.DEFAULT_CONNECT_TIMEOUT (10 seconds). In this case, IdleStateHandler and MoquetteIdleTimoutHandler make channel to close. But ProtocolProcess.processConnect() processes all messages connected and not connected.

Here is my logs and test code.

20:27:48,052 [main] INFO  Server startServer 95  - Persistent store file: C:\Users\runafter\git\runafter\moquette\broker\moquette_store.mapdb
20:27:48,345 [main] DEBUG SubscriptionsStore init 102  - init invoked
20:27:48,348 [main] DEBUG SubscriptionsStore init 107  - Reloading all stored subscriptions...subscription tree before 

20:27:48,348 [main] DEBUG SubscriptionsStore init 115  - Finished loading. Subscription tree after 

20:27:48,349 [main] INFO  SimpleMessaging processInit 250  - Starting without ACL definition
20:27:48,350 [main] DEBUG ProtocolProcessor init 137  - subscription tree on init 

20:27:48,595 [main] INFO  NettyAcceptor initFactory 152  - Server binded host: 0.0.0.0, port: 1883
20:27:48,598 [main] INFO  NettyAcceptor initFactory 152  - Server binded host: 0.0.0.0, port: 8080
20:27:48,692 [nioEventLoopGroup-3-1] DEBUG SimpleMessaging disruptorPublish 116  - disruptorPublish publishing event org.eclipse.moquette.spi.impl.events.LostConnectionEvent@2adf16cd
20:27:48,693 [pool-1-thread-1] INFO  SimpleMessaging onEvent 158  - onEvent processing messaging event from input ringbuffer org.eclipse.moquette.spi.impl.events.LostConnectionEvent@2adf16cd
20:27:48,694 [nioEventLoopGroup-3-2] DEBUG SimpleMessaging disruptorPublish 116  - disruptorPublish publishing event ProtocolEvent wrapping CONNECT
20:27:48,694 [nioEventLoopGroup-3-4] DEBUG SimpleMessaging disruptorPublish 116  - disruptorPublish publishing event ProtocolEvent wrapping CONNECT
20:27:48,694 [nioEventLoopGroup-3-3] DEBUG SimpleMessaging disruptorPublish 116  - disruptorPublish publishing event ProtocolEvent wrapping CONNECT
20:27:48,694 [pool-1-thread-1] INFO  SimpleMessaging onEvent 158  - onEvent processing messaging event from input ringbuffer ProtocolEvent wrapping CONNECT
20:27:48,694 [nioEventLoopGroup-3-1] DEBUG SimpleMessaging disruptorPublish 116  - disruptorPublish publishing event org.eclipse.moquette.spi.impl.events.LostConnectionEvent@21e85538
20:27:48,695 [nioEventLoopGroup-3-5] DEBUG SimpleMessaging disruptorPublish 116  - disruptorPublish publishing event ProtocolEvent wrapping CONNECT
20:27:48,696 [nioEventLoopGroup-3-2] DEBUG SimpleMessaging disruptorPublish 116  - disruptorPublish publishing event org.eclipse.moquette.spi.impl.events.LostConnectionEvent@76fd5428
20:27:48,696 [nioEventLoopGroup-3-4] DEBUG SimpleMessaging disruptorPublish 116  - disruptorPublish publishing event org.eclipse.moquette.spi.impl.events.LostConnectionEvent@76fd5428
20:27:48,696 [nioEventLoopGroup-3-3] DEBUG SimpleMessaging disruptorPublish 116  - disruptorPublish publishing event org.eclipse.moquette.spi.impl.events.LostConnectionEvent@7cd0e73c
20:27:48,697 [nioEventLoopGroup-3-4] DEBUG SimpleMessaging disruptorPublish 116  - disruptorPublish publishing event org.eclipse.moquette.spi.impl.events.LostConnectionEvent@4ce3c61
20:27:48,697 [nioEventLoopGroup-3-2] DEBUG SimpleMessaging disruptorPublish 116  - disruptorPublish publishing event org.eclipse.moquette.spi.impl.events.LostConnectionEvent@229198ea
20:27:48,697 [nioEventLoopGroup-3-3] DEBUG SimpleMessaging disruptorPublish 116  - disruptorPublish publishing event org.eclipse.moquette.spi.impl.events.LostConnectionEvent@305aecd9
20:27:48,698 [nioEventLoopGroup-3-5] DEBUG SimpleMessaging disruptorPublish 116  - disruptorPublish publishing event org.eclipse.moquette.spi.impl.events.LostConnectionEvent@8664e41
20:27:48,698 [nioEventLoopGroup-3-5] DEBUG SimpleMessaging disruptorPublish 116  - disruptorPublish publishing event org.eclipse.moquette.spi.impl.events.LostConnectionEvent@63643de1
20:27:48,699 [pool-1-thread-1] DEBUG ProtocolProcessor processConnect 160  - CONNECT for client <client1>
20:27:48,699 [nioEventLoopGroup-3-7] DEBUG SimpleMessaging disruptorPublish 116  - disruptorPublish publishing event ProtocolEvent wrapping CONNECT
20:27:48,699 [nioEventLoopGroup-3-8] DEBUG SimpleMessaging disruptorPublish 116  - disruptorPublish publishing event ProtocolEvent wrapping CONNECT
20:27:48,699 [nioEventLoopGroup-3-6] DEBUG SimpleMessaging disruptorPublish 116  - disruptorPublish publishing event ProtocolEvent wrapping CONNECT
20:27:48,699 [pool-1-thread-1] DEBUG ProtocolProcessor processConnect 224  - Connect with keepAlive 60 s
20:27:48,700 [pool-1-thread-1] DEBUG ProtocolProcessor processConnect 229  - Connect create session <session [clientID: client1]org.eclipse.moquette.server.netty.NettyChannel@4b658b53>
20:27:48,700 [pool-1-thread-1] DEBUG SubscriptionsStore activate 209  - Activating subscriptions for clientID <client1>
20:27:48,700 [nioEventLoopGroup-3-7] DEBUG SimpleMessaging disruptorPublish 116  - disruptorPublish publishing event org.eclipse.moquette.spi.impl.events.LostConnectionEvent@779e1cd0
20:27:48,701 [nioEventLoopGroup-3-7] DEBUG SimpleMessaging disruptorPublish 116  - disruptorPublish publishing event org.eclipse.moquette.spi.impl.events.LostConnectionEvent@2d75d90a
20:27:48,702 [nioEventLoopGroup-3-8] DEBUG SimpleMessaging disruptorPublish 116  - disruptorPublish publishing event org.eclipse.moquette.spi.impl.events.LostConnectionEvent@1f20a0ab
20:27:48,702 [nioEventLoopGroup-3-6] DEBUG SimpleMessaging disruptorPublish 116  - disruptorPublish publishing event org.eclipse.moquette.spi.impl.events.LostConnectionEvent@17144a84
20:27:48,702 [nioEventLoopGroup-3-8] DEBUG SimpleMessaging disruptorPublish 116  - disruptorPublish publishing event org.eclipse.moquette.spi.impl.events.LostConnectionEvent@35282bf2
20:27:48,702 [nioEventLoopGroup-3-6] DEBUG SimpleMessaging disruptorPublish 116  - disruptorPublish publishing event org.eclipse.moquette.spi.impl.events.LostConnectionEvent@605d3821
20:27:48,748 [pool-1-thread-1] INFO  ProtocolProcessor cleanSession 304  - cleaning old saved subscriptions for client <client1>
20:27:48,777 [pool-1-thread-1] INFO  ProtocolProcessor processConnect 260  - Create persistent session for clientID <client1>
20:27:48,812 [pool-1-thread-1] INFO  ProtocolProcessor processConnect 262  - Connected client ID <client1> with clean session true
20:27:48,812 [pool-1-thread-1] INFO  SimpleMessaging onEvent 158  - onEvent processing messaging event from input ringbuffer ProtocolEvent wrapping CONNECT
20:27:48,817 [pool-1-thread-1] DEBUG ProtocolProcessor processConnect 160  - CONNECT for client <client4>
20:27:48,817 [pool-1-thread-1] DEBUG ProtocolProcessor processConnect 224  - Connect with keepAlive 60 s
20:27:48,817 [pool-1-thread-1] DEBUG ProtocolProcessor processConnect 229  - Connect create session <session [clientID: client4]org.eclipse.moquette.server.netty.NettyChannel@5da1fbed>
20:27:48,817 [pool-1-thread-1] DEBUG SubscriptionsStore activate 209  - Activating subscriptions for clientID <client4>
20:27:48,841 [main] ERROR ServerIntegrationConnectionTest waitForCompletion 101  - token.waitForCompletion() client[null] Connection lost (32109) - java.io.EOFException
20:27:48,841 [main] ERROR ServerIntegrationConnectionTest waitForCompletion 102  - Throwable : 
Connection lost (32109) - java.io.EOFException
    at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:146)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
    at java.io.DataInputStream.readByte(DataInputStream.java:267)
    at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:65)
    at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:107)
    ... 1 more
20:27:48,843 [main] ERROR ServerIntegrationConnectionTest waitForCompletion 101  - token.waitForCompletion() client[null] Connection lost (32109) - java.io.EOFException
20:27:48,843 [main] ERROR ServerIntegrationConnectionTest waitForCompletion 102  - Throwable : 
Connection lost (32109) - java.io.EOFException
    at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:146)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
    at java.io.DataInputStream.readByte(DataInputStream.java:267)
    at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:65)
    at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:107)
    ... 1 more
20:27:48,843 [pool-1-thread-1] INFO  ProtocolProcessor cleanSession 304  - cleaning old saved subscriptions for client <client4>
20:27:48,843 [main] ERROR ServerIntegrationConnectionTest waitForCompletion 101  - token.waitForCompletion() client[null] Connection lost (32109) - java.io.EOFException
20:27:48,844 [main] ERROR ServerIntegrationConnectionTest waitForCompletion 102  - Throwable : 
Connection lost (32109) - java.io.EOFException
    at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:146)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
    at java.io.DataInputStream.readByte(DataInputStream.java:267)
    at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:65)
    at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:107)
    ... 1 more
20:27:48,844 [main] ERROR ServerIntegrationConnectionTest waitForCompletion 101  - token.waitForCompletion() client[null] Connection lost (32109) - java.io.EOFException
20:27:48,844 [main] ERROR ServerIntegrationConnectionTest waitForCompletion 102  - Throwable : 
Connection lost (32109) - java.io.EOFException
    at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:146)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
    at java.io.DataInputStream.readByte(DataInputStream.java:267)
    at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:65)
    at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:107)
    ... 1 more
20:27:48,857 [main] INFO  Server stopServer 112  - Server stopping...
20:27:48,870 [pool-1-thread-1] INFO  ProtocolProcessor processConnect 260  - Create persistent session for clientID <client4>
20:27:48,902 [pool-1-thread-1] INFO  ProtocolProcessor processConnect 262  - Connected client ID <client4> with clean session true
20:27:48,902 [pool-1-thread-1] INFO  SimpleMessaging onEvent 158  - onEvent processing messaging event from input ringbuffer ProtocolEvent wrapping CONNECT
20:27:48,908 [pool-1-thread-1] DEBUG ProtocolProcessor processConnect 160  - CONNECT for client <client2>
20:27:48,908 [pool-1-thread-1] DEBUG ProtocolProcessor processConnect 224  - Connect with keepAlive 60 s
20:27:48,908 [pool-1-thread-1] DEBUG ProtocolProcessor processConnect 229  - Connect create session <session [clientID: client2]org.eclipse.moquette.server.netty.NettyChannel@11f55379>
20:27:48,908 [pool-1-thread-1] DEBUG SubscriptionsStore activate 209  - Activating subscriptions for clientID <client2>
20:27:48,934 [pool-1-thread-1] INFO  ProtocolProcessor cleanSession 304  - cleaning old saved subscriptions for client <client2>
20:27:48,967 [pool-1-thread-1] INFO  ProtocolProcessor processConnect 260  - Create persistent session for clientID <client2>
20:27:48,994 [pool-1-thread-1] INFO  ProtocolProcessor processConnect 262  - Connected client ID <client2> with clean session true
20:27:48,994 [pool-1-thread-1] INFO  SimpleMessaging onEvent 158  - onEvent processing messaging event from input ringbuffer org.eclipse.moquette.spi.impl.events.LostConnectionEvent@21e85538
20:27:48,994 [pool-1-thread-1] INFO  SimpleMessaging onEvent 158  - onEvent processing messaging event from input ringbuffer ProtocolEvent wrapping CONNECT
20:27:48,999 [pool-1-thread-1] DEBUG ProtocolProcessor processConnect 160  - CONNECT for client <client3>
20:27:48,999 [pool-1-thread-1] DEBUG ProtocolProcessor processConnect 224  - Connect with keepAlive 60 s
20:27:48,999 [pool-1-thread-1] DEBUG ProtocolProcessor processConnect 229  - Connect create session <session [clientID: client3]org.eclipse.moquette.server.netty.NettyChannel@68f2ee49>
20:27:48,999 [pool-1-thread-1] DEBUG SubscriptionsStore activate 209  - Activating subscriptions for clientID <client3>
20:27:49,019 [pool-1-thread-1] INFO  ProtocolProcessor cleanSession 304  - cleaning old saved subscriptions for client <client3>
20:27:49,050 [pool-1-thread-1] INFO  ProtocolProcessor processConnect 260  - Create persistent session for clientID <client3>
20:27:49,061 [main] INFO  NettyAcceptor close 298  - Msg read: 14, msg wrote: 0
20:27:49,061 [main] INFO  NettyAcceptor close 301  - Bytes read: 465, bytes wrote: 0
20:27:49,061 [main] DEBUG SimpleMessaging disruptorPublish 116  - disruptorPublish publishing event org.eclipse.moquette.spi.impl.events.StopEvent@4ec60971
20:27:49,061 [main] DEBUG SimpleMessaging stop 141  - waiting 10 sec to m_stopLatch
20:27:49,069 [pool-1-thread-1] INFO  ProtocolProcessor processConnect 262  - Connected client ID <client3> with clean session true
20:27:49,069 [pool-1-thread-1] INFO  SimpleMessaging onEvent 158  - onEvent processing messaging event from input ringbuffer org.eclipse.moquette.spi.impl.events.LostConnectionEvent@76fd5428
20:27:49,070 [pool-1-thread-1] INFO  SimpleMessaging onEvent 158  - onEvent processing messaging event from input ringbuffer org.eclipse.moquette.spi.impl.events.LostConnectionEvent@76fd5428
20:27:49,070 [pool-1-thread-1] INFO  SimpleMessaging onEvent 158  - onEvent processing messaging event from input ringbuffer org.eclipse.moquette.spi.impl.events.LostConnectionEvent@7cd0e73c
20:27:49,070 [pool-1-thread-1] INFO  SimpleMessaging onEvent 158  - onEvent processing messaging event from input ringbuffer org.eclipse.moquette.spi.impl.events.LostConnectionEvent@4ce3c61
20:27:49,070 [pool-1-thread-1] INFO  SimpleMessaging onEvent 158  - onEvent processing messaging event from input ringbuffer org.eclipse.moquette.spi.impl.events.LostConnectionEvent@229198ea
20:27:49,070 [pool-1-thread-1] INFO  SimpleMessaging onEvent 158  - onEvent processing messaging event from input ringbuffer org.eclipse.moquette.spi.impl.events.LostConnectionEvent@305aecd9
20:27:49,070 [pool-1-thread-1] INFO  SimpleMessaging onEvent 158  - onEvent processing messaging event from input ringbuffer org.eclipse.moquette.spi.impl.events.LostConnectionEvent@8664e41
20:27:49,070 [pool-1-thread-1] INFO  SimpleMessaging onEvent 158  - onEvent processing messaging event from input ringbuffer org.eclipse.moquette.spi.impl.events.LostConnectionEvent@63643de1
20:27:49,070 [pool-1-thread-1] INFO  SimpleMessaging onEvent 158  - onEvent processing messaging event from input ringbuffer ProtocolEvent wrapping CONNECT
20:27:49,075 [pool-1-thread-1] DEBUG ProtocolProcessor processConnect 160  - CONNECT for client <client4>
20:27:49,075 [pool-1-thread-1] INFO  ProtocolProcessor processConnect 207  - Found an existing connection with same client ID <client4>, forcing to close
20:27:49,075 [pool-1-thread-1] INFO  ProtocolProcessor cleanSession 304  - cleaning old saved subscriptions for client <client4>
20:27:49,094 [pool-1-thread-1] DEBUG ProtocolProcessor processConnect 217  - Existing connection with same client ID <client4>, forced to close
20:27:49,094 [pool-1-thread-1] DEBUG ProtocolProcessor processConnect 224  - Connect with keepAlive 60 s
20:27:49,094 [pool-1-thread-1] DEBUG ProtocolProcessor processConnect 229  - Connect create session <session [clientID: client4]org.eclipse.moquette.server.netty.NettyChannel@7dcf7147>
20:27:49,095 [pool-1-thread-1] DEBUG SubscriptionsStore activate 209  - Activating subscriptions for clientID <client4>
20:27:49,114 [pool-1-thread-1] INFO  ProtocolProcessor cleanSession 304  - cleaning old saved subscriptions for client <client4>
20:27:49,133 [pool-1-thread-1] INFO  ProtocolProcessor processConnect 260  - Create persistent session for clientID <client4>
20:27:49,159 [pool-1-thread-1] INFO  ProtocolProcessor processConnect 262  - Connected client ID <client4> with clean session true
20:27:49,159 [pool-1-thread-1] INFO  SimpleMessaging onEvent 158  - onEvent processing messaging event from input ringbuffer ProtocolEvent wrapping CONNECT
20:27:49,164 [pool-1-thread-1] DEBUG ProtocolProcessor processConnect 160  - CONNECT for client <client2>
20:27:49,164 [pool-1-thread-1] INFO  ProtocolProcessor processConnect 207  - Found an existing connection with same client ID <client2>, forcing to close
20:27:49,164 [pool-1-thread-1] INFO  ProtocolProcessor cleanSession 304  - cleaning old saved subscriptions for client <client2>
20:27:49,183 [pool-1-thread-1] DEBUG ProtocolProcessor processConnect 217  - Existing connection with same client ID <client2>, forced to close
20:27:49,183 [pool-1-thread-1] DEBUG ProtocolProcessor processConnect 224  - Connect with keepAlive 60 s
20:27:49,183 [pool-1-thread-1] DEBUG ProtocolProcessor processConnect 229  - Connect create session <session [clientID: client2]org.eclipse.moquette.server.netty.NettyChannel@31a48a84>
20:27:49,184 [pool-1-thread-1] DEBUG SubscriptionsStore activate 209  - Activating subscriptions for clientID <client2>
20:27:49,208 [pool-1-thread-1] INFO  ProtocolProcessor cleanSession 304  - cleaning old saved subscriptions for client <client2>
20:27:49,234 [pool-1-thread-1] INFO  ProtocolProcessor processConnect 260  - Create persistent session for clientID <client2>
20:27:49,253 [pool-1-thread-1] INFO  ProtocolProcessor processConnect 262  - Connected client ID <client2> with clean session true
20:27:49,254 [pool-1-thread-1] INFO  SimpleMessaging onEvent 158  - onEvent processing messaging event from input ringbuffer ProtocolEvent wrapping CONNECT
20:27:49,259 [pool-1-thread-1] DEBUG ProtocolProcessor processConnect 160  - CONNECT for client <client1>
20:27:49,259 [pool-1-thread-1] INFO  ProtocolProcessor processConnect 207  - Found an existing connection with same client ID <client1>, forcing to close
20:27:49,259 [pool-1-thread-1] INFO  ProtocolProcessor cleanSession 304  - cleaning old saved subscriptions for client <client1>
20:27:49,279 [pool-1-thread-1] DEBUG ProtocolProcessor processConnect 217  - Existing connection with same client ID <client1>, forced to close
20:27:49,279 [pool-1-thread-1] DEBUG ProtocolProcessor processConnect 224  - Connect with keepAlive 60 s
20:27:49,279 [pool-1-thread-1] DEBUG ProtocolProcessor processConnect 229  - Connect create session <session [clientID: client1]org.eclipse.moquette.server.netty.NettyChannel@4995afcf>
20:27:49,279 [pool-1-thread-1] DEBUG SubscriptionsStore activate 209  - Activating subscriptions for clientID <client1>
20:27:49,304 [pool-1-thread-1] INFO  ProtocolProcessor cleanSession 304  - cleaning old saved subscriptions for client <client1>
20:27:49,336 [pool-1-thread-1] INFO  ProtocolProcessor processConnect 260  - Create persistent session for clientID <client1>
20:27:49,386 [pool-1-thread-1] INFO  ProtocolProcessor processConnect 262  - Connected client ID <client1> with clean session true
20:27:49,387 [pool-1-thread-1] INFO  SimpleMessaging onEvent 158  - onEvent processing messaging event from input ringbuffer org.eclipse.moquette.spi.impl.events.LostConnectionEvent@779e1cd0
20:27:49,387 [pool-1-thread-1] INFO  SimpleMessaging onEvent 158  - onEvent processing messaging event from input ringbuffer org.eclipse.moquette.spi.impl.events.LostConnectionEvent@2d75d90a
20:27:49,388 [pool-1-thread-1] INFO  SimpleMessaging onEvent 158  - onEvent processing messaging event from input ringbuffer org.eclipse.moquette.spi.impl.events.LostConnectionEvent@1f20a0ab
20:27:49,388 [pool-1-thread-1] INFO  SimpleMessaging onEvent 158  - onEvent processing messaging event from input ringbuffer org.eclipse.moquette.spi.impl.events.LostConnectionEvent@17144a84
20:27:49,388 [pool-1-thread-1] INFO  SimpleMessaging onEvent 158  - onEvent processing messaging event from input ringbuffer org.eclipse.moquette.spi.impl.events.LostConnectionEvent@35282bf2
20:27:49,389 [pool-1-thread-1] INFO  SimpleMessaging onEvent 158  - onEvent processing messaging event from input ringbuffer org.eclipse.moquette.spi.impl.events.LostConnectionEvent@605d3821
20:27:49,389 [pool-1-thread-1] INFO  SimpleMessaging onEvent 158  - onEvent processing messaging event from input ringbuffer org.eclipse.moquette.spi.impl.events.StopEvent@4ec60971
20:27:49,390 [pool-1-thread-1] DEBUG SimpleMessaging processStop 298  - processStop invoked
20:27:49,390 [pool-1-thread-1] DEBUG SimpleMessaging processStop 303  - interceptor firer stopped
20:27:49,396 [pool-1-thread-1] DEBUG SimpleMessaging processStop 306  - subscription tree 

20:27:49,396 [main] DEBUG SimpleMessaging stop 143  - after m_stopLatch
20:27:49,396 [main] INFO  Server stopServer 115  - Server stopped
    @Test
    public void shouldNotConnectedConnectionDelayLagerThanConnectionTimeout() throws MqttException, IOException {
        long connectionDelay = 5L;
        long connectionTimeout = 1L;
        startServer(connectionDelay, connectionTimeout);

        IMqttToken token1 = connect("client1");
        IMqttToken token2 = connect("client2");
        IMqttToken token3 = connect("client3");
        IMqttToken token4 = connect("client4");

        sleep(200L);

        waitForCompletion(token1);
        waitForCompletion(token2);
        waitForCompletion(token3);
        waitForCompletion(token4);

        verify(mockInterceptHandler, Mockito.never()).onConnect(any(InterceptConnectMessage.class));
    }

Whole test code is here https://github.com/runafter/moquette/commit/2f510ed61167b0e4573147345360afea37403d91

runafter commented 9 years ago

This bug can be reproduced with massive CONNECT message from over 10K clients.

This is the process of CONNECT message.

  1. receive CONNECT message form clients
  2. Be fired ProtocolEvent.
  3. Put ProtocolEvent in SimpleMessaging.m_ringBuffer at SimpleMessaging.disruptorPublish()
  4. ProtocolEvent in SimpleMessaging.m_ringBuffer is processed by SimpleMessaging.m_executor sequentially.
  5. ProtocolProcessor.processConnect() process CONNECT message with ServerChannel.

Usually theses process are no problem. But if massive CONNECT messages are received, it can be issue.

Let's suppose that 1000 CONNECT messages are received. Received messages will be put in SimpleMessaging.m_ringBuffer. And then SimpleMessaging.m_executor will process message in SimpleMessaging.m_ringBuffer one by one. If it takes 100ms to process a message, 1000 CONNECT messages are processed in 100 seconds. So worst case, last CONNECT message will be processed in 100 seconds.

But if ServerChannel is not processed within 10 seconds, it is closed by MoquetteIdleTimoutHandler. (https://github.com/runafter/moquette/blob/master/broker/src/main/java/org/eclipse/moquette/server/netty/MoquetteIdleTimoutHandler.java#L31)

Whether or not ServerChannel is closed, CONNECT message is processed by ProtocolProcessor.processConnect().

In my opinion, CONNECT message with closed ServerChannel is not necessary to process. But it is processed.

This situation makes performance degradation.

So if over 100 per seconds CONNECT messages are received and it takes 100ms to process a message, about 112 messages are processed but the others are failed.

andsel commented 9 years ago

Hi @runafter your analysis is correct, but on the fix we should speak a little bit, this issue is also related to #102 and is due to an architectural fail (in my opinion), the ring buffer do a great job to keep the ProtocolProcessor single threaded and simple to hack, without concerns on concurrent access to common data structures, but this produce these latency related bugs. I think that I've to rework this aspect of the broker.

andsel commented 9 years ago

Hi @runafter I changed the way Moquette works so no more a ring buffer and single thread processor,this means that latency errors shouldn't happened and this bug is fixed. Andrea

diegovisentin commented 9 years ago

As I know, LMAX Disruptor was built to address the latency of inter-thread communication but now you remove it because it seems to produces latency related bugs. So I'm a bit confused... Anyway you are the master therefore: 1) I will redo all our stress tests for evaluate the new implementation 2) maybe you need to change the main-page description of Moquette ;-)

andsel commented 9 years ago

Hi Diego, you are right,disruptor is for low latency inter thread comunication,but the structure (single thread Protocol Processor) was limiting the threading. In front we have n Netty that uses 1 thread per core and never block,so it's time to move Moquette to use full power of Netty.Now the protocol processor is executed in Netty's threads and use persistent data structures(the subscription tree is always copied upon modification). In this picture the ring buffer was just a removeable step (I like that LMAX stuff). That's the motivation. Let me know of new bugs:-) Andrea Il giorno 11/ott/2015 01:19, "diegovisentin" notifications@github.com ha scritto:

As I know, LMAX Disruptor was built to address the latency of inter-thread communication but now you remove it because it seems to produces latency related bugs. So I'm a bit confused... Anyway you are the master therefore: 1) I will redo all our stress tests for evaluate the new implementation 2) maybe you need to change the main-page description of Moquette ;-)

— Reply to this email directly or view it on GitHub https://github.com/andsel/moquette/issues/101#issuecomment-147130265.

runafter commented 9 years ago

Hi @andsel Thanks for your work. This changes are expected to make performance improvement for processing MQTT connection.