njouanin / hbmqtt

MQTT client/broker using Python asynchronous I/O
MIT License
804 stars 190 forks source link

asyncio Warning: BrokerProtocolHandler Unhandled exception in reader coro: IncompleteReadError #119

Open nicola-lunghi opened 6 years ago

nicola-lunghi commented 6 years ago

I get a lot of warning on Ubuntu 17.10 like this

[2018-03-30 13:05:45,416] :: WARNING - BrokerProtocolHandler Unhandled exception in reader coro: IncompleteReadError('0 bytes read on a total of 1 expected bytes',)

here's the version

 python3 --version
Python 3.6.3
 pip3 freeze | grep hbmqtt
hbmqtt==0.9.2
hbmqtt
[2018-03-30 13:05:35,165] :: INFO - Exited state new
[2018-03-30 13:05:35,165] :: INFO - Entered state starting
[2018-03-30 13:05:35,166] :: INFO - Listener 'default' bind to 0.0.0.0:1883 (max_connections=-1)
[2018-03-30 13:05:35,166] :: INFO - Exited state starting
[2018-03-30 13:05:35,166] :: INFO - Entered state started
[2018-03-30 13:05:45,413] :: INFO - Listener 'default': 1 connections acquired
[2018-03-30 13:05:45,413] :: INFO - Connection from 127.0.0.1:49098 on listener 'default'
[2018-03-30 13:05:45,414] :: INFO - Exited state new
[2018-03-30 13:05:45,414] :: INFO - Entered state connected
[2018-03-30 13:05:45,416] :: WARNING - BrokerProtocolHandler Unhandled exception in reader coro: IncompleteReadError('0 bytes read on a total of 1 expected bytes',)
[2018-03-30 13:05:45,416] :: INFO - Exited state connected
[2018-03-30 13:05:45,417] :: INFO - Entered state disconnected
[2018-03-30 13:05:45,417] :: INFO - Listener 'default': 0 connections acquired
hbmqtt_pub --url "mqtt://localhost:1883" -t "test/sensor1" -m "ciao"
[2018-03-30 13:05:45,409] :: INFO - hbmqtt_pub/13323-nicklang-laptop Connecting to broker
[2018-03-30 13:05:45,415] :: INFO - Exited state new
[2018-03-30 13:05:45,415] :: INFO - Entered state connected
[2018-03-30 13:05:45,415] :: INFO - hbmqtt_pub/13323-nicklang-laptop Publishing to 'test/sensor1'
[2018-03-30 13:05:45,416] :: INFO - Exited state connected
[2018-03-30 13:05:45,416] :: INFO - Entered state disconnected
[2018-03-30 13:05:45,416] :: INFO - hbmqtt_pub/13323-nicklang-laptop Disconnected from broker

the broker work fine but I get warning for every sent message

romancardenas commented 6 years ago

I experience the same issue on MacOS 10.13.3 As you said, the broker works fine, and messages are received by subscribed processes.

The hbmqtt version experiencing this issue is 0.9.2

Kriechi commented 6 years ago

I'm getting the same error. macos 10.13.4 as client and with a linux server, both with hbmqtt 0.9.2.

joopy commented 6 years ago

getting the same error, looking for a solution.

ivan-antonyuck commented 6 years ago

I'm experiencing the same issue reading from the broker (emqtt) in loop, the problem arises after about 1700-1800 messages in consequent read; Subscriber code:

from hbmqtt.client import MQTTClient, QOS_0
import asyncio
loop = asyncio.get_event_loop()

@asyncio.coroutine
def consume():
    C = MQTTClient()
    yield from C.connect('mqtt://192.168.0.2/')
    yield from C.subscribe([
        ('a/b',QOS_0)
    ])
    try:
        while 1:
            yield from C.deliver_message()
    finally:
        yield from C.unsubscribe(['a/b'])
        yield from C.disconnect()

loop.run_until_complete(consume())

producer code:

from hbmqtt.client import MQTTClient
import asyncio
import simplejson

loop = asyncio.get_event_loop()
payload = {"key":"value"
           }

async def send_one():
    C = MQTTClient()
    await C.connect('mqtt://192.168.0.2:1883/')

    try:
        while 1:
            await C.publish('a/b', simplejson.dumps(payload).encode('utf-8'))
# MQTT QOS - ?
    finally:
        # Wait for all pending messages to be delivered or expire.
        await C.disconnect()

loop.run_until_complete(send_one())

Traceback:

