eclipse / paho.mqtt.java

Eclipse Paho Java MQTT client library. Paho is an Eclipse IoT project.
https://eclipse.org/paho
Other
2.11k stars 882 forks source link

MQTTv5 NPE at ClientState.send during MqttClient.publish to VerneMQ server #787

Open xranby opened 4 years ago

xranby commented 4 years ago

I am using the MQTTv5 Client connecting to a VerneMQ server. Connecting work ok, subscribing to a topic work ok, however I am hitting a NPE when i try to publish to a topic using the MQTTv5 client.

Similar code work fine using the v3 client.

Reproducer:


import java.security.NoSuchAlgorithmException;
import javax.net.ssl.*;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.client.IMqttDeliveryToken;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;

public class Reproducer {

    public static void main(String args[]) throws MqttException, NoSuchAlgorithmException {
        MemoryPersistence persistence = new MemoryPersistence();

        SSLContext sslContext;
        sslContext = SSLContext.getDefault();

        MqttConnectionOptions options = new MqttConnectionOptions();
        options.setSocketFactory(sslContext.getSocketFactory());
        options.setCleanStart(false);
        options.setUserName("testUnit123");
        options.setPassword("123".getBytes());
        MqttClient client = new MqttClient("ssl://sto1-k8s-clstr01-mqtt.publicdns.zone:8888", "foobar b8:27:EB:58:DF:F2", persistence);

        client.connect(options);

        client.setCallback(new MqttCallback() {
            @Override
            public void disconnected(MqttDisconnectResponse arg0) {
            }

            @Override
            public void mqttErrorOccurred(MqttException arg0) {
            }

            @Override
            public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken arg0) {
            }

            @Override
            public void connectComplete(boolean arg0, String arg1) {
            }

            @Override
            public void authPacketArrived(int arg0, MqttProperties arg1) {
            }
        });

        MqttSubscription subscriptionTopic = new MqttSubscription("/acl");
        MqttSubscription[] subscriptionTopics = {subscriptionTopic};
        IMqttToken token = client.subscribe(subscriptionTopics);
        token.waitForCompletion();

        MqttMessage message = new MqttMessage("{\"units\": [{ \"id\": \"test_instrument\", \"type\": \"fhir\" }, { \"id\": \"Prov-id skrivare testUnit123 \", \"type\": \"labeler\" }, { \"id\": \"connection_test\", \"type\": \"ping\" }]}".getBytes());
        message.setQos(2);
        client.publish("/config", message);

        while (true) {

        }
    }
}
Exception in thread "main" java.lang.NullPointerException
    at org.eclipse.paho.mqttv5.client.internal.ClientState.send(ClientState.java:516)
    at org.eclipse.paho.mqttv5.client.internal.ClientComms.internalSend(ClientComms.java:156)
    at org.eclipse.paho.mqttv5.client.internal.ClientComms.sendNoWait(ClientComms.java:216)
    at org.eclipse.paho.mqttv5.client.MqttAsyncClient.publish(MqttAsyncClient.java:1514)
    at org.eclipse.paho.mqttv5.client.MqttClient.publish(MqttClient.java:564)
    at Reproducer.main(Reproducer.java:67)
xranby commented 4 years ago

triage: I have tested to reproduce this bug using the public available mqtt servers at https://test.mosquitto.org/

Improved reproducer with test log below:

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.Security;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.net.ssl.*;
import org.bouncycastle.cert.X509CertificateHolder;
import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.bouncycastle.openssl.PEMParser;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.client.IMqttDeliveryToken;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;

public class PahoBug787Reproducer {

    static volatile boolean connectedComplete = false;

