M-o-a-T / moat-mqtt

An async MQTT broker and client, plus DistKV integration
MIT License
21 stars 9 forks source link

MQTTClient doesn't close MQTTS (TLS) connections cleanly #7

Open mikenerone opened 4 years ago

mikenerone commented 4 years ago

Reproduction script (obviously, I had a broker set up listening on both of the URLs in this):

#! /usr/bin/env python3

import logging
import sys

import anyio
from distmqtt.client import MQTTClient

secure = 'secure' in sys.argv
backend = 'trio' if 'trio' in sys.argv else 'asyncio'

if secure:
    uri = 'mqtts://127.0.0.1:2883'
    config = dict(
        check_hostname=False,
        broker=dict(
            # Side note: Had to make a CA because there's no way to turn off verification
            cafile='ca-bundle.crt',
        )
    )
else:
    uri = 'mqtt://127.0.0.1:1883'
    config = {}

logging.basicConfig(level='DEBUG')
logging.getLogger().setLevel(logging.DEBUG)
logger = logging.getLogger(__name__)

async def main():
    async with anyio.create_task_group() as tg:
        client = MQTTClient(tg, config=config)
        await client.connect(uri)
        logger.warning('Connected')
        await client.disconnect()
        logger.warning('Disconnected')

anyio.run(main, backend=backend)

Running this without "secure" on the command line works fine, regardless of backend