1780
1781
Disconnected from broker
Client not connected, waiting for it
Traceback (most recent call last):
  File "/Users/antonyuck/work/projects/mqtest/mqtt/bench_receiver.py", line 33, in <module>
    loop.run_until_complete(consume())
  File "/usr/local/Cellar/python/3.6.4_4/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py", line 467, in run_until_complete
    return future.result()
  File "/Users/antonyuck/work/projects/mqtest/mqtt/bench_receiver.py", line 25, in consume
    yield from C.deliver_message()
  File "/Users/antonyuck/work/projects/mqtest/venv/lib/python3.6/site-packages/hbmqtt/client.py", line 337, in deliver_message
    raise deliver_task.exception()
  File "/Users/antonyuck/work/projects/mqtest/venv/lib/python3.6/site-packages/hbmqtt/mqtt/protocol/handler.py", line 466, in mqtt_deliver_next_message
    message = yield from self.session.delivered_message_queue.get()
AttributeError: 'NoneType' object has no attribute 'delivered_message_queue'

System info: python --version Python 3.6.4 pip freeze | grep hbmqtt hbmqtt==0.9.2

hbmqtt's client py is patched to match master branch:

# from websockets.handshake import InvalidHandshake
from websockets.exceptions import InvalidHandshake
arnulfojr commented 6 years ago

I'm having the same issue in 0.9.4; in my case I get nothing from the broker...

broker_1  | BrokerProtocolHandler Unhandled exception in reader coro: IncompleteReadError('0 bytes read on a total of 1 expected bytes')
broker_1  | BrokerProtocolHandler Unhandled exception in reader coro: IncompleteReadError('0 bytes read on a total of 1 expected bytes')
broker_1  | BrokerProtocolHandler Unhandled exception in reader coro: IncompleteReadError('0 bytes read on a total of 1 expected bytes')
hbmqtt_pub --url mqtt://broker -t tests/1 -m '1234'
[2018-09-09 14:43:56,257] :: INFO - hbmqtt_pub/44-c75c1dc0d095 Connecting to broker
[2018-09-09 14:43:56,264] :: INFO - Exited state new
[2018-09-09 14:43:56,264] :: INFO - Entered state connected
[2018-09-09 14:43:56,264] :: INFO - hbmqtt_pub/44-c75c1dc0d095 Publishing to 'tests/1'
[2018-09-09 14:43:56,265] :: INFO - Exited state connected
[2018-09-09 14:43:56,265] :: INFO - Entered state disconnected
[2018-09-09 14:43:56,266] :: INFO - hbmqtt_pub/44-c75c1dc0d095 Disconnected from broker
joopy commented 6 years ago

I 'fixed' it by not useing hbmqtt anymore. Now using mosquitto.

Simon-L commented 6 years ago

I traced back this error until I found out that 0.9.4 actually works fine if you use the "taboo" broker from the samples: python3 broker_taboo.py Use it along with the taboo publish and subscribe samples.

I got confused by assuming from the Quick Start page in the documentation that hbmqtt would work indistinctly with all the hbmqtt_* scripts, turns out it's a bit more picky than that!

arnulfojr commented 6 years ago

@Simon-L so we need to use the taboo thinggy?

I turned into rabbitMQ with mqtt plugin and Paho's mqtt but originally I was looking for an asyncio implementation, is that then working for you?

arnulfojr commented 6 years ago

I discovered that the client is as well experiencing the same problem...

Merwenus commented 6 years ago

Same problem for me in Home Assistant, any fix yet? 2018-09-24 17:21:41 WARNING (MainThread) [hbmqtt.mqtt.protocol.handler] BrokerProtocolHandler Unhandled exception in reader coro: IncompleteReadError('0 bytes read on a total of 1 expected bytes',)

JohannesBauer97 commented 6 years ago

Same for me: BrokerProtocolHandler Unhandled exception in reader coro: IncompleteReadError('0 bytes read on a total of 1 expected bytes',)

anthonyray commented 6 years ago

Same issue here !

ramonpoca commented 6 years ago

I've traced the error a bit. Added a logging.exception that shows:

ERROR:hbmqtt.mqtt.protocol.handler:0 bytes read on a total of 1 expected bytes
Traceback (most recent call last):
  File "/usr/local/miniconda3/lib/python3.6/site-packages/hbmqtt/mqtt/protocol/handler.py", line 373, in _
