kyuupichan / aiorpcX

Generic async RPC implementation, including JSON-RPC
MIT License
27 stars 23 forks source link

`ignore_after` cannot be cancelled sometimes (race?) #44

Open SomberNight opened 2 years ago

SomberNight commented 2 years ago

(re git tag 0.22.1)

Sometimes ignore_after ignores cancellation, which means if it is used in a task as part of a TaskGroup, the group's join will never finish. Consider the following example, which I would expect to print taskgroup exited. after <1 second:

import asyncio
from aiorpcx import TaskGroup, TaskTimeout, ignore_after

async def raise_exc():
    await asyncio.sleep(0.5)
    raise Exception("asd")

async def f():
    event = asyncio.Event()
    while True:
        # poll here; with the option of waking up if event is triggered:
        async with ignore_after(0.001):  # note: the lower the value, the higher chance race is triggered
            async with TaskGroup() as group:
                await group.spawn(event.wait())
        # (imagine doing some useful work here...)

async def main():
    async with TaskGroup() as group:
        await group.spawn(f)
        await group.spawn(raise_exc)
    print(f"taskgroup exited.")

asyncio.run(main())

The same example always finishes on aiorpcx 0.18.7. Note that increasing the timeout constant of ignore_after seems to decrease the chance the issue occurs, try running the example with e.g. 0.05 seconds instead - with that on my machine it prints taskgroup exited. around half the time.

SomberNight commented 2 years ago

I believe this is related to TimeoutAfter explicitly suppressing CancelledError, but I don't yet understand why timing seemingly matters. https://github.com/kyuupichan/aiorpcX/blob/e55950fed903b63e82b87c7b6ca93ab50e18ce08/aiorpcx/curio.py#L373-L377

SomberNight commented 2 years ago

Ok, so the issue is that TimeoutAfter internally uses task.cancel() (== CancelledError) to trigger the timeout, which is sometimes indistinguishable from a cancellation coming from the outside.

https://github.com/kyuupichan/aiorpcX/blob/e55950fed903b63e82b87c7b6ca93ab50e18ce08/aiorpcx/curio.py#L318-L323 https://github.com/kyuupichan/aiorpcX/blob/e55950fed903b63e82b87c7b6ca93ab50e18ce08/aiorpcx/curio.py#L373-L384 TimeoutAfter.__aexit__ is trying to distinguish where the cancellation is coming from (internal or external) based on the value of timed_out_deadline. The issue is that there can be a race where there is ~both an internal and an external cancellation around the same time, in which case __aexit__ will treat it as an internal cancellation (timeout) (due to line 378), and suppress the CancelledError.

In particular, in my example in the OP, as 0.001 sec is a divisor of 0.5 sec, when the external cancellation comes from TaskGroup.cancel_remaining, around the same time, _set_new_deadline.timeout_task will set task._timed_out.


To further support the above explanation, observe that the following extremely hackish patch (patch1) would fix the issue (for the example in OP):

diff --git a/aiorpcx/curio.py b/aiorpcx/curio.py
index 296023e..40825a7 100755
--- a/aiorpcx/curio.py
+++ b/aiorpcx/curio.py
@@ -259,6 +259,7 @@ class TaskGroup:
         '''Cancel the passed set of tasks.  Wait for them to complete.'''
         for task in tasks:
             task.cancel()
+            task._really_cancel = True

         if tasks:
             def pop_task(task):
@@ -372,6 +373,8 @@ class TimeoutAfter:

     async def __aexit__(self, exc_type, exc_value, traceback):
         timed_out_deadline, uncaught = _unset_task_deadline(self._task)
+        if getattr(self._task, "_really_cancel", False):
+            return False
         if exc_type not in (CancelledError, TaskTimeout,
                             TimeoutCancellationError):
             return False
SomberNight commented 2 years ago

Here is a more generic, albeit even more hackish, patch (patch2):

diff --git a/aiorpcx/curio.py b/aiorpcx/curio.py
index 296023e..ac8b814 100755
--- a/aiorpcx/curio.py
+++ b/aiorpcx/curio.py
@@ -318,8 +318,14 @@ class UncaughtTimeoutError(Exception):
 def _set_new_deadline(task, deadline):
     def timeout_task():
         # Unfortunately task.cancel is all we can do with asyncio
-        task.cancel()
+        task._orig_cancel()
         task._timed_out = deadline
+    def mycancel():
+        task._orig_cancel()
+        task._really_cancel = True
+    if not hasattr(task, "_orig_cancel"):
+        task._orig_cancel = task.cancel
+        task.cancel = mycancel
     task._deadline_handle = task._loop.call_at(deadline, timeout_task)

