wialon / gmqtt

Python MQTT v5.0 async client
MIT License
401 stars 52 forks source link

Client remains active after disconnect() #79

Open stefangotz opened 4 years ago

stefangotz commented 4 years ago

Hi!

My usecase is to use gmqtt with an MQTT broker over an unreliable link, so it can be unavailable for quite a bit or the MQTT TCP connection can be in weird and wonderful states. Furthermore, I need to completely shutdown and discard a gmqtt client and all its resources and tasks if it is unable to connect to such an MQTT broker.

One particular slightly strange connection state can be simulated with SSH port forwarding. On Linux and friends, the command ssh -L 2000:localhost:1 localhost opens local port 2000 and connects it to the closed port 1, simulating a case where you can connect to port 2000, but can't actually get data through.

I've been using code roughly similar to the simplified attachment test.py.txt to make this work with v0.4.5. The attached code produces the following output with v0.4.5:

INFO:gmqtt.mqtt.protocol:[CONNECTION MADE]
WARNING:gmqtt.mqtt.protocol:[EXC: CONN LOST]
Traceback (most recent call last):
  File "/opt/Python-3.7/lib/python3.7/asyncio/selector_events.py", line 804, in _read_ready__data_received
    data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer
DEBUG:gmqtt.mqtt.handler:[CMD 0xe0] b''
DEBUG:root:_on_disconnect(exc='None')
DEBUG:root:wait for connect() returned, done: 1, pending: 1
DEBUG:root:Client.connect() completed: 0
DEBUG:root:_DISC.wait() completed: 1
DEBUG:root:_DISC.is_set(): 1
DEBUG:root:Cancelling pending tasks
DEBUG:root:Cancelled pending tasks
DEBUG:root:Wait for disconnect()
WARNING:gmqtt.mqtt.protocol:[TRYING WRITE TO CLOSED SOCKET]
DEBUG:root:disconnect() returned
DEBUG:root:waiting for 20 seconds
DEBUG:root:waited for 20 seconds; exiting _main()

So while client.connect() doesn't return, that's ok because I can see from the call to on_disconnect() that there is on connection. After calling client.disconnect() the client is no longer active, which is what I want.

With v0.5, the output is as follows:

INFO:gmqtt.mqtt.protocol:[CONNECTION MADE]
DEBUG:gmqtt.mqtt.protocol:[RECV EMPTY] Connection will be reset automatically.
INFO:gmqtt.mqtt.protocol:[CONN CLOSE NORMALLY]
DEBUG:gmqtt.mqtt.handler:[CMD 0xe0] b''
DEBUG:root:_on_disconnect(exc='None')
WARNING:gmqtt.mqtt.protocol:[TRYING WRITE TO CLOSED SOCKET]
DEBUG:root:wait for connect() returned, done: 1, pending: 1
DEBUG:root:Client.connect() completed: 0
DEBUG:root:_DISC.wait() completed: 1
DEBUG:root:_DISC.is_set(): 1
DEBUG:root:Cancelling pending tasks
DEBUG:root:Cancelled pending tasks
DEBUG:root:Wait for disconnect()
WARNING:gmqtt.mqtt.protocol:[TRYING WRITE TO CLOSED SOCKET]
DEBUG:root:disconnect() returned
DEBUG:root:waiting for 20 seconds
INFO:gmqtt.mqtt.protocol:[CONNECTION MADE]
WARNING:gmqtt.mqtt.protocol:[EXC: CONN LOST]
Traceback (most recent call last):
  File "/opt/Python-3.7/lib/python3.7/asyncio/selector_events.py", line 804, in _read_ready__data_received
    data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer
DEBUG:gmqtt.mqtt.handler:[CMD 0xe0] b''
DEBUG:root:_on_disconnect(exc='None')
WARNING:gmqtt.mqtt.protocol:[TRYING WRITE TO CLOSED SOCKET]
INFO:gmqtt.mqtt.protocol:[CONNECTION MADE]
WARNING:gmqtt.mqtt.protocol:[EXC: CONN LOST]
Traceback (most recent call last):
  File "/opt/Python-3.7/lib/python3.7/asyncio/selector_events.py", line 804, in _read_ready__data_received
    data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer
