sbtinstruments / aiomqtt

The idiomatic asyncio MQTT client, wrapped around paho-mqtt
https://sbtinstruments.github.io/aiomqtt
BSD 3-Clause "New" or "Revised" License
393 stars 71 forks source link

How to queue on_message at highest priority? #206

Closed thalesmaoa closed 1 year ago

thalesmaoa commented 1 year ago

I'm trying to translate RS485 Modbus <-> MQTT.

The problem is that, when a command arrives (on/off/change speed), it waits until the entire task list is executed before handle messages. This time is around two seconds. Is there a way to process it immediately?

In other words, I want to hold modbus request tasks = [asyncio.create_task(x.read_values()) for x in _callDrivers.values()] and execute async for message in messages: immediately. Is there a way to define messages at higher priority?

I thought that it was related to #64 , but after some tests, it doesn't.

#!/usr/bin/python3
# -*- coding: utf-8 -*-

# Define MQTT broker address and port
broker_address = "10.10.10.10"
broker_port = 1883

# Define MQTT client instance
client = aiomqtt.Client(
    hostname=broker_address,  # The only non-optional parameter
    port=broker_port,
    username="admin",
    password="password!"
)

# Handle message
async def handle_message(msg):
    print("world")

# Connect, reconnect, subscribe and check messages
async def mqttConnect():
    interval = 5  # Seconds
    while True:
        try:
            async with client as _client:
                async with _client.messages() as messages:
                    await _client.subscribe("topic/tst01")
                    await _client.subscribe("topic/tst02")
                    await _client.subscribe("topic/tst03")  
                    async for message in messages:
                        print("Hello")
                        await handle_message(message)
        except aiomqtt.MqttError:
            print(f'Connection lost; Reconnecting in {interval} seconds ...')
            await asyncio.sleep(interval)

# Check modbus data
async def checkModbus():
    # Check forever
    while True:
        # There are 64 drivers
        tasks = [asyncio.create_task(x.read_values()) for x in _callDrivers.values()]
        await asyncio.gather(*tasks)

async def main():
    asyncio.create_task(mqttConnect(), name="MQTT")
    asyncio.create_task(checkModbus(), name="Check Modbus")
    while True:
        await asyncio.sleep(1000)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    except KeyboardInterrupt:
        print("Program exiting...")
empicano commented 1 year ago

Hi there, thanks for opening the issue!

I have to admit that I don't really understand your question/problem. I'm not familiar with RS485 Modbus, either, and I don't know what _callDrivers.values() does. Could you try to simplify your example and make it independent of RS485 Modbus (if that's possible), so it's easier for me to understand?

thalesmaoa commented 1 year ago

Hi @empicano , appreciate your reply. Basically, I noticed that the library works with queue, correct?

Let's suppose that I'm publishing N messages. They are queued. But, I'm constantly refilling this queue list. It always has 10 messages in the pile. Suppose that a message arrive from subscribe. It is queued at the end of the pile. From my understanding, it must roll all those 10 messages before enter the subscribe (on_message callback).

I want to perform "the callback" instantly. As soon a message arrive, it enter at the top of the queue list and make the other tasks later.

I also noticed that I was doing a lot of things wrong, but I'm not entirely sure if the library works as I mentioned. Thanks

frederikaalund commented 1 year ago

Hi thalesmaoa, let me try to have a look :)

I don't quite get the overall issue. Like empicano asked, is the RS485 Modbus protocol an important aspect of this issue or can we ignore that part? It's been a while since I've worked with Modbus.

In any case, let me try to answer some of your more specific questions.

Let's suppose that I'm publishing N messages. They are queued. But, I'm constantly refilling this queue list. It always has 10 messages in the pile. Suppose that a message arrive from subscribe. It is queued at the end of the pile. From my understanding, it must roll all those 10 messages before enter the subscribe (on_message callback).

