emqx / CocoaMQTT

MQTT 5.0 client library for iOS and macOS written in Swift
https://www.emqx.com/en
Other
1.58k stars 413 forks source link

Messages published with `retained` flag do not get delivered on subscription #458

Closed rafaelnobrekz closed 2 years ago

rafaelnobrekz commented 2 years ago

Hello! I'm having an issue regarding the retained flag. Hope you can shed some light on what might be wrong.

I suspect it to be a subscription issue (either my code, or a library bug), because when I try to send the publishing through the MQTTX app on the Mac, the result is the same with either a retained message and regular message. However when I subscribe to the topic in MQTTX with Retain handling option set to 0 it works as expected - I get the latest message right after subscribing.

I publish like this:

let message = CocoaMQTT5Message(
            topic: "kinzoo/connection/5448",
            payload: [UInt8](message.payload ?? Data()),
            qos: .qos1,
            retained: true
        )

        if self.client.publish(message, retained: true, properties: .init()) == -1 {
            Logger.log("Publish did fail \(message)")
        }

And I subscribe like this:

let subscription = MqttSubscription(topic: "kinzoo/connection/5448", qos: .qos1)
        subscription.noLocal = true
        self.client.subscribe([subscription])

I thought I had done something wrong on the subscription and went over all the code to find the subscription options which are turned off by default. But I still do not get retained messages from the broker!

I have this logging set up:

client.didSubscribeTopics = { _, successful, failed, _ in
            Logger.log("did subscribe to \(successful.count) topics, \(failed.count) failed")
        }
        client.didReceiveMessage = { [weak self] client, message, _, _ in
            Logger.log("Did receive\(message.retained ? " retained " : " ")message \(message)")
        }
        client.didPublishMessage = { client, message, _ in
            Logger.log("Did publish\(message.retained ? " retained " : " ")message \(message) with payload \(message.payload)")
        }

And I get this output when publishing:

CocoaMQTT(debug): SEND: PUBLISH(id: 1, topic: kinzoo/connection/5448, payload: [123, 34, 115, 116, 97, 116, 117, 115, 34, 58, ...])
CocoaMQTT(debug): ==========================MQTT 5.0==========================
CocoaMQTT(debug): packetFixedHeaderType 49
CocoaMQTT(debug): fixedHeader [48]
CocoaMQTT(debug): remainingLen(len: len) [141, 1]
CocoaMQTT(debug): variableHeader [0, 22, 107, 105, 110, 122, 111, 111, 47, 99, 111, 110, 110, 101, 99, 116, 105, 111, 110, 47, 53, 52, 52, 56, 0]
CocoaMQTT(debug): properties []
CocoaMQTT(debug): payload [123, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, 111, 110, 108, 105, 110, 101, 34, 44, 34, 101, 120, 112, 111, 115, 101, 67, 108, 105, 101, 110, 116, 83, 116, 97, 116, 117, 115, 34, 58, 116, 114, 117, 101, 44, 34, 115, 99, 104, 101, 109, 97, 86, 101, 114, 115, 105, 111, 110, 34, 58, 49, 44, 34, 100, 101, 118, 105, 99, 101, 84, 111, 107, 101, 110, 34, 58, 34, 67, 66, 48, 54, 68, 56, 69, 67, 45, 51, 68, 55, 52, 45, 52, 51, 51, 51, 45, 56, 69, 68, 69, 45, 53, 54, 53, 53, 65, 55, 52, 67, 53, 48, 53, 49, 34, 125]
CocoaMQTT(debug): =============================================================
2022-06-09 22:08:05.205230-0300 MyApp[40788:20628095] [CocoaMQTT] Did publish retained message CocoaMQTT5Message(topic: kinzoo/connection/5448, qos: qos1, payload: [123, 34, 115, 116, 97, 116, 117, 115, 34, 58, ...]) with payload [123, 34, 115, 116, 97, 116, 117, 115, 34, 58, 34, 111, 110, 108, 105, 110, 101, 34, 44, 34, 101, 120, 112, 111, 115, 101, 67, 108, 105, 101, 110, 116, 83, 116, 97, 116, 117, 115, 34, 58, 116, 114, 117, 101, 44, 34, 115, 99, 104, 101, 109, 97, 86, 101, 114, 115, 105, 111, 110, 34, 58, 49, 44, 34, 100, 101, 118, 105, 99, 101, 84, 111, 107, 101, 110, 34, 58, 34, 67, 66, 48, 54, 68, 56, 69, 67, 45, 51, 68, 55, 52, 45, 52, 51, 51, 51, 45, 56, 69, 68, 69, 45, 53, 54, 53, 53, 65, 55, 52, 67, 53, 48, 53, 49, 34, 125]