DEBUG:gmqtt.mqtt.handler:[CMD 0xe0] b''
DEBUG:root:_on_disconnect(exc='None')
WARNING:gmqtt.mqtt.protocol:[TRYING WRITE TO CLOSED SOCKET]
INFO:gmqtt.mqtt.protocol:[CONNECTION MADE]
WARNING:gmqtt.mqtt.protocol:[EXC: CONN LOST]
Traceback (most recent call last):
  File "/opt/Python-3.7/lib/python3.7/asyncio/selector_events.py", line 804, in _read_ready__data_received
    data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer
DEBUG:gmqtt.mqtt.handler:[CMD 0xe0] b''
DEBUG:root:_on_disconnect(exc='None')
WARNING:gmqtt.mqtt.protocol:[TRYING WRITE TO CLOSED SOCKET]
DEBUG:root:waited for 20 seconds; exiting _main()
ERROR:asyncio:Exception in callback MqttPackageHandler._handle_exception_in_future(<Task cancell...lient.py:174>>)
handle: <Handle MqttPackageHandler._handle_exception_in_future(<Task cancell...lient.py:174>>)>
Traceback (most recent call last):
  File "/opt/Python-3.7/lib/python3.7/asyncio/events.py", line 88, in _run
    self._context.run(self._callback, *self._args)
  File "/home/veefil/.local/lib/python3.7/site-packages/gmqtt/mqtt/handler.py", line 208, in _handle_exception_in_future
    if not future.exception():
concurrent.futures._base.CancelledError

This indicates that after having called client.disconnect() the gmqtt client still has some async call-backs going on and at least with the regular API, I couldn't figure out how to make those stop.

I'm not sure whether that behavior is intentional or whether I missed something, so I just wanted to let you know that it seems that v0.5.0 doesn't quite seem to serve the particular use case I'm after. Are there any plans to change this behavior again in the future?

Lenka42 commented 4 years ago

@stefangotz Thanks for letting us know. Setup is strange indeed, we didn't perform tests like this. I will try to figure out what went wrong in latest version.

honglei commented 4 years ago

Env: Win7 64 /python3.7.5 /Tornado6 /gmqtt 0.5.1

import socket
import asyncio
import logging

from tornado.ioloop import IOLoop,PeriodicCallback

import gmqtt
class GMQTT(object):
    def __init__(self):
        self.log =  logging.getLogger(__name__)
        self.log.setLevel(logging.DEBUG)
        self.mqtt_broker_host = "localhost"
        self.mqtt_broker_port = 1883
        self.dst_topic = "a/b"

        self.period_callback = None

        self.loop = IOLoop.instance() #

        ## self.loop.asyncio_loop == 
        mqtt_client = gmqtt.Client(client_id="xxxx" )
        mqtt_client.on_connect = self.on_connect
        mqtt_client.on_message = self.on_message
        mqtt_client.on_disconnect = self.on_disconnect
        #mqtt_client.on_subscribe = self.on_subscribe
        self.mqtt_client = mqtt_client
        self.index = 0

    async def period_check(self):
        '''
            send msg periodically
        '''
        if self.mqtt_client.is_connected:
            self.mqtt_client.publish(self.dst_topic, f"Hello:{self.index}".encode() )
            self.index +=1

    def on_message(self, client, topic, payload, qos, properties):
        self.log.info(f'MQTT.Recv {client._client_id}] TOPIC: {topic} PAYLOAD: {payload} QOS: {qos} PROPERTIES: {properties}'
                         )    
    def on_connect(self, client, flags, rc, properties):
        self.log.info('[MQTT.CONNECTED {}]'.format(client._client_id))    
        self.mqtt_client.subscribe( self.dst_topic )

    def on_disconnect(self, client, packet, exc=None):
        self.log.info('[MQTT.DISCONNECTED {}]'.format(client._client_id))   

    async def unreg(self):
        if self.mqtt_client.is_connected:
            self.mqtt_client.unsubscribe( self.dst_topic )
            await self.mqtt_client.disconnect()

    async def reg(self):
        await self.mqtt_client.connect(self.mqtt_broker_host)

        if not self.period_callback :
            self.period_callback = PeriodicCallback(self.period_check,
                                                    callback_time=1000) #Unit:milliseconds
        if not self.period_callback.is_running():
            self.period_callback.start()        

