njouanin / hbmqtt

MQTT client/broker using Python asynchronous I/O
MIT License
801 stars 190 forks source link

Not synchronized disconnection cleanup #175

Open d21d3q opened 5 years ago

d21d3q commented 5 years ago

I am observing this bug on client publishing approx every second in long run. Following tracelog contains some timings (from kibana):

March 11th 2019, 02:45:43.670 INFO:transitions.core:Entered state connected
March 11th 2019, 02:45:43.668 INFO:transitions.core:Exited state disconnected
March 11th 2019, 02:45:42.736 TimeoutError: [Errno 110] Connection timed out
March 11th 2019, 02:45:42.735 File "asyncio/selector_events.py", line 714, in _read_ready
March 11th 2019, 02:45:42.734 File "asyncio/streams.py", line 464, in _wait_for_data
March 11th 2019, 02:45:42.733 File "asyncio/streams.py", line 674, in readexactly
March 11th 2019, 02:45:42.732 File "site-packages/hbmqtt/adapters.py", line 146, in read
March 11th 2019, 02:45:42.731 File "site-packages/hbmqtt/codecs.py", line 52, in read_or_raise
March 11th 2019, 02:45:42.730 File "site-packages/hbmqtt/mqtt/packet.py", line 104, in from_stream
March 11th 2019, 02:45:42.729 File "asyncio/streams.py", line 217, in _drain_helper
March 11th 2019, 02:45:42.729 File "asyncio/streams.py", line 339, in drain
March 11th 2019, 02:45:42.728 File "site-packages/hbmqtt/adapters.py", line 168, in drain
March 11th 2019, 02:45:42.727 File "site-packages/hbmqtt/mqtt/packet.py", line 203, in to_stream
March 11th 2019, 02:45:42.726 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 446, in _send_packet
March 11th 2019, 02:45:42.725 File "site-packages/hbmqtt/mqtt/protocol/client_handler.py", line 158, in mqtt_ping
March 11th 2019, 02:45:42.724 File "asyncio/streams.py", line 329, in drain
March 11th 2019, 02:45:42.723 File "site-packages/hbmqtt/adapters.py", line 168, in drain
March 11th 2019, 02:45:42.722 File "site-packages/hbmqtt/mqtt/packet.py", line 203, in to_stream
March 11th 2019, 02:45:42.721 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 446, in _send_packet
March 11th 2019, 02:45:42.721 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 255, in _handle_qos1_message_flow
March 11th 2019, 02:45:42.720 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 202, in _handle_message_flow
March 11th 2019, 02:45:42.719 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 187, in mqtt_publish
March 11th 2019, 02:45:42.718 File "site-packages/hbmqtt/client.py", line 276, in publish
March 11th 2019, 02:45:42.717 File "site-packages/hbmqtt/client.py", line 72, in wrapper
March 11th 2019, 02:45:42.715 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 374, in _reader_loop
March 11th 2019, 02:45:42.715 File "asyncio/tasks.py", line 358, in wait_for
March 11th 2019, 02:45:42.714 File "asyncio/streams.py", line 329, in drain
March 11th 2019, 02:45:42.714 File "site-packages/hbmqtt/adapters.py", line 168, in drain
March 11th 2019, 02:45:42.713 File "site-packages/hbmqtt/mqtt/packet.py", line 203, in to_stream
March 11th 2019, 02:45:42.712 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 255, in _handle_qos1_message_flow
March 11th 2019, 02:45:42.712 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 446, in _send_packet
March 11th 2019, 02:45:42.711 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 187, in mqtt_publish
March 11th 2019, 02:45:42.711 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 202, in _handle_message_flow
March 11th 2019, 02:45:42.710 File "site-packages/hbmqtt/client.py", line 276, in publish
March 11th 2019, 02:45:42.709 File "asyncio/tasks.py", line 358, in wait_for
March 11th 2019, 02:45:42.709 File "site-packages/hbmqtt/client.py", line 72, in wrapper
March 11th 2019, 02:45:42.708 File "my_app/mqtt.py", line 66, in _publish_readout
March 11th 2019, 02:45:42.708 File "asyncio/tasks.py", line 358, in wait_for
March 11th 2019, 02:45:42.707 File "my_app/mqtt.py", line 66, in _publish_readout
March 11th 2019, 02:45:42.706 Traceback (most recent call last):
March 11th 2019, 02:45:42.705 future: <Task finished coro=<ClientProtocolHandler.mqtt_ping() done, defined at site-packages/hbmqtt/mqtt/protocol/client_handler.py:155> exception=TimeoutError(110, 'Connection timed out')>
March 11th 2019, 02:45:42.704 ERROR:asyncio:Task exception was never retrieved
March 11th 2019, 02:45:41.506 WARNING:hbmqtt.client:Client not connected, waiting for it
March 11th 2019, 02:45:41.491 AttributeError: 'NoneType' object has no attribute 'write'
March 11th 2019, 02:45:41.490 File "site-packages/hbmqtt/mqtt/packet.py", line 202, in to_stream
March 11th 2019, 02:45:41.489 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 446, in _send_packet
March 11th 2019, 02:45:41.488 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 255, in _handle_qos1_message_flow
March 11th 2019, 02:45:41.487 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 202, in _handle_message_flow
March 11th 2019, 02:45:41.486 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 187, in mqtt_publish
March 11th 2019, 02:45:41.485 File "site-packages/hbmqtt/client.py", line 276, in publish
March 11th 2019, 02:45:41.484 File "site-packages/hbmqtt/client.py", line 72, in wrapper
March 11th 2019, 02:45:41.483 File "asyncio/tasks.py", line 358, in wait_for
March 11th 2019, 02:45:41.481 File "my_app/mqtt.py", line 66, in _publish_readout
March 11th 2019, 02:45:41.480 Traceback (most recent call last):
March 11th 2019, 02:45:41.478 future: <Task finished coro=<MqttHandler._publish_readout() done, defined at my_app/mqtt.py:62> exception=AttributeError("'NoneType' object has no attribute 'write'",)>
March 11th 2019, 02:45:41.477 ERROR:asyncio:Task exception was never retrieved
March 11th 2019, 02:45:41.476 AttributeError: 'NoneType' object has no attribute 'write'
March 11th 2019, 02:45:41.475 File "site-packages/hbmqtt/mqtt/packet.py", line 202, in to_stream
March 11th 2019, 02:45:41.474 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 446, in _send_packet
March 11th 2019, 02:45:41.473 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 255, in _handle_qos1_message_flow
March 11th 2019, 02:45:41.472 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 202, in _handle_message_flow
March 11th 2019, 02:45:41.471 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 187, in mqtt_publish
March 11th 2019, 02:45:41.470 File "site-packages/hbmqtt/client.py", line 276, in publish
March 11th 2019, 02:45:41.469 File "site-packages/hbmqtt/client.py", line 72, in wrapper
March 11th 2019, 02:45:41.468 File "asyncio/tasks.py", line 358, in wait_for
March 11th 2019, 02:45:41.467 File "my_app/mqtt.py", line 66, in _publish_readout
March 11th 2019, 02:45:41.466 Traceback (most recent call last):
March 11th 2019, 02:45:41.465 future: <Task finished coro=<MqttHandler._publish_readout() done, defined at my_app/mqtt.py:62> exception=AttributeError("'NoneType' object has no attribute 'write'",)>
March 11th 2019, 02:45:41.464 ERROR:asyncio:Task exception was never retrieved
March 11th 2019, 02:45:41.463 AttributeError: 'NoneType' object has no attribute 'write'
March 11th 2019, 02:45:41.462 File "site-packages/hbmqtt/mqtt/packet.py", line 202, in to_stream
March 11th 2019, 02:45:41.461 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 446, in _send_packet
March 11th 2019, 02:45:41.460 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 255, in _handle_qos1_message_flow
March 11th 2019, 02:45:41.459 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 202, in _handle_message_flow
March 11th 2019, 02:45:41.458 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 187, in mqtt_publish
March 11th 2019, 02:45:41.457 File "site-packages/hbmqtt/client.py", line 276, in publish
March 11th 2019, 02:45:41.456 File "site-packages/hbmqtt/client.py", line 72, in wrapper
March 11th 2019, 02:45:41.456 File "asyncio/tasks.py", line 358, in wait_for
March 11th 2019, 02:45:41.455 File "my_app/mqtt.py", line 66, in _publish_readout
March 11th 2019, 02:45:41.454 Traceback (most recent call last):
March 11th 2019, 02:45:41.453 future: <Task finished coro=<MqttHandler._publish_readout() done, defined at my_app/mqtt.py:62> exception=AttributeError("'NoneType' object has no attribute 'write'",)>
March 11th 2019, 02:45:41.453 ERROR:asyncio:Task exception was never retrieved
March 11th 2019, 02:45:41.452 WARNING:hbmqtt.mqtt.protocol.handler:Unhandled exception: 'NoneType' object has no attribute 'write'
March 11th 2019, 02:45:41.451 TimeoutError: [Errno 110] Connection timed out
March 11th 2019, 02:45:41.450 File "asyncio/selector_events.py", line 714, in _read_ready
March 11th 2019, 02:45:41.450 File "asyncio/streams.py", line 464, in _wait_for_data
March 11th 2019, 02:45:41.449 File "asyncio/streams.py", line 674, in readexactly
March 11th 2019, 02:45:41.448 File "site-packages/hbmqtt/codecs.py", line 52, in read_or_raise
March 11th 2019, 02:45:41.448 File "site-packages/hbmqtt/adapters.py", line 146, in read
March 11th 2019, 02:45:41.447 File "site-packages/hbmqtt/mqtt/packet.py", line 104, in from_stream
March 11th 2019, 02:45:41.445 File "asyncio/streams.py", line 217, in _drain_helper
March 11th 2019, 02:45:41.444 File "asyncio/streams.py", line 339, in drain
March 11th 2019, 02:45:41.443 File "site-packages/hbmqtt/adapters.py", line 168, in drain
March 11th 2019, 02:45:41.442 File "site-packages/hbmqtt/mqtt/packet.py", line 203, in to_stream
March 11th 2019, 02:45:41.441 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 446, in _send_packet
March 11th 2019, 02:45:41.440 File "site-packages/hbmqtt/mqtt/protocol/client_handler.py", line 158, in mqtt_ping
March 11th 2019, 02:45:41.439 File "asyncio/streams.py", line 329, in drain
March 11th 2019, 02:45:41.438 File "site-packages/hbmqtt/adapters.py", line 168, in drain
March 11th 2019, 02:45:41.437 File "site-packages/hbmqtt/mqtt/packet.py", line 203, in to_stream
March 11th 2019, 02:45:41.435 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 446, in _send_packet
March 11th 2019, 02:45:41.434 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 255, in _handle_qos1_message_flow
March 11th 2019, 02:45:41.433 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 202, in _handle_message_flow
March 11th 2019, 02:45:41.433 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 187, in mqtt_publish
March 11th 2019, 02:45:41.432 File "site-packages/hbmqtt/client.py", line 276, in publish
March 11th 2019, 02:45:41.431 File "site-packages/hbmqtt/client.py", line 72, in wrapper
March 11th 2019, 02:45:41.430 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 374, in _reader_loop
March 11th 2019, 02:45:41.430 File "asyncio/tasks.py", line 358, in wait_for
March 11th 2019, 02:45:41.429 File "asyncio/streams.py", line 329, in drain
March 11th 2019, 02:45:41.428 File "site-packages/hbmqtt/adapters.py", line 168, in drain
March 11th 2019, 02:45:41.428 File "site-packages/hbmqtt/mqtt/packet.py", line 203, in to_stream
March 11th 2019, 02:45:41.427 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 446, in _send_packet
March 11th 2019, 02:45:41.426 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 255, in _handle_qos1_message_flow
March 11th 2019, 02:45:41.425 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 202, in _handle_message_flow
March 11th 2019, 02:45:41.425 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 187, in mqtt_publish
March 11th 2019, 02:45:41.424 File "site-packages/hbmqtt/client.py", line 276, in publish
March 11th 2019, 02:45:41.423 File "site-packages/hbmqtt/client.py", line 72, in wrapper
March 11th 2019, 02:45:41.422 File "asyncio/tasks.py", line 358, in wait_for
March 11th 2019, 02:45:41.422 File "my_app/mqtt.py", line 66, in _publish_readout
March 11th 2019, 02:45:41.421 File "asyncio/tasks.py", line 358, in wait_for
March 11th 2019, 02:45:41.420 File "my_app/mqtt.py", line 66, in _publish_readout
March 11th 2019, 02:45:41.420 Traceback (most recent call last):
March 11th 2019, 02:45:41.419 future: <Task finished coro=<MqttHandler._publish_readout() done, defined at my_app/mqtt.py:62> exception=TimeoutError(110, 'Connection timed out')>
March 11th 2019, 02:45:41.418 ERROR:asyncio:Task exception was never retrieved
March 11th 2019, 02:45:41.417 WARNING:hbmqtt.mqtt.protocol.handler:Unhandled exception: 'NoneType' object has no attribute 'write'
March 11th 2019, 02:45:41.416 TimeoutError: [Errno 110] Connection timed out
March 11th 2019, 02:45:41.416 File "asyncio/selector_events.py", line 714, in _read_ready
March 11th 2019, 02:45:41.415 File "asyncio/streams.py", line 464, in _wait_for_data
March 11th 2019, 02:45:41.414 File "asyncio/streams.py", line 674, in readexactly
March 11th 2019, 02:45:41.413 File "site-packages/hbmqtt/codecs.py", line 52, in read_or_raise
March 11th 2019, 02:45:41.413 File "site-packages/hbmqtt/adapters.py", line 146, in read
March 11th 2019, 02:45:41.412 File "site-packages/hbmqtt/mqtt/packet.py", line 104, in from_stream
March 11th 2019, 02:45:41.411 File "asyncio/streams.py", line 217, in _drain_helper
March 11th 2019, 02:45:41.410 File "asyncio/streams.py", line 339, in drain
March 11th 2019, 02:45:41.410 File "site-packages/hbmqtt/adapters.py", line 168, in drain
March 11th 2019, 02:45:41.409 File "site-packages/hbmqtt/mqtt/packet.py", line 203, in to_stream
March 11th 2019, 02:45:41.408 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 446, in _send_packet
March 11th 2019, 02:45:41.407 File "site-packages/hbmqtt/mqtt/protocol/client_handler.py", line 158, in mqtt_ping
March 11th 2019, 02:45:41.406 File "asyncio/streams.py", line 329, in drain
March 11th 2019, 02:45:41.405 File "site-packages/hbmqtt/mqtt/packet.py", line 203, in to_stream
March 11th 2019, 02:45:41.405 File "site-packages/hbmqtt/adapters.py", line 168, in drain
March 11th 2019, 02:45:41.404 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 446, in _send_packet
March 11th 2019, 02:45:41.403 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 255, in _handle_qos1_message_flow
March 11th 2019, 02:45:41.402 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 202, in _handle_message_flow
March 11th 2019, 02:45:41.401 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 187, in mqtt_publish
March 11th 2019, 02:45:41.400 File "site-packages/hbmqtt/client.py", line 276, in publish
March 11th 2019, 02:45:41.399 File "site-packages/hbmqtt/client.py", line 72, in wrapper
March 11th 2019, 02:45:41.398 File "asyncio/tasks.py", line 358, in wait_for
March 11th 2019, 02:45:41.397 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 374, in _reader_loop
March 11th 2019, 02:45:41.396 File "asyncio/streams.py", line 329, in drain
March 11th 2019, 02:45:41.395 File "site-packages/hbmqtt/adapters.py", line 168, in drain
March 11th 2019, 02:45:41.394 File "site-packages/hbmqtt/mqtt/packet.py", line 203, in to_stream
March 11th 2019, 02:45:41.393 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 446, in _send_packet
March 11th 2019, 02:45:41.392 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 255, in _handle_qos1_message_flow
March 11th 2019, 02:45:41.391 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 202, in _handle_message_flow
March 11th 2019, 02:45:41.391 File "site-packages/hbmqtt/mqtt/protocol/handler.py", line 187, in mqtt_publish
March 11th 2019, 02:45:41.389 File "site-packages/hbmqtt/client.py", line 276, in publish
March 11th 2019, 02:45:41.388 File "site-packages/hbmqtt/client.py", line 72, in wrapper
March 11th 2019, 02:45:41.387 File "asyncio/tasks.py", line 358, in wait_for
March 11th 2019, 02:45:41.385 File "my_app/mqtt.py", line 66, in _publish_readout
March 11th 2019, 02:45:41.383 Traceback (most recent call last):
March 11th 2019, 02:45:41.381 future: <Task finished coro=<MqttHandler._publish_readout() done, defined at my_app/mqtt.py:62> exception=TimeoutError(110, 'Connection timed out')>
March 11th 2019, 02:45:41.379 ERROR:asyncio:Task exception was never retrieved
March 11th 2019, 02:45:41.361 WARNING:hbmqtt.mqtt.protocol.handler:Unhandled exception: 'NoneType' object has no attribute 'write'
March 11th 2019, 02:45:41.359 INFO:transitions.core:Entered state disconnected
March 11th 2019, 02:45:41.356 INFO:transitions.core:Exited state connected
March 11th 2019, 02:45:41.354 WARNING:hbmqtt.client:Disconnected from broker
March 11th 2019, 02:45:41.353 WARNING:hbmqtt.mqtt.protocol.handler:Unhandled exception: [Errno 110] Connection timed out
March 11th 2019, 02:45:41.351 WARNING:hbmqtt.mqtt.protocol.handler:ClientProtocolHandler Unhandled exception in reader coro: TimeoutError(110, 'Connection timed out')
March 11th 2019, 02:45:41.350 WARNING:hbmqtt.mqtt.protocol.handler:Unhandled exception: [Errno 110] Connection timed out
March 11th 2019, 02:45:41.348 WARNING:hbmqtt.mqtt.protocol.handler:Unhandled exception: [Errno 110] Connection timed out
March 11th 2019, 02:45:41.347 TimeoutError: [Errno 110] Connection timed out
March 11th 2019, 02:45:41.346 File "asyncio/selector_events.py", line 714, in _read_ready
March 11th 2019, 02:45:41.344 Traceback (most recent call last):
March 11th 2019, 02:45:41.343 transport: <_SelectorSocketTransport fd=10 read=polling write=<polling, bufsize=73553>>
March 11th 2019, 02:45:41.341 protocol: <asyncio.streams.StreamReaderProtocol object at 0x74a0f570>
March 11th 2019, 02:45:41.340 ERROR:asyncio:Fatal read error on socket transport
March 11th 2019, 02:45:39.728 WARNING:hbmqtt.mqtt.protocol.handler:Unhandled exception:
March 11th 2019, 02:45:37.854 WARNING:hbmqtt.mqtt.protocol.handler:Unhandled exception:
March 11th 2019, 02:45:35.962 WARNING:hbmqtt.mqtt.protocol.handler:Unhandled exception:
March 11th 2019, 02:45:34.033 WARNING:hbmqtt.mqtt.protocol.handler:Unhandled exception:
March 11th 2019, 02:45:32.133 WARNING:hbmqtt.mqtt.protocol.handler:Unhandled exception:<Paste>

This line is interesting:

AttributeError: 'NoneType' object has no attribute 'write'

I guess error happens in . Since error happens around disconnection time, it would mean that detach method (which can set writer to None) is being between checking if clint is connected and writing packet to_stream, called from handler.

I noticed commented out yield from self._handler.stop(), I guess that it is meant to make som cleanup?

Question is what should happen to packet being interrupted by disconnection? Coroutine should be cancelled and rescheduled to be run again on connect? Or publish task should rise something, so that user can handle retransmission on his own. Since disconnect occurs before write, so it could be rescheduled automatically (broker doesn't know about it yet).

That would require keeping track on all pending packets (pings, publishes etc)

d21d3q commented 5 years ago

Maybe we could catch AttributeError along with ConnectionResetError or check/assert before writing if there is self.writer - if not raise ConnectionResetError (since writer can be None only in case of closed connection)?