And this one when subscribing:

CocoaMQTT(debug): SEND: SUBSCRIBE(id: Optional(5), topics: kinzoo/connection/5448)  
CocoaMQTT(debug): ==========================MQTT 5.0==========================
CocoaMQTT(debug): packetFixedHeaderType 130
CocoaMQTT(debug): fixedHeader [128]
CocoaMQTT(debug): remainingLen(len: len) [28]
CocoaMQTT(debug): variableHeader [0, 5, 0]
CocoaMQTT(debug): properties []
CocoaMQTT(debug): payload [0, 22, 107, 105, 110, 122, 111, 111, 47, 99, 111, 110, 110, 101, 99, 116, 105, 111, 110, 47, 53, 52, 52, 56, 5]
CocoaMQTT(debug): RECV: SUBACK(id: 5)
2022-06-09 22:43:18.063959-0300 MyApp[596:100523] [CocoaMQTT] did subscribe to 1 topics, 0 failed

It does receive publishes made when the client is connected to the broker (tested on mosquitto and paho):

CocoaMQTT(debug): RECV: PUBLISH(id: 2, topic: , payload: [123, 34, 115, 116, 97, 116, 117, 115, 34, 58, ...])
CocoaMQTT(info): Received message: CocoaMQTT5Message(topic: kinzoo/connection/5448, qos: qos1, payload: [123, 34, 115, 116, 97, 116, 117, 115, 34, 58, ...])
2022-06-09 22:34:40.645346-0300 MyApp[585:94432] [CocoaMQTT] Did receive retained message CocoaMQTT5Message(topic: kinzoo/connection/5448, qos: qos1, payload: [123, 34, 115, 116, 97, 116, 117, 115, 34, 58, ...])

Yet, after reconnecting/subscribing again, I do not get the latest retained message. I don't understand if my client is failing to send a proper retained message to the broker, of if the client is failing to deliver the existing retained messages to me when I subscribe.

Any tips would be highly appreciated!

Best regards, Rafael

rafaelnobrekz commented 2 years ago

I edit a bit of the data as my testing evolved. Could it also be related to the broker ? From my latest tests, it seems I'm able to get it to behave as expected on broker.emqx.io as well as test.mosquitto.org, but not on our paho based server

rafaelnobrekz commented 2 years ago

As far as I understand the packet format, which is little to be honest, it all seems correct: packetFixedHeaderType 49 -> 48 for the 4 bits of PUBLISH, plus 0001 options which is the retain flag For subscription, the payload seem fine as it contains the topic name and the last byte is 5 -> 0101, meaning qos1, and the only option to avoid getting back our own publish when subscribing. I tested with this one disabled and it behaved the same.

leeway1208 commented 2 years ago

As far as I understand the packet format, which is little to be honest, it all seems correct: packetFixedHeaderType 49 -> 48 for the 4 bits of PUBLISH, plus 0001 options which is the retain flag For subscription, the payload seem fine as it contains the topic name and the last byte is 5 -> 0101, meaning qos1, and the only option to avoid getting back our own publish when subscribing. I tested with this one disabled and it behaved the same.

Hi , your problem is solved? 😄

leeway1208 commented 2 years ago

I edit a bit of the data as my testing evolved. Could it also be related to the broker ? From my latest tests, it seems I'm able to get it to behave as expected on broker.emqx.io as well as test.mosquitto.org, but not on our paho based server

My colleague said that if your broker does not support the retain attribute. You may not be able to retain the message.

rafaelnobrekz commented 2 years ago

Hi, the problem is not solved yet 😢 The broker is paho, it does support it. The Android team is able to use the feature on the same broker

leeway1208 commented 2 years ago

Hi, the problem is not solved yet 😢 The broker is paho, it does support it. The Android team is able to use the feature on the same broker

Could you provide a paho broker for us to test? thanks~~

rafaelnobrekz commented 2 years ago

I will get a hold of one ASAP, thank you 👍

rafaelnobrekz commented 2 years ago

@leeway1208 Here is our sample broker without TLS or authentication: redacted Correction: our broker is VerneMQ

rafaelnobrekz commented 2 years ago

We have pinpointed it to be an issue with retained will messages! We are able to publish retained messages on all 3 brokers tested: vernemq, mosquitto, emq We are able to publish retained will messages on mosquitto and emq, but not vernemq!

