M-o-a-T / moat-mqtt

An async MQTT broker and client, plus DistKV integration
MIT License
21 stars 8 forks source link

Broker exits when subscriber disconnected #14

Open arthur-tacca opened 2 years ago

arthur-tacca commented 2 years ago

When a subscriber disconnects from the broker, the following error appears and the broker exits completely. This is distmqtt==0.35.2, anyio==3.6.1 running on Python 3.8 on Windows. I'm not sure which are expected exceptions and which are actual errors. Maybe it's that one right at the end: anyio.DeprecatedAwaitable seems to be an internal API, maybe it was exposed on an older version of anyio by accident and not recent versions?

Traceback (most recent call last):
  File "C:\Users\arthur\.conda\envs\py38\lib\asyncio\windows_events.py", line 457, in finish_recv
    return ov.getresult()
OSError: [WinError 64] The specified network name is no longer available

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\arthur\.conda\envs\py38\lib\asyncio\proactor_events.py", line 280, in _loop_reading
    data = fut.result()
  File "C:\Users\arthur\.conda\envs\py38\lib\asyncio\windows_events.py", line 812, in _poll
    value = callback(transferred, key, ov)
  File "C:\Users\arthur\.conda\envs\py38\lib\asyncio\windows_events.py", line 461, in finish_recv
    raise ConnectionResetError(*exc.args)
ConnectionResetError: [WinError 64] The specified network name is no longer available

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncscope\__init__.py", line 467, in main_scope
    yield s
  File "C:\Users\arthur\.conda\envs\py38\lib\contextlib.py", line 662, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "C:\Users\arthur\.conda\envs\py38\lib\contextlib.py", line 189, in __aexit__
    await self.gen.athrow(typ, value, traceback)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\distmqtt\broker.py", line 178, in create_broker
    tg.cancel_scope.cancel()
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\anyio\_backends\_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\distmqtt\mqtt\protocol\handler.py", line 472, in _reader_loop
    fixed_header = await MQTTFixedHeader.from_stream(self.stream)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\distmqtt\mqtt\packet.py", line 110, in from_stream
    int1 = (await read_or_raise(reader, 1))[0]
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\distmqtt\codecs.py", line 59, in read_or_raise
    data = await reader.read(n)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\distmqtt\adapters.py", line 128, in read
    data = await self._rstream.receive_exactly(n)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\anyio\streams\buffered.py", line 72, in receive_exactly
    chunk = await self.receive_stream.receive(remaining)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\anyio\_backends\_asyncio.py", line 1274, in receive
    raise self._protocol.exception
anyio.BrokenResourceError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncscope\__init__.py", line 207, in _ctx
    yield self
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncscope\__init__.py", line 437, in _ctx
    yield s
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncscope\__init__.py", line 470, in main_scope
    s.cancel()
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncscope\__init__.py", line 346, in cancel
    return anyio.DeprecatedAwaitable(self.cancel)
AttributeError: module 'anyio' has no attribute 'DeprecatedAwaitable'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\arthur\.conda\envs\py38\lib\runpy.py", line 194, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "C:\Users\arthur\.conda\envs\py38\lib\runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "C:\Users\arthur\.conda\envs\py38\Scripts\distmqtt.exe\__main__.py", line 7, in <module>
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncclick\core.py", line 1157, in __call__
    return anyio.run(self._main, main, args, kwargs, **opts)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\anyio\_core\_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\anyio\_backends\_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "C:\Users\arthur\.conda\envs\py38\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "C:\Users\arthur\.conda\envs\py38\lib\asyncio\base_events.py", line 616, in run_until_complete
    return future.result()
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\anyio\_backends\_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncclick\core.py", line 1160, in _main
    return await main(*args, **kwargs)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncclick\core.py", line 1076, in main
    rv = await self.invoke(ctx)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncclick\core.py", line 1434, in invoke
    return await ctx.invoke(self.callback, **ctx.params)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncclick\core.py", line 780, in invoke
    rv = await rv
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\distmqtt\scripts\broker_script.py", line 66, in main
    await anyio.sleep(99999)
  File "C:\Users\arthur\.conda\envs\py38\lib\contextlib.py", line 679, in __aexit__
    raise exc_details[1]
  File "C:\Users\arthur\.conda\envs\py38\lib\contextlib.py", line 662, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "C:\Users\arthur\.conda\envs\py38\lib\contextlib.py", line 189, in __aexit__
    await self.gen.athrow(typ, value, traceback)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncscope\__init__.py", line 470, in main_scope
    s.cancel()
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncscope\__init__.py", line 452, in __aexit__
    return await self._ctx_.__aexit__(*tb)  # pylint:disable=no-member  # YES IT HAS
  File "C:\Users\arthur\.conda\envs\py38\lib\contextlib.py", line 189, in __aexit__
    await self.gen.athrow(typ, value, traceback)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncscope\__init__.py", line 439, in _ctx
    del self._scopes[s._name]
  File "C:\Users\arthur\.conda\envs\py38\lib\contextlib.py", line 189, in __aexit__
    await self.gen.athrow(typ, value, traceback)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncscope\__init__.py", line 211, in _ctx
    await self.cancel_immediate()
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncscope\__init__.py", line 336, in cancel_immediate
    self.cancel()
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncscope\__init__.py", line 346, in cancel
    return anyio.DeprecatedAwaitable(self.cancel)
