eclipse / paho.mqtt.java

Eclipse Paho Java MQTT client library. Paho is an Eclipse IoT project.
https://eclipse.org/paho
Other
2.12k stars 883 forks source link

Connection lost (32109) - java.io.EOFException #673

Open llakhch opened 5 years ago

llakhch commented 5 years ago

library:

org.eclipse.paho org.eclipse.paho.client.mqttv3 1.2.1

issue: never reconnected in spite of AutomaticReconnect true.

2019-05-16 14:13:41,748 [MQTT Rec: paho3777994479346839] ERROR com.publish.util.ExceptionAssistService.exceptionStrategy(ExceptionAssistService.java:19) - main cause :: Connection lost (32109) - java.io.EOFException at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:189) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.EOFException at java.io.DataInputStream.readByte(DataInputStream.java:267) at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92) at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:136) ... 1 more 2019-05-16 14:13:41,769 [MQTT Rec: paho3777994479346839] ERROR com.publish.util.ExceptionAssistService.exceptionStrategy(ExceptionAssistService.java:35) - error code mqttexception :: 32109 ,whoHasCalledThis :: connectionLost

properties :

mqtt.connectiontimeout=60 mqtt.keepaliveinterval=30 mqtt.qos=0 mqtt.cleanSession=true mqtt.msg.retained=false mqtt.maxInflight=10000000 mqtt.ssl.enabled=true mqtt.ssl.url=ssl://xxxxx.clearblade.com mqtt.ssl.port=1884 mqtt.ssl.protocol=TLSv1.2 re.auth.code.list=4 conOpt.setAutomaticReconnect(true);

source code attached

