njouanin / hbmqtt

MQTT client/broker using Python asynchronous I/O
MIT License
800 stars 187 forks source link

Properly cancelling task during pending MQTT subscription #94

Open vlcinsky opened 6 years ago

vlcinsky commented 6 years ago

I am trying to create a task, which consumes data from MQTT topics and does some processing. As this is long term running process, it runs in while True loop and I want to cancel the task properly by sending task.cancel().

However, the cancel() results in exceptions thrown from within RuntimeError: 'Event loop is closed.')"

Here is the code which is adapted from project examples:

import logging
import asyncio

from hbmqtt.client import MQTTClient, ClientException
from hbmqtt.mqtt.constants import QOS_1, QOS_2

logger = logging.getLogger(__file__)
logging.basicConfig(level=logging.DEBUG)

async def reader_coro(mqtt_url, loop):
    C = MQTTClient(loop=loop)
    try:
    await C.connect(mqtt_url)
    await C.subscribe([
        ('$SYS/broker/uptime', QOS_1),
        ('$SYS/broker/load/#', QOS_2),
        ])
    while True:
        message = await C.deliver_message()
        packet = message.publish_packet
        topic_name = packet.variable_header.topic_name
        data = packet.payload.data
        print(f"TOPIC: {topic_name} MESSAGE: {data}")
    except asyncio.CancelledError:
    await C.unsubscribe(['$SYS/broker/uptime', '$SYS/broker/load/#'])
    await C.disconnect()
    logger.debug("DISCONNECTED======================")
    except ClientException as ce:
    logger.exception("Client exception: %s" % ce)

if __name__ == '__main__':
    mqtt_url = 'mqtt://test.mosquitto.org/'
    loop = asyncio.get_event_loop()
    reader = loop.create_task(reader_coro(mqtt_url, loop))
    try:
    loop.run_until_complete(reader)
    except KeyboardInterrupt:
    logger.debug("KEYBOARD INTERRUPT==================")
    reader.cancel()
    logger.debug("READER CANCELLED==================")
    loop.run_until_complete(reader)
    logger.debug("FINISHED LOOP==================")
    finally:
    loop.close()

When run, after Ctrl-C it prints out:

