sbtinstruments / aiomqtt

The idiomatic asyncio MQTT client, wrapped around paho-mqtt
https://sbtinstruments.github.io/aiomqtt
BSD 3-Clause "New" or "Revised" License
392 stars 71 forks source link

More than one subscription, routing it to a specific handler #261

Open skinkie opened 6 months ago

skinkie commented 6 months ago

The example denotes the following.

async with Client("test.mosquitto.org") as client:
    async with client.messages() as messages:
        await client.subscribe("humidity/#")
        async for message in messages:
            print(message.payload)

If a user wants to also test for temperature:

async with Client("test.mosquitto.org") as client:
    async with client.messages() as messages:
        await client.subscribe("humidity/#")
        await client.subscribe("temperature/#")
        async for message in messages:
            if message.topic.startswith('humidity'):
                print(message.payload)
            elif message.topic.startswith('temperature'):
                print(message.payload)

Is there any way to do this more elegantly? Maybe even in a concurrent way?

Pseudo:

async with Client("test.mosquitto.org") as client:
    async with client.messages() as messages:
        await client.subscribe("humidity/#")
        async for message in messages:
            print(message.payload)

    async with client.messages() as messages:
        await client.subscribe("temperature/#")
        async for message in messages:
            print(message.payload)
empicano commented 6 months ago

Hi there Stefan,

I'll respond to your question of handling incoming messages concurrently. We can also happily discuss how to make aiomqtt more elegant if you have specific ideas!

The second admonition in the "Message queue" section of our docs shows a small example of how to handle each message in a separate coroutine. However, as far as I understand your pseudocode, you'd like to have one coroutine per subscription instead of per message.

We'll need to dive a bit into asyncio for this πŸ˜‹ The idea is to implement a "distributor" that sorts the incoming messages into different asyncio queues, which are then processed concurrently (but sequentially inside a subscription). Here's a minimal working example:

import asyncio
import aiomqtt

async def fast_producer(client: aiomqtt.Client):
    while True:
        await asyncio.sleep(0.2)
        await client.publish("fast", "fast")

async def fast_consumer():
    while True:
        message = await fast_queue.get()
        print(f"Fast consumer received: {message.payload}")

async def slow_producer(client: aiomqtt.Client):
    while True:
        await asyncio.sleep(2)
        await client.publish("slow", "slow")

async def slow_consumer():
    while True:
        message = await slow_queue.get()
        print(f"Slow consumer received: {message.payload}")

fast_queue = asyncio.Queue()
slow_queue = asyncio.Queue()

async def distributor(client: aiomqtt.Client):
    async with client.messages() as messages:
        await client.subscribe("fast")
        await client.subscribe("slow")
        # Sort messages into the appropriate queues
        async for message in messages:
            if message.topic.matches("fast"):
                fast_queue.put_nowait(message)
            elif message.topic.matches("slow"):
                slow_queue.put_nowait(message)

async def main():
    async with aiomqtt.Client("test.mosquitto.org") as client:
        # Use a task group to manage and await all tasks
        async with asyncio.TaskGroup() as tg:
            tg.create_task(fast_producer(client))
            tg.create_task(fast_consumer())
            tg.create_task(slow_producer(client))
            tg.create_task(slow_consumer())
            tg.create_task(distributor(client))

asyncio.run(main())

Does that make sense? πŸ™‚ Issue #250 is similar and could have some more context.

skinkie commented 6 months ago

@empicano what I would love to avoid is the if .. elif-construction, and apply a more functional (or for example with aiogram filter/decorator) approach. I mean, if you create the distributor you showed in the example, you will still get the second variant. For this specific project (a telegram bot) I don't really need an extremely high performance coroutine. In a different project I discussed with the author of flashmq, what would be the best way to have very big (far bigger than regular burst of the retained messages), his suggestion was to create separate subscriptions like sharding the load. I wonder if there is a way to avoid 'multiple clients' and have this handled under the hood.

empicano commented 6 months ago

We had aiomqtt.filtered_messages before (in fact, this is still in the code, but deprecated now). This turned out to be more complex and less understandable than it should be. Simple if statements are easy and flexible. You can read a bit more about this design decision in this PR.

I agree with you however that aiomqtt is not yet as elegant as it could be.

skinkie commented 5 months ago

@empicano was there a change?

empicano commented 5 months ago

Hi Stefan,

I should have commented something before closing, sorry about that. I'm currently weeding out issues to get an overview of what to work on.

Do you have specific changes to aiomqtt that you still want to discuss?

skinkie commented 5 months ago

If you say "just solve it with if-then-else" that is fine for me. But I wish some way of routing would exists based on on the semantics that is allowed making subscriptions (the + #). Ideally a bit bigger than that so a single subscription could be used as well, with matching happening inside of the application. Might even be more elegant that the actual subscription would not be required at all, and the annotations figuring out the 'best' subscription(s).

empicano commented 5 months ago

You probably know this, but the Topic.matches method can match wildcards as well, e.g. Topic.matches("foo/+/bar/#").

I'm happy to discuss possible improvements to the interface. What matters most to me that it's intuitive. Could you provide some (pseudo-)code of how your ideal interface would look like?

skinkie commented 5 months ago

You probably know this, but the Topic.matches method can match wildcards as well, e.g. Topic.matches("foo/+/bar/#").

Did not know this :-) I literally redo all those things manually so it would be already an improvement.

