wialon / gmqtt

Python MQTT v5.0 async client
MIT License
400 stars 52 forks source link

blocking the loop when processing messages for a long time #146

Closed YraganTron closed 1 year ago

YraganTron commented 1 year ago

I have the following example which simulates a long message processing time. When receiving messages more than 2 per second, we will start to have problems. We will not have time to process messages and eventually the loop will be completely blocked. Any call to an endpoint test is doomed to fail. I would like to know how to proceed in such a situation. Maybe you should add some kind of restriction on the number of messages processed in the config?

import uvicorn
import time

from fastapi import FastAPI
from gmqtt import Client

from app.config import settings

def create_app():
    app = FastAPI()
    client = Client(client_id='blabla')
    Test(client=client).init_app(app)

    @app.get('/test')
    async def test():
        return

    return app

async def _handle_mqtt_msg(message):
    print(message)

def on_connect(client, flags, rc, properties):
    client.subscribe('vehicles/v1/+/events/telemetry', qos=1)

async def on_message(client, topic, payload, qos, properties):
    print(payload)
    time.sleep(0.4)

async def publish_some_messages(client):
    await client.publish(topic='test', payload='test')
    print(1)

class Test:
    def __init__(self, client):
        self.client = client
    def init_app(self, app):
        @app.on_event('startup')
        async def startup():
            self.client.on_connect = on_connect
            self.client.on_message = on_message
            await self.client.connect(host=settings.mqtt_host, port=settings.mqtt_port)

        @app.on_event('shutdown')
        async def shutdown():
            await self.client.disconnect()

if __name__ == '__main__':
    port = settings.port
    uvicorn.run('app.app:create_web_app', port=port, access_log=False)

I could do something like this to not block the loop, but the problem is that the delay when calling any endpoint is still saved. Just like if I want to stop the webserver, it will still process messages for some time that it did not have time to process.

import asyncio

import uvicorn
import time

from fastapi import FastAPI
from gmqtt import Client

from app.config import settings

def create_app():
    app = FastAPI()
    client = Client(client_id='blabla')
    Test(client=client).init_app(app)

    @app.get('/test')
    async def test():
        return

    return app

async def _handle_mqtt_msg(message):
    print(message)

def on_connect(client, flags, rc, properties):
    client.subscribe('vehicles/v1/+/events/telemetry', qos=1)

async def _on_message(client, topic, payload, qos, properties):
    print(payload)
    time.sleep(0.4)

async def on_message(client, topic, payload, qos, properties):
    loop = asyncio.get_running_loop()
    loop.create_task(_on_message(client, topic, payload, qos, properties))

async def publish_some_messages(client):
    await client.publish(topic='test', payload='test')
    print(1)

class Test:
    def __init__(self, client):
        self.client = client
    def init_app(self, app):
        @app.on_event('startup')
        async def startup():
            self.client.on_connect = on_connect
            self.client.on_message = on_message
            await self.client.connect(host=settings.mqtt_host, port=settings.mqtt_port)

        @app.on_event('shutdown')
        async def shutdown():
            await self.client.disconnect()

if __name__ == '__main__':
    port = settings.port
    uvicorn.run('app.app:create_web_app', port=port, access_log=False)
Lenka42 commented 1 year ago

@YraganTron if i understood you correctly, despite you use async callback, code inside is absolutely synchronous, so no chance for loop to switch context and process other tasks. Sorry, there are no miracles. What I could advice - run fastapi server in different process. And try to do calback as asynchronous as possible, for example use async db driver.