python / cpython

The Python programming language
https://www.python.org
Other
62.33k stars 29.94k forks source link

Asyncio loop.sock_sendall() fails on Windows when sockets are shared across threads #122240

Open NoahStapp opened 1 month ago

NoahStapp commented 1 month ago

Bug report

Bug description:

asyncio's loop.sock_sendall() method causes a Windows OSError: [WinError 87] The parameter is incorrect error when using a socket that was created in a different thread. This error only appears when using the default ProactorEventLoop on Windows.

Minimum reproducible example:

import asyncio
import threading
import socket

socks = []

async def create_socket():
    s = socket.socket()
    s.connect(("www.python.org", 80))
    s.settimeout(0.0)
    loop = asyncio.get_event_loop()
    print(f"{threading.current_thread().name}: {await asyncio.wait_for(loop.sock_sendall(s, bytes('hello', 'utf-8')), timeout=5)}")

    socks.append(s)

async def use_socket():
    while len(socks) < 1:
        pass
    s = socks.pop()
    loop = asyncio.get_event_loop()
    print(f"{threading.current_thread().name}: {await asyncio.wait_for(loop.sock_sendall(s, bytes('hello', 'utf-8')), timeout=5)}")

def wrapper(func):
    asyncio.run(func())

t1 = threading.Thread(target=wrapper, args=(create_socket,))
t2 = threading.Thread(target=wrapper, args=(use_socket,))

t1.start()
t2.start()

Error stacktrace:

  File "C:\Python312\Lib\threading.py", line 1073, in _bootstrap_inner
    self.run()
  File "C:\Python312\Lib\threading.py", line 1010, in run
    self._target(*self._args, **self._kwargs)
  File "C:\cygwin\home\Administrator\mongo-python-driver\windows_test.py", line 27, in wrapper
    asyncio.run(func())
  File "C:\Python312\Lib\asyncio\runners.py", line 194, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "C:\Python312\Lib\asyncio\runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Python312\Lib\asyncio\base_events.py", line 687, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "C:\cygwin\home\Administrator\mongo-python-driver\windows_test.py", line 23, in use_socket
    print(f"{threading.current_thread().name}: {await asyncio.wait_for(loop.sock_sendall(s, bytes('hello
', 'utf-8')), timeout=5)}")
                                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Python312\Lib\asyncio\tasks.py", line 520, in wait_for
    return await fut
           ^^^^^^^^^
  File "C:\Python312\Lib\asyncio\proactor_events.py", line 721, in sock_sendall
    return await self._proactor.send(sock, data)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Python312\Lib\asyncio\windows_events.py", line 539, in send
    self._register_with_iocp(conn)
  File "C:\Python312\Lib\asyncio\windows_events.py", line 709, in _register_with_iocp
    _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)
OSError: [WinError 87] The parameter is incorrect

CPython versions tested on:

3.8, 3.11, 3.12

Operating systems tested on:

Linux, macOS, Windows

graingert commented 1 month ago

fyi this is reproducible when running on the same thread:

import asyncio
import threading
import socket

socks = []

async def create_socket():
    s = socket.socket()
    s.connect(("www.python.org", 80))
    s.settimeout(0.0)
    loop = asyncio.get_event_loop()
    print(f"{threading.current_thread().name}: {await asyncio.wait_for(loop.sock_sendall(s, bytes('hello', 'utf-8')), timeout=5)}")

    socks.append(s)

async def use_socket():
    s = socks.pop()
    loop = asyncio.get_event_loop()
    print(f"{threading.current_thread().name}: {await asyncio.wait_for(loop.sock_sendall(s, bytes('hello', 'utf-8')), timeout=5)}")

asyncio.run(create_socket())
asyncio.run(use_socket())
graingert commented 1 month ago

a smaller reproducer, that doesn't depend on an external server:

import asyncio
import sys
import socket

async def sock_sendall(sock, data):
    return await asyncio.get_running_loop().sock_sendall(sock, data)

def main():
    s1, s2 = socket.socketpair()
    with s1, s2:
        s1.setblocking(False)
        asyncio.run(sock_sendall(s1, b"\x00"))
        asyncio.run(sock_sendall(s1, b"\x00"))

if __name__ == "__main__":
    sys.exit(main())
ShaneHarvey commented 1 month ago

It looks like the problem is that when a ProactorEventLoop first encounters a new file it will cache it and call CreateIoCompletionPort:

        # To get notifications of finished ops on this objects sent to the
        # completion port, were must register the handle.
        if obj not in self._registered:
            self._registered.add(obj)
            _overlapped.CreateIoCompletionPort(obj.fileno(), self._iocp, 0, 0)