I'm happy to discuss possible improvements to the interface. What matters most to me that it's intuitive. Could you provide some (pseudo-)code of how your ideal interface would look like?

@subscription(topic="/temperature/#')
async def handleTemperature(message: Message):
    print(message.payload)

@subscription(topic="/humidity/{room}/#') # Analogue to aiohttp's @routes.post('/{room}/something.xml')
async def handleHumidity(message: Message, room: str):
    print(message.payload)

With the above it may become obvious that creating two subscription would be the best towards the server, and a subscription on '/#' wasteful.

empicano commented 5 months ago

Interesting, this seems similar to how fastapi-mqtt is designed, and in general many web frameworks.

Some thoughts on this:

I guess what could work is to pass a handler when calling subscribe and assigning the messages to the correct handler under the hood instead of letting the user do that with if/else (pseudocode):

async def handle_temperature(message):
    print(message.payload)

async def handle_humidity(message, room):
    print(message.payload)

async with Client("test.mosquitto.org") as client:
    await client.subscribe("temperature/#", handle_temperature)
    await client.subscribe("humidity/{room}/#", handle_humidity)
    # Block here or do something else such that we don't exit the context manager

However, this would be a big change to the interface that I'm not sure is worth it. Some more intricate ways of prioritizing and handling messages concurrently would for example be more difficult to achieve. I believe that one of aiomqtt's strengths is that it's very flexible. The if/else design is simple, but I'm reluctant to move away from it, because it's so intuitive and flexible.

@frederikaalund, what do you think about this? 😊

smurfix commented 1 month ago

my PR https://github.com/sbtinstruments/aiomqtt/pull/302 has code that does this.

async def get_temperature(client):
    async with client.subscription("temperature/#") as sub:
        async for msg in sub:
            await handle_temperature(msg)
    print(message.payload)

async with Client("test.mosquitto.org") as client, anyio.create_task_group() as tg:
    tg.start_soon(get_temperature, client)
    tg.start_soon(get_humidity, client)

The "old" subscribe+async-iterate-client.messages still works. I'm using subscription identifiers, so no expensive topic comparisons required.

skinkie commented 1 month ago

@smurfix how does it work? Does it make a new connection for every subscription? Does it do any deduplication work, for example if a subscription to # is made, and a subscription to something/#, is only one upstream subscription created?

smurfix commented 1 month ago

The MQTT protocol has a nice feature where you can send a code along with each subscription; the server adds the codes of all matching subscriptions to the PUBLISH packet that it sends to you. My patch uses a unique code for each with client.Subscription block, then I simply go through the incoming PUBLISH's list and forward the message to all queues that want it. Traditional subscribe requests get a static code of 1.

This of course assumes that no user of aiomqtt uses this subscription feature directly … which is moderately unlikely, since the server might not support it and the Connect Ack handler didn't save whether it does or not.

If the server doesn't support these codes I have a fallback implementation that dispatches on the message topic in the client.

skinkie commented 1 month ago

But if I have two 'processes' that use the data from the same subscription, that subscription is not 'reused' from the MQTT server perspective, there is a subscription for each time client.subscription("temperature/#") is created?

smurfix commented 1 month ago

a subscription for each time client.subscription("temperature/#") is created

That part doesn't work because you can't ask the MQTT server for two (non-shared) subscriptions on the same topic. That's per the MQTT protocol specification.

I thought about implementing this (simply attach more than one queue to the topic) but if you try you run into interesting problems (does the second task want retained messages? does the server even send them when you re-SUBSCRIBE? if so, shall the existing task get these too? which QoS should be used and what about the other properties you can attach to a subscription?) which I don't have a good answer for, thus for now the code doesn't allow duplicate subscriptions.

What you can do is for one task to ask for $share/foo/temperature/#, and the other ask for $share/bar/temperature/# (or to use the unshared topic). Messages to temperature/basement/heating will then be delivered to both.

Assuming, of course, that you use MQTT5 and that the server supports shared subscriptions.

skinkie commented 1 month ago

I just wonder how it will go with the more interesting cases. I think the variant 'temperature/#' (storage in database) and 'temperature/site_no_1' (streaming to a client) needs the explanation. Will there be one subscription or two?

smurfix commented 1 month ago

@skinkie Well if one task calls async with client.subscription("temperature/#") and the other does async with client.subscription("temperature/somewhere") then obviously that's two subscriptions.

A reasonable server will send us messages to "temperature/somewhere exactly once, tagged with both subscription IDs.

skinkie commented 1 month ago

@smurfix I was not aware this subscription id could be duplicate on a message level as well. In that case this is a very clean implementation. My rationale was that the subscribing client was responsible for the deduplication of the inter process communication. Now you basically have pushed this to the server by using the subscription IDs.

smurfix commented 1 month ago

Or rather I have asked the server to please tell us about the result of its dispatching effort.

There's no deduplication involved – at least unless you also consider shared subscriptions. Those are separate from unshared subscriptions, thus when you have both shared and unshared subscriptions that match a message, the server is free to send the same message twice β€” mosquitto does that.

You can't handle the resulting ambiguity without using subscription IDs.