sbtinstruments / aiomqtt

The idiomatic asyncio MQTT client, wrapped around paho-mqtt
https://sbtinstruments.github.io/aiomqtt
BSD 3-Clause "New" or "Revised" License
392 stars 71 forks source link

[WIP] transition to anyio #302

Open smurfix opened 1 month ago

smurfix commented 1 month ago

As per https://github.com/sbtinstruments/aiomqtt/discussions/226 here's a draft of an anyio-ified aiomqtt. It abuses paho-mqtt 2.0 by way of subclassing its Client class.

No more threads. No more issues with locking. Trio compatibility.

Tests pass (one failure on asyncio) but there's still a lot to do; checking robustness and handling reconnections and getting typing back up to standard and finishing the paho callback API v2 transition and support for paho-mqtt 2.1 and general clean-up and support for more MQTTv5 features and whatnot.

I need this code in order to replace my moat-mqtt package (which is a hbmqtt port to anyio that shows its age). No guarantees but I plan to keep poking at this.

smurfix commented 1 month ago

Now includes a subscription method that lets you do "local" message processing:

async with Client("test.mosquitto.org") as client:
    async with client.subscription("foo/bar") as msgs:
        async for m in msgs:
            await process(m)
empicano commented 1 month ago

Hi there, very cool to see so much work on this! 👍

