python-trio / pytest-trio

Pytest plugin for trio
Other
54 stars 25 forks source link

pytest-trio exits with TrioInternalError on Python 3.7 when pytest-asyncio is installed #126

Closed seifertm closed 2 years ago

seifertm commented 2 years ago

I'm one of the maintainers of pytest-asyncio and I'm currently trying to address a reported incompatibilty between pytest-asyncio and pytest-trio v0.7.0.

The current release of pytest-asyncio (v0.18.2) contains a bug, which causes pytest to fail during the collection phase when the code base contains trio fixtures. I addressed the issue in a branch and came up with a corresponding test case:

from textwrap import dedent

def test_strict_mode_ignores_trio_fixtures(testdir):
    testdir.makepyfile(
        dedent(
            """\
        import pytest
        import pytest_asyncio
        import pytest_trio

        pytest_plugins = ["pytest_asyncio", "pytest_trio"]

        @pytest_trio.trio_fixture
        async def any_fixture():
            return True

        @pytest.mark.trio
        async def test_anything(any_fixture):
            pass
        """
        )
    )
    result = testdir.runpytest("--asyncio-mode=strict")
    result.assert_outcomes(passed=1)

The test case is successful on CPython 3.8 – 3.10, but it fails with Python 3.7. The reported error is a TrioInternalError. It's worth noting that the test also issues a RuntimeWarning and suggests to set host_uses_signal_set_wakeup_fd=True. However, I didn't find a way to configure this variable in pytest-trio.

This is the test output I'm seeing:

========== FAILURES ==========
________________________________________________________________________________________________ test_strict_mode_ignores_trio_fixtures ________________________________________________________________________________________________
/home/michael/pytest-asyncio/tests/trio/test_fixtures.py:25: in test_strict_mode_ignores_trio_fixtures
    result.assert_outcomes(passed=1)
E   AssertionError: assert {'errors': 0,...pped': 0, ...} == {'errors': 0,...pped': 0, ...}
E     Omitting 4 identical items, use -vv to show
E     Differing items:
E     {'failed': 1} != {'failed': 0}
E     {'passed': 0} != {'passed': 1}
E     Use -v to get the full diff
------------ Captured stdout call ------------------
============================= test session starts ==============================
platform linux -- Python 3.7.12, pytest-6.2.5, py-1.11.0, pluggy-1.0.0
rootdir: /tmp/pytest-of-michael/pytest-47/test_strict_mode_ignores_trio_fixtures0
plugins: asyncio-0.18.3.dev0+g929608e.d20220307, hypothesis-6.39.2, flaky-3.7.0, trio-0.7.0
asyncio: mode=strict
collected 1 item

test_strict_mode_ignores_trio_fixtures.py F                              [100%]

=================================== FAILURES ===================================
________________________________ test_anything _________________________________