if __name__ == "__main__":
    c = GMQTT()
    c.loop.run_sync(c.reg)
    c.loop.call_later(10, c.unreg)
    c.loop.start()

Result:

INFO:__main__:[MQTT.CONNECTED xxxx]
INFO:__main__:MQTT.Recv xxxx] TOPIC: a/b PAYLOAD: b'Hello:0' QOS: 0 PROPERTIES: {'dup': 0, 'retain': 0}
INFO:__main__:MQTT.Recv xxxx] TOPIC: a/b PAYLOAD: b'Hello:1' QOS: 0 PROPERTIES: {'dup': 0, 'retain': 0}
INFO:__main__:MQTT.Recv xxxx] TOPIC: a/b PAYLOAD: b'Hello:2' QOS: 0 PROPERTIES: {'dup': 0, 'retain': 0}
INFO:__main__:MQTT.Recv xxxx] TOPIC: a/b PAYLOAD: b'Hello:3' QOS: 0 PROPERTIES: {'dup': 0, 'retain': 0}
INFO:__main__:MQTT.Recv xxxx] TOPIC: a/b PAYLOAD: b'Hello:4' QOS: 0 PROPERTIES: {'dup': 0, 'retain': 0}
INFO:__main__:MQTT.Recv xxxx] TOPIC: a/b PAYLOAD: b'Hello:5' QOS: 0 PROPERTIES: {'dup': 0, 'retain': 0}
INFO:__main__:MQTT.Recv xxxx] TOPIC: a/b PAYLOAD: b'Hello:6' QOS: 0 PROPERTIES: {'dup': 0, 'retain': 0}
INFO:__main__:MQTT.Recv xxxx] TOPIC: a/b PAYLOAD: b'Hello:7' QOS: 0 PROPERTIES: {'dup': 0, 'retain': 0}
INFO:__main__:MQTT.Recv xxxx] TOPIC: a/b PAYLOAD: b'Hello:8' QOS: 0 PROPERTIES: {'dup': 0, 'retain': 0}
INFO:__main__:[MQTT.DISCONNECTED xxxx]
WARNING:gmqtt.mqtt.protocol:[TRYING WRITE TO CLOSED SOCKET]
INFO:__main__:[MQTT.CONNECTED xxxx]
INFO:__main__:MQTT.Recv xxxx] TOPIC: a/b PAYLOAD: b'Hello:10' QOS: 0 PROPERTIES: {'dup': 0, 'retain': 0}
INFO:__main__:MQTT.Recv xxxx] TOPIC: a/b PAYLOAD: b'Hello:11' QOS: 0 PROPERTIES: {'dup': 0, 'retain': 0}
INFO:__main__:MQTT.Recv xxxx] TOPIC: a/b PAYLOAD: b'Hello:12' QOS: 0 PROPERTIES: {'dup': 0, 'retain': 0}
INFO:__main__:MQTT.Recv xxxx] TOPIC: a/b PAYLOAD: b'Hello:13' QOS: 0 PROPERTIES: {'dup': 0, 'retain': 0}
INFO:__main__:MQTT.Recv xxxx] TOPIC: a/b PAYLOAD: b'Hello:14' QOS: 0 PROPERTIES: {'dup': 0, 'retain': 0}
INFO:__main__:MQTT.Recv xxxx] TOPIC: a/b PAYLOAD: b'Hello:15' QOS: 0 PROPERTIES: {'dup': 0, 'retain': 0}
INFO:__main__:MQTT.Recv xxxx] TOPIC: a/b PAYLOAD: b'Hello:16' QOS: 0 PROPERTIES: {'dup': 0, 'retain': 0}
INFO:__main__:MQTT.Recv xxxx] TOPIC: a/b PAYLOAD: b'Hello:17' QOS: 0 PROPERTIES: {'dup': 0, 'retain': 0}
INFO:__main__:MQTT.Recv xxxx] TOPIC: a/b PAYLOAD: b'Hello:18' QOS: 0 PROPERTIES: {'dup': 0, 'retain': 0}
INFO:__main__:MQTT.Recv xxxx] TOPIC: a/b PAYLOAD: b'Hello:19' QOS: 0 PROPERTIES: {'dup': 0, 'retain': 0}
INFO:__main__:MQTT.Recv xxxx] TOPIC: a/b PAYLOAD: b'Hello:20' QOS: 0 PROPERTIES: {'dup': 0, 'retain': 0}
INFO:__main__:MQTT.Recv xxxx] TOPIC: a/b PAYLOAD: b'Hello:21' QOS: 0 PROPERTIES: {'dup': 0, 'retain': 0}
honglei commented 4 years ago

