emqx / CocoaMQTT

MQTT 5.0 client library for iOS and macOS written in Swift
https://www.emqx.com/en
Other
1.57k stars 411 forks source link

Can't receive retained message #560

Closed zzsqwq closed 8 months ago

zzsqwq commented 8 months ago

Hello, I am recently using the CocoaMQTT library to subscribe to and publish topics. The specific usage is as follows, but I noticed that I cannot receive any retained messages, but using MQTTX can be received normally.

In my example, there is a retained message on qyun/robot/QYUN-Q03-0001/info/ip. I should receive the relevant message after subscribing to the topic when the App starts, and print log didReceiveMessage is retained message, but no message was received. But I can be sure that the topic is successfully subscribed, because if I send a message to the topic after the app is started, it can be received normally, but the message received is not a retained message.

What could possibly be the problem? Thanks.

Retained message:

CleanShot 2023-10-26 at 16 18 21@2x

Code:

import Foundation
import CocoaMQTT
import os
import Combine

class MQTTService {
    // Add logger
    private let logger = Logger(subsystem: "com.demo", category: "MQTTService")

    private var robots: [String] = []

    static let shared = MQTTService()

    private var mqtt: CocoaMQTT5?

    private var pendingSubscriptions: [String] = []

    private var pendingUnsubscriptions: [String] = []

    var messageReceived = PassthroughSubject<(topic: String, message: String), Never>()

    private init(clientID: String = "CocoaMQTT-" + String(ProcessInfo().processIdentifier),
                 host: String = "broker-cn.emqx.io",
                 port: UInt16 = 1883) {
        logger.log("clientID is \(clientID), host is \(host), port is \(port)")
        mqtt = CocoaMQTT5(clientID: clientID, host: host, port: port)
        mqtt?.willMessage = CocoaMQTT5Message(topic: "/lastwill", string: "dieout")
        mqtt?.keepAlive = 60
        mqtt?.autoReconnect = true
        mqtt?.delegate = self
        self.connect()
    }

    func register(robotId: String) {
        robots.append(robotId)
        if(isConnected()) {
            self.subscribe(topic: "qyun/robot/\(robotId)/info/ip")
            // other topics
        } else {
            pendingSubscriptions.append("qyun/robot/\(robotId)/info/ip")
           // other topics
        }
        self.logger.info("robotId \(robotId) registered.")
        // TODO: maps, histories, timing
    }

    func unregister(robotId: String) {
        if let index = robots.firstIndex(of: robotId) {
            robots.remove(at: index)
            if(isConnected()) {
                self.unsubscribe(topic: "qyun/robot/\(robotId)/info/ip")
                // other topics
            } else {
                pendingUnsubscriptions.append("qyun/robot/\(robotId)/info/ip")
               // other topics
            }
        } else {
            logger.warning("robotId \(robotId) not found in robots map.")
        }
        logger.info("robotId \(robotId) unregistered.")

    }

    func connect() {
        let ret = mqtt?.connect()
        // if ret == 0, connect success
        logger.log("connect result is \(String(describing: ret))")
    }

    func subscribe(topic: String) {
        mqtt?.subscribe(topic)
    }

    func unsubscribe(topic: String) {
        mqtt?.unsubscribe(topic)
    }

    // some other functions...
}

extension MQTTService: CocoaMQTT5Delegate {

    /// 连接成功
    /// The CONNACK packet is the packet sent by the Server in response to a CONNECT packet received from a Client. The Server MUST send a CONNACK with a 0x00 (Success) Reason Code before sending any Packet other than AUTH [MQTT-3.2.0-1]. The Server MUST NOT send more than one CONNACK in a Network Connection [MQTT-3.2.0-2].
    /// If the Client does not receive a CONNACK packet from the Server within a reasonable amount of time, the Client SHOULD close the Network Connection. A "reasonable" amount of time depends on the type of application and the communications infrastructure.
    func mqtt5(_ mqtt5: CocoaMQTT5, didConnectAck ack: CocoaMQTTCONNACKReasonCode, connAckData: MqttDecodeConnAck?) {
        logger.info("didConnectAck ack is \(String(describing: ack))")
        logger.log("didConnectAck connAckData is \(String(describing: connAckData))")
        for topic in pendingSubscriptions {
            self.subscribe(topic: topic)
        }
        for topic in pendingUnsubscriptions {
            self.unsubscribe(topic: topic)
        }
    }

    /// 自己作为接受端接收到的 PUBLISH 报文的信息
    /// A PUBLISH packet is sent from a Client to a Server or from a Server to a Client to transport an Application Message.
    func mqtt5(_ mqtt5: CocoaMQTT5, didReceiveMessage message: CocoaMQTT5Message, id: UInt16, publishData: MqttDecodePublish?) {
        logger.log("didReceiveMessage topic is \(message.topic), string is \(message.string ?? "null")")
        logger.log("didReceiveMessage publishData is \(String(describing: publishData))")
        if message.retained {
            logger.log("didReceiveMessage is retained message")
        }
        // 通过 topic 获取机器人 ID
        // let robotId = message.topic.split(separator: "/")[2]
        // 通过机器人 ID 获取机器人本体
        messageReceived.send((topic: message.topic, message: message.string ?? ""))
        // TODO: 通知机器人本体处理消息
    }

    // some other functions...
}
leeway1208 commented 8 months ago

Hello. you can try to set properties in subscribe. Here are some examples.

  let topic = MqttSubscription(topic: "chat/room/animals/client/+")
            let subscriptions : [MqttSubscription] = [topic]
            topic.retainHandling = CocoaRetainHandlingOption.sendOnSubscribe
            mqtt5.subscribe(subscriptions)
zzsqwq commented 8 months ago

Base on this article, I think the retainHandling setting in the subscription options should be set to default to .sendOnSubscribe instead of .none to maintain compatibility with the MQTT3.1.1 default behavior.