sbtinstruments / aiomqtt

The idiomatic asyncio MQTT client, wrapped around paho-mqtt
https://sbtinstruments.github.io/aiomqtt
BSD 3-Clause "New" or "Revised" License
393 stars 71 forks source link

Using aiomqtt with fastAPI routers - AttributeError: 'NoneType' object has no attribute 'publish' #230

Closed tomliptrot closed 1 year ago

tomliptrot commented 1 year ago

Hi,

I'm using aiomqtt in a FastAPI application. I can make it work as expected following your guide here. However, when I try to run it in a larger application that uses routers I run into problems.

My code:

app.py

import fastapi

from router import router

app = fastapi.FastAPI()
app.include_router(router, prefix="/mqtt", tags=["mqtt"])

router.py

import asyncio
import contextlib

import aiomqtt
from fastapi import APIRouter

async def listen(client):
    async with client.messages() as messages:
        await client.subscribe("humidity/#")
        async for message in messages:
            print(message.payload)

client = None

@contextlib.asynccontextmanager
async def lifespan(app):
    global client
    async with aiomqtt.Client("test.mosquitto.org") as c:
        # Make client globally available
        client = c
        # Listen for MQTT messages in (unawaited) asyncio task
        loop = asyncio.get_event_loop()
        task = loop.create_task(listen(client))
        yield
        # Cancel the task
        task.cancel()
        # Wait for the task to be cancelled
        try:
            await task
        except asyncio.CancelledError:
            pass

router = APIRouter(lifespan=lifespan)

@router.get("/")
async def publish():
    await client.publish("humidity/outside", 0.38)
    await client.publish("humidity/outside", 0.38)

Running this and then calling the endpoint gives the following error:

  File "XXX/router.py", line 42, in publish
    await client.publish("humidity/outside", 0.38)
AttributeError: 'NoneType' object has no attribute 'publish'

There is clearly some issue with where the global variable client. Any advice on how to solve this would be much appreciated.

tomliptrot commented 1 year ago

On further investigation, this appears to be an issue with FastAPI. Lifespan events are not supported by routers. See https://github.com/tiangolo/fastapi/discussions/9664

Possible workaround that occur to me:

  1. do it all on the main app. This seems a bit messy to me.
  2. use the, supposedly deprecated, startup_event.

I've tried the second, but can't get that to work:

router.py

client = None

@router.on_event("startup")
async def startup_event():
    print("Starting MQTT client")
    global client
    async with aiomqtt.Client("test.mosquitto.org") as c:
        # Make client globally available
        client = c
        # Listen for MQTT messages in (unawaited) asyncio task
        loop = asyncio.get_event_loop()
        loop.create_task(listen(client))

I now get this error:

    raise MqttCodeError(info.rc, "Could not publish message")
aiomqtt.error.MqttCodeError: [code:4] The client is not currently connected.

Any ideas why this is happening?

empicano commented 1 year ago

Hi there!

That's good to know that the approach doesn't work for FastAPI routers. As you said, the best way forward would be to bring this up with the FastAPI team.

With regards to your workaround: The aiomqtt client connects inside the startup_event function, but then also exits the context manager again before the function returns, which disconnects the client.

The trick with our lifespan example is that we yield inside the context manager. This way the client connects, then all the requests are handled, and only after the FastAPI application exits does the yield "return" so to say and we exit the aiomqtt context manager.

I haven't worked with FastAPI in a bit. If there's a corresponding "shutdown" event, you could use the context manager's __aenter__ and __aexit__ methods directly (call __aenter__ during startup and __aexit__ during shutdown). We do not recommend this approach, it's a workaround and a bit tricky to get right. The best option would be that FastAPI supports lifespans for routers.

Let me know if that helps! 😊

tomliptrot commented 1 year ago

Thanks so much for your quick response. I've done what you suggested (called __aenter__ during startup and __aexit__ during shutdown) and it seems to be working. Although I am getting errors when I shutdown

This is how I did it:

# router.py

@router.on_event("startup")
async def startup_event():
    print("Starting MQTT client")
    global client
    client = aiomqtt.Client("test.mosquitto.org")
    await client.__aenter__()
    print("MQTT client started")
    loop = asyncio.get_event_loop()
    loop.create_task(listen(client))

@router.on_event("shutdown")
async def shutdown_event():
    print("Stopping MQTT client")
    await client.__aexit__(None, None, None)
    print("MQTT client stopped")

It works fine when the app is running, but on shutdown I get this error:

Stopping MQTT client
Task exception was never retrieved
future: <Task finished name='Task-4' coro=<listen() done, defined at /Users/tomliptrot/tutorials/aiomqtt-test/router.py:55> exception=MqttError('Disconnected during message iteration')>
Traceback (most recent call last):
  File "XXX/aiomqtt-test/router.py", line 58, in listen
    async for message in messages:
  File "XXX/aiomqtt-test/.venv/lib/python3.10/site-packages/aiomqtt/client.py", line 684, in _generator
    raise MqttError(msg)
aiomqtt.error.MqttError: Disconnected during message iteration
MQTT client stopped
empicano commented 1 year ago

Looks good so far 👍 You need to keep a reference to the listener task you create and call cancel() on it during shutdown. The original lifespan example and this section in the docs show how.

tomliptrot commented 1 year ago

Like this with another global variable?:

client = None
task = None

@router.on_event("startup")
async def startup_event():
    print("Starting MQTT client")
    global client
    client = aiomqtt.Client("test.mosquitto.org")
    await client.__aenter__()
    print("MQTT client started")
    loop = asyncio.get_event_loop()
    global task
    task = loop.create_task(listen(client))

@router.on_event("shutdown")
async def shutdown_event():
    print("Stopping MQTT client")
    task.cancel()
    await client.__aexit__(None, None, None)
    print("MQTT client stopped")

Yep, that worked. No more errors.

When you say you don't recommend this approach, why is that? What are the risks/downsides?

empicano commented 1 year ago

Great 👍😎

Context managers ensure that the setup and teardown code of an external resource is executed at least, and at most once.

There are a few cases where this can go wrong when using __aenter__ and __aexit__ directly:

If we throw an exception inside the shutdown_event function before we have the chance to call __aexit__, the resource won't be properly closed -- the client stays connected. This may not be an immediate problem for a simple use case, but if we deal with many connections this can become a problem. (That's why we manage files with context managers in Python as well, we don't want to forget to close them and run out of memory.)

Similarly, if we have complex code, we might forget to call __aexit__ in some obscure execution path. Or call it multiple times. Or call __aexit__ before __aenter__ etc.

tomliptrot commented 1 year ago

That makes perfect sense. Thanks so much @empicano for solving my issue and your prompt and incredibly clear explanation.

Let me know if I can do anything to help with this excellent library.

frederikaalund commented 1 year ago

Well done, @empicano! 😄👍