DEBUG:hbmqtt.client:Waiting message delivery
DEBUG:hbmqtt.mqtt.protocol.handler:0 message(s) available for delivery
^CDEBUG:sub.py:KEYBOARD INTERRUPT==================
DEBUG:sub.py:READER CANCELLED==================
DEBUG:hbmqtt.client.plugins.packet_logger_plugin:hbmqtt/IyM^`78qi35Ns?ke -out-> UnsubscribePacket(ts=2017
-10-29 00:11:05.438820, fixed=MQTTFixedHeader(length=42, flags=0x2), variable=PacketIdVariableHeader(pack
et_id=2), payload=<hbmqtt.mqtt.unsubscribe.UnubscribePayload object at 0x7f40d879ea48>)
DEBUG:hbmqtt.client.plugins.packet_logger_plugin:hbmqtt/IyM^`78qi35Ns?ke <-in-- UnsubackPacket(ts=2017-10
-29 00:11:05.476021, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=PacketIdVariableHeader(packet_i
d=2), payload=None)
DEBUG:hbmqtt.mqtt.protocol.handler:Stopping 0 puback waiters
DEBUG:hbmqtt.mqtt.protocol.handler:Stopping 0 pucomp waiters
DEBUG:hbmqtt.mqtt.protocol.handler:Stopping 0 purec waiters
DEBUG:hbmqtt.mqtt.protocol.handler:Stopping 0 purel waiters
DEBUG:hbmqtt.mqtt.protocol.handler:waiting for tasks to be stopped
DEBUG:hbmqtt.client.plugins.packet_logger_plugin:hbmqtt/IyM^`78qi35Ns?ke -out-> DisconnectPacket(ts=2017-
10-29 00:11:05.477688, fixed=MQTTFixedHeader(length=0, flags=0x0), variable=None, payload=None)
DEBUG:hbmqtt.mqtt.protocol.handler:Task cancelled, reader loop ending
DEBUG:hbmqtt.mqtt.protocol.handler:Broker closed connection
DEBUG:hbmqtt.mqtt.protocol.handler:hbmqtt/IyM^`78qi35Ns?ke Reader coro stopped
DEBUG:hbmqtt.mqtt.protocol.handler:Stopping 0 puback waiters
DEBUG:hbmqtt.mqtt.protocol.handler:Stopping 0 pucomp waiters
DEBUG:hbmqtt.mqtt.protocol.handler:Stopping 0 purec waiters
DEBUG:hbmqtt.mqtt.protocol.handler:Stopping 0 purel waiters
DEBUG:hbmqtt.mqtt.protocol.handler:waiting for tasks to be stopped
DEBUG:hbmqtt.mqtt.protocol.handler:closing writer
DEBUG:transitions.core:Initiating transition from state connected to state disconnected...
DEBUG:transitions.core:Exiting state connected. Processing callbacks...
INFO:transitions.core:Exited state connected
DEBUG:transitions.core:Entering state disconnected. Processing callbacks...
INFO:transitions.core:Entered state disconnected
DEBUG:sub.py:DISCONNECTED======================
DEBUG:sub.py:FINISHED LOOP==================
Exception ignored in: <generator object ProtocolHandler.mqtt_deliver_next_message at 0x7f40d87807d8>
Traceback (most recent call last):
  File "/home/javl/devel/tis/dpo-tis-pns-pump/.tox/py36/lib/python3.6/site-packages/hbmqtt/mqtt/protocol/
handler.py", line 466, in mqtt_deliver_next_message
    message = yield from self.session.delivered_message_queue.get()
  File "/home/javl/.pyenv/versions/3.6.1/lib/python3.6/asyncio/queues.py", line 169, in get
    getter.cancel()  # Just in case getter is not done yet.
  File "/home/javl/.pyenv/versions/3.6.1/lib/python3.6/asyncio/base_events.py", line 573, in call_soon
    self._check_closed()
  File "/home/javl/.pyenv/versions/3.6.1/lib/python3.6/asyncio/base_events.py", line 357, in _check_close
d
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
ERROR:asyncio:Task was destroyed but it is pending!
task: <Task pending coro=<ProtocolHandler.mqtt_deliver_next_message() done, defined at /home/javl/devel/t
is/dpo-tis-pns-pump/.tox/py36/lib/python3.6/site-packages/hbmqtt/mqtt/protocol/handler.py:461> wait_for=<
Future cancelled> cb=[_wait.<locals>._on_completion() at /home/javl/.pyenv/versions/3.6.1/lib/python3.6/a
syncio/tasks.py:374]>
$

Is there proper way of stopping pending subscription without such problems?

hongquan commented 6 years ago

You should use this https://docs.python.org/3.6/library/asyncio-eventloop.html#set-signal-handlers-for-sigint-and-sigterm to handle Ctrl+C and do cancel tasks properly.

litnimax commented 6 years ago

The problem is that it cannot be cancelled. Please have a look. This is what a do at init:

        for signame in ('SIGINT', 'SIGTERM'):
            self.loop.add_signal_handler(getattr(signal, signame),
                lambda: asyncio.ensure_future(self.stop()))

And here is my stop function:

async def stop(self):
        logger.info('Stopping mqttrpc...')
        # Check subscriptions
        await self.unsubscribe(self.subscriptions)
        await self.disconnect()
        tasks = [task for task in asyncio.Task.all_tasks() if task is not
                    asyncio.tasks.Task.current_task()]
        list(map(lambda task: task.cancel(), tasks))
        results = await asyncio.gather(*tasks, return_exceptions=True)
        logger.debug('Finished cancelling tasks, result: {}'.format(results))

