fusesource / mqtt-client

A Java MQTT Client
Apache License 2.0
1.27k stars 369 forks source link

Out of memory exception on Android while trying to reconnect #51

Open zaur opened 8 years ago

zaur commented 8 years ago

This is my code:

    private void setupMqttClient() {
        mqtt = new MQTT();
        mqtt.setKeepAlive((short) 20);

        try {
            mqtt.setHost("", 1883);
        } catch (URISyntaxException e) {

        BlockingConnection bc = mqtt.blockingConnection();
        try {

        } catch (Exception e) {

I launch the app and client successfully connects. Then I switch wifi off and on and wait until the client reconnects. After repeating this 2-3 times mqttclient starts opening and closing connections like crazy until the app crashes with OutOfMemoryError:

java.lang.OutOfMemoryError: Failed to allocate a 65548 byte allocation with 59328 free bytes and 57KB until OOM
    at org.fusesource.hawtdispatch.util.BufferPool.create(BufferPool.java:35)
    at org.fusesource.hawtdispatch.util.BufferPool.create(BufferPool.java:25)
    at org.fusesource.hawtdispatch.util.ThreadLocalPool.checkout(ThreadLocalPool.java:78)
    at org.fusesource.hawtdispatch.transport.AbstractProtocolCodec.allocateNextWriteBuffer(AbstractProtocolCodec.java:153)
    at org.fusesource.hawtdispatch.transport.AbstractProtocolCodec.write(AbstractProtocolCodec.java:137)
    at org.fusesource.hawtdispatch.transport.TcpTransport.offer(TcpTransport.java:649)
    at org.fusesource.mqtt.client.CallbackConnection$LoginHandler.onSuccess(CallbackConnection.java:388)
    at org.fusesource.mqtt.client.CallbackConnection$LoginHandler.onSuccess(CallbackConnection.java:319)
    at org.fusesource.mqtt.client.CallbackConnection$5.onTransportConnected(CallbackConnection.java:295)
    at org.fusesource.hawtdispatch.transport.TcpTransport.onConnected(TcpTransport.java:601)
    at org.fusesource.hawtdispatch.transport.TcpTransport$2$1$1.run(TcpTransport.java:490)
    at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
    at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
    at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)

and here is what happens in mosquitto logs:

1445865487: Client client_1 already connected, closing old connection.
1445865487: Client client_1 disconnected.
1445865487: New client connected from as client_1 (c1, k20).
1445865487: Client client_1 already connected, closing old connection.
1445865487: Client client_1 disconnected.
1445865487: New client connected from as client_1 (c1, k20).
1445865487: Client client_1 already connected, closing old connection.
1445865487: Client client_1 disconnected.
1445865487: New client connected from as client_1 (c1, k20).
1445865487: Client client_1 already connected, closing old connection.
1445865487: Client client_1 disconnected.
1445865487: New client connected from as client_1 (c1, k20).
1445865487: Client client_1 already connected, closing old connection.
1445865487: Client client_1 disconnected.
1445865487: New client connected from as client_1 (c1, k20).
1445865487: Client client_1 already connected, closing old connection.
1445865487: Client client_1 disconnected.
1445865487: New client connected from as client_1 (c1, k20).
1445865487: Client client_1 already connected, closing old connection.
1445865487: Client client_1 disconnected.
1445865487: New client connected from as client_1 (c1, k20).
1445865487: Client client_1 already connected, closing old connection.
1445865487: Client client_1 disconnected.
1445865487: New client connected from as client_1 (c1, k20).
TheNitek commented 8 years ago

Probably a duplicate of https://github.com/fusesource/mqtt-client/issues/48

Rayman230510 commented 7 years ago

i merge the commit to my code,still OOM,how can i solve this problem? can you help me?

ouyangzn commented 7 years ago

Maybe you are trying to connect to the server repeated while it is already connected to server. I had meet this problem, and i used a flag to sign the connection status, like this:

private void connect() {
    // 连接服务器
    if (isConnected()) {
    mCallbackConnection = mMQTT.callbackConnection();
    mCallbackConnection.listener(new ExtendedListener() {
        @Override public void onConnected() {
          mConnected = true;

        @Override public void onDisconnected() {
          mConnected = false;
private boolean isConnected() {
    return mConnected;

Now it's ok, no OOM. But i got another problem ,somtimes it will crash:

Exception in thread "hawtdispatch-DEFAULT-2" java.lang.NullPointerException
    at org.fusesource.mqtt.client.CallbackConnection$7.run(CallbackConnection.java:450)

public void onSessionEstablished(Transport transport) {
    if(mqtt.getKeepAlive()>0) {
            heartBeatMonitor = new HeartBeatMonitor();
            heartBeatMonitor.setWriteInterval((mqtt.getKeepAlive() * 1000) / 2);
            heartBeatMonitor.suspendRead(); // to match the suspended state of the transport.
            heartBeatMonitor.setOnKeepAlive(new Task() {
                public void run() {
                    // Don't care if the offer is rejected, just means we have data outbound.
                    if(!disconnected && pingedAt==0) {
                        MQTTFrame encoded = new PINGREQ().encode();
                        //   -------------this.transport == null----------------
                        if(CallbackConnection.this.transport.offer(encoded)) {
Rayman230510 commented 7 years ago

The commit #48 & #51,This line has been changed , like this: if(CallbackConnection.this.transport != null && CallbackConnection.this.transport.offer(encoded)) and delete this: //handleSessionFailure(new ProtocolException("Ping TimeOut").fillInStackTrace()); and i have tried this ,there is no some bad effects, i hope this will help for your NullPointException.


ouyangzn commented 7 years ago

@zhlmgithub I don't know why delete this line: java handleSessionFailure(new ProtocolException("Ping TimeOut").fillInStackTrace()); It seems there will not reconnect to server when heartbeat timeout if delete this line.

void handleSessionFailure(Throwable error) {
    // Socket failure, should we try to reconnect?
    if (!disconnected && (mqtt.reconnectAttemptsMax < 0
        || reconnects < mqtt.reconnectAttemptsMax)) {

      mqtt.tracer.debug("Reconnecting transport");
      // Cleanup the previous transport.
      if (heartBeatMonitor != null) {
        heartBeatMonitor = null;
      final Transport t = transport;
      transport = null;

      if (t != null) {
        t.stop(new Task() {
          @Override public void run() {
      } else {
    } else {
      // nope.