Open cjavad opened 4 months ago
The current "mitigation" i've implemented looks something like this:
def _ensure_internal_ssl_proto_state(self) -> bool:
valid = True
try:
if not self.socket or not self.socket._writer:
return valid
transport = self.socket._writer.transport
inner_transport = transport._ssl_protocol._transport
from asyncio.sslproto import _SSLProtocolTransport, _SSLProtocol
from asyncio.selector_events import _SelectorTransport
assert isinstance(transport, _SSLProtocolTransport)
assert isinstance(inner_transport, _SelectorTransport)
if inner_transport.is_closing() and not transport.is_closing():
self.logger.warning(
"The internal SSL protocol state is invalid, manually marking top level SSL Transport as closed.")
valid = False
self.socket._closed = True
transport._force_close(None)
except (ImportError, AssertionError, AttributeError):
pass
except Exception as e:
self.logger.warning("An exception occurred while ensuring the internal SSL protocol state", exc_info=e)
finally:
return valid
I am not sure what is intended when handling the error, it seems like it should propegate the issue with the connection_lost callback on the protocol, but that seems to not be the SSLProtocol class, and how the SSL transport should deal with a broken pipe for instance is beyond me at this time, hence the issue and not a pull request.
Hi Javad, thanks for the detailed bug report, and especially for providing a stand-alone repro. Unfortunately SSL support in asyncio is an esoteric area where I don't seem to have much luck finding experts. So it may be some time before we have a fix. If you come up with a PR, don't be shy, and ping me directly on the PR.
After some further investigation the code is technically sane, the connection_lost does end up setting the top level transport as closed when the next async context switch happens and it is allowed to run the connection_lost callback, simply adding a await asyncio.sleep(0)
before my last prints resolves the issue. Something in the end library application code must be interfering with this, or some other condition is required to be met preventing the connection_lost function from working as intended.
Here is another example, using the aiohttp
library directly, the difference maker between the issue here and it working as intended is the await asyncio.sleep(0)
import asyncio
import aiohttp
from aiohttp import web
from issue_118950 import server_ssl_context, client_ssl_context
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
print("Message received from client:", msg.data)
return ws
async def server(host, port):
with server_ssl_context(host) as context:
app = web.Application()
app.add_routes([web.get('/', websocket_handler)])
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, host, port, ssl_context=context)
await site.start()
async def client(host, port):
await asyncio.sleep(1) # Wait for server to start
with client_ssl_context() as context:
session = aiohttp.ClientSession()
async with session.ws_connect(f'wss://{host}:{port}', ssl_context=context) as ws:
while True:
await ws.send_str("Hello, server!")
# This line fixes the issue.
# await asyncio.sleep(0)
try:
ws._writer.transport._ssl_protocol._transport._sock.close() # Simulate a broken pipe
except AttributeError:
...
async def main(conn=('localhost', 8765)):
await asyncio.gather(server(*conn), client(*conn))
if __name__ == '__main__':
asyncio.run(main())
I can see a similar approach is used in asyncio/streams.py
with a comment:
# Wait for protocol.connection_lost() call
# Raise connection closing error if any,
# ConnectionResetError otherwise
# Yield to the event loop so connection_lost() may be
# called. Without this, _drain_helper() would return
# immediately, and code that calls
# write(...); await drain()
# in a loop would never call connection_lost(), so it
# would not see an error when the socket is closed.
await sleep(0)
So something similar to this might be required. This logic is only triggered when the top transport is is_closing
so it would require it to already be marked as closed before it would wait and run the function that sets it to close. So either it would have to respect the inner transport in the SSL Transport, or this logic should also exist another place.
Can you try to check if this issue only exists in asyncio or in uvloop too?
@cjavad
Hi @kumaraditya303, i should be able to test it.
Have you seen the suggested PR, iirc you are on as a reviewer.
Hi again @kumaraditya303, a similar issue also exists in uvloop, where it does not correctly handle the underlying socket being closed, i've generalized my first example to work in all 3 cases, libc only.
import contextlib
import logging
import socket
import ssl
import subprocess
import tempfile
import uvloop
import asyncio
import ctypes
# Load the C standard library
libc = ctypes.CDLL("libc.so.6")
@contextlib.contextmanager
def server_ssl_context(host):
with tempfile.NamedTemporaryFile() as keyfile, tempfile.NamedTemporaryFile() as certfile:
subprocess.run([
'openssl', 'req', '-new', '-newkey', 'ec', '-pkeyopt', 'ec_paramgen_curve:prime256v1',
'-keyout', keyfile.name, '-nodes', '-x509', '-days', '365', '-subj', f'/CN={host}',
'-out', certfile.name,
], check=True, shell=False)
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain(certfile.name, keyfile.name)
yield context
@contextlib.contextmanager
def client_ssl_context():
try:
context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
yield context
finally:
pass
def patch_asyncio():
# Implements https://github.com/python/cpython/pull/118960
# Without needing to modify the source code of the library.
from asyncio.sslproto import _SSLProtocolTransport, SSLProtocol
def _is_transport_closing(self) -> bool:
return self._transport is not None and self._transport.is_closing()
SSLProtocol._is_transport_closing = _is_transport_closing
def is_closing(self) -> bool:
return self._closed or self._ssl_protocol._is_transport_closing()
_SSLProtocolTransport.is_closing = is_closing
async def client_handle(reader, writer):
...
async def main(host, port):
with server_ssl_context(host) as server_context, client_ssl_context() as client_context:
await asyncio.start_server(client_handle, host, port, ssl=server_context)
reader, writer = await asyncio.open_connection(host, port, ssl=client_context)
transport = writer._transport
socket = transport.get_extra_info('socket')
result = libc.close(socket.fileno())
print(f"{socket.fileno()=} {result=}")
print('[Client] Sending: %r' % 'Hello, world!')
while True:
writer.write('Hello, world!'.encode())
await writer.drain()
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
# Runs forever
uvloop.run(main('localhost', 8443), debug=True)
# Runs forever with logging discovering the issue
# asyncio.run(main('localhost', 8443), debug=True)
# Works as expected (Raises exception)
# patch_asyncio()
# asyncio.run(main('localhost', 8443), debug=True)
Assuming the previous file holds as issue_118950.py
we can also write the same test with aiohttp, and the same fix applies to all of it.
import logging
import asyncio
import uvloop
import aiohttp
from aiohttp import web
from issue_118950 import server_ssl_context, client_ssl_context, patch_asyncio, libc
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
print("Message received from client:", msg.data)
return ws
async def server(host, port):
with server_ssl_context(host) as context:
app = web.Application()
app.add_routes([web.get('/', websocket_handler)])
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, host, port, ssl_context=context)
await site.start()
async def client(host, port):
await asyncio.sleep(1) # Wait for server to start
with client_ssl_context() as context:
session = aiohttp.ClientSession()
async with session.ws_connect(f'wss://{host}:{port}', ssl_context=context) as ws:
# Simulate a broken pipe
fd = ws._writer.transport.get_extra_info('socket').fileno()
result = libc.close(fd)
print(f"{fd=} {result=}")
while True:
await ws.send_str("Hello, server!")
# Most likely same issue as this make
# the issue go away.
# await asyncio.sleep(0.0)
async def main(conn=('localhost', 8765)):
await asyncio.gather(server(*conn), client(*conn))
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
# Runs forever
uvloop.run(main(), debug=True)
# Runs forever with logging discovering the issue
# asyncio.run(main(), debug=True)
# Works as expected (Raises exception)
# patch_asyncio()
# asyncio.run(main(), debug=True)
cc @fantix @1st1
Bug report
Bug description:
TL;DR
_SSLProtocolTransport.is_closing
should match its inner_SelectorTransport.is_closing
, indicating to the user that the transport is actually closed instead of silently logging an error.Description
I've been using the aio-libs library
aiohttp
in production together with its WebSocket client implementation, and found an interesting issue that sometimes occured on certain devices (Specifically M-series macbooks).The logs i've been seeing looks something like this:
Digging deeper the issue occurs when the connection has been lost due to an exception when invoking
socket.send
, this normally will result in the Transportsis_closing()
function returningTrue
.The issue occurs when using TLS, which now uses the transport
_SSLProtocolTransport
which implements its ownis_closing
logic.When
_SocketSelectorTransport.write
gets an OSError such asBroken Pipe
(which is the issue i've experienced in the wild) it sets its inner transport state as closed but when a library such as aiohttp checks its transportis_closing
it returnsFalse
leading to it silently assuming that it is still connected.I've been able to recreate the flow by raising a different exception (by manually closing the socket) but the error source and flow is the same in both cases as far as i can tell.
Full example (out of the box + SSL cert generation)
CPython versions tested on:
3.12
Operating systems tested on:
Linux, macOS
Linked PRs