The reason for this problem is that after disconnect() , a CONNACK message received , which cause link connected again!

track stack:

  File "F:\Python\backend (1)\tests\test\gmqtt_test\util\gmqtt_client.py", line 68, in <module>
    c.loop.start()

  File "t:\ProgramData\Anaconda3\envs\py37win\lib\site-packages\tornado\platform\asyncio.py", line 148, in start
    self.asyncio_loop.run_forever()

  File "t:\ProgramData\Anaconda3\envs\py37win\lib\asyncio\base_events.py", line 534, in run_forever
    self._run_once()

  File "t:\ProgramData\Anaconda3\envs\py37win\lib\asyncio\base_events.py", line 1771, in _run_once
    handle._run()

  File "t:\ProgramData\Anaconda3\envs\py37win\lib\asyncio\events.py", line 88, in _run
    self._context.run(self._callback, *self._args)

  File "t:\ProgramData\Anaconda3\envs\py37win\lib\site-packages\gmqtt\mqtt\protocol.py", line 184, in _read_loop
    parsed_size = self._read_packet(buf)

  File "t:\ProgramData\Anaconda3\envs\py37win\lib\site-packages\gmqtt\mqtt\protocol.py", line 172, in _read_packet
    self._connection.put_package((command, packet))

  File "t:\ProgramData\Anaconda3\envs\py37win\lib\site-packages\gmqtt\mqtt\connection.py", line 37, in put_package
    self._handler(*pkg)

  File "t:\ProgramData\Anaconda3\envs\py37win\lib\site-packages\gmqtt\mqtt\handler.py", line 354, in __call__
    result = self._handle_packet(cmd, packet)

  File "t:\ProgramData\Anaconda3\envs\py37win\lib\site-packages\gmqtt\mqtt\handler.py", line 207, in _handle_packet
    handler(cmd, packet)

  File "t:\ProgramData\Anaconda3\envs\py37win\lib\site-packages\gmqtt\mqtt\handler.py", line 275, in _handle_connack_packet
    self.on_connect(self, flags, result, self.properties)

  File "F:\Python\backend (1)\tests\test\gmqtt_test\util\gmqtt_client.py", line 44, in on_connect
    self.log.info('[MQTT.CONNECTED {}]'.format(client._client_id))

gmqtt_error

Mixser commented 4 years ago

Hi @honglei

Thanks for your investigation. We founded an error in our reconnection logic and will think how to fix it in the best way. We will fix this behavior in next version (approximately tomorrow).

Lenka42 commented 4 years ago

@honglei please, check version 0.5.2

honglei commented 4 years ago

@Lenka42 version 0.5.2 solved this problem, Thanks!

Lenka42 commented 4 years ago

@stefangotz does the new version solve also the problem in your setup? :slightly_smiling_face:

xandrade commented 4 years ago

Could you please share your code improvement?

stefangotz commented 4 years ago

@Lenka42 I've tried 0.5.4 and it is better, but unfortunately still not as good as 0.4.5 as far as the API is concerned.

