If the connection is lost to the broker, moat-mqtt doesn't retry and crashes. The error message is the same with or without "auto_reconnect" being set.
sample:
import logging
import anyio
from moat.mqtt.client import open_mqttclient, ClientException
async def uptime_coro():
async with open_mqttclient(config=config) as C:
try:
await C.connect("mqtt://k")
will wait until the broker disappears
await anyio.sleep_forever()
except ClientException as ce:
logger.error("Client exception: %r", ce)
if name == "main":
formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
logging.basicConfig(level=logging.INFO, format=formatter)
anyio.run(uptime_coro)
When the broker disappears, the connection times out. The message is the same with auto-reconnect true or false.
error message:
(.env) ➜ mqtt /Users/fredrick867/projects/viewer/.env/bin/python /Users/fredrick867/projects/viewer/testclient.py
[2022-12-19 20:54:11,495] {core.py:136} INFO - Finished processing state new exit callbacks.
[2022-12-19 20:54:11,495] {core.py:130} INFO - Finished processing state connected enter callbacks.
[2022-12-19 21:09:47,326] {handler.py:525} WARNING - ClientProtocolHandler Unhandled exception in reader coro
Traceback (most recent call last):
File "/usr/local/Cellar/python@3.10/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/selector_events.py", line 862, in _read_ready__data_received
data = self._sock.recv(self.max_size)
TimeoutError: [Errno 60] Operation timed out
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/moat/mqtt/mqtt/protocol/handler.py", line 469, in _reader_loop
fixed_header = await MQTTFixedHeader.from_stream(self.stream)
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/moat/mqtt/mqtt/packet.py", line 105, in from_stream
int1 = (await read_or_raise(reader, 1))[0]
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/moat/mqtt/codecs.py", line 61, in read_or_raise
data = await reader.read(n)
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/moat/mqtt/adapters.py", line 129, in read
data = await self._rstream.receive_exactly(n)
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/anyio/streams/buffered.py", line 72, in receive_exactly
chunk = await self.receive_stream.receive(remaining)
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 1274, in receive
raise self._protocol.exception
anyio.BrokenResourceError
[2022-12-19 21:09:47,337] {core.py:136} INFO - Finished processing state connected exit callbacks.
[2022-12-19 21:09:47,337] {core.py:130} INFO - Finished processing state disconnected enter callbacks.
Traceback (most recent call last):
File "/usr/local/Cellar/python@3.10/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/selector_events.py", line 862, in _read_ready__data_received
data = self._sock.recv(self.max_size)
TimeoutError: [Errno 60] Operation timed out
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/fredrick867/projects/viewer/testclient.py", line 38, in
anyio.run(uptime_coro)
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, *backend_options)
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/usr/local/Cellar/python@3.10/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/local/Cellar/python@3.10/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
return future.result()
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(args)
File "/Users/fredrick867/projects/viewer/testclient.py", line 24, in uptime_coro
async with open_mqttclient(config=config) as C:
File "/usr/local/Cellar/python@3.10/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/contextlib.py", line 217, in aexit
await self.gen.athrow(typ, value, traceback)
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/moat/mqtt/client.py", line 158, in open_mqttclient
async with anyio.create_task_group() as tg:
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 662, in aexit
raise exceptions[0]
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/moat/mqtt/mqtt/protocol/handler.py", line 469, in _reader_loop
fixed_header = await MQTTFixedHeader.from_stream(self.stream)
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/moat/mqtt/mqtt/packet.py", line 105, in from_stream
int1 = (await read_or_raise(reader, 1))[0]
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/moat/mqtt/codecs.py", line 61, in read_or_raise
data = await reader.read(n)
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/moat/mqtt/adapters.py", line 129, in read
data = await self._rstream.receive_exactly(n)
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/anyio/streams/buffered.py", line 72, in receive_exactly
chunk = await self.receive_stream.receive(remaining)
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 1274, in receive
raise self._protocol.exception
anyio.BrokenResourceError
If the connection is lost to the broker, moat-mqtt doesn't retry and crashes. The error message is the same with or without "auto_reconnect" being set.
sample: import logging import anyio
from moat.mqtt.client import open_mqttclient, ClientException
config = { "keep_alive": 10, "ping_delay": 1, "default_qos": 0, "default_retain": False, "auto_reconnect": True, "reconnect_max_interval": 10, "reconnect_retries": 3, "codec": "noop", }
logger = logging.getLogger(name)
async def uptime_coro(): async with open_mqttclient(config=config) as C: try: await C.connect("mqtt://k")
will wait until the broker disappears
if name == "main": formatter = "[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s" logging.basicConfig(level=logging.INFO, format=formatter) anyio.run(uptime_coro)
When the broker disappears, the connection times out. The message is the same with auto-reconnect true or false.
error message: (.env) ➜ mqtt /Users/fredrick867/projects/viewer/.env/bin/python /Users/fredrick867/projects/viewer/testclient.py [2022-12-19 20:54:11,495] {core.py:136} INFO - Finished processing state new exit callbacks. [2022-12-19 20:54:11,495] {core.py:130} INFO - Finished processing state connected enter callbacks. [2022-12-19 21:09:47,326] {handler.py:525} WARNING - ClientProtocolHandler Unhandled exception in reader coro Traceback (most recent call last): File "/usr/local/Cellar/python@3.10/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/selector_events.py", line 862, in _read_ready__data_received data = self._sock.recv(self.max_size) TimeoutError: [Errno 60] Operation timed out
The above exception was the direct cause of the following exception:
Traceback (most recent call last): File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/moat/mqtt/mqtt/protocol/handler.py", line 469, in _reader_loop fixed_header = await MQTTFixedHeader.from_stream(self.stream) File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/moat/mqtt/mqtt/packet.py", line 105, in from_stream int1 = (await read_or_raise(reader, 1))[0] File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/moat/mqtt/codecs.py", line 61, in read_or_raise data = await reader.read(n) File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/moat/mqtt/adapters.py", line 129, in read data = await self._rstream.receive_exactly(n) File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/anyio/streams/buffered.py", line 72, in receive_exactly chunk = await self.receive_stream.receive(remaining) File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 1274, in receive raise self._protocol.exception anyio.BrokenResourceError [2022-12-19 21:09:47,337] {core.py:136} INFO - Finished processing state connected exit callbacks. [2022-12-19 21:09:47,337] {core.py:130} INFO - Finished processing state disconnected enter callbacks. Traceback (most recent call last): File "/usr/local/Cellar/python@3.10/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/selector_events.py", line 862, in _read_ready__data_received data = self._sock.recv(self.max_size) TimeoutError: [Errno 60] Operation timed out
The above exception was the direct cause of the following exception:
Traceback (most recent call last): File "/Users/fredrick867/projects/viewer/testclient.py", line 38, in
anyio.run(uptime_coro)
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, *backend_options)
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/usr/local/Cellar/python@3.10/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/local/Cellar/python@3.10/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
return future.result()
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(args)
File "/Users/fredrick867/projects/viewer/testclient.py", line 24, in uptime_coro
async with open_mqttclient(config=config) as C:
File "/usr/local/Cellar/python@3.10/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/contextlib.py", line 217, in aexit
await self.gen.athrow(typ, value, traceback)
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/moat/mqtt/client.py", line 158, in open_mqttclient
async with anyio.create_task_group() as tg:
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 662, in aexit
raise exceptions[0]
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/moat/mqtt/mqtt/protocol/handler.py", line 469, in _reader_loop
fixed_header = await MQTTFixedHeader.from_stream(self.stream)
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/moat/mqtt/mqtt/packet.py", line 105, in from_stream
int1 = (await read_or_raise(reader, 1))[0]
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/moat/mqtt/codecs.py", line 61, in read_or_raise
data = await reader.read(n)
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/moat/mqtt/adapters.py", line 129, in read
data = await self._rstream.receive_exactly(n)
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/anyio/streams/buffered.py", line 72, in receive_exactly
chunk = await self.receive_stream.receive(remaining)
File "/Users/fredrick867/projects/viewer/.env/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 1274, in receive
raise self._protocol.exception
anyio.BrokenResourceError