sbtinstruments / aiomqtt

The idiomatic asyncio MQTT client, wrapped around paho-mqtt
https://sbtinstruments.github.io/aiomqtt
BSD 3-Clause "New" or "Revised" License
393 stars 71 forks source link

Negative inflight messages counter #258

Closed harrandt closed 5 months ago

harrandt commented 7 months ago

Python 3.9, Windows 10 aiomqtt 1.2.1 paho-mqtt 1.6.1

The client._client._inflight_messages counter of the paho client is negative and decreasing with QoS 2 for publish and subscribe on the same topic. Other combinations of the QoS levels will not show the issue.

The log seems to indicate that the published messages are transfered successfully:

DEBUG:__main__:Sending PUBLISH (d0, q2, r0, m6), 'b'test'', ... (1 bytes)
DEBUG:__main__:Received PUBREC (Mid: 6)
DEBUG:__main__:Sending PUBREL (Mid: 6)
DEBUG:__main__:Received PUBCOMP (Mid: 6)
DEBUG:__main__:Received PUBLISH (d0, q2, r0, m5), 'test', ...  (1 bytes)
DEBUG:__main__:Sending PUBREC (Mid: 5)
DEBUG:__main__:Received PUBREL (Mid: 5)
DEBUG:__main__:Sending PUBCOMP (Mid: 5)
DEBUG:__main__:received payload 4, mid: 5
DEBUG:__main__: _out_messages 0, _in_messages 0, _inflight_messages -5

Here is a MCVE

import asyncio
import sys
import aiomqtt
import os

#Change to the "Selector" event loop if platform is Windows
if sys.platform.lower() == "win32" or os.name.lower() == "nt":
    from asyncio import set_event_loop_policy, WindowsSelectorEventLoopPolicy
    set_event_loop_policy(WindowsSelectorEventLoopPolicy())

import logging

logging.basicConfig(
    level=logging.DEBUG,
    stream=sys.stdout,
)
logger = logging.getLogger(__name__)

MQTT_TOPIC = "test"
MQTT_PUBLISH_QOS = 2
MQTT_SUBSCRIBE_QOS = 2

async def publisher(client):

    n = 0

    while True:

        await client.publish(MQTT_TOPIC, n, MQTT_PUBLISH_QOS)
        n += 1

        await asyncio.sleep(1.0)

async def main():

    background_tasks = set()

    client = aiomqtt.Client(
        hostname="localhost",
        client_id="client",
        max_inflight_messages=20,
        max_queued_messages=0,
        keepalive=5,
        logger=logger
    )

    async with client:

        task = asyncio.create_task(publisher(client))
        background_tasks.add(task)
        task.add_done_callback(background_tasks.discard)

        async with client.messages() as messages:

            await client.subscribe(MQTT_TOPIC, MQTT_SUBSCRIBE_QOS)

            async for message in messages:

                if message.topic.matches(MQTT_TOPIC):

                    n = int(message.payload.decode("utf-8"))

                    logger.debug(f"received payload {n}, mid: {message.mid}")

                    logger.debug(f" _out_messages {len(client._client._out_messages)}, _in_messages {len(client._client._in_messages)}, _inflight_messages {client._client._inflight_messages}")

if __name__ == "__main__":

    try:
        asyncio.run(main())

    except KeyboardInterrupt:
        sys.exit(0)
empicano commented 5 months ago

Hi there,

Thanks for this issue and sorry for the late reply! Let's try to get to the bottom of this 👍

As far as I know, aiomqtt doesn't touch the _inflight_messages counter. The only time we're doing something related is when we set paho-mqtt's limit on inflight messages with max_inflight_messages_set(). Did you already try to reproduce a negative counter using paho-mqtt directly?

paho-mqtt decrements the _inflight_messages counter in two places:

harrandt commented 5 months ago

Good point, thanks. I re-wrote the example to use only paho and the issue is also visible then.

I created a ticket in the paho tracker. https://github.com/eclipse/paho.mqtt.python/issues/805.

empicano commented 5 months ago

Thanks for reporting back!