AttributeError: module 'anyio' has no attribute 'DeprecatedAwaitable'
arthur-tacca commented 2 years ago

Looks like that final exception is a bug in asyncscope which was fixed in a recent revision. But even manually installing that newer version, I still get an exception when the subscriber disconnects:

[2022-06-28 16:32:40,706] :: INFO - Finished processing state stopping exit callbacks.
[2022-06-28 16:32:40,706] :: INFO - Finished processing state stopped enter callbacks.
Traceback (most recent call last):
  File "C:\Users\arthur\.conda\envs\py38\lib\asyncio\windows_events.py", line 457, in finish_recv
    return ov.getresult()
OSError: [WinError 64] The specified network name is no longer available

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\arthur\.conda\envs\py38\lib\asyncio\proactor_events.py", line 280, in _loop_reading
    data = fut.result()
  File "C:\Users\arthur\.conda\envs\py38\lib\asyncio\windows_events.py", line 812, in _poll
    value = callback(transferred, key, ov)
  File "C:\Users\arthur\.conda\envs\py38\lib\asyncio\windows_events.py", line 461, in finish_recv
    raise ConnectionResetError(*exc.args)
ConnectionResetError: [WinError 64] The specified network name is no longer available

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Users\arthur\.conda\envs\py38\lib\runpy.py", line 194, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "C:\Users\arthur\.conda\envs\py38\lib\runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "C:\Users\arthur\.conda\envs\py38\Scripts\distmqtt.exe\__main__.py", line 7, in <module>
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncclick\core.py", line 1157, in __call__
    return anyio.run(self._main, main, args, kwargs, **opts)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\anyio\_core\_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\anyio\_backends\_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "C:\Users\arthur\.conda\envs\py38\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "C:\Users\arthur\.conda\envs\py38\lib\asyncio\base_events.py", line 616, in run_until_complete
    return future.result()
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\anyio\_backends\_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncclick\core.py", line 1160, in _main
    return await main(*args, **kwargs)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncclick\core.py", line 1076, in main
    rv = await self.invoke(ctx)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncclick\core.py", line 1434, in invoke
    return await ctx.invoke(self.callback, **ctx.params)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncclick\core.py", line 780, in invoke
    rv = await rv
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\distmqtt\scripts\broker_script.py", line 66, in main
    await anyio.sleep(99999)
  File "C:\Users\arthur\.conda\envs\py38\lib\contextlib.py", line 679, in __aexit__
    raise exc_details[1]
  File "C:\Users\arthur\.conda\envs\py38\lib\contextlib.py", line 189, in __aexit__
    await self.gen.athrow(typ, value, traceback)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\asyncscope\__init__.py", line 466, in main_scope
    yield s
  File "C:\Users\arthur\.conda\envs\py38\lib\contextlib.py", line 662, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "C:\Users\arthur\.conda\envs\py38\lib\contextlib.py", line 189, in __aexit__
    await self.gen.athrow(typ, value, traceback)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\distmqtt\broker.py", line 178, in create_broker
    tg.cancel_scope.cancel()
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\anyio\_backends\_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\distmqtt\mqtt\protocol\handler.py", line 472, in _reader_loop
    fixed_header = await MQTTFixedHeader.from_stream(self.stream)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\distmqtt\mqtt\packet.py", line 110, in from_stream
    int1 = (await read_or_raise(reader, 1))[0]
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\distmqtt\codecs.py", line 59, in read_or_raise
    data = await reader.read(n)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\distmqtt\adapters.py", line 128, in read
    data = await self._rstream.receive_exactly(n)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\anyio\streams\buffered.py", line 72, in receive_exactly
    chunk = await self.receive_stream.receive(remaining)
  File "C:\Users\arthur\.conda\envs\py38\lib\site-packages\anyio\_backends\_asyncio.py", line 1274, in receive
    raise self._protocol.exception
anyio.BrokenResourceError