I am searching for any outstanding issues on vernemq's issues board about that. We're also doing one last round of tests by publishing a will message from our sample client in golang, to understand if that will is able to be retained correctly on vernemq.

rafaelnobrekz commented 2 years ago

Our latest testing with a golang client that connects to VerneMQ and sets a retained will message does work. So I assume there is some small protocol discrepancy in either verne parsing of the connect packet, or the bytes CocoaMQTT sends to it 🤔 OR that my code for setting the will is broken somehow. I'll post snippets for both

rafaelnobrekz commented 2 years ago

This is how I set the will message when connecting:

        let client = CocoaMQTT5(clientID: "not set", host: host, port: UInt16(port))
        client.enableSSL = false
        client.logLevel = .debug
        client.backgroundOnSocket = false
        client.keepAlive = 10
        client.clientID = UUID().uuidString
        client.username = nil
        client.password = nil
        client.willMessage = .init(
            topic: "kinzoo/connection/3377",
            payload: [UInt8]((Bool.random() ? "offline" : "online").data(using: .utf8)!),
            qos: .qos1,
            retained: true
        )
        client.autoReconnect = true
        Logger.log("Will connect to \(client.host):\(client.port) (\(client.enableSSL ? "SSL" : "no SSL")) with credentials \(String(describing: client.username)):\(String(describing: client.password))")
        _ = client.connect()
}

This is the output for the CONNECT packet sent from CocoaMQTT:

CocoaMQTT(info): Connected to redacted: 1883
CocoaMQTT(debug): SEND: CONNECT(id: 08618C85-9750-4A6C-9812-C29AB09C3A1F, username: nil, password: nil, keepAlive : 10, cleansess: true)
CocoaMQTT(debug): ==========================MQTT 5.0==========================
CocoaMQTT(debug): packetFixedHeaderType 16
CocoaMQTT(debug): fixedHeader [16]
CocoaMQTT(debug): remainingLen(len: len) [95]
CocoaMQTT(debug): variableHeader [0, 4, 77, 81, 84, 84, 5, 46, 0, 10, 0]
CocoaMQTT(debug): properties []
CocoaMQTT(debug): payload [0, 36, 48, 56, 54, 49, 56, 67, 56, 53, 45, 57, 55, 53, 48, 45, 52, 65, 54, 67, 45, 57, 56, 49, 50, 45, 67, 50, 57, 65, 66, 48, 57, 67, 51, 65, 49, 70, 12, 24, 0, 0, 0, 0, 2, 0, 0, 0, 0, 1, 1, 0, 22, 107, 105, 110, 122, 111, 111, 47, 99, 111, 110, 110, 101, 99, 116, 105, 111, 110, 47, 51, 51, 55, 55, 0, 7, 111, 102, 102, 108, 105, 110, 101]

This is how the golang sample client does the LWT posting:

    server := flag.String("server", "redacted:1883", "The full URL of the MQTT server to connect to")
    clientid := flag.String("clientid", "", "A clientid for the connection")
    username := flag.String("username", "api-server", "A username to authenticate to the MQTT server")
    password := flag.String("password", "==", "Password to match username")
    flag.Parse()

    conn, err := net.Dial("tcp", *server)
    if err != nil {
        log.Fatalf("Failed to connect to %s: %s", *server, err)
    }

    c := paho.NewClient(paho.ClientConfig{
        Conn: conn,
    })
    cp := &paho.Connect{
        KeepAlive:  30,
        ClientID:   *clientid,
        CleanStart: true,
        Username:   *username,
        Password:   []byte(*password),
        WillMessage: &paho.WillMessage{
            Retain:  true,
            QoS:     1,
            Topic:   "kinzoo/connection/25264",
            Payload: []byte("Willpower"),
        },
    }

    if *username != "" {
        cp.UsernameFlag = true
    }
    if *password != "" {
        cp.PasswordFlag = true
    }

    log.Println(cp.UsernameFlag, cp.PasswordFlag)

    ca, err := c.Connect(context.Background(), cp)

    fmt.Printf("Connected to %s\n", *server)
rafaelnobrekz commented 2 years ago
Screen Shot 2022-06-10 at 15 37 26 Screen Shot 2022-06-10 at 15 37 34

24, 0, 0, 0, 0, 2, 0, 0, 0, 0 The spec does not mention 0 as being a default value for Will Expiry Interval, could this broker consider 0 as a "discard immediatelly" command?

rafaelnobrekz commented 2 years ago

