vert-x3 / vertx-mqtt

Vert.x MQTT
Apache License 2.0
185 stars 88 forks source link

subscriber cant receive topic message #192

Open chenziyi2018 opened 3 years ago

chenziyi2018 commented 3 years ago

Questions

` @Slf4j public class MqttClientVerticle extends AbstractVerticle {

private MqttClient mqttClient;

\@Override
public void start() throws Exception {
    log.info("good we are in now");
    MqttClientOptions options = new MqttClientOptions()
            .setKeepAliveInterval(5)
            .setClientId("client_1111")
            .setAutoKeepAlive(true);
    mqttClient = MqttClient.create(vertx, options);
    mqttClient.publishHandler(handle -> {
        log.info("receive topic:[{}],payload:[{}]", handle.topicName(), handle.payload());
    });

    mqttClient.connect(1883, "localhost", handler -> {
        mqttClient.subscribe("/topic", 0);
    });
}
@Override
public void stop() throws Exception {
    if (mqttClient != null) {
        log.info("disconnect");
        mqttClient.disconnect();
    }
}

}`

` @Slf4j public class MqtttServer extends AbstractVerticle { private static final String TOPIC_LIGHTS = "lights";

@Override
public void start() throws Exception {
    MqttServerOptions options = new MqttServerOptions().setPort(1883).setHost("0.0.0.0");
    MqttServer mqttServer = MqttServer.create(vertx, options);
    mqttServer.endpointHandler(endpoint -> {
        System.out.println("connected client " + endpoint.clientIdentifier());
        endpoint.accept(false);
        endpoint.pingHandler(h -> {
            log.info("receive client [{}] ping message", endpoint.clientIdentifier());
        });
        handleSubscription(endpoint);
        handleUnsubscription(endpoint);
        publishHandler(endpoint);
        handleClientDisconnect(endpoint);
    })
            .listen(ar -> {
                if (ar.succeeded()) {
                    log.info("MQTT server is listening on port " + ar.result().actualPort());
                } else {
                    log.info("Error on starting the server");
                    ar.cause().printStackTrace();
                }
            });
}

private static void handleSubscription(MqttEndpoint endpoint) {
    endpoint.subscribeHandler(subscribe -> {

        List<MqttQoS> grantedQosLevels = new ArrayList<>();
        for (MqttTopicSubscription s : subscribe.topicSubscriptions()) {
            grantedQosLevels.add(s.qualityOfService());
        }
        String topicNames = subscribe.topicSubscriptions().stream().map(MqttTopicSubscription::topicName).collect(Collectors.joining(","));
        log.info("{} subscribe :{}", endpoint.clientIdentifier(), topicNames);
        endpoint.subscribeAcknowledge(subscribe.messageId(), grantedQosLevels);
        for (MqttTopicSubscription topicSubscription : subscribe.topicSubscriptions()) {
            endpoint.publish(topicSubscription.topicName(), Buffer.buffer("hello"),MqttQoS.EXACTLY_ONCE,false,false);
        }
        endpoint.publishAcknowledgeHandler(messageId -> {

            System.out.println("Received ack for message = " + messageId);

        }).publishReceivedHandler(endpoint::publishRelease).publishCompletionHandler(messageId -> {

            System.out.println("Received ack for message = " + messageId);
        });
    });
}

private static void handleUnsubscription(MqttEndpoint endpoint) {
    endpoint.unsubscribeHandler(unsubscribe -> {
        String topics = String.join(",", unsubscribe.topics());
        log.info("{} unsubscribe :{}", endpoint.clientIdentifier(), topics);
        endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
    });
}

private static void publishHandler(MqttEndpoint endpoint) {
    endpoint.publishHandler(message -> {
        log.info("client [{}] publish message :{}, to topic:[{}]", endpoint.clientIdentifier(), message.payload(), message.topicName());
        if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
            endpoint.publishAcknowledge(message.messageId());
        } else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
            endpoint.publishRelease(message.messageId());
        }
    }).publishReleaseHandler(endpoint::publishComplete);
}

private static void handleQoS(MqttPublishMessage message, MqttEndpoint endpoint) {
    if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
        String topicName = message.topicName();

        if (TOPIC_LIGHTS.equals(topicName)) {
            LightsController.handler(message);
        }
        endpoint.publishAcknowledge(message.messageId());

    } else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
        endpoint.publishRelease(message.messageId());
    }
}

private static void handleClientDisconnect(MqttEndpoint endpoint) {
    endpoint.disconnectHandler(h -> {
        log.info("{} disconnect", endpoint.clientIdentifier());
    });
}

}`

` @Slf4j public class VertxServer extends AbstractVerticle {

@Override
public void start() throws Exception {
    HttpServer server = vertx.createHttpServer();
    Router router = Router.router(vertx);
    router.route("/static/*").handler(StaticHandler.create("templates"));
    router.get("/hello").handler(ctx -> {

        MqttClientOptions options = new MqttClientOptions().setClientId("client_sender_111");
        MqttClient mqttClient = MqttClient.create(vertx, options);
        mqttClient.connect(1883, "localhost", h -> {
            if (h.succeeded()) {
                mqttClient.publish("/topic", Buffer.buffer("hello, how are you"), MqttQoS.AT_MOST_ONCE, false, false, s -> {
                    log.info("send success? {}", s.succeeded());
                    mqttClient.disconnect();
                });
            }
        });

        // This handler will be called for every request
        HttpServerResponse response = ctx.response();
        response.putHeader("content-type", "text/plain");
        // Write to the response and end it
        response.end("send success");
    });

    server.requestHandler(router).listen(9091, h -> {
        log.info("server start:{}", h.succeeded());
    });
}

}`

description

when the client init on project start, I can receive the msg publish by the mqtt server subscribeHandler "hello", but when I use the web router "/hello" to publish msg to the topic "/topic" the subscriber client "client_1111" cant recieve the msg ...

help me please ~ T_T ~

chenziyi2018 commented 3 years ago

sorry its not bug, I used a wrong tag

vietj commented 3 years ago

is it a bug or a question ?

chenziyi2018 commented 3 years ago

is it a bug or a question ?

its a question. I write a demo like this, bug it not work : subscriber cant receive the published message.

i want to use the vertx in my project. but development documents are too less, and the samples are too simple. can you give me some suggests?

zhengchalei commented 3 years ago

I have the same problem,

endpoint.publishHandler { message ->
       // this is ok;
        println("publishHandler topic: ${message.topicName()}  payload: ${message.payload()}")
        if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
          endpoint.publishAcknowledge(message.messageId())
        } else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
          endpoint.publishReceived(message.messageId())
        }
      }.publishReleaseHandler { messageId -> endpoint.publishComplete(messageId) }

but other clients subscribe topic can't get the message; do you need to manually forward the subscription to other clients

sunqb commented 3 years ago

how to get message from subscribe ? i can't find a method to get message from

sunqb commented 3 years ago

how to get message from subscribe ? i can't find a method to get message from

method name is not good,i think 。 this is not a bug,i use 'publishHandler' to get message from subscribe

pigbayspy commented 3 years ago

yeah, vertx-mqtt is not a fully featured MQTT broker. You need to complete pub/sub message passage by yourself.