You're correct. asyncio-mqtt queues up incoming messages in a FIFO (first in, first out).

I want to perform "the callback" instantly. As soon a message arrive, it enter at the top of the queue list and make the other tasks later.

Not possible with the current asyncio-mqtt FIFO-based implementation, sorry! If you want a LIFO (last in, first out) instead of a FIFO, you must change asyncio-mqtt itself. It's actually not that bad, change line 614 from:

messages: asyncio.Queue[Message] = asyncio.Queue(maxsize=queue_maxsize)

to something like:

messages: asyncio.LifoQueue[Message] = asyncio.LifoQueue(maxsize=queue_maxsize)

That should do it.

We (asyncio-mqtt) could make this configurable via an, e.g., queue_cls keyword argument in Client.__init__.

@thalesmaoa, try with that manual change first and let me know if it works for your use case. If it does, we (asyncio-mqtt) will add it to the official release. :)

empicano commented 1 year ago

We (asyncio-mqtt) could make this configurable

If we're on that we should probably also think about integrating PriorityQueue, so that we have all three of the asyncio queues. That's more complicated though, because the priority logic would need to be passed to asyncio-mqtt somehow.

frederikaalund commented 1 year ago

If we're on that we should probably also think about integrating PriorityQueue, so that we have all three of the asyncio queues.

Yeah, I agree. πŸ‘

That's more complicated though, because the priority logic would need to be passed to asyncio-mqtt somehow.

Yes, because the message priority is up to the user. They would have to make a function like message_priority(message: Message) -> int. At that point, they might as well just wrap asyncio.PriorityQueue and call message_priority inside that wrapper.

This way, we (asyncio-mqtt) only have to support a single queue interface. I hope it makes sense. :)

empicano commented 1 year ago

Good idea, that's very elegant!

thalesmaoa commented 1 year ago

integrating PriorityQueue

That was exactly what I was trying to say. Perfect.

messages: asyncio.LifoQueue[Message] = asyncio.LifoQueue(maxsize=queue_maxsize)

This should not solve it, since FIFO is ok for publishing, but not for subscribe. Also, some subscribes are ok to be FIFO, others doesn't.

It is good to know that I'm not crazy.

empicano commented 1 year ago

I just implemented this. Here's the relevant documentation. It's not yet released, but you can install it straight from GitHub via pip install git+https://github.com/sbtinstruments/asyncio-mqtt.

I also added a note on #64 (the other issue you commented on) here under "important". @thalesmaoa do you think the documentation is clear like this or is there anything that could still be improved?

frederikaalund commented 1 year ago

I just implemented this. Here's the relevant documentation. It's not yet released, but you can install it straight from GitHub via pip install git+https://github.com/sbtinstruments/asyncio-mqtt.

Looks great. :) I never thought about this use case before now but user-configurable FIFO/LIFO/Priority does make a lot of sense. I like the PriorityQueue example. πŸ‘ Just curious: Why override _get/_put with the _ prefix and not just get/put itself?

I also added a note on https://github.com/sbtinstruments/asyncio-mqtt/issues/64 (the other issue you commented on) here under "important".

Excellent, cheers.

From said docs:

Note that this only makes sense if your message handling is I/O-bound. If it’s CPU-bound, you should spawn multiple processes instead.

Good note! πŸ‘

empicano commented 1 year ago

Why override _get/put with the prefix and not just get/put itself?

That's how LifoQueue and PriorityQueue are subclassed from Queue in the source. This way, put_nowait (what we're actually using for asyncio-mqtt) also does the right thing 😊 Do you think overriding the public functions directly would be simpler?

thalesmaoa commented 1 year ago

Hi @empicano, It was fast! :blush:

I've created a sample program to test and see if I could follow the doc:

import asyncio
import time
import asyncio_mqtt as aiomqtt

client = aiomqtt.Client(hostname='10.10.10.10', username="admin", password="admin")