https://github.com/python/cpython/blob/v3.13.0b4/Lib/asyncio/windows_events.py#L706-L710

Once registered, Windows will not allow CreateIoCompletionPort to be called again with another completion port. For example, here's an even smaller example which repros the The parameter is incorrect error:

import _overlapped
import _winapi
import sys
import socket

def main():
    s1, s2 = socket.socketpair()
    with s1, s2:
        s1.setblocking(False)
        iocp1 = _overlapped.CreateIoCompletionPort(_overlapped.INVALID_HANDLE_VALUE, _winapi.NULL, 0, _winapi.INFINITE)
        _overlapped.CreateIoCompletionPort(s1.fileno(), iocp1, 0, 0)
        iocp2 = _overlapped.CreateIoCompletionPort(_overlapped.INVALID_HANDLE_VALUE, _winapi.NULL, 0, _winapi.INFINITE)
        _overlapped.CreateIoCompletionPort(s1.fileno(), iocp2, 0, 0)

if __name__ == "__main__":
    sys.exit(main())

The only way to fix this would be to clear the Windows (and ProactorEventLoop) state. Starting in Windows 8.1 it's possible to remove a completion port from a file via FileReplaceCompletionInformation:

import _overlapped
import _winapi
import sys
import socket
import ctypes
from ctypes.wintypes import HANDLE

# See: https://learn.microsoft.com/en-us/windows-hardware/drivers/ddi/ntifs/nf-ntifs-ntsetinformationfile
FileReplaceCompletionInformation = 61

# See: https://learn.microsoft.com/en-us/windows-hardware/drivers/ddi/ntifs/ns-ntifs-_file_completion_information
class FileCompletionInformation(ctypes.Structure):
    _fields_ = [
        ("Port", HANDLE),
        ("Key", ctypes.c_void_p),
    ]

# See: https://learn.microsoft.com/en-us/windows-hardware/drivers/ddi/wdm/ns-wdm-_io_status_block
class PointerOrStatus(ctypes.Union):
    _fields_ = [("Status", ctypes.c_long),
                ("Pointer", ctypes.c_void_p)]

class IoStatusBlock(ctypes.Structure):
    _anonymous_ = ("u",)
    _fields_ = [
        ("u", PointerOrStatus),
        ("Information", ctypes.c_void_p),
    ]

def clear_iocp(file):
    # Clear association, FileReplaceCompletionInformation was added in Windows 8.1:
    # __kernel_entry NTSYSCALLAPI NTSTATUS NtSetInformationFile(
    #   [in]  HANDLE                 FileHandle,
    #   [out] PIO_STATUS_BLOCK       IoStatusBlock,
    #   [in]  PVOID                  FileInformation,
    #   [in]  ULONG                  Length,
    #   [in]  FILE_INFORMATION_CLASS FileInformationClass
    # );
    file_info = FileCompletionInformation(None, None)
    out = IoStatusBlock()
    ctypes.windll.ntdll.NtSetInformationFile(file.fileno(), ctypes.byref(out), file_info, ctypes.sizeof(file_info), FileReplaceCompletionInformation)

def main():
    s1, s2 = socket.socketpair()
    with s1, s2:
        s1.setblocking(False)
        iocp1 = _overlapped.CreateIoCompletionPort(_overlapped.INVALID_HANDLE_VALUE, _winapi.NULL, 0, _winapi.INFINITE)
        _overlapped.CreateIoCompletionPort(s1.fileno(), iocp1, 0, 0)
        clear_iocp(s1)
        iocp2 = _overlapped.CreateIoCompletionPort(_overlapped.INVALID_HANDLE_VALUE, _winapi.NULL, 0, _winapi.INFINITE)
        _overlapped.CreateIoCompletionPort(s1.fileno(), iocp2, 0, 0)

if __name__ == "__main__":
    sys.exit(main())

How to integrate this into the asyncio API is an interesting problem.

ShaneHarvey commented 1 month ago

Putting it all together, this works around the error:

# clear_iocp et al. from above 

async def sock_sendall(sock, data):
    return await asyncio.get_running_loop().sock_sendall(sock, data)

def main():
    s1, s2 = socket.socketpair()
    with s1, s2:
        s1.setblocking(False)
        loop = asyncio.ProactorEventLoop()
        asyncio.set_event_loop(loop)
        asyncio.run(sock_sendall(s1, b"\x00"))

        # Discard state:
        loop._proactor._registered.discard(s1)
        clear_iocp(s1)

        asyncio.run(sock_sendall(s1, b"\x00"))

if __name__ == "__main__":
    sys.exit(main())