DEBUG:asyncio:Using selector: KqueueSelector
DEBUG:distmqtt.client:Using generated client ID : distmqtt-kvgxupwiq{fvwict
DEBUG:distmqtt.client.plugins:Loading plugins for namespace distmqtt.client.plugins
DEBUG:distmqtt.client:Connect to: mqtt://127.0.0.1:1883
DEBUG:distmqtt.mqtt.protocol.handler:< C ConnackPacket(ts=2020-08-07 12:15:24.627961, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=ConnackVariableHeader(session_parent=0x0, return_code=0x0), payload=None)
DEBUG:distmqtt.mqtt.protocol.handler:distmqtt-kvgxupwiq{fvwict Starting reader coro
DEBUG:distmqtt.mqtt.protocol.handler:Handler tasks started
DEBUG:distmqtt.mqtt.protocol.handler:Begin messages delivery retries
DEBUG:distmqtt.mqtt.protocol.handler:0 messages redelivered
DEBUG:distmqtt.mqtt.protocol.handler:0 messages not redelivered due to timeout
DEBUG:distmqtt.mqtt.protocol.handler:End messages delivery retries
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-kvgxupwiq{fvwict ready
DEBUG:transitions.core:Executed machine preparation callbacks before conditions.
DEBUG:transitions.core:Initiating transition from state new to state connected...
DEBUG:transitions.core:Executed callbacks before conditions.
DEBUG:transitions.core:Executed callback before transition.
DEBUG:transitions.core:Exiting state new. Processing callbacks...
INFO:transitions.core:Exited state new
DEBUG:transitions.core:Entering state connected. Processing callbacks...
INFO:transitions.core:Entered state connected
DEBUG:transitions.core:Executed callback after transition.
DEBUG:transitions.core:Executed machine finalize callbacks
DEBUG:distmqtt.client:connected to 127.0.0.1:1883
DEBUG:distmqtt.client:Wait for broker disconnection
WARNING:__main__:Connected
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-kvgxupwiq{fvwict stopping
DEBUG:distmqtt.mqtt.protocol.handler:waiting for reader <anyio._backends._asyncio.TaskGroup object at 0x102e958c0> to be stopped
WARNING:distmqtt.client:Disconnected from broker
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-kvgxupwiq{fvwict stopping
DEBUG:distmqtt.mqtt.protocol.handler:waiting for sender <anyio._backends._asyncio.CancelScope object at 0x102e5f7b0> to be stopped
WARNING:distmqtt.mqtt.protocol.handler:ClientProtocolHandler CANCEL
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-kvgxupwiq{fvwict coro stopped
DEBUG:distmqtt.mqtt.protocol.handler:closing writer
DEBUG:distmqtt.mqtt.protocol.handler:closed writer
DEBUG:transitions.core:Executed machine preparation callbacks before conditions.
DEBUG:transitions.core:Initiating transition from state connected to state disconnected...
DEBUG:transitions.core:Executed callbacks before conditions.
DEBUG:transitions.core:Executed callback before transition.
DEBUG:transitions.core:Exiting state connected. Processing callbacks...
INFO:transitions.core:Exited state connected
DEBUG:transitions.core:Entering state disconnected. Processing callbacks...
INFO:transitions.core:Entered state disconnected
DEBUG:transitions.core:Executed callback after transition.
DEBUG:transitions.core:Executed machine finalize callbacks
WARNING:__main__:Disconnected
DEBUG:distmqtt.mqtt.protocol.handler:closing writer
DEBUG:distmqtt.mqtt.protocol.handler:closed writer
DEBUG:transitions.core:Executed machine preparation callbacks before conditions.
DEBUG:transitions.core:Initiating transition from state disconnected to state disconnected...
DEBUG:transitions.core:Executed callbacks before conditions.
DEBUG:transitions.core:Executed callback before transition.
DEBUG:transitions.core:Exiting state disconnected. Processing callbacks...
INFO:transitions.core:Exited state disconnected
DEBUG:transitions.core:Entering state disconnected. Processing callbacks...
INFO:transitions.core:Entered state disconnected
DEBUG:transitions.core:Executed callback after transition.
DEBUG:transitions.core:Executed machine finalize callbacks

But running it with secure on the command line (enabling MQTTS and using asyncio) crashes the app with a stack trace:

DEBUG:asyncio:Using selector: KqueueSelector
DEBUG:distmqtt.client:Using generated client ID : distmqtt-elgwutjmmdeiv{fq
DEBUG:distmqtt.client.plugins:Loading plugins for namespace distmqtt.client.plugins
DEBUG:distmqtt.client:Connect to: mqtts://127.0.0.1:2883
DEBUG:distmqtt.mqtt.protocol.handler:< C ConnackPacket(ts=2020-08-07 12:17:18.235579, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=ConnackVariableHeader(session_parent=0x0, return_code=0x0), payload=None)
DEBUG:distmqtt.mqtt.protocol.handler:distmqtt-elgwutjmmdeiv{fq Starting reader coro
DEBUG:distmqtt.mqtt.protocol.handler:Handler tasks started
DEBUG:distmqtt.mqtt.protocol.handler:Begin messages delivery retries
DEBUG:distmqtt.mqtt.protocol.handler:0 messages redelivered
DEBUG:distmqtt.mqtt.protocol.handler:0 messages not redelivered due to timeout
DEBUG:distmqtt.mqtt.protocol.handler:End messages delivery retries
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-elgwutjmmdeiv{fq ready
DEBUG:transitions.core:Executed machine preparation callbacks before conditions.
DEBUG:transitions.core:Initiating transition from state new to state connected...
DEBUG:transitions.core:Executed callbacks before conditions.
DEBUG:transitions.core:Executed callback before transition.
DEBUG:transitions.core:Exiting state new. Processing callbacks...
INFO:transitions.core:Exited state new
DEBUG:transitions.core:Entering state connected. Processing callbacks...
INFO:transitions.core:Entered state connected
DEBUG:transitions.core:Executed callback after transition.
DEBUG:transitions.core:Executed machine finalize callbacks
DEBUG:distmqtt.client:connected to 127.0.0.1:2883
DEBUG:distmqtt.client:Wait for broker disconnection
WARNING:__main__:Connected
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-elgwutjmmdeiv{fq stopping
DEBUG:distmqtt.mqtt.protocol.handler:waiting for reader <anyio._backends._asyncio.TaskGroup object at 0x10ffeb800> to be stopped
WARNING:distmqtt.client:Disconnected from broker
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-elgwutjmmdeiv{fq stopping
DEBUG:distmqtt.mqtt.protocol.handler:waiting for sender <anyio._backends._asyncio.CancelScope object at 0x10ffcac10> to be stopped
WARNING:distmqtt.mqtt.protocol.handler:ClientProtocolHandler CANCEL
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-elgwutjmmdeiv{fq coro stopped
DEBUG:distmqtt.mqtt.protocol.handler:closing writer
DEBUG:distmqtt.mqtt.protocol.handler:closing writer
Traceback (most recent call last):
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 185, in unwrap_tls
    self._raw_socket = cast(ssl.SSLSocket, self._raw_socket).unwrap()
  File "/Users/mikenerone/.pyenv/versions/3.8.5/lib/python3.8/ssl.py", line 1285, in unwrap
    s = self._sslobj.shutdown()
ssl.SSLWantReadError: The operation did not complete (read) (_ssl.c:2745)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "./closetest.py", line 34, in main
    await client.disconnect()
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/client.py", line 266, in disconnect
    await self._handler.stop()
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/client_handler.py", line 31, in stop
    await super().stop()
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/handler.py", line 175, in stop
    await self.stream.close()
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/adapters.py", line 123, in close
    await self._stream.close()
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 216, in close
    await self._socket.unwrap_tls()
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 188, in unwrap_tls
    await self._wait_readable()
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 633, in wait_socket_readable
    raise ClosedResourceError
anyio.exceptions.ClosedResourceError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "./closetest.py", line 37, in <module>
    anyio.run(main)
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/__init__.py", line 72, in run
    return asynclib.run(func, *args, **backend_options)  # type: ignore
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 114, in run
    raise exception
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 76, in wrapper
    retval = await func(*args)
  File "./closetest.py", line 35, in main
    logger.warning('Disconnected')
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 390, in __aexit__
    raise ExceptionGroup(exceptions)
anyio._backends._asyncio.ExceptionGroup: 2 exceptions were raised in the task group:
----------------------------
Traceback (most recent call last):

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 415, in _run_wrapped_task
    await func(*args)

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/client.py", line 724, in handle_connection_close
    await self._handler.stop()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/client_handler.py", line 31, in stop
    await super().stop()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/handler.py", line 175, in stop
    await self.stream.close()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/adapters.py", line 123, in close
    await self._stream.close()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 216, in close
    await self._socket.unwrap_tls()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 188, in unwrap_tls
    await self._wait_readable()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 618, in wait_socket_readable
    raise ResourceBusyError('reading from') from None

anyio.exceptions.ResourceBusyError: Another task is already reading from this resource
----------------------------
Traceback (most recent call last):

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 185, in unwrap_tls
    self._raw_socket = cast(ssl.SSLSocket, self._raw_socket).unwrap()

  File "/Users/mikenerone/.pyenv/versions/3.8.5/lib/python3.8/ssl.py", line 1285, in unwrap
    s = self._sslobj.shutdown()

ssl.SSLWantReadError: The operation did not complete (read) (_ssl.c:2745)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):

  File "./closetest.py", line 34, in main
    await client.disconnect()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/client.py", line 266, in disconnect
    await self._handler.stop()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/client_handler.py", line 31, in stop
    await super().stop()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/handler.py", line 175, in stop
    await self.stream.close()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/adapters.py", line 123, in close
    await self._stream.close()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 216, in close
    await self._socket.unwrap_tls()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 188, in unwrap_tls
    await self._wait_readable()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 633, in wait_socket_readable
    raise ClosedResourceError

anyio.exceptions.ClosedResourceError

Running it with "secure" and "trio" on the command line (enabling MQTSS and trio) also crashes with a stack trace, but it's different:

DEBUG:distmqtt.client:Using generated client ID : distmqtt-ai{epvdnqr{uqppo
DEBUG:distmqtt.client.plugins:Loading plugins for namespace distmqtt.client.plugins
DEBUG:distmqtt.client:Connect to: mqtts://127.0.0.1:2883
DEBUG:distmqtt.mqtt.protocol.handler:< C ConnackPacket(ts=2020-08-07 12:25:17.745781, fixed=MQTTFixedHeader(length=2, flags=0x0), variable=ConnackVariableHeader(session_parent=0x0, return_code=0x0), payload=None)
DEBUG:distmqtt.mqtt.protocol.handler:distmqtt-ai{epvdnqr{uqppo Starting reader coro
DEBUG:distmqtt.mqtt.protocol.handler:Handler tasks started
DEBUG:distmqtt.mqtt.protocol.handler:Begin messages delivery retries
DEBUG:distmqtt.mqtt.protocol.handler:0 messages redelivered
DEBUG:distmqtt.mqtt.protocol.handler:0 messages not redelivered due to timeout
DEBUG:distmqtt.mqtt.protocol.handler:End messages delivery retries
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-ai{epvdnqr{uqppo ready
DEBUG:transitions.core:Executed machine preparation callbacks before conditions.
DEBUG:transitions.core:Initiating transition from state new to state connected...
DEBUG:transitions.core:Executed callbacks before conditions.
DEBUG:transitions.core:Executed callback before transition.
DEBUG:transitions.core:Exiting state new. Processing callbacks...
INFO:transitions.core:Exited state new
DEBUG:transitions.core:Entering state connected. Processing callbacks...
INFO:transitions.core:Entered state connected
DEBUG:transitions.core:Executed callback after transition.
DEBUG:transitions.core:Executed machine finalize callbacks
DEBUG:distmqtt.client:connected to 127.0.0.1:2883
DEBUG:distmqtt.client:Wait for broker disconnection
WARNING:__main__:Connected
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-ai{epvdnqr{uqppo stopping
DEBUG:distmqtt.mqtt.protocol.handler:waiting for reader <anyio._backends._trio.TaskGroup object at 0x106fc3240> to be stopped
WARNING:distmqtt.client:Disconnected from broker
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-ai{epvdnqr{uqppo stopping
DEBUG:distmqtt.mqtt.protocol.handler:waiting for sender <anyio._backends._trio.CancelScope object at 0x106f91c40> to be stopped
WARNING:distmqtt.mqtt.protocol.handler:ClientProtocolHandler CANCEL
DEBUG:distmqtt.mqtt.protocol.handler:Client distmqtt-ai{epvdnqr{uqppo coro stopped
DEBUG:distmqtt.mqtt.protocol.handler:closing writer
DEBUG:distmqtt.mqtt.protocol.handler:closing writer
Traceback (most recent call last):
  File "./closetest.py", line 38, in <module>
    anyio.run(main, backend=backend)
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/__init__.py", line 72, in run
    return asynclib.run(func, *args, **backend_options)  # type: ignore
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_run.py", line 1896, in run
    raise runner.main_task_outcome.error
  File "./closetest.py", line 36, in main
    logger.warning('Disconnected')
  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_trio.py", line 139, in __aexit__
    raise ExceptionGroup(exc.exceptions) from None
anyio._backends._trio.ExceptionGroup: 2 exceptions were raised in the task group:
----------------------------
Traceback (most recent call last):

  File "./closetest.py", line 35, in main
    await client.disconnect()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/client.py", line 266, in disconnect
    await self._handler.stop()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/client_handler.py", line 31, in stop
    await super().stop()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/handler.py", line 175, in stop
    await self.stream.close()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/adapters.py", line 123, in close
    await self._stream.close()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 216, in close
    await self._socket.unwrap_tls()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 188, in unwrap_tls
    await self._wait_readable()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_trio.py", line 213, in wait_socket_readable
    raise ResourceBusyError('reading from') from None

anyio.exceptions.ResourceBusyError: Another task is already reading from this resource
----------------------------
Traceback (most recent call last):

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/client.py", line 724, in handle_connection_close
    await self._handler.stop()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/client_handler.py", line 31, in stop
    await super().stop()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/handler.py", line 175, in stop
    await self.stream.close()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/adapters.py", line 123, in close
    await self._stream.close()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 216, in close
    await self._socket.unwrap_tls()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 188, in unwrap_tls
    await self._wait_readable()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_trio.py", line 211, in wait_socket_readable
    raise ClosedResourceError().with_traceback(exc.__traceback__) from None

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_trio.py", line 209, in wait_socket_readable
    await wait_readable(sock)

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_generated_io_kqueue.py", line 37, in wait_readable
    return await GLOBAL_RUN_CONTEXT.runner.io_manager.wait_readable(fd)

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_io_kqueue.py", line 162, in wait_readable
    await self._wait_common(fd, select.KQ_FILTER_READ)

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_io_kqueue.py", line 158, in _wait_common
    await self.wait_kevent(fd, filter, abort)

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_io_kqueue.py", line 127, in wait_kevent
    return await _core.wait_task_rescheduled(abort)

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_traps.py", line 166, in wait_task_rescheduled
    return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap()

  File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/outcome/_sync.py", line 111, in unwrap
    raise captured_error

anyio.exceptions.ClosedResourceError

Details of embedded exception 1:

  Traceback (most recent call last):
    File "./closetest.py", line 35, in main
      await client.disconnect()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/client.py", line 266, in disconnect
      await self._handler.stop()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/client_handler.py", line 31, in stop
      await super().stop()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/handler.py", line 175, in stop
      await self.stream.close()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/adapters.py", line 123, in close
      await self._stream.close()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 216, in close
      await self._socket.unwrap_tls()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 188, in unwrap_tls
      await self._wait_readable()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_trio.py", line 213, in wait_socket_readable
      raise ResourceBusyError('reading from') from None
  anyio.exceptions.ResourceBusyError: Another task is already reading from this resource

Details of embedded exception 2:

  Traceback (most recent call last):
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/client.py", line 724, in handle_connection_close
      await self._handler.stop()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/client_handler.py", line 31, in stop
      await super().stop()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/mqtt/protocol/handler.py", line 175, in stop
      await self.stream.close()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/distmqtt/adapters.py", line 123, in close
      await self._stream.close()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 216, in close
      await self._socket.unwrap_tls()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_networking.py", line 188, in unwrap_tls
      await self._wait_readable()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_trio.py", line 211, in wait_socket_readable
      raise ClosedResourceError().with_traceback(exc.__traceback__) from None
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/anyio/_backends/_trio.py", line 209, in wait_socket_readable
      await wait_readable(sock)
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_generated_io_kqueue.py", line 37, in wait_readable
      return await GLOBAL_RUN_CONTEXT.runner.io_manager.wait_readable(fd)
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_io_kqueue.py", line 162, in wait_readable
      await self._wait_common(fd, select.KQ_FILTER_READ)
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_io_kqueue.py", line 158, in _wait_common
      await self.wait_kevent(fd, filter, abort)
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_io_kqueue.py", line 127, in wait_kevent
      return await _core.wait_task_rescheduled(abort)
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/trio/_core/_traps.py", line 166, in wait_task_rescheduled
      return (await _async_yield(WaitTaskRescheduled(abort_func))).unwrap()
    File "/Users/mikenerone/Library/Caches/pypoetry/virtualenvs/aiodxlclient-sf0sGL74-py3.8/lib/python3.8/site-packages/outcome/_sync.py", line 111, in unwrap
      raise captured_error
  anyio.exceptions.ClosedResourceError
mikenerone commented 4 years ago

Ok, the cause is that in MQTTClient.disconnect(), these two operation are effectively done in succession:

            await self._handler.mqtt_disconnect()
            ...
            await self._handler.stop()

The first call ends up queueing the disconnect packet for sending, but that is not actually complete when the stop() ends up killing the handlers sender task. Here's a terrible workaround that actually works:

--- a/distmqtt/client.py
+++ b/distmqtt/client.py
@@ -262,6 +262,7 @@ class MQTTClient:
             if self._disconnect_task is not None:
                 await self._disconnect_task.cancel()
             await self._handler.mqtt_disconnect()
+            await anyio.sleep(0.1)
             self._connected_state.clear()
             await self._handler.stop()
             self.session.transitions.disconnect()

But I think this needs a more proper fix than that that depends on a positive indicator of completion of sending the disconnect packet, but I'm not sure how you'd like to approach that (though I'm happy to help if you have a suggestion).

For now, out of necessity, I'm going to workaround the issue by monkey-patching ClientProtocolHandler.mqtt_disconnect() to add a short sleep before returning.