njouanin / hbmqtt

MQTT client/broker using Python asynchronous I/O
MIT License
799 stars 188 forks source link

Client not closing connections on connect failure (rejection) #188

Open d21d3q opened 5 years ago

d21d3q commented 5 years ago

description

When connection is being rejected by broker, client opens new socket one without closing previous.

steps to reproduce/observe

run example client_publish.py with broker url:

yield from C.connect('mqtt://nobody:nopassword@test.mosquitto.org:1883/')

Broker will reject connections:

Connection rejected with code '5'
Connection failed: ConnectException('Connection rejected by broker')

(however rabbitmq rejects connections with code 4)

Here is one liner to run example and watch increasing number of open sockets over time:

python client_publish.py > /dev/null 2>&1 & CLIENT_PUBLISH_PID=$! && watch -n 1 "lsof -p $CLIENT_PUBLISH_PID | grep TCP | wc -l"

This become a problem (ticking bomb) when using library with infinite reconnection (reconnect_retries set to -1). After some time (depending on reconnect_max_interval) whole program will crash due to limit of open files.

I have time and resources (and need :) ) to fix it, but I would need to be guided, especially here. I was asking previously in #175 about reason why was it commented out. What was direction of implementing disconnect routine. Was that experimental code? not working? Or planned to be included in future?

d21d3q commented 5 years ago

Similar problem occurs when connection is closed by server:

Steps to reproduce: run broker (mosquitto) locally: run client_publish.py example with following test_coro:

@asyncio.coroutine
def test_coro():
    C = MQTTClient()
    yield from C.connect('mqtt://localhost:1883/')

    while True:
        yield from C.publish('a/b', b'TEST MESSAGE WITH QOS_0')
        yield from asyncio.sleep(1)

    yield from C.disconnect()

stop broker. When running with oneliner which counts opened files:

python client_publish.py > /dev/null 2>&1 & CLIENT_PUBLISH_PID=$! && watch -n 1 "lsof -p $CLIENT_PUBLISH_PID | grep TCP | wc -l"

after stopping broker, connection should be closed, but it is not. On lost connection handling closed connection is letting disconnect waiter to continue and handler.stop() is not being called since it was moved here.

rharder commented 4 years ago

Is that what's going on here? I have no way to detect when a connection to a broker goes down. Here's some sample code -- if I kill the broker process while the code is awaiting deliver_message(), it just hangs:

import asyncio
import sys
import traceback

from hbmqtt.client import MQTTClient
from hbmqtt.mqtt.constants import QOS_1

async def run():
    try:
        url = "mqtt://localhost"
        mqtt = MQTTClient()
        await mqtt.connect(url)
        await mqtt.subscribe([("#", QOS_1)])

        print("Awaiting message... (if you kill mqtt broker here, deliver_message never returns)")
        msg = await mqtt.deliver_message()
        print("Message:", msg)

        await mqtt.disconnect()

    except Exception as ex:
        print(type(ex), ex)
        traceback.print_tb(sys.exc_info()[2])

if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(run())

Output looks like this:

Awaiting message... (if you kill mqtt broker here, deliver_message never returns)
ClientProtocolHandler Unhandled exception in reader coro: IncompleteReadError('0 bytes read on a total of 1 expected bytes')
Disconnected from broker
MQTT connection failed: ConnectionRefusedError(10061, "Connect call failed ('10.8.32.10', 1883)")
Reconnection attempt failed: ConnectException(ConnectionRefusedError(10061, "Connect call failed ('10.8.32.10', 1883)"))
MQTT connection failed: ConnectionRefusedError(10061, "Connect call failed ('10.8.32.10', 1883)")
Reconnection attempt failed: ConnectException(ConnectionRefusedError(10061, "Connect call failed ('10.8.32.10', 1883)"))
MQTT connection failed: ConnectionRefusedError(10061, "Connect call failed ('10.8.32.10', 1883)")
Reconnection attempt failed: ConnectException(ConnectionRefusedError(10061, "Connect call failed ('10.8.32.10', 1883)"))
Maximum number of connection attempts reached. Reconnection aborted

And it hangs forever.

-Rob

dfontenot commented 3 years ago

Running into a similar issue with my setup. I am running hbmqtt 0.9.6 dockerized and I have a health check set to run nc -zv 127.0.0.1 1883. Each of these I get a stack trace from hbmqtt, which I would not expect unless I had turned on debug logs.

As it can be seen, the number of open connections is rising from the health check (not having any other clients connect during this test):

[2021-06-06 04:39:47,805] :: INFO - Listener 'my-tcp': 290 connections acquired
[2021-06-06 04:39:47,805] :: INFO - Connection from 127.0.0.1:38403 on listener 'my-tcp'
[2021-06-06 04:39:47,805] :: ERROR - Task exception was never retrieved
future: <Task finished name='Task-1992' coro=<Broker.stream_connected() done, defined at /usr/lib/python3.8/site-packages/hbmqtt/broker.py:342> exception=IncompleteReadError('0 bytes read on a total of 1 expected bytes')>
Traceback (most recent call last):
  File "/usr/lib/python3.8/site-packages/hbmqtt/broker.py", line 344, in stream_connected
    yield from self.client_connected(listener_name, StreamReaderAdapter(reader), StreamWriterAdapter(writer))
  File "/usr/lib/python3.8/site-packages/hbmqtt/broker.py", line 359, in client_connected
    handler, client_session = yield from BrokerProtocolHandler.init_from_connect(reader, writer, self.plugins_manager, loop=self._loop)
  File "/usr/lib/python3.8/site-packages/hbmqtt/mqtt/protocol/broker_handler.py", line 129, in init_from_connect
    connect = yield from ConnectPacket.from_stream(reader)
  File "/usr/lib/python3.8/site-packages/hbmqtt/mqtt/packet.py", line 225, in from_stream
    fixed_header = yield from cls.FIXED_HEADER.from_stream(reader)
  File "/usr/lib/python3.8/site-packages/hbmqtt/mqtt/packet.py", line 104, in from_stream
    byte1 = yield from read_or_raise(reader, 1)
  File "/usr/lib/python3.8/site-packages/hbmqtt/codecs.py", line 52, in read_or_raise
    data = yield from reader.read(n)
  File "/usr/lib/python3.8/site-packages/hbmqtt/adapters.py", line 145, in read
    data = yield from self._reader.readexactly(n)
  File "/usr/lib/python3.8/asyncio/streams.py", line 721, in readexactly
    raise exceptions.IncompleteReadError(incomplete, n)
asyncio.exceptions.IncompleteReadError: 0 bytes read on a total of 1 expected bytes