runner = Runner(clock=SystemClock(offset=100594.58619596818), instruments={'_all': {}}, io_manager=EpollIOManager(_epoll=<selec..._autojump_threshold=inf, is_guest=False, guest_tick_scheduled=False, ki_pending=False, waiting_for_idle=SortedDict({}))
async_fn = functools.partial(<function _trio_test_runner_factory.<locals>._bootstrap_fixtures_and_run_test at 0x7fd38a2a7290>, any_fixture=<pytest_trio.plugin.TrioFixture object at 0x7fd389b72090>)
args = (), host_uses_signal_set_wakeup_fd = False

    def unrolled_run(runner, async_fn, args, host_uses_signal_set_wakeup_fd=False):
        locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True
        __tracebackhide__ = True

        try:
            if not host_uses_signal_set_wakeup_fd:
>               runner.entry_queue.wakeup.wakeup_on_signals()

/home/michael/pytest-asyncio/.tox/py37/lib/python3.7/site-packages/trio/_core/_run.py:2048: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <trio._core._wakeup_socketpair.WakeupSocketpair object at 0x7fd389b72550>

    def wakeup_on_signals(self):
        assert self.old_wakeup_fd is None
        if not is_main_thread():
            return
        fd = self.write_sock.fileno()
        self.old_wakeup_fd = signal.set_wakeup_fd(fd, warn_on_full_buffer=False)
        if self.old_wakeup_fd != -1:
            warnings.warn(
                RuntimeWarning(
>                   "It looks like Trio's signal handling code might have "
                    "collided with another library you're using. If you're "
                    "running Trio in guest mode, then this might mean you "
                    "should set host_uses_signal_set_wakeup_fd=True. "
                    "Otherwise, file a bug on Trio and we'll help you figure "
                    "out what's going on."
                )
            )
E           RuntimeWarning: It looks like Trio's signal handling code might have collided with another library you're using. If you're running Trio in guest mode, then this might mean you should set host_uses_signal_set_wakeup_fd=True. Otherwise, file a bug on Trio and we'll help you figure out what's going on.

/home/michael/pytest-asyncio/.tox/py37/lib/python3.7/site-packages/trio/_core/_wakeup_socketpair.py:58: RuntimeWarning

The above exception was the direct cause of the following exception:

runner = Runner(clock=SystemClock(offset=100594.58619596818), instruments={'_all': {}}, io_manager=EpollIOManager(_epoll=<selec..._autojump_threshold=inf, is_guest=False, guest_tick_scheduled=False, ki_pending=False, waiting_for_idle=SortedDict({}))
async_fn = functools.partial(<function _trio_test_runner_factory.<locals>._bootstrap_fixtures_and_run_test at 0x7fd38a2a7290>, any_fixture=<pytest_trio.plugin.TrioFixture object at 0x7fd389b72090>)
args = (), host_uses_signal_set_wakeup_fd = False

    def unrolled_run(runner, async_fn, args, host_uses_signal_set_wakeup_fd=False):
        locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True
        __tracebackhide__ = True

        try:
            if not host_uses_signal_set_wakeup_fd:
                runner.entry_queue.wakeup.wakeup_on_signals()

            if "before_run" in runner.instruments:
                runner.instruments.call("before_run")
            runner.clock.start_clock()
            runner.init_task = runner.spawn_impl(
                runner.init, (async_fn, args), None, "<init>", system_task=True
            )

            # You know how people talk about "event loops"? This 'while' loop right
            # here is our event loop:
            while runner.tasks:
                if runner.runq:
                    timeout = 0
                else:
                    deadline = runner.deadlines.next_deadline()
                    timeout = runner.clock.deadline_to_sleep_time(deadline)
                timeout = min(max(0, timeout), _MAX_TIMEOUT)

                idle_primed = None
                if runner.waiting_for_idle:
                    cushion, _ = runner.waiting_for_idle.keys()[0]
                    if cushion < timeout:
                        timeout = cushion
                        idle_primed = IdlePrimedTypes.WAITING_FOR_IDLE
                # We use 'elif' here because if there are tasks in
                # wait_all_tasks_blocked, then those tasks will wake up without
                # jumping the clock, so we don't need to autojump.
                elif runner.clock_autojump_threshold < timeout:
                    timeout = runner.clock_autojump_threshold
                    idle_primed = IdlePrimedTypes.AUTOJUMP_CLOCK

                if "before_io_wait" in runner.instruments:
                    runner.instruments.call("before_io_wait", timeout)

                # Driver will call io_manager.get_events(timeout) and pass it back
                # in through the yield
                events = yield timeout
                runner.io_manager.process_events(events)

                if "after_io_wait" in runner.instruments:
                    runner.instruments.call("after_io_wait", timeout)

                # Process cancellations due to deadline expiry
                now = runner.clock.current_time()
                if runner.deadlines.expire(now):
                    idle_primed = None

                # idle_primed != None means: if the IO wait hit the timeout, and
                # still nothing is happening, then we should start waking up
                # wait_all_tasks_blocked tasks or autojump the clock. But there
                # are some subtleties in defining "nothing is happening".
                #
                # 'not runner.runq' means that no tasks are currently runnable.
                # 'not events' means that the last IO wait call hit its full
                # timeout. These are very similar, and if idle_primed != None and
                # we're running in regular mode then they always go together. But,
                # in *guest* mode, they can happen independently, even when
                # idle_primed=True:
                #
                # - runner.runq=empty and events=True: the host loop adjusted a
                #   deadline and that forced an IO wakeup before the timeout expired,
                #   even though no actual tasks were scheduled.
                #
                # - runner.runq=nonempty and events=False: the IO wait hit its
                #   timeout, but then some code in the host thread rescheduled a task
                #   before we got here.
                #
                # So we need to check both.
                if idle_primed is not None and not runner.runq and not events:
                    if idle_primed is IdlePrimedTypes.WAITING_FOR_IDLE:
                        while runner.waiting_for_idle:
                            key, task = runner.waiting_for_idle.peekitem(0)
                            if key[0] == cushion:
                                del runner.waiting_for_idle[key]
                                runner.reschedule(task)
                            else:
                                break
                    else:
                        assert idle_primed is IdlePrimedTypes.AUTOJUMP_CLOCK
                        runner.clock._autojump()

                # Process all runnable tasks, but only the ones that are already
                # runnable now. Anything that becomes runnable during this cycle
                # needs to wait until the next pass. This avoids various
                # starvation issues by ensuring that there's never an unbounded
                # delay between successive checks for I/O.
                #
                # Also, we randomize the order of each batch to avoid assumptions
                # about scheduling order sneaking in. In the long run, I suspect
                # we'll either (a) use strict FIFO ordering and document that for
                # predictability/determinism, or (b) implement a more
                # sophisticated scheduler (e.g. some variant of fair queueing),
                # for better behavior under load. For now, this is the worst of
                # both worlds - but it keeps our options open. (If we do decide to
                # go all in on deterministic scheduling, then there are other
                # things that will probably need to change too, like the deadlines
                # tie-breaker and the non-deterministic ordering of
                # task._notify_queues.)
                batch = list(runner.runq)
                runner.runq.clear()
                if _ALLOW_DETERMINISTIC_SCHEDULING:
                    # We're running under Hypothesis, and pytest-trio has patched
                    # this in to make the scheduler deterministic and avoid flaky
                    # tests. It's not worth the (small) performance cost in normal
                    # operation, since we'll shuffle the list and _r is only
                    # seeded for tests.
                    batch.sort(key=lambda t: t._counter)
                    _r.shuffle(batch)
                else:
                    # 50% chance of reversing the batch, this way each task
                    # can appear before/after any other task.
                    if _r.random() < 0.5:
                        batch.reverse()
                while batch:
                    task = batch.pop()
                    GLOBAL_RUN_CONTEXT.task = task

                    if "before_task_step" in runner.instruments:
                        runner.instruments.call("before_task_step", task)

                    next_send_fn = task._next_send_fn
                    next_send = task._next_send
                    task._next_send_fn = task._next_send = None
                    final_outcome = None
                    try:
                        # We used to unwrap the Outcome object here and send/throw
                        # its contents in directly, but it turns out that .throw()
                        # is buggy, at least before CPython 3.9:
                        #   https://bugs.python.org/issue29587
                        #   https://bugs.python.org/issue29590
                        # So now we send in the Outcome object and unwrap it on the
                        # other side.
                        msg = task.context.run(next_send_fn, next_send)
                    except StopIteration as stop_iteration:
                        final_outcome = Value(stop_iteration.value)
                    except BaseException as task_exc:
                        # Store for later, removing uninteresting top frames: 1
                        # frame we always remove, because it's this function
                        # catching it, and then in addition we remove however many
                        # more Context.run adds.
                        tb = task_exc.__traceback__.tb_next
                        for _ in range(CONTEXT_RUN_TB_FRAMES):
                            tb = tb.tb_next
                        final_outcome = Error(task_exc.with_traceback(tb))
                        # Remove local refs so that e.g. cancelled coroutine locals
                        # are not kept alive by this frame until another exception
                        # comes along.
                        del tb

                    if final_outcome is not None:
                        # We can't call this directly inside the except: blocks
                        # above, because then the exceptions end up attaching
                        # themselves to other exceptions as __context__ in
                        # unwanted ways.
                        runner.task_exited(task, final_outcome)
                        # final_outcome may contain a traceback ref. It's not as
                        # crucial compared to the above, but this will allow more
                        # prompt release of resources in coroutine locals.
                        final_outcome = None
                    else:
                        task._schedule_points += 1
                        if msg is CancelShieldedCheckpoint:
                            runner.reschedule(task)
                        elif type(msg) is WaitTaskRescheduled:
                            task._cancel_points += 1
                            task._abort_func = msg.abort_func
                            # KI is "outside" all cancel scopes, so check for it
                            # before checking for regular cancellation:
                            if runner.ki_pending and task is runner.main_task:
                                task._attempt_delivery_of_pending_ki()
                            task._attempt_delivery_of_any_pending_cancel()
                        elif type(msg) is PermanentlyDetachCoroutineObject:
                            # Pretend the task just exited with the given outcome
                            runner.task_exited(task, msg.final_outcome)
                        else:
                            exc = TypeError(
                                "trio.run received unrecognized yield message {!r}. "
                                "Are you trying to use a library written for some "
                                "other framework like asyncio? That won't work "
                                "without some kind of compatibility shim.".format(msg)
                            )
                            # The foreign library probably doesn't adhere to our
                            # protocol of unwrapping whatever outcome gets sent in.
                            # Instead, we'll arrange to throw `exc` in directly,
                            # which works for at least asyncio and curio.
                            runner.reschedule(task, exc)
                            task._next_send_fn = task.coro.throw
                        # prevent long-lived reference
                        # TODO: develop test for this deletion
                        del msg

                    if "after_task_step" in runner.instruments:
                        runner.instruments.call("after_task_step", task)
                    del GLOBAL_RUN_CONTEXT.task
                    # prevent long-lived references
                    # TODO: develop test for these deletions
                    del task, next_send, next_send_fn

        except GeneratorExit:
            # The run-loop generator has been garbage collected without finishing
            warnings.warn(
                RuntimeWarning(
                    "Trio guest run got abandoned without properly finishing... "
                    "weird stuff might happen"
                )
            )
        except TrioInternalError:
            raise
        except BaseException as exc:
>           raise TrioInternalError("internal error in Trio - please file a bug!") from exc
E           trio.TrioInternalError: internal error in Trio - please file a bug!

/home/michael/pytest-asyncio/.tox/py37/lib/python3.7/site-packages/trio/_core/_run.py:2258: TrioInternalError

You will need to use a Git version of pytest-asyncio to reproduce the issue (see this branch). Run tox -e py37 in that branch to reproduce the issue.

I'm currently trying to figure out whether I'm using pytest-trio incorrectly or whether pytest-trio or pytest-asyncio have to make adjustments to work together nicely. I'm also puzzled why the test only fails on Python 3.7. Do you have any ideas?

pquentin commented 2 years ago

cc @smurfix @oremanj @njsmith

njsmith commented 2 years ago

The error message means that someone else used signal.set_wakeup_fd to register a signal-delivery fd before starting trio. This is weird, because there should only be a signal-delivery fd set while an async loop is running. So... assuming asyncio is the one who registered this library, my guess is either:

If it is an asyncio bug, I'm not sure what to do. Python 3.7 asyncio isn't going to get fixed. Maybe there's some workaround you can do, like forcibly calling signal.set_wakeup_fd(-1) to clear out the stale state? Or just tell people they should upgrade to 3.8+?

seifertm commented 2 years ago

Thanks for the clarification!

I noticed that I always ran that test as part of our test suite. I then tried running it stand-alone and it completes successfully. I assume the issue is caused by our test setup and we have to figure that out on pytest-asyncio side.

It's unclear to me why the error only appears with Python 3.7, though.

Thanks!

njsmith commented 2 years ago

It's unclear to me why the error only appears with Python 3.7, though.

My (unconfirmed) guess is that it's because in 3.7, asyncio doesn't cleanup the set_signal_fd when the loop exits, and then they fixed that in 3.8.

pquentin commented 2 years ago

Thanks @njsmith! I think the fix was https://github.com/python/cpython/pull/11135

seifertm commented 2 years ago

Thanks for your support!

From what I can tell, the problem was related to the use of asyncio.subprocess.create_subprocess_exec in our test suite. The underlying implementation changed quite a bit from CPython 3.7 to 3.8, which is probably why recent Python versions behave differently.

Btw: We just released pytest-asyncio-0.18.3 which fixes compatibility with pytest-trio :)