Open 8db34305-10a6-4126-913e-5e0749ad6209 opened 2 years ago
Currently the only way to use a class other than threading.Thread
with concurrent.futures.ThreadPoolExecutor
is to extend ThreadPoolExecutor
and override the private method _adjust_thread_count()
.
For example, suppose I have a class that applies some custom logic when running code in a new thread:
class MyThread(threading.Thread):
def run(self):
with some_important_context():
super().run()
Using this class with ThreadPoolExecutor
requires me to write the following code:
class MyThreadPoolExecutor(ThreadPoolExecutor):
def _adjust_thread_count(self):
# if idle threads are available, don't spin new threads
if self._idle_semaphore.acquire(timeout=0):
return
# When the executor gets lost, the weakref callback will wake up
# the worker threads.
def weakref_cb(_, q=self._work_queue):
q.put(None)
num_threads = len(self._threads)
if num_threads < self._max_workers:
thread_name = '%s_%d' % (self._thread_name_prefix or self,
num_threads)
t = MyThread(name=thread_name, target=_worker,
args=(weakref.ref(self, weakref_cb),
self._work_queue,
self._initializer,
self._initargs))
t.start()
self._threads.add(t)
_threads_queues[t] = self._work_queue
with MyThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(pow, 323, 1235)
print(future.result())
That's a bummer, because now I have to maintain this method if there are upstream fixes/changes. I also can't count on it existing in the future, since it's a private method.
In other words, ThreadPoolExecutor
is not composable, and extending it to use a custom Thread
class is neither safe nor maintainable.
Here's what I'd like to be able to do instead:
with ThreadPoolExecutor(max_workers=1, thread_class=MyThread) as executor:
future = executor.submit(pow, 323, 1235)
print(future.result())
Can you apply some custom logic by specifying the initializer?
Can you apply some custom logic by specifying the initializer?
Not sure that I follow; how would I use a context manager with the initializer?
Of course this is just one example. In the threading
docs, the second sentence in the description of the Thread.run
method (https://docs.python.org/3/library/threading.html#threading.Thread.run) is:
You may override this method in a subclass.
But one cannot use a subclass of Thread
with ThreadPoolExecutor
without the gymnastics indicated earlier. That's the real issue here, I think.
You can call __enter in the initializer. Seems there is no easy way to call __exit at shutting down the thread, perhaps something like per-thread atexit could help. As an example, you can save the context manager in thread locals and manually repeatedly submit a function that calls __exit__ and blocks the thread on some Barrier before calling ThreadPoolExecutor.shutdown(). It is complicated, so we may add some helpers to support context managers.
What are examples of your some_important_context()? Is it important to call some code before destroying the thread?
The problem with allowing the user to specify the Thread subclass is that in general using Thread is an implementation detail. ThreadPoolExecutor could be implemented using the low-level _thread module instead. Or in future it can need to create a special Thread subclass, and user-specified Thread subclass can be not compatible with it. It is safer to limit ways in which the user can affect execution. The initializer parameter was added to address cases similar to your.
Note also that ThreadPoolExecutor and ProcessPoolExecutor have almost identical interface. If we add some feature in ThreadPoolExecutor we will have a pressure to add the same feature in ProcessPoolExecutor to solve similar problems.
What are examples of your some_important_context()? Is it important to call some code before destroying the thread?
To be honest, pulled a context manager example out of thin air to illustrate the point. But sure, I want to allocate a resource before the thread runs, and release it when the work is done.
The problem with allowing the user to specify the Thread subclass is that in general using Thread is an implementation detail. ThreadPoolExecutor could be implemented using the low-level _thread module instead. Or in future it can need to create a special Thread subclass, and user-specified Thread subclass can be not compatible with it.
This is surprising to hear, since I imagine that there are many potential users of this library that are evolving from direct wrangling of Thread objects, where custom Thread subclasses are commonplace. This was certainly the scenario that prompted me to post.
Ultimately it's up to the maintainers what direction the library will go. Are there specific plans to adopt an alternative implementation that is orthogonal to threading.Thread
? Or are there reasons to think that it is likely? I would submit that maintaining smooth interoperability between this library and the threading
library would be a welcome constraint, absent specific drivers to the contrary.
It also occurs to me that even if concurrent.futures
adopted an alternative Thread class, it would still be preferable to allow for composition in the manner that was originally proposed.
I was looking for some way to be able to add a thread finalizer, a piece of code to be called when the thread pool shuts down and all threads need cleaning up. Glad I came across this issue, since the example of using a Thread subclass with a custom run (wrapped in a context manager) would fill my needs completely.
What are examples of your some_important_context()? Is it important to call some code before destroying the thread?
I currently have a use case with a web-framework that has persistent DB connections - i.e. they span multiple HTTP requests. This also applies in the context of running a command-line script with said framework where database connections are opened.
We are calling external APIs (IO heavy), so using a ThreadPoolExecutor makes sense. However, since those "DB connection pools" are thread locals, we need to ensure that database connections are closed again to not leak resources.
The current workaround is to submit a job/function to the pool and have it close the database connections, which adds overhead since database connections are now opened and closed within the same thread that could have been re-used.
Using a context manager, we would be able to wrap the super().run(...)
and close the database connections when the thread exits (succesfully or because of an Exception, even). This comes very close to having an atexit
for individual threads.
Furthermore I like the idea of being able to provide the class as a context manager kwarg, but would also not be opposed to a class property specifying the Thread class to use - both would greatly enhance composability and are a cleaner solution than adding a finalizer option (similar to the initializer
kwarg)
I guess if you are asking for initialization and finalization of thread-specific data in a thread pool -- you need exactly these things (or a context manager). A custom thread class reveals too many implementation details. I personally prefer an explicit initializer/finalizer based approach.
As in most cases, there are many different ways to achieve the same result in Python. I'm not sure that I follow the assertion that using a custom Thread
class exposes too many implementation details. I do think that there is value in "minimizing surprise" for users of the standard library. In this case it seems likely that, based on their experience with the threading
library, users will expect to be able to apply a similar approach here.
I think overriding the "target" function would be better. Providing some "thread" or "process" class is too fragile. If the executors require custom bits on those objects you need to be careful to implement and not squash them. Overriding the "target" function would also satisfy OP requirement without the need for a class / additional object. And if someone needs an object they can provide one that implements __call__
and not have to worry about conflicts.
The API would be something like this.
ThreadPoolExecutor
and ProcessPoolExecutor
both accept a new argument worker_fn
. This function is used to wrap the underlining worker. It has the following definition:
def worker_fn(worker, initargs):
"""
Wraps the worker function provided by the executor.
The worker argument is an opaque callable provided by the executor. It will perform the
actual work of the Thread / Process.
Arguments:
worker (callable): opaque ; must be called
initargs (mixed): whatever was provided as initargs to the executor
"""
# The following is just an example user implementation of "worker_fn"
with some_context():
db = connect.open()
worker()
db.close()
As for the library side. A naive implementation would be like (a bunch of stuff is left out).
class ProcessPoolExecutor():
def _spawn_process(self):
worker = partial(
_process_worker,
self._result_queue,
self._initializer,
self._initargs,
self._max_tasks_per_child
)
if self._worker_fn:
p = self._mp_context.Process(
target=self._worker_fn,
args=(worker, self._initargs),
)
else:
p = self._mp_context.Process(target=worker)
p.start()
self._processes[p.pid] = p
Baring this, I'd be happy with a finalizer. My use case is SharedMemory. According to the docs you're supposed to ".close()" the SharedMemory when done. That's a little hard to do when you can't guarantee some final function. You either override something in the standard library or .submit
a bunch of times with the finalizer function and hope it hits all processes.
Is there any update on this issue? I have also encountered this issue when multithreading db connections. Each thread in the pool maintains a connection opened in the initializer, however once the thread pool is closed the connections remain open while the main thread performs a wait and repeat loop, opening more connections with each loop until the db refuses to accept more connections, crashing the program.
I have temporarily bypassed the issue by subclassing the threadpoolexecutor as described by @erickpeirson, Thank you for your insight.
Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.
Show more details
GitHub fields: ```python assignee = None closed_at = None created_at =
labels = ['type-feature', 'library', '3.11']
title = 'concurrent.future.ThreadPoolExecutor should parameterize class used for threads'
updated_at =
user = 'https://github.com/erickpeirson'
```
bugs.python.org fields:
```python
activity =
actor = 'asvetlov'
assignee = 'none'
closed = False
closed_date = None
closer = None
components = ['Library (Lib)']
creation =
creator = 'erickpeirson'
dependencies = []
files = []
hgrepos = []
issue_num = 45339
keywords = []
message_count = 8.0
messages = ['403007', '403054', '403142', '403145', '403186', '403187', '408562', '412238']
nosy_count = 7.0
nosy_names = ['bquinlan', 'pitrou', 'asvetlov', 'serhiy.storchaka', 'yselivanov', 'Sergei Maertens', 'erickpeirson']
pr_nums = ['28640']
priority = 'normal'
resolution = None
stage = None
status = 'open'
superseder = None
type = 'enhancement'
url = 'https://bugs.python.org/issue45339'
versions = ['Python 3.11']
```