njouanin / hbmqtt

MQTT client/broker using Python asynchronous I/O
MIT License
795 stars 188 forks source link

HBMQTT broker inside a thread #211

Open interkosmos opened 4 years ago

interkosmos commented 4 years ago

How can I run an HBMQTT broker instance inside a Python thread? The following minimal example doesn’t work:

import asyncio
from threading import Thread
from hbmqtt.broker import Broker

class ExampleThread(Thread):
    def __init__(self):
        super().__init__()
        self.daemon = True
        self.config = {
            'listeners': {
                'default': {
                    'max-connections': 50000,
                    'bind': 'localhost:1883',
                    'type': 'tcp',
                },
            },
            'auth': {
                'allow-anonymous': True,
            },
            'plugins': [ 'auth_anonymous' ],
            'topic-check': {
                'enabled': False
            }
        }
        self.loop = None
        self.broker = None

    async def broker_coroutine(self):
        self.broker = Broker(self.config, self.loop)
        await self.broker.start()
        return self.broker

    async def test_coroutine(self):
        while True:
            await asyncio.sleep(1)
            print('hey!')

    def run(self) -> None:
        print('running ...')
        self.loop.run_forever()
        self.loop.run_until_complete(self.broker.shutdown())
        self.loop.close()

    def start(self):
        print('starting thread ...')
        self.loop = asyncio.new_event_loop()
        print('starting server ...')
        try:
            start_server = asyncio.gather(self.broker_coroutine(),
                                          loop=self.loop)
            self.loop.run_until_complete(start_server)
            broker = start_server.result()[0]
        except KeyboardInterrupt as e:
            self.loop.close()
        except:
            print(traceback.format_exc())
            self.loop.close()

        super().__init__()

if __name__ == '__main__':
    thread = ExampleThread()
    thread.start()

This basic example crashes:

$ python3.7 ./mqtt.py
starting thread ...
starting server ...
Task was destroyed but it is pending!
task: <Task pending coro=<Broker._broadcast_loop() running at venv/lib/python3.7/site-packages/hbmqtt/broker.py:696> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x8027fd890>()]>>
Exception ignored in: <generator object Broker._broadcast_loop at 0x8027d1ed0>
Traceback (most recent call last):
  File "venv/lib/python3.7/site-packages/hbmqtt/broker.py", line 696, in _broadcast_loop
  File "/usr/local/lib/python3.7/asyncio/queues.py", line 161, in get
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 687, in call_soon
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 479, in _check_closed
RuntimeError: Event loop is closed

But running the coroutine test_coroutine() instead just works as intended. If asyncio.set_event_loop(self.loop) is added the thread starts but immediately finishes:

    def start(self):
        print('starting thread ...')
        self.loop = asyncio.new_event_loop()
        # Setting the event loop:
        asyncio.set_event_loop(self.loop)
        print('starting server ...')
        # [...]

Output:

$ python3.7 ./mqtt.py
starting thread ...
starting server ...
starting coroutine ...
FlorianLudwig commented 4 years ago

@interkosmos mixing threads with asyncio is generally not recommended.

If you need to (for example you have a not async written code or something cpu heavy) have a look at ProcessPoolExecutor or ThreadPoolExecutor. They allow you to run code in parallel to async code but keep the async loop in control.

interkosmos commented 4 years ago

You seem to be right. I couldn’t get it running inside a Thread class, but it works using:

    def run(self, loop):
        try:
            future = asyncio.gather(self.broker_coroutine(self.config, loop),
                                    loop=loop)
            loop.run_until_complete(future)
            loop.run_forever()
            loop.run_until_complete(self.broker.shutdown())
            loop.close()
        except KeyboardInterrupt:
            loop.close()
        except:
            loop.close()

    def start(self):
        loop = asyncio.new_event_loop()
        thread = Thread(target=lambda: self.run(loop), daemon=True)
        thread.start()

Therefore, it’s a Python and not an HBMQTT issue.