class PriorityQueue(asyncio.PriorityQueue):
    def _put(self, item):
        priority = 1 
        super()._put((priority, item))

    def _get(self):
        return super()._get()[1]

async def someTask():
    count = 10
    while True:
        asyncio.create_task( client.publish(f"test/send", count) )
        print(f"test/test: {count}")
        count += 1
        time.sleep(1) # Simulate some blockiing process
        # Do some async sleep to publish already created tasks
        await asyncio.sleep(.001)

async def mqttProcess():
    async with client.messages(queue_class=PriorityQueue) as messages:
    # async with client.messages() as messages:
        await ( client.subscribe("test/send") )
        async for message in messages:
            print(f"{message.topic}: {message.payload}")

# run the async function in a loop
async def main():
    # Create MQTT connection
    await client.connect()
    asyncio.create_task(mqttProcess(), name="MQTT")
    # Fill the queue with 10 publishes
    # It will only process subscribe after emptying this buffer
    [asyncio.create_task(client.publish(f"test/send", x)) for x in range(10)]
    # Simulate some blocking processing
    asyncio.create_task(someTask(), name="Some task")
    # Stay here for ever
    while True:
        await asyncio.sleep(1000)

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
except KeyboardInterrupt:
    print("Program exiting...")

I publish outside the program and check how fast it is processed.

Without priority, I can notice that I must run all the created tasks first. In other words, 20 tasks.

After adding priority, it process after 4 or 5 tasks. It works! However, I'm still confused about priority. You've defined a random number to illustrate. I've tested with 1 and with 999, it seems to be the same. At least for this sample.

Regarding #64, I noticed that repositories are outdated. I decided to stick with 3.10. But, at least for me, it is clear.

empicano commented 1 year ago

Great, thanks for checking!

Take care not to post any important credentials. For testing you can use the public mosquitto broker via aiomqtt.Client("test.mosquitto.org"). You don't need a password for that one.

The PriorityQueue retrieves entries in priority order going from lowest to highest. Instead of setting the same priority for all of them, you need to assign a low priority to the messages you want to process early and a higher one to those that can wait. Hope that helps πŸ˜‰

thalesmaoa commented 1 year ago

Thanks @empicano, I totally let it pass. I've edited my post. Regarding the priority, I'm confused. Can I define priority using _put over subscribe? Can you give me an example?

empicano commented 1 year ago

Yes, that's it, the item that gets passed to _put is the message. Based on that you can assign it a priority. Let's say we have sensors measuring temperature and humidity, but we want to prioritize handling the temperature messages. Then we could do something like this:

import asyncio
import asyncio_mqtt as aiomqtt

class PriorityQueue(asyncio.PriorityQueue):
    def _put(self, item):
        priority = 3
        if item.topic.matches("temperature/#"):
            priority = 1
        if item.topic.matches("humidity/#"):
            priority = 2
        print(f"Assigned priority {priority} to message: {item.payload}")
        super()._put((priority, item))

    def _get(self):
        return super()._get()[1]

async def main():
    async with aiomqtt.Client("test.mosquitto.org") as client:
        async with client.messages(queue_class=PriorityQueue) as messages:
            await client.subscribe("temperature/#")
            await client.subscribe("humidity/#")

            # Publish some test messages
            await client.publish("humidity/inside", 0.45)
            await client.publish("temperature/inside", 22.0)
            await client.publish("humidity/outside", 0.38)
            await client.publish("temperature/outside", 25.0)

            await asyncio.sleep(1)  # Wait for messages to arrive and be queued

            async for message in messages:
                print(f"Processed message: {message.payload}")

asyncio.run(main())

When you run this, you'll see that the temperature values are always printed before the humidity values, even though we sent them in another order.

Of course you don't necessarily have to decide the priority based on the topic, you could also use the content of the message or anything else.

Does that make it clearer? 😊

thalesmaoa commented 1 year ago

Cristal clear! Thanks!