And this hangs forever:

^C2018-03-17 13:56:24,098 - INFO - mqtt_rpc - Stopping mqttrpc...
2018-03-17 13:56:24,098 - WARNING - hbmqtt.client - Client not connected, waiting for it
2018-03-17 13:56:24,947 - WARNING - hbmqtt.client - MQTT connection failed: OSError("Multiple exceptions: [Errno 61] Connect call failed ('::1', 1883), [Errno 61] Connect call failed ('127.0.0.1', 1883)",)
2018-03-17 13:56:24,947 - DEBUG - transitions.core - Initiating transition from state disconnected to state disconnected...
2018-03-17 13:56:24,947 - DEBUG - transitions.core - Exiting state disconnected. Processing callbacks...
2018-03-17 13:56:24,947 - INFO - transitions.core - Exited state disconnected
2018-03-17 13:56:24,947 - DEBUG - transitions.core - Entering state disconnected. Processing callbacks...
2018-03-17 13:56:24,947 - INFO - transitions.core - Entered state
2018-03-17 13:55:48,808 - WARNING - hbmqtt.client - Reconnection attempt failed: ConnectException(OSError("Multiple exceptions: [Errno 61] Connect call failed ('::1', 1883), [Errno 61] Connect call failed ('127.0.0.1', 1883)",),)
2018-03-17 13:55:50,814 - WARNING - hbmqtt.client - MQTT connection failed: OSError("Multiple exceptions: [Errno 61] Connect call failed ('::1', 1883), [Errno 61] Connect call failed ('127.0.0.1', 1883)",)
2018-03-17 13:55:50,814 - DEBUG - transitions.core - Initiating transition from state disconnected to state disconnected...
2018-03-17 13:55:50,814 - DEBUG - transitions.core - Exiting state disconnected. Processing callbacks...
2018-03-17 13:55:50,814 - INFO - transitions.core - Exited state disconnected
2018-03-17 13:55:50,814 - DEBUG - transitions.core - Entering state disconnected. Processing callbacks...
2018-03-17 13:55:50,815 - INFO - transitions.core - Entered state disconnected
2018-03-17 13:55:50,815 - WARNING - hbmqtt.client - Reconnection attempt failed: ConnectException(OSError("Multiple exceptions: [Errno 61] Connect call failed ('::1', 1883), [Errno 61] Connect call failed ('127.0.0.1', 1883)",),)

When I press CTRL+C I see immediately:

2018-03-17 13:56:24,098 - WARNING - hbmqtt.client - Client not connected, waiting for it

And it will not exit. Please advise.

litnimax commented 6 years ago

As far as I understand the problem is with decorator @mqtt_connected:

def mqtt_connected(func):
    """
        MQTTClient coroutines decorator which will wait until connection before calling the decorated method.
        :param func: coroutine to be called once connected
        :return: coroutine result
    """
    @asyncio.coroutine
    @wraps(func)
    def wrapper(self, *args, **kwargs):
        if not self._connected_state.is_set():
            base_logger.warning("Client not connected, waiting for it")
            yield from self._connected_state.wait()
        return (yield from func(self, *args, **kwargs))
    return wrapper

I this there should be a check if task was cancelled.

litnimax commented 6 years ago

I modified stop function and now it's ok with me.

    async def stop(self):
        logger.info('Stopping mqttrpc...')
        # Check subscriptions
        if self._connected_state.is_set():        
            await self.unsubscribe(self.subscriptions)
            await self.disconnect()
        tasks = [task for task in asyncio.Task.all_tasks() if task is not
                    asyncio.tasks.Task.current_task()]
        list(map(lambda task: task.cancel(), tasks))
        results = await asyncio.gather(*tasks, return_exceptions=True)
        logger.debug('Finished cancelling tasks, result: {}'.format(results))

Just check if _connected_state.is_set() before trying to unsubscribe or disconnect.