sbtinstruments / aiomqtt

The idiomatic asyncio MQTT client, wrapped around paho-mqtt
https://sbtinstruments.github.io/aiomqtt
BSD 3-Clause "New" or "Revised" License
392 stars 71 forks source link

No convenient way to get message without getting locked into a for loop #298

Closed skewty closed 1 month ago

skewty commented 1 month ago

Use-case I need to listen to MQTT messages until I get a signal from an external non MQTT system to stop.

If I use the async for client.messages I am locked in the for loop until MQTT is disconnected or a message arrives. Neither are likely to trigger for hours or days in my scenario.

I can't client.disconnect() because that isn't implemented. Even if it did though (which I think it should) calling client.disconnect() seems wrong / excessive (it generates an MqttError after all). There is no error. I want a graceful shutdown.

So I guess what I am proposing / have in mind something like:

import asyncio
from aiomqtt import Client, Message, MqttError

class DesiredClient(Client):
    def disconnect(self) -> None:
        rc = self._client.disconnect()
        if rc != MQTTErrorCode.MQTT_ERR_SUCCESS:
            raise MqttCodeError(rc)

    async def get_message(self) -> Message:
        task = self._loop.create_task(self._queue.get())
        try:
            done, _ = await asyncio.wait((task, self._disconnected), return_when=asyncio.FIRST_COMPLETED)
        except asyncio.CancelledError:
            task.cancel()
            raise

        if task in done:
            return task.result()

        task.cancel()
        raise MqttError("Disconnected while waiting for message")

I also looked into using the raw iterator outside the Client class but it seems wrong and is ugly

# save iterator and use it again next time 
some_other_class.stored_iterator = aiter(aiomqtt_client.messages)
# of course this isn't "safe" as it doesn't watch for client disconnect and can get stuck
# and _disconnected (what I should watch to be safe) is protected / unavailable
next_message: Message = await anext(some_other_class.stored_iterator)
frederikaalund commented 1 month ago

Hi skewty, thank you for opening this issue. 👍 Let me have a look.

Use-case I need to listen to MQTT messages until I get a signal from an external non MQTT system to stop.

Sounds like a reasonable use case.

Have you considered using task cancellation? It seems ideal for what you want to achieve. Something like this:

async def process_messages(client: Client) -> None:
    async for message in client.messages:
        ...

async with Client(...) as client, asyncio.TaskGroup() as tg:
    task = tg.create_task(process_messages(client))  # This schedules the task to run in the background
    await your_signal_arrives
    task.cancel()  # This stops the infinite loop inside process_messages

You can do something similar even without asyncio.TaskGroup (if you don't have python 3.11 or later).

Let me know if that helps you or not. :)

skewty commented 1 month ago

The log / logging messages are less useful but it works. Thank you.