`package com.publish.client;

import java.util.Arrays; import java.util.Date; import java.util.concurrent.atomic.AtomicInteger;

import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocketFactory;

import org.apache.log4j.Logger; import org.eclipse.paho.client.mqttv3.IMqttActionListener; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.IMqttToken; import org.eclipse.paho.client.mqttv3.MqttAsyncClient; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttCallbackExtended; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;

import com.publish.dto.ResponseDto; import com.publish.main.PublisherApp; import com.publish.service.ExternalApiService; import com.publish.util.ExceptionAssistService;

@Service public class MqttAsyncPublisher implements MqttCallback, IMqttActionListener, MqttCallbackExtended {

private static Logger logger = Logger.getLogger(MqttAsyncPublisher.class);

@Autowired
private ExternalApiService externalApiService;

@Autowired
private ExceptionAssistService exceptionAssistService;

private static final String EMPTY = "empty";
private static final String MQTT_IS_COMM_DEVICE_FORMAT_ENABLED = "mqtt.isCommDeviceFormatEnabled";
private static final String MQTT_IS_COMM_DEVICE_ERROR_ENABLED = "mqtt.isCommDeviceErrorEnabled";

private static final String MQTT_IS_MQTT_OPERATING = "mqtt.isMqttOperating";
private static final String MQTT_QOS = "mqtt.qos";
private static final String ENCODING = "UTF-8";
private static final String ERROR_CODE_LIST = "re.auth.code.list";
private static final String MQTT_PASSWORD = "mqtt.password";
private static final String MQTT_CONNECTIONTIMEOUT = "mqtt.connectiontimeout";
private static final String MQTT_KEEPALIVEINTERVAL = "mqtt.keepaliveinterval";
private static final String MQTT_CLEAN_SESSION = "mqtt.cleanSession";
private static final String MQTT_MAX_INFLIGHT = "mqtt.maxInflight";
private static final String MQTT_MSG_RETAINED = "mqtt.msg.retained";
private static final String IS_TLS_ENABLED = "mqtt.ssl.enabled";
private static final String TLS_PROTOCOL = "mqtt.ssl.protocol";
private static final String MQTT_TLS_PORT = "mqtt.ssl.port";
private static final String MQTT_TLS_URL = "mqtt.ssl.url";
private static final String PUBLISHER_NAME = "publisher.name";

private static final String MQTT_PORT = "mqtt.port";
private static final String MQTT_URL = "mqtt.url";

private ResponseDto responseDto = null;
private MqttConnectOptions conOpt = null;
private MqttAsyncClient client = null;
private IMqttToken connectToken;
private String[] errorCodeList = {};
private int mqttMaxInflight = 0;
private boolean isMqttOperating = false;
private boolean isCommDeviceFormatEnabled = false;
private boolean isCommDeviceErrorEnabled = false;
private int qos = 0;
private boolean msgRetained = false;
private static AtomicInteger atomicMessageCount = new AtomicInteger(0);

@PostConstruct
private void init() {
    logger.info("Initializing mqtt connection for broker");
    try {
        responseDto = externalApiService.getAuthFromClearBlade();
        if (responseDto == null) {
            logger.error("Not a valid auth.");
            return;
        }
        qos = Integer.parseInt(PublisherApp.prop.getProperty(MQTT_QOS));
        msgRetained = Boolean.parseBoolean(PublisherApp.prop.getProperty(MQTT_MSG_RETAINED));
        isMqttOperating = Boolean.parseBoolean(PublisherApp.prop.getProperty(MQTT_IS_MQTT_OPERATING));
        isCommDeviceFormatEnabled = Boolean
                .parseBoolean(PublisherApp.prop.getProperty(MQTT_IS_COMM_DEVICE_FORMAT_ENABLED));
        isCommDeviceErrorEnabled = Boolean
                .parseBoolean(PublisherApp.prop.getProperty(MQTT_IS_COMM_DEVICE_ERROR_ENABLED));
        errorCodeList = PublisherApp.prop.getProperty(ERROR_CODE_LIST).split("##");
        mqttMaxInflight = Integer.parseInt(PublisherApp.prop.getProperty(MQTT_MAX_INFLIGHT));
        conOpt = new MqttConnectOptions();
        conOpt.setUserName(responseDto.getUserToken());
        conOpt.setPassword(PublisherApp.prop.getProperty(MQTT_PASSWORD).toCharArray());
        conOpt.setConnectionTimeout(Integer.parseInt(PublisherApp.prop.getProperty(MQTT_CONNECTIONTIMEOUT)));
        conOpt.setKeepAliveInterval(Integer.parseInt(PublisherApp.prop.getProperty(MQTT_KEEPALIVEINTERVAL)));
        conOpt.setCleanSession(Boolean.parseBoolean(PublisherApp.prop.getProperty(MQTT_CLEAN_SESSION)));
        conOpt.setMaxInflight(mqttMaxInflight);
        conOpt.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
        conOpt.setAutomaticReconnect(true);
        if (Boolean.parseBoolean(PublisherApp.prop.getProperty(IS_TLS_ENABLED))) {
            logger.info("isTLSEnabled :: true");
            SSLContext sslContext = SSLContext.getInstance(PublisherApp.prop.getProperty(TLS_PROTOCOL));
            sslContext.init(null, null, new java.security.SecureRandom());
            SSLSocketFactory socketFactory = sslContext.getSocketFactory();
            conOpt.setSocketFactory(socketFactory);
            client = new MqttAsyncClient(
                    PublisherApp.prop.getProperty(MQTT_TLS_URL) + ":"
                            + PublisherApp.prop.getProperty(MQTT_TLS_PORT),
                    PublisherApp.prop.getProperty(PUBLISHER_NAME) + System.nanoTime(), new MemoryPersistence());
        } else {
            logger.info("isTLSEnabled :: false");
            client = new MqttAsyncClient(
                    PublisherApp.prop.getProperty(MQTT_URL) + ":" + PublisherApp.prop.getProperty(MQTT_PORT),
                    PublisherApp.prop.getProperty(PUBLISHER_NAME) + System.nanoTime(), new MemoryPersistence());
        }
        logger.info("Connection opt ::" + conOpt.toString());
        client.setCallback(this);
        connectToken = client.connect(conOpt, null, this);
    } catch (Exception ex) {
        logger.error("init :: ", ex);
    }
}

@PreDestroy
private void destory() {
    try {
        if (isConnected()) {
            client.disconnect().waitForCompletion();
        }
    } catch (Exception ex) {
        logger.error("destory exception :: ", ex);
    } finally {
        try {
            client.close();
        } catch (MqttException e) {
            logger.error("destory final block exception :: ", e);
        }
    }
}

@Override
public void connectionLost(Throwable cause) {
    try {
        if (exceptionAssistService.exceptionStrategy("connectionLost", cause, errorCodeList))
            responseDto = externalApiService.getAuthFromClearBlade();
        if (responseDto == null) {
            logger.error("Not a valid auth.");
            return;
        }
        conOpt.setUserName(responseDto.getUserToken());
        logger.error("Connection opt at connectionLost ::" + conOpt.toString());
    } catch (Exception ex) {
        logger.error("connectionLost at connectionLost :: ", ex);
    }
}

@Override
public void onSuccess(IMqttToken asyncActionToken) {
    try {
        if (asyncActionToken != null && asyncActionToken.equals(connectToken))
            logger.info("client connected successfully");
    } catch (Exception ex) {
        logger.error("onSuccess :: ", ex);
    }
}

@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
    if (exceptionAssistService.exceptionStrategy("onFailure", exception, errorCodeList))
        responseDto = externalApiService.getAuthFromClearBlade();
}

public void publishMessage(String topic, String message) {
    try {
        if (isConnected()) {
            client.getInFlightMessageCount();
            atomicMessageCount.getAndIncrement();
            if (mqttMaxInflight * 90 / 100 < client.getInFlightMessageCount())
                logger.error("message_count::" + atomicMessageCount.get() + " inflight_count :: "
                        + client.getInFlightMessageCount());
            client.publish(topic, message.getBytes(ENCODING), qos, msgRetained);
            return;
        } else {
            logger.error("client is not connected");
        }
    } catch (Exception ex) {
        logger.error("publishMessage :: ", ex);
    }
    return;
}

private boolean isConnected() {
    return (client != null) && (client.isConnected());
}   

public String getAccountId() {
    if (responseDto != null)
        return responseDto.getAccountId();
    return EMPTY;
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
    logger.info("not needed for this client");
    return;
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
    // dont do anything as of now not needed.
}

@Override
public void connectComplete(boolean reconnect, String serverURI) {
    logger.error("reconnect called :: " + reconnect + " ,is connected :: " + client.isConnected());
}

@Override
public String toString() {
    return responseDto.toString() + ", clientConnected ? " + client.isConnected() + ", errorCodeList="
            + Arrays.toString(errorCodeList) + ", mqttMaxInflight=" + mqttMaxInflight + ", msgInflightCount="
            + client.getInFlightMessageCount() + ", publishedMessageCount=" + atomicMessageCount.get()
            + ", current time=" + new Date() + ", isMqttOperating=" + isMqttOperating + ", qos=" + qos
            + ", isCommDeviceFormatEnabled=" + isCommDeviceFormatEnabled + ", isCommDeviceErrorEnabled="
            + isCommDeviceErrorEnabled + conOpt.toString();
}

} `

