eclipse / paho.mqtt.java

Eclipse Paho Java MQTT client library. Paho is an Eclipse IoT project.
https://eclipse.org/paho
Other
2.14k stars 887 forks source link

Why subscribe does not work #923

Closed kosuodhmwa closed 2 years ago

kosuodhmwa commented 2 years ago

Version: Most current release one on 2022-04-13

PROBLEM: I publish the value 'TEST-TEST-TEST' to the topic on my remote MQTT server. That works - MQTT Explorer show me the Topic "a/b/c" with its value "TEST-TEST-TEST"...

BUT: The "Subscribe" action does not work... none of the overriden methods in MqttCallback object will be called. NONE. -> What's the problem here? Do i need a separate "connect" / "disconnect" for each case: "publish to topic" AND "subscribe to topic"

VERY VERY STRANGE the whole thing...

CODE HERE:

`package test.mqtt.lora.swisscom;

import java.util.concurrent.Flow.Subscriber; import java.util.logging.Level; import java.util.logging.Logger;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;

public class LoRaMqtt {

private final String topic;

private final String message;

private final String host;

private final int port;

private final String clientId;

private final String username;

private final String password;

private final boolean retain;

private static final String methodPrefix = "tcp://";

private final boolean clean;

private MqttConnectOptions mqttConnectOptions = null;

private MqttClient client = null;

private String url = null;

private static String tmpDir = null;

private MqttDefaultFilePersistence mqttDefaultFilePersistence = null;

static {
    Exception exception = null;
    try {

        System.out.println("Static constructor called ... ");
        LoRaMqtt.tmpDir = System.getProperty("java.io.tmpdir");

    } catch (final Exception clException) {
        exception = clException;
    } finally {
        if (exception != null) {
            System.out.println(exception.getMessage());
            throw new RuntimeException(exception);
        }
        // ...
    }

}

private LoRaMqtt() {

    System.out.println("Constructor called ... ");

    this.topic = "a/b/c";

    this.message = "TEST-TEST-TEST";

    this.host = "...";

    this.port = 1883;

    this.clientId = "test-client-id-123456";

    this.username = "...";

    this.password = "...";

    this.retain = false;

    this.clean = true;

    init();
}

private void init() {

    Exception exception = null;
    try {
        System.out.println("init() method called ... ");
        this.mqttConnectOptions = new MqttConnectOptions();

        ////////////////
        this.mqttConnectOptions.setUserName(this.username);
        this.mqttConnectOptions.setPassword(this.password.toCharArray());
        this.mqttConnectOptions.setCleanSession(this.clean);
        ////////////////

        this.mqttConnectOptions.setCleanSession(Boolean.valueOf(this.clean));
        this.url = LoRaMqtt.methodPrefix + this.host + ":" + this.port;
        this.mqttDefaultFilePersistence = new MqttDefaultFilePersistence(LoRaMqtt.tmpDir);
        System.out.println("MQTT URL: " + this.url);
        System.out.println("MQTT Client ID: " + this.clientId);
        this.client = new MqttClient(this.url, this.clientId, this.mqttDefaultFilePersistence);

        this.client.connect(this.mqttConnectOptions);

        publishTopic(this.client);

        Thread.sleep(3000);

        subscribeToTopic(this.client);

        Thread.sleep(3000);

        System.out.println("ENDED SUCCESSFUL ... ");

    } catch (final Exception clException) {
        exception = clException;
    } finally {
        try {
            if (this.client != null) {
                this.client.disconnect();
            }
        } catch (final Exception clException) {
            final Runtime rt = Runtime.getRuntime();
            rt.gc();
        }

        if (exception != null) {
            Logger.getLogger(Subscriber.class.getName()).log(Level.SEVERE, "EXCEPTION: ", exception);
            throw new RuntimeException(exception);
        } else {
            // ...
        }
    }
}

private void subscribeToTopic(final MqttClient mqttClient) {
    Exception exception = null;

    try {
        final MqttCallback mqttCallback = createCallback();
        mqttClient.setCallback(mqttCallback);
        System.out.println("MQTT callback created ... ");
        mqttClient.subscribe(this.topic);
        System.out.println("Subscribed to topic: " + this.topic);

    } catch (final Exception clException) {
        exception = clException;
    } finally {

        if (exception != null) {
            Logger.getLogger(Subscriber.class.getName()).log(Level.SEVERE, "EXCEPTION: ", exception);
            throw new RuntimeException(exception);
        } else {
            // ...
        }
    }
}

private MqttCallback createCallback() {
    return new MqttCallback() {

        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            System.out.println("------------ Message arrived ... ");
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            System.out.println("------------ Delivery complete ... ");
        }

        @Override
        public void connectionLost(Throwable cause) {
            System.out.println("------------ Connection lost ... ");
        }
    };
}

@SuppressWarnings("unused")
private void publishTopic(final MqttClient mqttClient) {

    Exception exception = null;

    try {

        ////////////
        final MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setRetained(this.retain);
        mqttMessage.setPayload(this.message.getBytes());
        ////////////

        // Use threading yes / no
        if (false) {
            final MqttConnectionRunnable mqttConnectionRunnable = MqttConnectionRunnable.createInstance(this.client,
                    this.mqttConnectOptions);
            final Thread myThread = new Thread(mqttConnectionRunnable);
            myThread.start();

            final MqttClient myNewMqttClient = mqttConnectionRunnable.getMqttClient();

            while (true) {
                if (myNewMqttClient != null) {
                    break;
                }
            }
        } else {

            this.client.publish(this.topic, mqttMessage);
        }

    } catch (final Exception clException) {
        exception = clException;
        throw new RuntimeException(exception);
    } finally {

        if (exception != null) {
            Logger.getLogger(Subscriber.class.getName()).log(Level.SEVERE, "EXCEPTION: ", exception);
            throw new RuntimeException(exception);
        } else {
            // ...
        }
    }
}

public static LoRaMqtt createInstance() {
    LoRaMqtt swisscomLoRaMqtt = null;

    Exception exception = null;
    try {
        swisscomLoRaMqtt = new LoRaMqtt();
    } catch (final Exception clException) {
        exception = clException;
    } finally {
        if (exception != null) {
            System.out.println(exception.getMessage());
            throw new RuntimeException(exception);
        }
    }

    return swisscomLoRaMqtt;
}

}`

kosuodhmwa commented 2 years ago

Code formatting does not work correctly on GitHub with Java code.

Why?

->

image

That problem seems to persist since years...

@admin

kosuodhmwa commented 2 years ago

To use for both PUBLISH & SUBSCRIBE 2 seprate connection, also with 2 SEPARATE (!!) client-ids, does also not seem to work:

` this.clientId = "test-client-id-123456"; System.out.println("MQTT client ID for publish: " + this.clientId); this.client = new MqttClient(this.url, this.clientId, this.mqttDefaultFilePersistence); this.client.connect(this.mqttConnectOptions);
publishTopic(this.client);
Thread.sleep(3000); this.client.disconnect();

        this.clientId = "test-client-id-654321";
        System.out.println("MQTT client ID for subscribe: " + this.clientId);
        this.client = new MqttClient(this.url, this.clientId, this.mqttDefaultFilePersistence);
        this.client.connect(this.mqttConnectOptions);                               
        subscribeToTopic(this.client);
        Thread.sleep(3000);
        this.client.disconnect();

`

kosuodhmwa commented 2 years ago

Why!?

kosuodhmwa commented 2 years ago

Got it!! ;-)