python / cpython

The Python programming language
https://www.python.org
Other
63.51k stars 30.42k forks source link

asyncio.create_subprocess_exec does not respond properly to asyncio.CancelledError #103847

Open DarkArc opened 1 year ago

DarkArc commented 1 year ago

Bug report

asyncio programs that call proc = await asyncio.create_subprocess_exec but do not reach the call to await proc.communicate are not properly cancelled.

This can be observed in the following script (it may take a few runs to observe):

import asyncio
import functools
import signal

counter = 0

async def run_bash_sleep():
  global counter
  counter += 1
  local_counter = counter
  try:
    print(f"Started - {local_counter}")
    proc = await asyncio.create_subprocess_exec(
      'bash', '-c', 'sleep .001',
      stdout = asyncio.subprocess.PIPE,
      stderr = asyncio.subprocess.PIPE,
      start_new_session = True
    )

    print(f"Waiting - {local_counter}")
    stdout, stderr = await proc.communicate()
    print(f"Done - {local_counter}!")
  except asyncio.CancelledError:
    print(f"Canceled - {local_counter}!")

async def run_loop(loop):
  max_jobs = 8
  active_tasks = []
  while True:
    try:
      # Add jobs to the list of active jobs
      while len(active_tasks) < max_jobs:
        active_tasks.append(loop.create_task(run_bash_sleep()))

      # All tasks have finished, end the loop
      if len(active_tasks) == 0:
        break

      # Wait for a test to finish (or a 1 second timeout)
      done, pending = await asyncio.wait(
        active_tasks,
        timeout = 1,
        return_when = asyncio.FIRST_COMPLETED
      )

      print(f"Running jobs: {len(active_tasks)}")

      # Update the active jobs
      active_tasks = list(pending)
    except asyncio.CancelledError:
      max_jobs = 0

def stop_asyncio_loop(signame, loop):
  for task in asyncio.all_tasks(loop):
    task.cancel()

def main():
  loop = asyncio.new_event_loop()

  asyncio.set_event_loop(loop)

  for signame in {'SIGINT', 'SIGTERM'}:
    loop.add_signal_handler(
      getattr(signal, signame),
      functools.partial(stop_asyncio_loop, signame, loop)
    )

  loop.run_until_complete(loop.create_task(run_loop(loop)))

main()

When the signal handler cancels the tasks, any task that hasn't made it to await proc.communicate() will never complete.

A subsequent SIGTERM to the script can then actually terminate the task; however, I'd expect the first call to cancel() to disrupt the coroutine.

Your environment

DarkArc commented 1 year ago

Curiously this doesn't seem to affect Python 3.6.8 (running on 3.6 requires a change to the stop_asyncio_loop function, switching to asyncio.Task.all_tasks instead of asyncio.all_tasks).

Instead of an indefinite hang, a message is printed akin to Unknown child process pid 37535, will report returncode 255

gvanrossum commented 1 year ago

That looks like a nasty problem. Do you want to help by checking the logic of asyncio signals and subprocess creation? There might even be a simple fix.

DarkArc commented 1 year ago

I don't currently have the bandwidth personally or professionally at the moment unfortunately. :(

I took a quick look through the create_subprocess_exec logic before reporting the issue, and didn't see anything that jumped out to me, but that doesn't mean there's nothing there.

gvanrossum commented 1 year ago

I’m in a similar situation, so we‘ll have to leave this open for a while.

graingert commented 4 weeks ago

Copying my analysis from: https://github.com/python/cpython/issues/125502#issuecomment-2413536175

the problem occurs when asyncio.runners._cancel_all_tasks is run at an inopportune instant when connecting pipes:

This task gets cancelled:

https://github.com/python/cpython/blob/92af191a6a5f266b71373f5374ca0c9c522d62d9/Lib/asyncio/base_subprocess.py#L56

which means self._pending_calls is never run:

https://github.com/python/cpython/blob/92af191a6a5f266b71373f5374ca0c9c522d62d9/Lib/asyncio/base_subprocess.py#L199-L202

so when _try_finish appends self._call_connection_lost to self._pending_calls: https://github.com/python/cpython/blob/92af191a6a5f266b71373f5374ca0c9c522d62d9/Lib/asyncio/base_subprocess.py#L257

call_connection_lost is never called, which means self._exit_waiters are never woken: https://github.com/python/cpython/blob/92af191a6a5f266b71373f5374ca0c9c522d62d9/Lib/asyncio/base_subprocess.py#L259-L270

Here's a demo that hangs every time for me:

import sys
import inspect
import asyncio
from subprocess import PIPE

async def run_sleep():
    proc = await asyncio.create_subprocess_exec(
        "sleep",
        "0.002",
        stdout=PIPE,
    )
    await proc.communicate()

async def amain():
    loop = asyncio.get_running_loop()
    task = asyncio.current_task(loop)
    coro = task.get_coro()

    called_cancel = False

    def cancel_eventually():
        my_coro = coro
        while inspect.iscoroutine(my_coro.cr_await):
            my_coro = my_coro.cr_await
        if my_coro.cr_code is loop._make_subprocess_transport.__code__:
            print("_cancel_all_tasks")
            tasks = asyncio.all_tasks()
            for task in tasks:
                task.cancel()
        else:
            loop.call_soon(cancel_eventually)

    loop.call_soon(cancel_eventually)
    await run_sleep()

def main():
    asyncio.run(amain())

if __name__ == "__main__":
    sys.exit(main())
graingert commented 4 weeks ago

@kumaraditya303 I think the fix is to use:

self._waiter = waiter
self._task = task = self._loop.create_task(self._connect_pipes())
task.add_done_callback(self._wake_waiter_and_call_pending_calls_or_close)

Then in def _wake_waiter_and_call_pending_calls_or_close(self, task): ... check the exception/done/cancelled state of task and do the right thing

I've spent a little time fiddling around but can't get the tests to pass and not spray a bunch of errors!

graingert commented 3 weeks ago

Are these the same issue?

https://github.com/python/cpython/issues/115787 https://github.com/python/cpython/issues/105288

savannahostrowski commented 3 weeks ago

@graingert Happy to take a look at this if no one else has started!

kumaraditya303 commented 3 weeks ago

@kumaraditya303 I think the fix is to use:

I think the same although I don't think it's that simple, we possibly need to rework and audit whole cancelation of subprocess, I am pretty sure it is broken at more places than this.

1st1 commented 2 weeks ago

@graingert @kumaraditya303

I think the fix is to use: self._waiter = waiter self._task = task = self._loop.create_task(self._connect_pipes()) task.add_done_callback(self._wake_waiter_and_call_pending_calls_or_close)

I'm not so sure. This might fix the problem now, but IMO lead to code that's hard to maintain. If, say, connect_pipes task in time gets some other code that mustn't be cancelled and instead should be put into the callback.

In general, I think that separating logic between async/await code and done callbacks is a big antipattern.

Instead I'd think about shielding the connect_pipes/init code from cancellation, or handling it explicitly.