    public static void main(String args[]) throws NoSuchAlgorithmException, InterruptedException, MqttException {

        {   // Test 1 block tcp using mosquito.org
            System.out.println("test 1 tcp using mosquito.org");

            MqttConnectionOptions tcpOptions = new MqttConnectionOptions();
            tcpOptions.setCleanStart(false);
            MqttClient tcpClient = new MqttClient("tcp://test.mosquitto.org:1883", "foobar b8:27:EB:58:DF:F2", new MemoryPersistence());
            tryReproducePahoBug787("tcp client", tcpClient, tcpOptions);
        }

        {   // Test 2 block BouncyCastle ssl using mosquito.org pem
            System.out.println("test 2 ssl using mosquito.org pem");
            /* Step 0 initialize keyManagerFactory */
            Security.addProvider(new BouncyCastleProvider());
            // load client certificate
            String mosquittoOrgPemCertificate = "-----BEGIN CERTIFICATE-----\n"
                    + "MIIEAzCCAuugAwIBAgIUBY1hlCGvdj4NhBXkZ/uLUZNILAwwDQYJKoZIhvcNAQEL\n"
                    + "BQAwgZAxCzAJBgNVBAYTAkdCMRcwFQYDVQQIDA5Vbml0ZWQgS2luZ2RvbTEOMAwG\n"
                    + "A1UEBwwFRGVyYnkxEjAQBgNVBAoMCU1vc3F1aXR0bzELMAkGA1UECwwCQ0ExFjAU\n"
                    + "BgNVBAMMDW1vc3F1aXR0by5vcmcxHzAdBgkqhkiG9w0BCQEWEHJvZ2VyQGF0Y2hv\n"
                    + "by5vcmcwHhcNMjAwNjA5MTEwNjM5WhcNMzAwNjA3MTEwNjM5WjCBkDELMAkGA1UE\n"
                    + "BhMCR0IxFzAVBgNVBAgMDlVuaXRlZCBLaW5nZG9tMQ4wDAYDVQQHDAVEZXJieTES\n"
                    + "MBAGA1UECgwJTW9zcXVpdHRvMQswCQYDVQQLDAJDQTEWMBQGA1UEAwwNbW9zcXVp\n"
                    + "dHRvLm9yZzEfMB0GCSqGSIb3DQEJARYQcm9nZXJAYXRjaG9vLm9yZzCCASIwDQYJ\n"
                    + "KoZIhvcNAQEBBQADggEPADCCAQoCggEBAME0HKmIzfTOwkKLT3THHe+ObdizamPg\n"
                    + "UZmD64Tf3zJdNeYGYn4CEXbyP6fy3tWc8S2boW6dzrH8SdFf9uo320GJA9B7U1FW\n"
                    + "Te3xda/Lm3JFfaHjkWw7jBwcauQZjpGINHapHRlpiCZsquAthOgxW9SgDgYlGzEA\n"
                    + "s06pkEFiMw+qDfLo/sxFKB6vQlFekMeCymjLCbNwPJyqyhFmPWwio/PDMruBTzPH\n"
                    + "3cioBnrJWKXc3OjXdLGFJOfj7pP0j/dr2LH72eSvv3PQQFl90CZPFhrCUcRHSSxo\n"
                    + "E6yjGOdnz7f6PveLIB574kQORwt8ePn0yidrTC1ictikED3nHYhMUOUCAwEAAaNT\n"
                    + "MFEwHQYDVR0OBBYEFPVV6xBUFPiGKDyo5V3+Hbh4N9YSMB8GA1UdIwQYMBaAFPVV\n"
                    + "6xBUFPiGKDyo5V3+Hbh4N9YSMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZIhvcNAQEL\n"
                    + "BQADggEBAGa9kS21N70ThM6/Hj9D7mbVxKLBjVWe2TPsGfbl3rEDfZ+OKRZ2j6AC\n"
                    + "6r7jb4TZO3dzF2p6dgbrlU71Y/4K0TdzIjRj3cQ3KSm41JvUQ0hZ/c04iGDg/xWf\n"
                    + "+pp58nfPAYwuerruPNWmlStWAXf0UTqRtg4hQDWBuUFDJTuWuuBvEXudz74eh/wK\n"
                    + "sMwfu1HFvjy5Z0iMDU8PUDepjVolOCue9ashlS4EB5IECdSR2TItnAIiIwimx839\n"
                    + "LdUdRudafMu5T5Xma182OC0/u/xRlEm+tvKGGmfFcN0piqVl8OrSPBgIlb+1IKJE\n"
                    + "m/XriWr/Cq4h/JfB7NTsezVslgkBaoU=\n"
                    + "-----END CERTIFICATE-----\n"
                    + "";
            InputStream pemCertInputStream = new ByteArrayInputStream(mosquittoOrgPemCertificate.getBytes());
            try {
                PEMParser reader = new PEMParser(new InputStreamReader(pemCertInputStream));
                X509Certificate cert = new JcaX509CertificateConverter().setProvider("BC").getCertificate((X509CertificateHolder) reader.readObject());
                reader.close();

                // client key and certificates are sent to server so it can authenticate us
                String password = "123456";
                KeyStore ks;
                ks = KeyStore.getInstance(KeyStore.getDefaultType());

                ks.load(null, null);
                ks.setCertificateEntry("certificate", cert);
                java.security.cert.Certificate[] certArray = {(java.security.cert.Certificate) cert};

                KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
                kmf.init(ks, password.toCharArray());

                TrustManagerFactory tmf;
                tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
                tmf.init(ks);

                /* Step 1 initialize SSL context */
                SSLContext sslContext;
                sslContext = SSLContext.getInstance("SSL");
                sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom());

                MqttConnectionOptions sslOptions = new MqttConnectionOptions();
                sslOptions.setCleanStart(false);
                sslOptions.setSocketFactory(sslContext.getSocketFactory());
                MqttClient sslClient = new MqttClient("ssl://test.mosquitto.org:8883", "foobar b8:27:EB:58:DF:F2", new MemoryPersistence());
                tryReproducePahoBug787("BouncyCastle ssl PEM client", sslClient, sslOptions);
            } catch (IOException ex) {
                Logger.getLogger(PahoBug787Reproducer.class.getName()).log(Level.SEVERE, null, ex);
            } catch (NoSuchAlgorithmException ex) {
                Logger.getLogger(PahoBug787Reproducer.class.getName()).log(Level.SEVERE, null, ex);
            } catch (CertificateException ex) {
                Logger.getLogger(PahoBug787Reproducer.class.getName()).log(Level.SEVERE, null, ex);
            } catch (KeyStoreException ex) {
                Logger.getLogger(PahoBug787Reproducer.class.getName()).log(Level.SEVERE, null, ex);
            } catch (KeyManagementException ex) {
                Logger.getLogger(PahoBug787Reproducer.class.getName()).log(Level.SEVERE, null, ex);
            } catch (UnrecoverableKeyException ex) {
                Logger.getLogger(PahoBug787Reproducer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }

        {   // Test 3 block default ssl to VerneMQ MQTT server
            System.out.println("test 3 default ssl to VerneMQ MQTT server");

            SSLContext sslContext = SSLContext.getDefault();

            MqttConnectionOptions sslOptions = new MqttConnectionOptions();
            sslOptions.setUserName("testUnit123");
            sslOptions.setPassword("123".getBytes());
            sslOptions.setCleanStart(false);
            sslOptions.setSocketFactory(sslContext.getSocketFactory());
            MqttClient sslClient = new MqttClient("ssl://sto1-k8s-clstr01-mqtt.publicdns.zone:8888", "foobar b8:27:EB:58:DF:F2", new MemoryPersistence());
            tryReproducePahoBug787("default ssl client to VerneMQ MQTT server", sslClient, sslOptions);

        }

        // testing using 
        System.out.println("done");
    }

    static void tryReproducePahoBug787(String testDescription, MqttClient client, MqttConnectionOptions options) throws InterruptedException {
        connectedComplete = false;
        try {
            client.setCallback(new MqttCallback() {
                @Override
                public void disconnected(MqttDisconnectResponse arg0) {
                    System.out.println(testDescription + " MqttCallback disconnected " + arg0);
                }

                @Override
                public void mqttErrorOccurred(MqttException arg0) {
                    System.out.println(testDescription + " MqttCallback mqttErrorOccurred " + arg0);
                }

                @Override
                public void messageArrived(String arg0, MqttMessage arg1) throws Exception {
                    System.out.println(testDescription + " MqttCallback messageArrived " + arg0 + " " + arg1);
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken arg0) {
                    try {
                        System.out.println(testDescription + " MqttCallback deliveryComplete " + arg0.getMessage().toString());
                    } catch (MqttException ex) {
                        System.out.println(testDescription + " MqttCallback excaption reason " + ex.getReasonCode());
                        System.out.println(testDescription + " MqttCallback excaption msg " + ex.getMessage());
                        System.out.println(testDescription + " MqttCallback excaption loc " + ex.getLocalizedMessage());
                        System.out.println(testDescription + " MqttCallback excaption cause " + ex.getCause());
                        System.out.println(testDescription + " MqttCallback excaption excep " + ex);
                        ex.printStackTrace();
                    }
                }

                @Override
                public void connectComplete(boolean arg0, String arg1) {
                    System.out.println(testDescription + " MqttCallback connectComplete " + arg0 + " " + arg1);
                    connectedComplete = true;
                }

                @Override
                public void authPacketArrived(int arg0, MqttProperties arg1) {
                    System.out.println(testDescription + " MqttCallback authPacketArrived " + arg0 + " " + arg1);
                }
            });

            System.out.println(testDescription + " connecting");
            client.connect(options);

            while (connectedComplete == false) {
                System.out.println(testDescription + " waiting for connection to complete");
                Thread.sleep(1000);
            }

            System.out.println(testDescription + " subscribing");
            MqttSubscription subscriptionTopic = new MqttSubscription("/acl");
            MqttSubscription[] subscriptionTopics = {subscriptionTopic};
            IMqttToken token = client.subscribe(subscriptionTopics);
            token.waitForCompletion();

            System.out.println(testDescription + " publishing");
            MqttMessage message = new MqttMessage("{\"units\": [{ \"id\": \"test_instrument\", \"type\": \"fhir\" }, { \"id\": \"Prov-id skrivare testUnit123 \", \"type\": \"labeler\" }, { \"id\": \"connection_test\", \"type\": \"ping\" }]}".getBytes());
            message.setQos(2);
            client.publish("/acl", message);

            Thread.sleep(1000); // Wait for published message to return on the subscribed topic

            client.disconnect();

        } catch (MqttException me) {
            System.out.println(testDescription + " exception reason " + me.getReasonCode());
            System.out.println(testDescription + " exception msg " + me.getMessage());
            System.out.println(testDescription + " exception loc " + me.getLocalizedMessage());
            System.out.println(testDescription + " exception cause " + me.getCause());
            System.out.println(testDescription + " exception excep " + me);
            me.printStackTrace();

            try {
                client.disconnect();
            } catch (MqttException ex) {
            }
        }
    }
}
java PahoBug787Reproducer 
test 1 tcp using mosquito.org
tcp client connecting
tcp client MqttCallback connectComplete false tcp://test.mosquitto.org:1883
tcp client waiting for connection to complete
tcp client subscribing
tcp client publishing
tcp client MqttCallback messageArrived /acl {"units": [{ "id": "test_instrument", "type": "fhir" }, { "id": "Prov-id skrivare testUnit123 ", "type": "labeler" }, { "id": "connection_test", "type": "ping" }]}
test 2 ssl using mosquito.org pem
BouncyCastle ssl PEM client connecting
BouncyCastle ssl PEM client waiting for connection to complete
BouncyCastle ssl PEM client MqttCallback connectComplete false ssl://test.mosquitto.org:8883
BouncyCastle ssl PEM client subscribing
BouncyCastle ssl PEM client publishing
BouncyCastle ssl PEM client MqttCallback messageArrived /acl {"units": [{ "id": "test_instrument", "type": "fhir" }, { "id": "Prov-id skrivare testUnit123 ", "type": "labeler" }, { "id": "connection_test", "type": "ping" }]}
test 3 default ssl to VerneMQ MQTT server
default ssl client to VerneMQ MQTT server connecting
default ssl client to VerneMQ MQTT server MqttCallback connectComplete false ssl://mqtt.foobar.com:8888
default ssl client to VerneMQ MQTT server waiting for connection to complete
default ssl client to VerneMQ MQTT server subscribing
default ssl client to VerneMQ MQTT server publishing
Exception in thread "main" java.lang.NullPointerException
    at org.eclipse.paho.mqttv5.client.internal.ClientState.send(ClientState.java:516)
    at org.eclipse.paho.mqttv5.client.internal.ClientComms.internalSend(ClientComms.java:156)
    at org.eclipse.paho.mqttv5.client.internal.ClientComms.sendNoWait(ClientComms.java:216)
    at org.eclipse.paho.mqttv5.client.MqttAsyncClient.publish(MqttAsyncClient.java:1514)
    at org.eclipse.paho.mqttv5.client.MqttClient.publish(MqttClient.java:564)
    at PahoBug787Reproducer.tryReproducePahoBug787(PahoBug787Reproducer.java:209)
    at PahoBug787Reproducer.main(PahoBug787Reproducer.java:139)
rdasgupt commented 4 years ago

I couldn't recreate the problem using your sample code:

tcp client connecting tcp client MqttCallback connectComplete false tcp://test.mosquitto.org:1883 tcp client subscribing tcp client publishing tcp client MqttCallback messageArrived /acl {"units": [{ "id": "test_instrument", "type": "fhir" }, { "id": "Prov-id skrivare testUnit123 ", "type": "labeler" }, { "id": "connection_test", "type": "ping" }]} test 2 ssl using mosquito.org pem BouncyCastle ssl PEM client connecting BouncyCastle ssl PEM client MqttCallback connectComplete false ssl://test.mosquitto.org:8883 BouncyCastle ssl PEM client subscribing BouncyCastle ssl PEM client publishing BouncyCastle ssl PEM client MqttCallback messageArrived /acl {"units": [{ "id": "test_instrument", "type": "fhir" }, { "id": "Prov-id skrivare testUnit123 ", "type": "labeler" }, { "id": "connection_test", "type": "ping" }]} test 3 default ssl to VerneMQ MQTT server default ssl client to VerneMQ MQTT server connecting default ssl client to VerneMQ MQTT server exception reason 32103 default ssl client to VerneMQ MQTT server exception msg Unable to connect to server default ssl client to VerneMQ MQTT server exception loc Unable to connect to server default ssl client to VerneMQ MQTT server exception cause java.net.ConnectException: Connection refused (Connection refused) default ssl client to VerneMQ MQTT server exception excep Unable to connect to server (32103) - java.net.ConnectException: Connection refused (Connection refused)

Test 3 failed, because mqtt.foobar.com:8888 is not available in my test env.

I will try to connect to other brokers and check.

xranby commented 4 years ago

@rdasgupt I have received clearance to share a VerneMQ server url with login username and password that are able to reproduce and troubleshoot this bug. The reproducers mentioned above has been updated to reproduce the bug using the following server and user credentials.

        {
            SSLContext sslContext = SSLContext.getDefault();

            MqttConnectionOptions sslOptions = new MqttConnectionOptions();
            sslOptions.setUserName("testUnit123");
            sslOptions.setPassword("123".getBytes());
            sslOptions.setCleanStart(false);
            sslOptions.setSocketFactory(sslContext.getSocketFactory());
            MqttClient sslClient = new MqttClient("ssl://sto1-k8s-clstr01-mqtt.publicdns.zone:8888", "foobar b8:27:EB:58:DF:F2", new MemoryPersistence());
            tryReproducePahoBug787("VerneMQ", sslClient, sslOptions);

        }
rdasgupt commented 4 years ago

@xranby Thanks for providing access details. I could recreate the issue and have a fix. We are planning to release Paho Java client library with MQTTv5 support by July 3, 2020. This release will include this fix as well as many other fixes that we have found in our testing.

xranby commented 4 years ago

Thank you, good luck with the release, i am happy to test any release with your fix implemented.

xranby commented 4 years ago

Please update if a fix for this issue have been pushed to the testing or release branch of Paho