Eureka!! willMessage.willExpiryInterval = .max this fixed the issue for VerneMQ broker!! They are following the spec to the letter, and do not consider 0 as forever as the delay expiry. As a quick fix, maybe CocoaMQTT could default to .max instead of 0? The best would be not to send payload bytes when will delay and will expiry are set to 0

leeway1208 commented 2 years ago

Thanks for your support. I will fix it ASAP and then you can check it work or not. If there are still some problem, you can provide PR to us.

rafaelnobrekz commented 2 years ago

Great, having it default to max for now is a good thing. I was also successful in working around it by setting a manual .max value in my created will message. I think the best approach overall would be letting the default values be nil, and on the payload5 method, not outputting the 4 bytes for both delay and expiry properties, then the spec is followed, the wire packet is smaller, and retained messages are still dispatched as expected

rafaelnobrekz commented 2 years ago

I have also tested setting nil as the value, and the bytes are not emitted at all (which is better imo), and it works ok on the brokers I have tested with!

130db commented 1 week ago

This is still actual problem Retained message is not received on connection

    let socket = CocoaMQTTSocket()
    socket.enableSSL = true

    service = CocoaMQTT5(clientID: clientId, host: host, port: port, socket: socket)

    let connectProperties = MqttConnectProperties()
    connectProperties.sessionExpiryInterval = .max
    service?.connectProperties = connectProperties

    service?.connectProperties?.sessionExpiryInterval = .max
    service?.logLevel = .debug
    service?.enableSSL = true
    service?.autoReconnect = true
    service?.username = nil
    service?.password = nil
    service?.cleanSession = false
    service?.delegate = self

Everythig except retained message works.

rafaelnobrekz commented 1 week ago

@130db What's the library version you're using? What is the will message being posted like? (willExpiryInterval should be set to .max or nil) How is the subscription made? (MqttSubscription retainHandling should be set to .sendOnSubscribe on the latest version, .none before #499 was fixed)

130db commented 1 week ago

@130db What's the library version you're using? What is the will message being posted like? (willExpiryInterval should be set to .max or nil) How is the subscription made? (MqttSubscription retainHandling should be set to .sendOnSubscribe on the latest version, .none before #499 was fixed)

Client just listens for messages. This is for radio station app. App receive current song metadata via MQTT. Latest EMQX + CocoaMQTT 2.1.8 Minimums iOS deployment 16.0

EMQX is configured correctly. I am using it on Radio Nemiers web. Media player at the bottom of the page.

I have tried to use WebSocket, but it fails because of Starscream (not conforming to delegate)

Forgot to mention I do not use lastWill. It is read-only connection.

For tests however I use read/write connection with username and password

130db commented 1 week ago

For those stumble upon "missing retain messages" problem Thank you @rafaelnobrekz for direction!

final class MQTTService: ObservableObject {
    // ..

    func configure() {
        if service != nil {
            print("🤘 DEBUG: There were MQTT instance. Reseting")
            service = nil
        }

        self.host = "mqtt.example.com"
        self.port = 8881

        let clientId = "mqtt-poc-\(String(ProcessInfo().processIdentifier))"
        print("🤘 DEBUG: Client ID: \(clientId)")

        let socket = CocoaMQTTSocket()
        socket.enableSSL = true

        service = CocoaMQTT5(clientID: clientId, host: host, port: port, socket: socket)

        service?.connectProperties = MqttConnectProperties()
        service?.logLevel = .debug
        service?.enableSSL = true
        service?.autoReconnect = true
        service?.username = nil
        service?.password = nil
        service?.keepAlive = 60
        service?.delegate = self
    }

    func connect() {
        if let success = service?.connect(), success {
            print("🤘 DEBUG: Connected \(success)")
        } else {
            print("🤘 DEBUG: Connect failed")
        }
    }

    func subscribe(topic: String) {
        let subscription = MqttSubscription(topic: topic)
        subscription.retainHandling = .sendOnSubscribe
        service?.subscribe([subscription])

        // This will not return retained messages because of .retainHandling
        //service?.subscribe(topic)
    }
    // ..
}

extension MQTTService: CocoaMQTT5Delegate {
    // ..
    func mqtt5(_ mqtt5: CocoaMQTT5, didConnectAck ack: CocoaMQTTCONNACKReasonCode, connAckData: MqttDecodeConnAck?) {
        print("🤘 DEBUG: Connected")
        print("🤘 DEBUG: ACK \(ack.rawValue)")

        if ack == .success {
            subscribe(topic: "topic/sub/topic")
        }
    }
    // ..
}