This seems to be the go-ahead for our yearly discussion on this subject 😄 For reference, we had a PR in the past (#152) to switch to anyio that we ultimately decided to abandon. In discussion #44, people were reluctant about a switch to anyio, especially if the changes would impact the interface.

All that being said, we focus a lot on keeping aiomqtt as small and maintainable as possible. I can see an internal switch to anyio if it works towards that goal. In the best case, this would be non-breaking. The trio support would be a nice extra, but wouldn't make the switch worth it to me if it increases maintenance cost. I am also still on the edge if it's worth adding another dependency if we get e.g. task groups "for free" once we drop Python <3.11.

@frederikaalund, @JonathanPlasse, what are your thoughts on this? 🙂


Now includes a subscription method that lets you do "local" message processing

We had something similar to this in the past with unfiltered_messages and filtered_messages. People got confused about how it works, among others the fact that this creates multiple queues, and we thus decided to change it to what we have now.

agronholm commented 1 month ago

Is it absolutely necessary to depend on the paho-mqtt client? Can't we instead create a sans-io implementation of the MQTT v5 / v3.1 protocols?

smurfix commented 1 month ago

@agronholm Paho-MQTT at least has a working MQTTv5 implementation and complete data classes for all the fiddly bits; reimplementing all that is not exactly trivial, thus the straightest way to working threads-free async-only code was to anyio-ify it by way of a subclass. I'm the first person to admit that this subclass is way more intrusive than any sane programmer should be comfortable with, but you got to start somewhere.

I do plan to build a reasonable sans-io MQTT core eventually. Ideally I'd then be able to teach the current paho.mqtt.Client class to use that (should be a net reduction in code size …). Assuming that its maintainers are amenable to this, which I'm a bit skeptical about, but we'll see. In the meantime I've submitted a somewhat less intrusive clean-up patch to them.

https://github.com/eclipse/paho.mqtt.python/pull/845

smurfix commented 1 month ago

@empicano asyncio taskgroups may have the same name, but not the same features of anyio taskgroups. For instance, they don't have cancel scopes; you need to cancel the task the group runs in, which is not the same thing. There are other differences which @agronholm is way more qualified than I am to educate people about. ;-)

As to changing the interface, my patch manages not to do that. Admittedly it does this by way of somewhat-invasively subclassing the Paho client but frankly the Paho code could do with a bit of improvement. :-/

As far as "filtered_messages", IMHO it's far more confusing, or rather detrimental to modularity, to require a central "async with client.messages()" loop to do the message dispatching. You can't do a "this task deals with temperatures and that task deals with humidity" pattern that way. It also requires more CPU.

IMHO it's a question of designing the right interface. The old async with filtered_messages() pattern required the caller to do the subscribe/loop-/unsubscribe dance instead of having the context manager handle it, which I can't see a good reason for. It also risked losing messages, because the filter was attached after subscribing instead of beforehand.

NB 9c6a7f4 documents the new async with client.subscription() interface. Personally I'd much rather use my example code than the equivalent global-queue example above it (which is more code and less modular, esp. when you add queue-full-error handling) example above it.

agronholm commented 1 month ago

@agronholm Paho-MQTT at least has a working MQTTv5 implementation and complete data classes for all the fiddly bits; reimplementing all that is not exactly trivial, thus the straightest way to working threads-free async-only code was to anyio-ify it by way of a subclass. I'm the first person to admit that this subclass is way more intrusive than any sane programmer should be comfortable with, but you got to start somewhere.

I do plan to build a reasonable sans-io MQTT core eventually. Ideally I'd then be able to teach the current paho.mqtt.Client class to use that (should be a net reduction in code size …). Assuming that its maintainers are amenable to this, which I'm a bit skeptical about, but we'll see. In the meantime I've submitted a somewhat less intrusive clean-up patch to them.

eclipse/paho.mqtt.python#845

Fair enough.

empicano commented 1 month ago

I hope Frederik will still comment here, he can frankly review this better than me. I'll say that I'd be pretty neat to get rid of the threads and clean up the code, I'm just concerned about maintainability when adding as much code as we're already stretched thin as is. I agree that that sans-io "backend" would be really cool to see in aiomqtt 👍


On the interface topic:

You can't do a "this task deals with temperatures and that task deals with humidity" pattern that way.

Maybe I misunderstand you here, I'd say this example in the documentation does that, or not? The flexibility of this approach was in fact one of the main reasons I pushed for the change.

Generally, I want this library to serve the most basic use case (which I think is a single task and queue) as well as possible, with flexibility to use it for as much as possible after that. I'd like to avoid having multiple queues by default, because when we still had filtered_messages we heared multiple times in issues and discussions that this led to confusion and problems.

Apart from that, I'd expect a fixed number of tasks working a single priority queue to fit better in most use cases where we need to process message concurrently, e.g. because the number of messages coming over different topics is often skewed. What do you think about that?

IMHO it's a question of designing the right interface. The old async with filtered_messages() pattern required the caller to do the subscribe/loop-/unsubscribe dance instead of having the context manager handle it, which I can't see a good reason for. It also risked losing messages, because the filter was attached after subscribing instead of beforehand.

In my opinion, subscriptions and routing should be separate. I admit that context managers for subscriptions are elegant, but most of the time we either don't need to unsubscribe (clean session) or don't want to unsubscribe (persistent session) before disconnection. I want this library to support as many use cases as possible, and dynamic un/subscriptions are part of that. We can still have those via task cancellation with the proposed changes, but I'd like to find a simpler solution.

Let me propose an alternative idea to improve the message handling interface:

router = aiomqtt.Router()

@router.match("humidity/+/inside/#")
async def handle(message, client, *args):  # automatically extract wildcards into *args
    print(message.payload)

async with aiomqtt.Client("test.mosquitto.org", routers=[router]) as client:
    await client.subscribe("humidity/#")
    async for message in client.messages:
        await client.route(message)

Where we can process messages concurrently e.g. like this:

router = aiomqtt.Router()

@router.match("humidity/+/inside/#")
async def handle(message, client, *args):  # automatically extract wildcards into *args
    print(message.payload)

async def work(client):
    async for message in client._messages():
        await client.route(message)

async with aiomqtt.Client("test.mosquitto.org", routers=[router]) as client:
    await client.subscribe("humidity/#")
    async with asyncio.TaskGroup() as tg:
        tg.create_task(work(client))
        tg.create_task(work(client))

(.messages is currently broken, for now we use ._messages() to be able to use it multiple times.)

I opened #304 with a proof of concept. I'd be glad to hear what you think!

smurfix commented 1 month ago

Well, the downside of separating subscription and routing is, as your example demonstrates, that the router needs to match every message to all your wildcard patterns. My code uses the fact that the server already did that work.

As to sans-io – in an ideal world that'd be part of Paho and thus not add to aiomqtt's maintainance burden. We'll see.