Open 0f1b1614-ca01-498d-b542-2f1dc86a128b opened 11 years ago
There is a race condition in multiprocessing.Pool._terminate_pool that can result in workers being restarted during shutdown (process shutdown or pool.terminate()).
worker_handler._state = TERMINATE # <~~~~ race from here
task_handler._state = TERMINATE
debug('helping task handler/workers to finish')
cls._help_stuff_finish(inqueue, task_handler, len(pool))
assert result_handler.is_alive() or len(cache) == 0
result_handler._state = TERMINATE
outqueue.put(None) # sentinel
# We must wait for the worker handler to exit before terminating
# workers because we don't want workers to be restarted behind our back.
debug('joining worker handler')
worker_handler.join() # <~~~~~ race to here
At any point between setting worker_handler._state = TERMINATE and joining the worker handler, if the intervening code causes a worker to exit then it is possible for the worker handler to fail to notice that it has been shutdown and so attempt to restart the worker:
@staticmethod
def _handle_workers(pool):
thread = threading.current_thread()
# Keep maintaining workers until the cache gets drained, unless the pool
# is terminated.
while thread._state == RUN or (pool._cache and thread._state != TERMINATE):
# <~~~~~~ race here
pool._maintain_pool()
time.sleep(0.1)
# send sentinel to stop workers
pool._taskqueue.put(None)
util.debug('worker handler exiting')
We noticed this initially because in the absence of the fix to bpo-14881 a ThreadPool trying to restart a worker fails and hangs the process. In the presence of the fix to bpo-14881 there is no immediate issue, but trying to restart a worker process/thread on pool shutdown is clearly unwanted and could result in bad things happening e.g. at process shutdown.
To trigger the race with ThreadPool, it is enough just to pause the _handle_workers thread after checking its state and before calling _maintain_pool:
import multiprocessing.pool
import time
class ThreadPool(multiprocessing.pool.ThreadPool):
def _maintain_pool(self):
time.sleep(1)
super(ThreadPool, self)._maintain_pool()
def _repopulate_pool(self):
assert self._state == multiprocessing.pool.RUN
super(ThreadPool, self)._repopulate_pool()
pool = ThreadPool(4)
pool.map(lambda x: x, range(5))
pool.terminate()
pool.join()
Exception in thread Thread-5:
Traceback (most recent call last):
File ".../cpython/Lib/threading.py", line 657, in _bootstrap_inner
self.run()
File ".../cpython/Lib/threading.py", line 605, in run
self._target(*self._args, **self._kwargs)
File ".../cpython/Lib/multiprocessing/pool.py", line 358, in _handle_workers
pool._maintain_pool()
File ".../bug.py", line 6, in _maintain_pool
super(ThreadPool, self)._maintain_pool()
File ".../cpython/Lib/multiprocessing/pool.py", line 232, in _maintain_pool
self._repopulate_pool()
File ".../bug.py", line 8, in _repopulate_pool
assert self._state == multiprocessing.pool.RUN
AssertionError
In this case, the race occurs when ThreadPool._help_stuff_finish puts sentinels on inqueue to make the workers finish.
It is also possible to trigger the bug with multiprocessing.pool.Pool:
import multiprocessing.pool
import time
class Pool(multiprocessing.pool.Pool):
def _maintain_pool(self):
time.sleep(2)
super(Pool, self)._maintain_pool()
def _repopulate_pool(self):
assert self._state == multiprocessing.pool.RUN
super(Pool, self)._repopulate_pool()
@staticmethod
def _handle_tasks(taskqueue, put, outqueue, pool):
time.sleep(1)
_real_handle_tasks(taskqueue, put, outqueue, pool)
_real_handle_tasks = multiprocessing.pool.Pool._handle_tasks
multiprocessing.pool.Pool._handle_tasks = Pool._handle_tasks
pool = Pool(4)
pool.map(str, range(10))
pool.map_async(str, range(10))
pool.terminate()
pool.join()
In this case, the race occurs when _handle_tasks checks thread._state, breaks out of its first loop, and sends sentinels to the workers.
The terminate/join can be omitted, in which case the bug will occur at gc or process shutdown when the pool's atexit handler runs. The bug is avoided if terminate is replaced with close, and we are using this workaround.
Suggested patch: https://bitbucket.org/ecatmur/cpython/compare/19096-multiprocessing-race..#diff
Move the worker_handler.join() to immediately after setting the worker handler thread state to TERMINATE. This is a safe change as nothing in the moved-over code affects the worker handler thread, except by terminating workers which is precisely what we don't want to happen. In addition, this is near-equivalent behaviour to current close() + join(), which is well-tested.
Also: write tests; and modify Pool.__init__ to refer to its static methods using self rather than class name, to make them overridable for testing purposes.
Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.
Show more details
GitHub fields: ```python assignee = None closed_at = None created_at =
labels = ['type-bug', 'library']
title = 'multiprocessing.Pool._terminate_pool restarts workers during shutdown'
updated_at =
user = 'https://bugs.python.org/ecatmur'
```
bugs.python.org fields:
```python
activity =
actor = 'ecatmur'
assignee = 'none'
closed = False
closed_date = None
closer = None
components = ['Library (Lib)']
creation =
creator = 'ecatmur'
dependencies = []
files = []
hgrepos = ['211']
issue_num = 19096
keywords = []
message_count = 2.0
messages = ['198432', '198434']
nosy_count = 2.0
nosy_names = ['sbt', 'ecatmur']
pr_nums = []
priority = 'normal'
resolution = None
stage = None
status = 'open'
superseder = None
type = 'behavior'
url = 'https://bugs.python.org/issue19096'
versions = ['Python 2.7', 'Python 3.5']
```