When I run the attached sample program with 0.5.4, the output is as follows:

INFO:gmqtt.mqtt.protocol:[CONNECTION MADE]
DEBUG:gmqtt.mqtt.protocol:[RECV EMPTY] Connection will be reset automatically.
INFO:gmqtt.mqtt.protocol:[CONN CLOSE NORMALLY]
DEBUG:gmqtt.mqtt.handler:[CMD 0xe0] b''
DEBUG:root:_on_disconnect(exc='None')
WARNING:gmqtt.mqtt.protocol:[TRYING WRITE TO CLOSED SOCKET]
DEBUG:root:wait for connect() returned, done: 1, pending: 1
DEBUG:root:Client.connect() completed: 0
DEBUG:root:_DISC.wait() completed: 1
DEBUG:root:_DISC.is_set(): 1
DEBUG:root:Cancelling pending tasks
DEBUG:root:Cancelled pending tasks
DEBUG:root:Wait for disconnect()
WARNING:gmqtt.mqtt.protocol:[TRYING WRITE TO CLOSED SOCKET]
DEBUG:root:disconnect() returned
DEBUG:root:waiting for 20 seconds
INFO:gmqtt.mqtt.protocol:[CONNECTION MADE]
WARNING:gmqtt.mqtt.protocol:[EXC: CONN LOST]
Traceback (most recent call last):
  File "/opt/Python-3.7/lib/python3.7/asyncio/selector_events.py", line 804, in _read_ready__data_received
    data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer
DEBUG:gmqtt.mqtt.handler:[CMD 0xe0] b''
DEBUG:root:_on_disconnect(exc='None')
DEBUG:root:waited for 20 seconds; exiting _main()

This is an improvement over 0.5 in that the Client instance appears to become inactive after the second call-back to _disconnect(). However, the application sees the following confusing sequence:

  1. application calls connect()
  2. application receives on_disconnect() call-back
  3. connect() returns
  4. application receives another on_disconnect() call-back a few seconds later

At least I personally find that confusing for application developers and fairly difficult to handle nicely in application code because it feels like one can never be quite sure when a connection is really closed and it is safe to tear down all connection state.

What do you think? Is that something that could be improved so that gmqtt's API makes it crystal clear to applications when connections are truly closed and an application no longer needs to expect to receive call-backs from gmqtt?

madhukar01 commented 4 years ago

I updated the library from version 0.5.4 to 0.6.2 and I have stumbled upon a similar problem.

I can check in the mosquitto log which says: Client xyz has exceeded timeout, disconnecting. New connection from <ipaddr> on port 1883 Because of this, I am getting [TRYING WRITE TO CLOSED SOCKET]

My client has a keepalive of 60 seconds and is not subscribed to any topics.

madhukar01 commented 4 years ago

It is a single python thread which never disconnects, and I am sure there is no other thread/client with the same client ID.

Lenka42 commented 4 years ago

@madhukar01 sorry, I need more details on what's going on in your case.

straga commented 4 years ago

Hello. I have same result. After await client.disconnect() I have callback on_disconnect. But the client still try to connect again.

Just. Restart MQTT broker. gmqtt.mqtt.protocol: [RECV EMPTY] Connection will be reset automatically. gmqtt.mqtt.protocol: [CONN CLOSE NORMALLY] gmqtt.mqtt.handler: [CMD 0xe0] b'' I have on_diconnect call back and delete client from list for example. models.mqtt_connect: MQTT: Disconnected mqtt.protocol: [TRYING WRITE TO CLOSED SOCKET] gmqtt.mqtt.protocol: [CONNECTION MADE] gmqtt.mqtt.handler: [CMD 0x20] b'\x00\x00\x03"\x00\n' mqtt.handler: [CONNACK] flags: 0x0, result: 0x0 models.mqtt_connect: I have on connect call back, but I am alredy delete client after Ondisconnect callback. MQTT: On Connected

What we have: Phantom Client connected to MQTT

Mixser commented 4 years ago

Hi @straga

Please checkout our new version. We have been improved disconnect behavior (0.6.6);