hivemq / hivemq-mqtt-client

HiveMQ MQTT Client is an MQTT 5.0 and MQTT 3.1.1 compatible and feature-rich high-performance Java client library with different API flavours and backpressure support
https://hivemq.github.io/hivemq-mqtt-client/
Apache License 2.0
864 stars 162 forks source link

Mqtt5PubRecException: PUBREC contained an Error Code #484

Closed digrec closed 8 months ago

digrec commented 3 years ago

Hi everyone,

we are using HiveMQ MQTT Client on our Android mobile devices quite extensively, and we noticed that a significant number of publishes from clients is failing with:

Mqtt5PubRecException: PUBREC contained an Error Code.

And almost always the reason code found in Mqtt5PublishResult is: Mqtt5PubRecReasonCode.PACKET_IDENTIFIER_IN_USE (code 145) and the PUBLISH packet ID in question is somehow always 1.

PACKET_IDENTIFIER_IN_USE reason code says that this might point to issues with mismatch of the session state between the client and the broker. So we tried to debug this error multiple times to figure out what goes wrong or what client/broker configuration would prevent this, while we reduced the message loss we are still facing many failed publishes and therefore messages get stuck on devices. After some time or intentional client reset the issue disappears, and then it appears again later.

What we found suspicious is that when this error code is logged on EMQ and on device, the PUBLISH packet IDs used by the affected client are in fact not increasing as expected, and are simply stuck at 1. EMQ X logs show the same PACKET_IDENTIFIER_IN_USE error code, over and over, for the same client which is unable to publish its messages.

To compare this unexpected behaviour with another broker, we tried HiveMQ Broker too. And noticed the same issue, the PUBLISH Packet ID was still stuck at 1, even with the cleanSession = true flag. See packet identifier for [c9198f90544d1892|io.workerbase.devicemanagement] client in this HiveMQ trace:

2021-04-16 14:54:06,403 - [c9198f90544d1892|io.workerbase.devicemanagement] - Received PUBLISH message (packet identifier: 1, topic: workerbase/skeeper/ping, QoS: 2, retain: false, duplicate delivery: false, user properties: [name: publishId, value: a859f16e-6336-4b1c-82ec-9e65092fe611], [name: publisherId, value: 39d2f6fb-1a6b-442b-9abf-0773f05da9b1])
2021-04-16 14:54:06,408 - [hamid-hivemq] - Sent PUBLISH message (topic: workerbase/skeeper/ping, QoS: 0, retain: false, duplicate delivery: false, user properties: [name: publishId, value: a859f16e-6336-4b1c-82ec-9e65092fe611], [name: publisherId, value: 39d2f6fb-1a6b-442b-9abf-0773f05da9b1])
2021-04-16 14:54:06,598 - [hamid-hivemq] - Received PUBLISH message (packet identifier: 51865, topic: workerbase/skeeper/ping, QoS: 2, retain: false, duplicate delivery: false)
2021-04-16 14:54:06,600 - [hamid-hivemq] - Sent PUBLISH message (topic: workerbase/skeeper/ping, QoS: 0, retain: false, duplicate delivery: false)
2021-04-16 14:54:21,410 - [c9198f90544d1892|io.workerbase.devicemanagement] - Received PUBLISH message (packet identifier: 1, topic: workerbase/skeeper/ping, QoS: 2, retain: false, duplicate delivery: false, user properties: [name: publishId, value: 53387eba-ad87-46aa-89c0-6f1cb5985b1d], [name: publisherId, value: 39d2f6fb-1a6b-442b-9abf-0773f05da9b1])
2021-04-16 14:54:21,415 - [hamid-hivemq] - Sent PUBLISH message (topic: workerbase/skeeper/ping, QoS: 0, retain: false, duplicate delivery: false, user properties: [name: publishId, value: 53387eba-ad87-46aa-89c0-6f1cb5985b1d], [name: publisherId, value: 39d2f6fb-1a6b-442b-9abf-0773f05da9b1])
2021-04-16 14:54:36,419 - [c9198f90544d1892|io.workerbase.devicemanagement] - Received PUBLISH message (packet identifier: 1, topic: workerbase/skeeper/ping, QoS: 2, retain: false, duplicate delivery: false, user properties: [name: publishId, value: ff3ec9ab-7cd4-4bd3-a228-12d8c8c37ca4], [name: publisherId, value: 39d2f6fb-1a6b-442b-9abf-0773f05da9b1])
2021-04-16 14:54:36,425 - [hamid-hivemq] - Sent PUBLISH message (topic: workerbase/skeeper/ping, QoS: 0, retain: false, duplicate delivery: false, user properties: [name: publishId, value: ff3ec9ab-7cd4-4bd3-a228-12d8c8c37ca4], [name: publisherId, value: 39d2f6fb-1a6b-442b-9abf-0773f05da9b1])

