sanic-org / sanic

Accelerate your web app development | Build fast. Run fast.
https://sanic.dev
MIT License
18.01k stars 1.54k forks source link

Sanic and subprocesses for CPU intensive work #1172

Closed kinware closed 6 years ago

kinware commented 6 years ago

I'm trying to get Sanic (and uvloop) to use a worker pool of sub-processes for CPU intensive requests. So far I've come up with the below code. The example "work" is first run once (to ensure the process pool works as intended). Then sanic is started and the user can trigger the "work" through HTTP. See below.

import asyncio
import concurrent
import os
import time
import uvloop

from sanic import Sanic, response, config

# Define a CPU bound worker function
def deep_thought(data):
    print("Working to answer: " + data)
    time.sleep(5)  # Actually not CPU bound in this example, but outcome should be identical
    return 42

# Enable extra asyncio debugging
os.environ['PYTHONASYNCIODEBUG'] = "1"

# Make sure uvloop is used as event loop by default
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

# Instantiate an event loop object for main thread
loop = asyncio.get_event_loop()

# Create ProcessPoolExecutor
executor = concurrent.futures.ProcessPoolExecutor(max_workers=5)

# Define a coroutine that will call the CPU bound function in a sub process
async def ask_question():
    res = await loop.run_in_executor(
        executor,
        deep_thought,
        "What's the answer to life, the universe and everything? (no sanic)",
    )
    print('Answer (no sanic): ' + str(res))

# Create a task and run it in the the event loop
print('Asking question without sanic started')
task = loop.create_task(ask_question())
loop.run_until_complete(task)

# Create the Sanic app which should(?!?) use the same event loop as create above
config.Config.LOGO = None
app = Sanic()

# Endpoint to test server responsiveness
@app.route("/")
async def root(request):
    return response.json({"root": True})

# CPU bound endpoint
@app.route("/ask")
async def ask(request):
    res = await loop.run_in_executor(
        executor,
        deep_thought,
        "What's the answer to life, the universe and everything? (sanic)"
    )
    return response.json({"result": res})

# Run the Sanic app (which will start the event loop again)
app.run(host="0.0.0.0", port=12000, debug=True)

I run the above using sanic from master branch, uvloop 0.8.1 together with python 3.6.3. The first run of the worker function in a subprocess (before starting sanic) works fine. However, when initiating the work through sanic (http://localhost:12000/ask) it bombs with an exception:

$ python3.6 ./test_sanic_subprocess.py
Asking question without sanic started
Working to answer: What's the answer to life, the universe and everything? (no sanic)
Answer (no sanic): 42
[2018-03-23 10:24:16 +0100] [19475] [INFO] Goin' Fast @ http://0.0.0.0:12000
[2018-03-23 10:24:16 +0100] [19475] [INFO] Starting worker [19475]
[2018-03-23 10:24:19 +0100] - (sanic.access)[INFO][127.0.0.1:51520]: GET http://localhost:12000/  200 13
[2018-03-23 10:24:23 +0100] [19475] [ERROR] Traceback (most recent call last):
  File "/home/kinware/src/sanic/sanic/app.py", line 597, in handle_request
    response = await response
  File "/home/kinware/python/lib/python3.6/asyncio/coroutines.py", line 128, in throw
    return self.gen.throw(type, value, traceback)
  File "./test_subprocess_nosanic.py", line 59, in ask
    "What's the answer to life, the universe and everything? (sanic)"
RuntimeError: Task <Task pending coro=<Sanic.handle_request() running at /home/kinware/src/sanic/sanic/app.py:597> created at /home/kinware/src/sanic/sanic/server.py:304> got Future <Future pending cb=[_chain_future.<locals>._call_check_cancel()] created at ./test_subprocess_nosanic.py:59> attached to a different loop

Working to answer: What's the answer to life, the universe and everything? (sanic)
[2018-03-23 10:24:23 +0100] [19475] [ERROR] Exception occurred while handling uri: "http://localhost:12000/ask"
Traceback (most recent call last):
  File "/home/kinware/src/sanic/sanic/app.py", line 597, in handle_request
    response = await response
  File "/home/kinware/python/lib/python3.6/asyncio/coroutines.py", line 128, in throw
    return self.gen.throw(type, value, traceback)
  File "./test_subprocess_nosanic.py", line 59, in ask
    "What's the answer to life, the universe and everything? (sanic)"
RuntimeError: Task <Task pending coro=<Sanic.handle_request() running at /home/kinware/src/sanic/sanic/app.py:597> created at /home/kinware/src/sanic/sanic/server.py:304> got Future <Future pending cb=[_chain_future.<locals>._call_check_cancel()] created at ./test_subprocess_nosanic.py:59> attached to a different loop

[2018-03-23 10:24:23 +0100] - (sanic.access)[INFO][127.0.0.1:51520]: GET http://localhost:12000/ask  500 2977

It appears from the exceptions that there are multiple event loops in play here, but I'm not quite sure why/where these are created. Maybe I'm misunderstanding the python event loop concept? Also not sure what happens when the current process fork()s and the child worker processes inherits the event loop from the master process.

Any ideas what I'm doing wrong here?

If I can get it working I'd be happy to submit a PR for sanic/examples/ showing how to handle CPU intensive work in a subprocess in conjunction with the normal async processing sanic does so well.

kinware commented 6 years ago

Digging into the sanic I could see that there is a difference in how the loop is handled when launching sanic through Sanic.run() vs Sanic.create_server().

Sanic.run() uses

    run_async=False  # Implicit from default parameter of Sanic._helper()
    loop=None        # Implicit from default parameter of Sanic._helper()

which causes sanic.server.serve() to create a new loop by doing:

if not run_async:
    # create new event_loop after fork
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

Sanic.create_server() uses

    run_async=True         # Explicitly set in create_server()
    loop=get_event_loop()  # Explicitly set in create_server()

This does not create a new loop, but it also leaves it to the caller to run the actual event loop (as opposed to Sanic.run() which conveniently does run the loop).

So there are two ways to get rid of the exceptions.

One is to just allow Sanic to create it's on loop and update the code to use that loop. Easily accomplished by changing loop.run_in_executor() to asyncio.get_event_loop().run_in_executor() (or possibly request.app.loop.run_in_executor()).

The alternative is to run the event loop outside of sanic and switch from Sanic.run() to Sanic.create_server() which should correctly pick up the event loop as it uses get_event_loop() internally (instad of new_event_loop() as Sanic.app() does).

This solves the problem for me. Maybe someone else finds this useful too.

Closing issue.

zhugexinxin commented 1 year ago

request.app.loop.run_in_executor()

request.app.loop.run_in_executor(None,func)

I'm not very good at python, can the executor be set to None and let it read the default values?