@@ -372,6 +378,8 @@ class TimeoutAfter:

     async def __aexit__(self, exc_type, exc_value, traceback):
         timed_out_deadline, uncaught = _unset_task_deadline(self._task)
+        if getattr(self._task, "_really_cancel", False):
+            return False
         if exc_type not in (CancelledError, TaskTimeout,
                             TimeoutCancellationError):
             return False
SomberNight commented 2 years ago

The async-timeout library (developed by aiohttp and asyncio authors), which provides identical functionality to timeout_after, has the same issue: https://github.com/aio-libs/async-timeout/issues/229.

example2 ```python import asyncio from aiorpcx import TaskGroup from async_timeout import timeout async def raise_exc(): await asyncio.sleep(0.5) raise Exception("asd") async def f(): event = asyncio.Event() while True: try: async with timeout(0.001): async with TaskGroup() as group: await group.spawn(event.wait()) except asyncio.TimeoutError: pass async def main(): async with TaskGroup() as group: await group.spawn(f) await group.spawn(raise_exc) print(f"taskgroup exited.") asyncio.run(main()) ```

As mentioned in that thread, even asyncio.wait_for might have the same issue: https://bugs.python.org/issue42130 There are some workarounds proposed but AFAICT they don't seem to cover all cases.

Also see https://bugs.python.org/issue45098 https://github.com/Traktormaster/wait-for2


I intend to monkey-patch aiorpcx in electrum as follows:

diff --git a/electrum/util.py b/electrum/util.py
index 1dc02ca6eb..f73447eca7 100644
--- a/electrum/util.py
+++ b/electrum/util.py
@@ -1246,6 +1246,37 @@ class OldTaskGroup(aiorpcx.TaskGroup):
             if self.completed:
                 self.completed.result()

+# We monkey-patch aiorpcx TimeoutAfter (used by timeout_after and ignore_after API),
+# to fix a timing issue present in asyncio as a whole re timing out tasks.
+# To see the issue we are trying to fix, consider example:
+#     async def outer_task():
+#         async with timeout_after(0.1):
+#             await inner_task()
+# When the 0.1 sec timeout expires, inner_task will get cancelled by timeout_after (=internal cancellation).
+# If around the same time (in terms of event loop iterations) another coroutine
+# cancels outer_task (=external cancellation), there will be a race.
+# Both cancellations work by propagating a CancelledError out to timeout_after, which then
+# needs to decide (in TimeoutAfter.__aexit__) whether it's due to an internal or external cancellation.
+# AFAICT asyncio provides no reliable way of distinguishing between the two.
+# This patch tries to always give priority to external cancellations.
+# see https://github.com/kyuupichan/aiorpcX/issues/44
+# see https://github.com/aio-libs/async-timeout/issues/229
+# see https://bugs.python.org/issue42130 and https://bugs.python.org/issue45098
+def _aiorpcx_monkeypatched_set_new_deadline(task, deadline):
+    def timeout_task():
+        task._orig_cancel()
+        task._timed_out = None if getattr(task, "_externally_cancelled", False) else deadline
+    def mycancel(*args, **kwargs):
+        task._orig_cancel(*args, **kwargs)
+        task._externally_cancelled = True
+        task._timed_out = None
+    if not hasattr(task, "_orig_cancel"):
+        task._orig_cancel = task.cancel
+        task.cancel = mycancel
+    task._deadline_handle = task._loop.call_at(deadline, timeout_task)
+
+aiorpcx.curio._set_new_deadline = _aiorpcx_monkeypatched_set_new_deadline
+

 class NetworkJobOnDefaultServer(Logger, ABC):
     """An abstract base class for a job that runs on the main network

I think aiorpcx itself could adopt a similar hack - I don't see a better way atm.

SomberNight commented 2 years ago

If you want, I could open a PR with some variant of the above. I can only see dirty ways to fix this though, such as monkey-patching asyncio.Task.cancel.

SomberNight commented 2 years ago

I've tested now with asyncio.timeout using cpython 3.11.0rc1, and that behaves as expected, not reproducing the issue.

example snippet with `asyncio.timeout` ```python import asyncio from aiorpcx import TaskGroup async def raise_exc(): await asyncio.sleep(0.5) raise Exception("asd") async def f(): event = asyncio.Event() while True: try: # note: requires python 3.11 async with asyncio.timeout(0.001): await event.wait() except asyncio.TimeoutError: pass async def main(): async with TaskGroup() as group: await group.spawn(f) await group.spawn(raise_exc) print(f"taskgroup exited. {group.exception=}") asyncio.run(main()) ```