Why does the packet ID never increment or change, even with clean session? Is that the reason for PACKET_IDENTIFIER_IN_USE errors? We tried other MQTT clients to reproduce this but they do increment their PUBLISH packet IDs as expected.

The setup

Our goal is to not loose any message while offline, so we use:

MQTT client creation and configuration

private var client: Mqtt5RxClient? = null

override suspend fun connect(config: SkeeperConfig) {
    Timber.d("connect: $config")

    if (client == null) {
        client = buildClient(config)
    }

    val sub = client?.connectWith()
        ?.keepAlive(5)
        ?.cleanStart(false)
        ?.sessionExpiryInterval(60 * 60 * 1)
        ?.applyConnect()
        ?.subscribeOn(Schedulers.io())
        ?.subscribe(
            this::onConnected,
            this::onConnectError
        )
    sub?.let { compositeDisposable.add(it) }
}

private fun buildClient(config: SkeeperConfig): Mqtt5RxClient {
    Timber.d("buildClient: ${config.userId}")

    val clientBuilder = Mqtt5Client.builder()
        .identifier(config.userId) // userId or deviceId
        .serverHost(config.serverHost)
        .serverPort(config.serverPort)

    if (!config.serverBasepath.isNullOrEmpty()) {
        clientBuilder
            .webSocketConfig()
            .serverPath(config.serverBasepath)
            .applyWebSocketConfig()
    }

    if (config.useSsl) {
        clientBuilder.sslWithDefaultConfig()
    }

    if (!config.userToken.isNullOrEmpty()) {
        clientBuilder.simpleAuth()
            .username(config.userToken)
            .password(config.userToken.toByteArray())
            .applySimpleAuth()
    }

    return clientBuilder
        .automaticReconnect()
        .initialDelay(3, TimeUnit.SECONDS)
        .maxDelay(10, TimeUnit.SECONDS)
        .applyAutomaticReconnect()
        .addConnectedListener { context -> onClientConnected(context) }
        .addDisconnectedListener { context -> onClientDisconnected(context) }
        .buildRx()
}

private fun onClientDisconnected(context: MqttClientDisconnectedContext) {
    val clientId = context.clientConfig.getClientId()
    val cause = context.cause
    val attempts = context.reconnector.attempts

    Timber.e(cause, "onClientDisconnected")
    Timber.d(
        "onClientDisconnected: clientId = $clientId, " +
                "cause = $cause, source = ${context.source}, attempts = $attempts"
    )

    checkIfNotAuthorizedError(cause, clientId)

    if (isInitiatedDisconnect) {
        // TODO: remove custom auto-reconnect stopper when this ticket is released
        //  https://github.com/hivemq/hivemq-mqtt-client/issues/306

        // Current auto-reconnect feature delays transition of the HiveMq client into
        // disconnected state until the next auto-reconnect fail where we stop it manually.
        Timber.d("isInitiatedDisconnect: stop auto-reconnect")
        context.reconnector.reconnect(false)

        service.onMqttDisconnected(SkeeperStateException(cause.toString()))
        onDisconnectedAndStopped()
    } else {
        context.reconnector.republishIfSessionExpired(true)
        service.onMqttDisconnected(SkeeperStateException(cause.toString()))
    }
}
SgtSilvio commented 3 years ago

Sorry for the delay, I will still answer here in the next few days, but unfortunately did not find the time in the last weeks.

SgtSilvio commented 3 years ago

Hi @digrec Thank you for the detailed description.

Why does the packet ID never increment or change

The MQTT specification does not specify that packet ids must be incremented by 1; instead the sender must ensure that a packet identifier is never used twice concurrently. Quoting the MQTT specification:

Each time a Client sends a new [...] PUBLISH (where QoS > 0) MQTT Control Packet it MUST assign it a non-zero Packet Identifier that is currently unused [MQTT-2.2.1-3].

The Packet Identifier becomes available for reuse after the sender has processed the corresponding acknowledgement packet, defined as follows. In the case of a QoS 1 PUBLISH, this is the corresponding PUBACK; in the case of QoS 2 PUBLISH it is PUBCOMP or a PUBREC with a Reason Code of 128 or greater.

(https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901026)

This means it is possible that always the same packet identifier is used when each PUBLISH is fully acknowledged before the next PUBLISH is sent. (In the HiveMQ log you provided this is very likely the case as messages are sent in a 15s interval)