reader_loop
    keepalive_timeout, loop=self._loop)
  File "/usr/local/miniconda3/lib/python3.6/asyncio/tasks.py", line 358, in wait_for
    return fut.result()
  File "/usr/local/miniconda3/lib/python3.6/site-packages/hbmqtt/mqtt/packet.py", line 104, in from_stream
    byte1 = yield from read_or_raise(reader, 1)
  File "/usr/local/miniconda3/lib/python3.6/site-packages/hbmqtt/codecs.py", line 52, in read_or_raise
    data = yield from reader.read(n)
  File "/usr/local/miniconda3/lib/python3.6/site-packages/hbmqtt/adapters.py", line 146, in read
    data = yield from self._reader.readexactly(n)
  File "/usr/local/miniconda3/lib/python3.6/asyncio/streams.py", line 672, in readexactly
    raise IncompleteReadError(incomplete, n)
asyncio.streams.IncompleteReadError: 0 bytes read on a total of 1 expected bytes
WARNING:hbmqtt.mqtt.protocol.handler:ClientProtocolHandler Unhandled exception in reader coro: IncompleteReadError('0 bytes read on a total of 1 expected bytes',)
ramonpoca commented 6 years ago

Keeping with the tracing, I've dumped it happening with wireshark. Hex dump follows. After the last c0 00 packet sent by hbmqtt, the server responds with a TCP RST (reset). Server is mosquitto 1.5.3 macos (homebrew).

00000000  10 23 00 04 4d 51 54 54  04 02 00 09 00 17 68 62   .#..MQTT ......hb
00000010  6d 71 74 74 2f 47 38 6a  34 38 42 46 50 6d 4e 6f   mqtt/G8j 48BFPmNo
00000020  60 30 45 30 7a                                     `0E0z
    00000000  20 02 00 00                                         ...
00000025  82 06 00 01 00 01 23 02                            ......#. 
    00000004  90 03 00 01 02                                     .....
0000002D  c0 00                                              ..
    00000009  d0 00                                              ..
    0000000B  34 24 00 11 2f 66 69 72  65 62 61 73 65 2f 63 6f   4$../fir ebase/co
    0000001B  6e 74 72 6f 6c 00 01 7b  22 72 65 6c 6f 61 64 22   ntrol..{ "reload"
    0000002B  3a 20 33 37 30 7d                                  : 370}
0000002F  50 02 00 01                                        P...
    00000031  62 02 00 01                                        b...
00000033  70 02 00 01                                        p...
00000037  c0 00                                              ..
00000000  10 23 00 04 4d 51 54 54  04 02 00 09 00 17 68 62   .#..MQTT ......hb
00000010  6d 71 74 74 2f 47 38 6a  34 38 42 46 50 6d 4e 6f   mqtt/G8j 48BFPmNo
00000020  60 30 45 30 7a                                     `0E0z
    00000000  20 02 00 00                                         ...
00000025  82 06 00 01 00 01 23 02                            ......#. 
    00000004  90 03 00 01 02                                     .....
0000002D  c0 00                                              ..
    00000009  d0 00                                              ..
    0000000B  34 24 00 11 2f 66 69 72  65 62 61 73 65 2f 63 6f   4$../fir ebase/co
    0000001B  6e 74 72 6f 6c 00 01 7b  22 72 65 6c 6f 61 64 22   ntrol..{ "reload"
    0000002B  3a 20 33 37 30 7d                                  : 370}
0000002F  50 02 00 01                                        P...
    00000031  62 02 00 01                                        b...
00000033  70 02 00 01                                        p...
00000037  c0 00                                              ..

and a corresponding mosquitto log for the same error:

