Open tncowart opened 12 months ago
Thanks for the note and the code. This is an interesting change.
I'm cautious about changing anything involving multiprocessing because it's a platform-compatibility minefield and we know the current setup works (see e4b2a09abce83544812f67366ac44f86e24a57d1 for one non-obvious bugfix involving the code you're changing).
I'd like to understand the issue better before committing to a change. In my experience on Linux with pystan you could always abort the main process with a keyboard interrupt quickly. If there were other processes still running, they didn't interfere with aborting the main process. Perhaps you can give me a fuller description of the problem or help me understand why Python isn't cleaning up properly?
What I'm trying to do in the application is build a stan model then create many sets of samples with different "data" parameters for the model. Since each sample set is independent this can be parallelized across multiple processes.
The following code (see below), adapted from the pystan documentation, illustrates what I'm doing. With httpstan 4.10.1 (the current version pystan uses, but I don't think there are any changes that would solve it in httpstan 4.11) this code will just hang after printing "done!". You can ctrl-c a bunch to get out of it, but even then there will be dozens of "zombie" processes left that you have to kill somehow. These zombie processes are the ones the httpstan process pool generates but doesn't shutdown.
My changes shutdown the pools properly when the httpstan aiohttp app shuts down, leaving no zombie processes and allowing the parent pystan processes to shut down and therefore allowing the main python process to shutdown.
import asyncio
import concurrent.futures
import multiprocessing as mp
from random import randint
import httpstan
import stan
schools_code = """
data {
int<lower=0> J; // number of schools
array[J] real y; // estimated treatment effects
array[J] real<lower=0> sigma; // standard error of effect estimates
}
parameters {
real mu; // population treatment effect
real<lower=0> tau; // standard deviation in treatment effects
vector[J] eta; // unscaled deviation from mu by school
}
transformed parameters {
vector[J] theta = mu + tau * eta; // school treatment effects
}
model {
target += normal_lpdf(eta | 0, 1); // prior log-density
target += normal_lpdf(y | theta, sigma); // log-likelihood
}
"""
async def build_model():
model_name = httpstan.models.calculate_model_name(schools_code)
async with stan.common.HttpstanClient() as client:
response = await client.get("/models")
model_names = {model["name"] for model in response.json()["models"]}
if model_name in model_names:
print("Model is cached")
else:
# Pre-build the model so it gets cached and doesn't get built repeatedly in each subprocess
print("Building Model...")
await client.post("/models", json={"program_code": schools_code})
def gen_data():
data = []
for _ in range(20):
j = randint(1, 10)
y = [randint(-10, 10) for _ in range(j)]
sigma = [randint(1, 10) for _ in range(j)]
data.append((j, y, sigma))
return data
def run_stan(stan_args):
J, y, sigma = stan_args
schools_data = {"J": J, "y": y, "sigma": sigma}
posterior = stan.build(schools_code, data=schools_data)
fit = posterior.sample(num_chains=4, num_samples=1000)
return fit
async def run():
await build_model()
stan_params = gen_data()
results = []
executor = concurrent.futures.ProcessPoolExecutor(max_workers=mp.cpu_count())
for samples in executor.map(run_stan, stan_params):
results.append(samples)
executor.shutdown(wait=False, cancel_futures=True)
if __name__ == "__main__":
asyncio.run(run())
print("done!")
Thanks for the illustration. I'm still not entirely sure I understand why this isn't a problem with Python itself. Surely at some point Python is calling executor.shutdown(wait=False, cancel_futures=True)
, right?
Second quick query -- can't one just use a simpler change, something roughly like:
async def shutdown():
httpstan/services_stub.executor.shutdown(...)
app.on_cleanup.append(shutdown)
Thanks for the illustration. I'm still not entirely sure I understand why this isn't a
problem with Python itself. Surely at some point Python is calling
executor.shutdown(wait=False, cancel_futures=True), right?
It's not. Python doesn't know when you're done using an executor unless you use a Context Manager, which isn't the case here. Therefore the executors must be shut down explicitly.
Second quick query -- can't one just use a simpler change, something roughly like:
async def shutdown():
httpstan.services_stub.executor.shutdown(...)
app.on_cleanup.append(shutdown)
I just tried making only the following code changes to app.py
and using the example script I wrote above
diff --git a/httpstan/app.py b/httpstan/app.py
index 65bdff4e..05cc84a5 100644
--- a/httpstan/app.py
+++ b/httpstan/app.py
@@ -8,6 +8,7 @@ import logging
import aiohttp.web
import httpstan.routes
+import httpstan.services_stub
try:
from uvloop import EventLoopPolicy
@@ -29,6 +30,10 @@ async def _warn_unfinished_operations(app: aiohttp.web.Application) -> None:
logger.critical(f"Operation `{name}` cancelled before finishing.")
+async def shutdown_pool(app: aiohttp.web.Application):
+ httpstan.services_stub.executor.shutdown()
+
+
def make_app() -> aiohttp.web.Application:
"""Assemble aiohttp Application.
@@ -42,4 +47,5 @@ def make_app() -> aiohttp.web.Application:
# startup and shutdown tasks
app["operations"] = {}
app.on_cleanup.append(_warn_unfinished_operations)
+ app.on_cleanup.append(shutdown_pool)
return app
And it doesn't work -- the script doesn't hang but the sampling never happens and there are a bunch of crash logs:
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/process.py", line 262, in _process_worker
r = call_item.fn(*call_item.args, **call_item.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/process.py", line 214, in _process_chunk
return [fn(*args) for args in chunk]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/process.py", line 214, in <listcomp>
return [fn(*args) for args in chunk]
^^^^^^^^^
File "/Users/tc325/Documents/CLEANSER1.0/httpstan_test.py", line 60, in run_stan
fit = posterior.sample(num_chains=4, num_samples=1000)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/tc325/Documents/CLEANSER1.0/venv/lib/python3.11/site-packages/stan/model.py", line 89, in sample
return self.hmc_nuts_diag_e_adapt(num_chains=num_chains, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/tc325/Documents/CLEANSER1.0/venv/lib/python3.11/site-packages/stan/model.py", line 108, in hmc_nuts_diag_e_adapt
return self._create_fit(function=function, num_chains=num_chains, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/tc325/Documents/CLEANSER1.0/venv/lib/python3.11/site-packages/stan/model.py", line 313, in _create_fit
return asyncio.run(go())
^^^^^^^^^^^^^^^^^
File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 190, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 650, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/Users/tc325/Documents/CLEANSER1.0/venv/lib/python3.11/site-packages/stan/model.py", line 236, in go
raise RuntimeError(message)
RuntimeError: Exception during call to services function: `RuntimeError('cannot schedule new futures after shutdown')`, traceback: `[' File "/Users/tc325/Documents/httpstan/httpstan/services_stub.py", line 112, in call\n future = asyncio.get_running_loop().run_in_executor(executor, lazy_function_wrapper_partial) # type: ignore\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n', ' File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 826, in run_in_executor\n executor.submit(func, *args), loop=self)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n', ' File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/process.py", line 754, in submit\n raise RuntimeError("cannot schedule new futures after shutdown")\n']`
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/tc325/Documents/CLEANSER1.0/httpstan_test.py", line 78, in <module>
asyncio.run(run())
File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 190, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 650, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/Users/tc325/Documents/CLEANSER1.0/httpstan_test.py", line 72, in run
for samples in executor.map(run_stan, stan_params):
File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/process.py", line 588, in _chain_from_iterable_of_lists
for element in iterable:
File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/_base.py", line 618, in result_iterator
yield _result_or_cancel(fs.pop())
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/_base.py", line 318, in _result_or_cancel
return fut.result(timeout)
^^^^^^^^^^^^^^^^^^^
File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/_base.py", line 456, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
RuntimeError: Exception during call to services function: `RuntimeError('cannot schedule new futures after shutdown')`, traceback: `[' File "/Users/tc325/Documents/httpstan/httpstan/services_stub.py", line 112, in call\n future = asyncio.get_running_loop().run_in_executor(executor, lazy_function_wrapper_partial) # type: ignore\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n', ' File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 826, in run_in_executor\n executor.submit(func, *args), loop=self)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n', ' File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/process.py", line 754, in submit\n raise RuntimeError("cannot schedule new futures after shutdown")\n']`
I'd like to give this attention when I have some bandwidth available. I'm wary of adding anything to services_stub -- it's already a bit of a pain to understand what's going on.
I'd like to return to this once I figure out if we can build Apple Silicon wheels with the new Github runners. There are lots of people clamoring for them.
Thanks again for submitting such a careful patch. Sorry for the delay in reviewing this.
Some background: I'm using pystan in some multi-process code. Pystan works, but my script was hanging at the end. I found that httpstan was creating a bunch of processes and never ending them, which meant the pystan processes weren't ending and so my process wasn't ending. I traced the issue to this code that creates a ProcessPool in services_stub.py but never shuts down the pool.
Creating the ProcessPoolExecutor at the top level of the services_stub.py is a problem because the pool can't be shut down properly. This can break code using the httpstan library because, as the processes are never shut down, the calling process might hang on exit due to waiting for all child processes to exit..
I think this change solves the issue that creating the pool at the module top-level was meant to solve, but does so in a way that allows the process pool to complete its lifecycle, thus playing well with others.