Is that the reason for PACKET_IDENTIFIER_IN_USE errors?

If the broker works as expected, using the same packet identifier after the previous PUBLISH is fully acknowledged should not lead to this error. It seams that this is a bug of EMQX which wrongly thinks that the packet identifier is still in use even if it is not. This is also a violation of the MQTT specification.

To compare this unexpected behaviour with another broker, we tried HiveMQ Broker too. And noticed the same issue, the PUBLISH Packet ID was still stuck at 1

As described above, using the same packet identifier is not an issue. Do you see the Mqtt5PubRecException also with HiveMQ or only with EMQX. If the latter is the case this confirms the assumption that this is a bug of EMQX.

Feel free to ask any questions about my explanations. And again, sorry for the delay, needed to find the time to check the packet identifier logic again to not make false claims that this is an EMQX bug.

digrec commented 3 years ago

Hi @SgtSilvio, thanks for your time and looking into this issue.

In the mean time, after posting the question, I was reading the source code of HiveMQ client, specifically MqttOutgoingQosHandler and it's packetIdentifiers field to understand the packet id logic, and came with the same conclusion that the packet IDs are released and reused again as soon as their currently assigned publish message gets acknowledged. Which matches perfectly with your explanation.

To see this in action I was producing a lot of small ping publishes per second to see how packet id changes and actually increases at some point, and decreases etc. Then totally coincidently, when redeploying the connected app, I managed to reproduce the Mqtt5PubRecException when redeployed app connected again with same credentials and used same ping topic.

So if I stop the app process while app is publishing and packet id is higher than 1, more than 1 packet is in flight and not yet acked as I understand it, then restart the client and connect, and the first publish with packetIdentifier=1 fails with result printed like this:


onPublishResult: MqttQos2Result{publish=MqttPublish{topic=workerbase/skeeper/ping, payload=7byte, qos=EXACTLY_ONCE, retain=false, userProperties=[(publishId, e2b76470-a16b-480e-930b-873bea99adaa), (publisherId, 7206a73e-c4f2-4f72-aa9f-8482c37c629b)]}, error=com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubRecException: PUBREC contained an Error Code, pubRec=MqttPubRec{packetIdentifier=1}}

and the reason on this result object is:

onPublishResult: PubRec.Reason = 145 PACKET_IDENTIFIER_IN_USE

Does this have to do with the fact that we use session? If client restarts, while there are publishes in progress, the client is started and reconnected again with session, but lost track of previous session state and previous not acked packet ids, so it starts from the packed id 1 again and then the broker replies with Mqtt5PubRecException because client violates the protocol?

By the client restart I mean that the existing Mqtt5RxClient instance is stoped, released, and new client instance is built and connected withe same credentials and same topic. Because, afaik, there is no other way to force reconnect the same client instance. We do that when HiveMQ client is not reconnecting fast enough, like after entering into WIFI range again, after device was out side of the WIFI for some time. Or when users see "not connected" bar in the app and click reconnect which restarts the MQTT client to try to establish connection as fast as possible.

In April, I was able to reproduce this also on our test installation of HiveMQ broker, now I don't have access to it any more, so I tried with public HiveMQ broker, but for some reason I can't reproduce it there. Same setup, same code, with session and client restarts. I guess it's some difference in broker configs?

Should the session in this case be cleared from client side, after reseting client, to prevent packet id in use errors?

If so, is connecting to times, first with clean session true, and then clean session false, flag the only way to do that? As described here https://stackoverflow.com/questions/36034433/mqtt-clean-session

But by clearing the client session we would also loose the messages that were queued on the broker, for the client's subscriptions, while client was not connected, right?

In the mean time we also updated our EMQX broker, and are still looking into it if EMQX is somehow causing this, but first I want to make sure that client restarts are not causing this.

kushwiz commented 2 years ago

We are facing this issue as well. HiveMQ as client and EMQX 4.4 as broker.

pglombardo commented 8 months ago

Can someone in this thread confirm if you are still seeing this issue? We've had quite a few releases since this was been filed. I assume the same on the EMQX side.

Additionally, we haven't seen any reports of this issue with any other broker other than EMQX. Maybe bad broker behaviour or some bad interaction/protocol assumption between the two?

digrec commented 8 months ago

Hi, thanks for brining this up! We are not experiencing this issue anymore, we use latest HiveMQ client and EMQX broker. Feel free to close this ticket if no one else objects. Or should I close it?

pglombardo commented 8 months ago

Thanks @digrec for the update. I'll close this out but if anyone still has a remaining issue, feel free to add a comment or file a new issue. Thanks all!