MagicStack / uvloop

Ultra fast asyncio event loop.
Apache License 2.0
10.41k stars 544 forks source link

pipes from subprocess_exec do not have extra info 'pipe' (incompatible with vanilla asyncio) #595

Open jensbjorgensen opened 8 months ago

jensbjorgensen commented 8 months ago

I can see in the source that when a pipe transport is created via loop.create_write_pipe(...) we have:

        transp = WriteUnixTransport.new(self, proto, None, waiter)
        transp._add_extra_info('pipe', pipe)

However no transp._add_extra_info(...) is done on the WriteUnixTransport that is created inside the UVProcessTransport code. There are ways to work around this (don't depend on getting access to that pipe) however this works fine in the asyncio loop implementation. I imagine it can be done without too much pain by wrapping the raw file descriptor created in the process transport code however I'm not super fluent in cython so I didn't attempt a patch.

jensbjorgensen commented 8 months ago
import asyncio
import typing
import uvloop

class Test(asyncio.SubprocessProtocol):
    def __init__(self, asyncio_impl):
        self._exited = asyncio.Event()
        self._loop = asyncio_impl.new_event_loop()
        self._task = self._loop.create_task(self.test_subprocess_pipe_extra_info())
        self._task.add_done_callback(self._task_done)
        self._ok = True

    def _task_done(self, task: asyncio.Task) -> None:
        if not task.cancelled() and task.exception():
            self._ok = False
            task.print_stack()
        self._loop.stop()

    def run(self) -> bool:
        self._loop.run_forever()
        return self._ok

    async def test_subprocess_pipe_extra_info(self) -> None:
        proc_trans, _ = await self._loop.subprocess_exec(lambda : self, 'cat', stdin=asyncio.subprocess.PIPE)
        proc_stdin = proc_trans.get_pipe_transport(0)
        pipe = proc_stdin.get_extra_info('pipe')
        pipe.write('hello, test'.encode())
        pipe.close()
        await self._exited.wait()
        self._ok = True

    def pipe_data_received(self, fd: int, data: bytes) -> None:
        print(f"pipe_data_received: {fd} {repr(data)}")

    def pipe_connection_lost(self, fd: int, exc: typing.Optional[Exception]) -> None:
        pass

    def process_exited(self):
        self._exited.set()

for impl_s in ('asyncio', 'uvloop'):
    if Test(globals()[impl_s]).run():
        print(f'{impl_s} works as expected')
    else:
        print(f'{impl_s} is broken it seems (see stack trace)')