llakhch commented 5 years ago

team this is production issue, any update would be appreciated.

Sujitrai88 commented 5 years ago

hi team , i am also facing the same issue..

ArunSandhu commented 5 years ago

I get the same error.The callback gets called but client does not reconnect and paho just seem to die.

siavashsoleymani commented 5 years ago

Hi same error here! paho does not reconnect and just stop consuming!

damaddin commented 5 years ago

facing the same issue we decided to reconnect manually in the connectionLost callback.

vanrin commented 5 years ago
Screen Shot 2019-08-16 at 12 14 35 AM
nguyenvanquan7826 commented 4 years ago

I get the same error. Now I must call connect again in the connection fail.

penggle commented 4 years ago

I am got this problem in a high-concurrency consumer program with paho.mqtt.java version 1.2.2

org.eclipse.paho.client.mqttv3.MqttException: Connection lost at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:190) [org.eclipse.paho.client.mqttv3-1.2.2.jar!/:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202] Caused by: java.io.EOFException at java.io.DataInputStream.readByte(DataInputStream.java:267) ~[?:1.8.0_202] at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92) ~[org.eclipse.paho.client.mqttv3-1.2.2.jar!/:?] at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:137) ~[org.eclipse.paho.client.mqttv3-1.2.2.jar!/:?]

I think it's a bug! I I modified the source code org.eclipse.paho.client.mqttv3.internal.CommsReceiver.java and resolved this problem as follows:

original source code in v1.2.2: image

modified source code base on v1.2.2: image

dsambugaro commented 4 years ago

Same issue here. Solution provided by @penggle works just fine. Thanks !!

vivek11111994 commented 4 years ago

any update on this issue ?

lcarnevale commented 4 years ago

I have the same issue. I cannot subscribe to a topic after connection, and the application fails with:

2020-04-03 14:24:22.078 21100-21100/appname W/System.err: Connection lost (32109) - java.io.EOFException
2020-04-03 14:24:22.079 21100-21100/appname  W/System.err:     at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:146)
2020-04-03 14:24:22.079 21100-21100/appname  W/System.err:     at java.lang.Thread.run(Thread.java:764)
2020-04-03 14:24:22.079 21100-21100/appname  W/System.err: Caused by: java.io.EOFException
2020-04-03 14:24:22.079 21100-21100/appname  W/System.err:     at java.io.DataInputStream.readByte(DataInputStream.java:270)
2020-04-03 14:24:22.079 21100-21100/appname  W/System.err:     at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:65)
2020-04-03 14:24:22.079 21100-21100/appname  W/System.err:     at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:107)
2020-04-03 14:24:22.079 21100-21100/appname  W/System.err:  ... 1 more
rdasgupt commented 4 years ago

@penggle Thanks for your suggested code change. Synchronizing in.readMqttWireMessage() may not be a good idea because in.readMqttWireMessage() can take significant amount of time. You are making the process blocking.

I think we need to revisit readMqttWireMessage().

@lcarnevale @penggle Do you have a simple test case to reproduce this issue?

dbraun1991 commented 7 months ago

Hi there ( @rdasgupt , @lcarnevale , @penggle , @icraggs ),

as I am facing the same issue in a Testcontainers-setup, I published code for testing in the following REPO:

Please follow the readme.

Best wishes ;-)