Open colesbury opened 2 weeks ago
This issue brings similar concerns as a previous discussion that arose a little while ago in scikit-learn https://github.com/scikit-learn/scikit-learn/issues/30007#issuecomment-2400219854
Overall, there is an unknown behaviour regarding global and thread-local fixtures represented via @pytest.fixture(scope='function')
. While the main purpose of fixtures is to recycle state across test functions, it completely makes sense to use it to share state across threads, but there's also the question about thread-local fixtures. A potential solution would be to let such fixtures return factory functions that get called on the test function, this would ensure that each thread gets its own instance of the thread-local state.
import pytest
import asyncio
@pytest.fixture
def thread_local_fixture():
def local_loop():
loop = asyncio.new_event_loop()
return loop
return local_loop
def test_loop(thread_local_fixture):
loop = thread_local_fixture()
...
Also, as @ngoldbaum mentioned over https://github.com/scikit-learn/scikit-learn/issues/30007#issuecomment-2400253116, pytest is not thread-safe, and there are particular motivations to keep it that way.
I don't understand your comment. Function scope fixtures are not reused by pytest, except when running with pytest-run-parallel.
For example, if I write a test like:
import pytest
COUNTER = 0
@pytest.fixture
def my_fixture():
global COUNTER
COUNTER += 1
return COUNTER
@pytest.mark.parametrize('execution_number', range(5))
def test_my_fixture(my_fixture, execution_number):
print(f"test_my_fixture called with {my_fixture} {execution_number}")
Each invocation of test_my_fixture
gets a different my_fixture
.
Similarly, if I use pytest-repeat
, and run a test multiple times, then each invocation gets another fixture.
If a test case wanted a fixture to be reused, it would use some other scope like "module" or "session".
This is because we implement pytest_itemcollected
, which means that there's only one invocation of the test function, which we change to spawn n threads and do n invocations in each thread.
We would need to instead implement either pytest_generate_tests
to generate parametrized tests or pytest_collection_modifyitems
to add new items. However, I'm not yet clear on how we would exactly do it since we need to change the collected function once for each thread and then once more for each iteration in each thread.
I'll try to figure it out.
We would need to instead implement either
pytest_generate_tests
to generate parametrized tests orpytest_collection_modifyitems
to add new items.
Parametrized tests probably aren't very useful, right? I thought a big part of the point of the current design was to use threading.Barrier
within a single function, ensuring that all threads exercise the same code at (to the extent possible) the same moment. Separate test cases don't seem likely to execute in a synchronized fashion like that.
Between the added docs at https://py-free-threading.github.io/porting/#fixing-thread-unsafe-tests and the discussion spread over a couple of issues like https://github.com/scikit-learn/scikit-learn/issues/30007, it feels like the actual design here is fairly tricky and there's no clear way of making everything magically robust to thread-safety issues in the tests themselves and also exercising thread-safety issues in the functionality under test well.
It'd be great to discuss and document the current design with its pros and cons in enough detail to be able to understand these subtleties. I certainly hadn't grasped the issues with fixtures and pytest.raises
& co from reading the README or implementation. And perhaps there's a lower-fidelity mode possible that will only catch a subset of issues that the current implementation can catch, but is more robust with an unmodified test suite?
TL;DR I've been looking into this in the last few days and it seems like it's doable to have a more robust version of the plugin. It's hard though, and it'll require a decision on our side, whether we want to take that additional maintenance burden that'll come with a more robust solution. IMHO we should, since I expect this plugin to become increasingly popular as more projects want to test multi-threaded code.
Longer version:
Parametrized tests probably aren't very useful, right? I thought a big part of the point of the current design was to use threading.Barrier within a single function, ensuring that all threads exercise the same code at (to the extent possible) the same moment. Separate test cases don't seem likely to execute in a synchronized fashion like that.
What I meant by using the term "parametrization" was that a single test item is broken down into more and the test report contains n_workers * n_iterations
items rather than just one and doing everything under the hood. I didn't necessarily mean a call to @pytest.mark.parametrize
. We can still use this approach of breaking down into more than one items for a single test while also ensuring that everything runs at almost the same moment (more on that in my proposed solution below).
making everything magically robust to thread-safety issues in the tests themselves and also exercising thread-safety issues in the functionality under test well.
There's certainly challenges and not everything will be thread-safe. Some of the thread-safety issues that'll arise will have to be handled in user testing code. However, this specific issue (and other similar issues related to fixtures, skips, etc.) have more to do with the current design of the plugin, and the fact that pytest thinks it's only one test that is running while the reality is that the test function is invoked multiple times.
It'd be great to discuss and document the current design with its pros and cons in enough detail to be able to understand these subtleties.
Agreed.
And perhaps there's a lower-fidelity mode possible that will only catch a subset of issues that the current implementation can catch, but is more robust with an unmodified test suite?
Not sure if that'll be needed in case we do a rebust enough redesign so that some of the most common issues that we have now go away.
A possible solution for how to implement this, as I said in my previous comment, would be to generate n_threads * n_iterations
items instead of one. That would ensure, for example, that a fixture gets called for each invocation of the test function in all threads. Fixtures will be calculated in the main thread as well, so this does not create any thread-safety issues. Here's some code on how this could look:
def thread_func(func, barrier, *args, **kwargs):
barrier.wait()
func(*args, **kwargs)
def wrap_func(thread):
def func(*args, **kwargs):
thread._args = (*thread._args, *args)
thread._kwargs = {**thread._kwargs, **kwargs}
thread.start()
return func
@pytest.hookimpl
def pytest_collection_modifyitems(session, config, items: list[pytest.Function]):
newitems = []
for item in items:
if n_workers == 1: # Getting this is left out for consiceness
<do nothing>
barrier = threading.Barrier(n_workers)
for i in range(n_workers):
thread = threading.Thread(target=thread_func, args=(item.obj, barrier))
# Change the test function from under pytest's nose to call thread.start, which will run the test function
# in a new thread
newitem = pytest.Function.from_parent(item, name=<name for subitem>, callobj=wrap_func(thread), ...)
newitem.stash["thread"] = thread
newitems.append(newitem)
items[:] = newitems
We also need to reimplement the test loop to make sure all of the threads have terminated before doing anything else after the tests have run:
@pytest.hookimpl
def pytest_runtestloop(session) -> bool:
try:
default_pytest_runtestloop(session) # Whatever this does
finally:
for item in session.items:
if item.stash["thread"] is not None:
item.stash["thread"].join()
return True
This code works for more than one threads. We can probably extend it to do more than one iterations in each subitem as well. The biggest challenge is related to how to communicate exceptions/skip etc. to the main thread (it currently relies on nonlocal
state inside the wrapper), but it seems that that is doable as well, probably by using some kind of communication queue between subthreads and the main thread.
Thoughts?
how to communicate exceptions/skip etc. to the main thread
This can be solved if the last thread id waits for an event to complete and handles everything related to every other thread completion, this would imply an exit barrier as well. This way, it is ensured that all the previous "test cases" start running without blocking and end simultaneously, however, the issue regarding skips, exceptions would be flagged from the first thread to report.
def thread_func(func, barrier, is_last, evt, *args, **kwargs):
barrier.wait()
try:
func(*args, **kwargs)
except:
# Exception handling goes here, maybe store values in a shared registry
# that enters as a parameter
...
barrier.wait()
if is_last:
evt.set()
def wrap_func(thread, is_last, evt):
def func(*args, **kwargs):
thread._args = (*thread._args, *args)
thread._kwargs = {**thread._kwargs, **kwargs}
thread.start()
if is_last:
evt.wait()
# Handle information stored in registry
return func
@pytest.hookimpl
def pytest_collection_modifyitems(session, config, items: list[pytest.Function]):
newitems = []
for item in items:
if n_workers == 1: # Getting this is left out for consiceness
<do nothing>
barrier = threading.Barrier(n_workers)
evt = threading.Event()
for i in range(n_workers):
is_last = i == n_workers - 1
thread = threading.Thread(target=thread_func, args=(item.obj, barrier, is_last, evt))
# Change the test function from under pytest's nose to call thread.start, which will run the test function
# in a new thread
newitem = pytest.Function.from_parent(item, name=<name for subitem>, callobj=wrap_func(thread, is_last, evt), ...)
newitem.stash["thread"] = thread
newitems.append(newitem)
items[:] = newitems
If I understand correctly, the default pytest behavior for fixtures is that a new fixture is created for each test case.
https://docs.pytest.org/en/6.2.x/reference.html#pytest-fixture-api
When running a test case multiple times in parallel, I don't think we should reuse fixtures across threads (at least not function scope fixtures).
For example, see
test_async.py
from the pybind11 test suite:https://github.com/pybind/pybind11/blob/f46f5be4fa4d24c4e5382d0251315f361ce97424/tests/test_async.py#L14-L18
The tests will fail when run in parallel because the event loop is shared across threads.