1539722788: New connection from ::1 on port 1883.
1539722788: New client connected from ::1 as hbmqtt/1hs_mtXV?>y4xqRT (c1, k9).
1539722788: No will message specified.
1539722788: Sending CONNACK to hbmqtt/1hs_mtXV?>y4xqRT (0, 0)
1539722788: Received SUBSCRIBE from hbmqtt/1hs_mtXV?>y4xqRT
1539722788:     # (QoS 2)
1539722788: hbmqtt/1hs_mtXV?>y4xqRT 2 #
1539722788: Sending SUBACK to hbmqtt/1hs_mtXV?>y4xqRT
1539722797: Received PINGREQ from hbmqtt/1hs_mtXV?>y4xqRT
1539722797: Sending PINGRESP to hbmqtt/1hs_mtXV?>y4xqRT
1539722806: Received PINGREQ from hbmqtt/1hs_mtXV?>y4xqRT
1539722806: Sending PINGRESP to hbmqtt/1hs_mtXV?>y4xqRT
1539722813: Received PUBLISH from gsSAIvf`EKT5YY\VKc_aeIeiwb3AH@[a;tmx]P:6X0fAlf<7[h4\ekEuZ1fH:ThV (d0, q2, r0, m7, '/firebase/control', ... (15 bytes))
1539722813: Sending PUBREC to gsSAIvf`EKT5YY\VKc_aeIeiwb3AH@[a;tmx]P:6X0fAlf<7[h4\ekEuZ1fH:ThV (Mid: 7)
1539722813: Received PUBREL from gsSAIvf`EKT5YY\VKc_aeIeiwb3AH@[a;tmx]P:6X0fAlf<7[h4\ekEuZ1fH:ThV (Mid: 7)
1539722813: Sending PUBCOMP to gsSAIvf`EKT5YY\VKc_aeIeiwb3AH@[a;tmx]P:6X0fAlf<7[h4\ekEuZ1fH:ThV (Mid: 7)
1539722813: Sending PUBLISH to hbmqtt/1hs_mtXV?>y4xqRT (d0, q2, r0, m1, '/firebase/control', ... (15 bytes))
1539722813: Received PUBREC from hbmqtt/1hs_mtXV?>y4xqRT (Mid: 1)
1539722813: Sending PUBREL to hbmqtt/1hs_mtXV?>y4xqRT (Mid: 1)
1539722813: Received PUBCOMP from hbmqtt/1hs_mtXV?>y4xqRT (Mid: 1)
1539722826: Client hbmqtt/1hs_mtXV?>y4xqRT has exceeded timeout, disconnecting.
1539722826: Socket error on client hbmqtt/1hs_mtXV?>y4xqRT, disconnecting.
ramonpoca commented 6 years ago

Ok, think I've found out:

ohminy commented 5 years ago

How could i change the keepalive time in HBMQTT??

DallasO commented 5 years ago

I'm seeing this warning from my MQTT client, but it isn't affecting anything - messages are still making it through.

I'm using eclipse/paho.mqtt.python and tried their sync/async methods, and the keepalive is already defaulted to 60 seconds. So my only other try would be @ramonpoca's second point, but I'm not sure how to change that.

skewty commented 5 years ago

@ramonpoca I don't think that's it.

I get that error running the broker as such:

import asyncio
import logging
from hbmqtt.broker import Broker

logging.basicSetup(level=logging.INFO)
logger = logging.getLogger('broker')

def main():
    loop = asyncio.get_event_loop()
    config = {
        'listeners': {
            'default': {
                'type': 'tcp',
            },
            'combox': {
                'bind': '127.0.0.1:1883',
            }
        }
    }
    broker = Broker(config=config, plugin_namespace='invalid.namespace')
    loop.create_task(broker.start())
    try:
        logger.info('Broker listening on {}'.format(config['listeners']['combox']['bind']))
        loop.run_forever()
    except KeyboardInterrupt:
        pass

if __name__ == '__main__':
    main()

It occurs after just one call to publish() from a system test we have where two clients subscribe to a common stream and publish a single, short message to each other and disconnect (test takes around 1 second).

EDIT

A refactor caused our clients to not call disconnect() before exiting out. So it seems the warning is printed when a client doesn't cleanly exit out (call disconnect() .. possibly send will, etc).

Perhaps it should be a debug level logger call. Is it really a warning level thing for an IoT device with poor cellular signal to have a connection failure and drop out?

caseybrichardson commented 4 years ago

Just thought I'd throw out that I'm also experiencing this issue. I have a couple async python applications and found this project. Integrated really smoothly, but now I'm having issues with this framework and using persistent sessions. I'm only using the client but several applications all begin to face this issue after running for a while and after a few connects/disconnects. The only solution I've found for this is to restart the application with a new client_id which is something I'd like to avoid as persistent sessions are required for data persistence reasons.

26labs commented 4 years ago

My trialling of hbmqtt as broker seems to reflect what others have seen - use the broker-taboo config plugin and then using MQTTBox or own python code on macOS (10.15.3) and messages will be seen and stored by broker. Also own python code, following the examples, will get messages registered with broker. So right now hbmqtt would seem to be good if one needs a 100% python type of setup for mqtt broker and clients. There is an issue mentioned at the HomeAssistant site about memory consumption after 20k messages which is of concern. I will test further - i have been using mosquitto in my project for 5 years for its "industrial strength" and small footprint but have always wanted a python option (even if it means volume-based restart cycles) for my solution to make easier to setup demos on new machines.