sanic-org / sanic

Accelerate your web app development | Build fast. Run fast.
https://sanic.dev
MIT License
18.11k stars 1.55k forks source link

KeyboardInterrupt is not processed correctly when using multiprocessing.Queue #2989

Open zoranbosnjak opened 3 months ago

zoranbosnjak commented 3 months ago

Is there an existing issue for this?

Describe the bug

I am trying to use multiprocessing.Queue from my_process (see the sample code). The problem is that sanic won't stop on KeyboardInterrupt (even on repeated CTRL-C).

It looks like a bug, but I am not sure. I have tryed with several variations, without success. Please advice how to properly implement the intended communication and how to stop the process.

Code snippet

from multiprocessing import Queue                                                                          
from queue import Empty                                                                                    
import time                                                                                                
import asyncio                                                                                             
from sanic import Request, Sanic, json                                                                     

def my_process(q):                                                                                         
    try:                                                                                                   
        while True:                                                                                        
            q.put(time.monotonic())                                                                        
            time.sleep(1)                                                                                  
    except KeyboardInterrupt:                                                                              
        pass                                                                                               

app = Sanic("TestApp")                                                                                     

@app.get("/")                                                                                              
async def handler(request: Request):                                                                       
    return json({"foo": "bar"})                                                                            

async def reader() -> None:                                                                                
    while True:                                                                                            
        print('got', app.shared_ctx.queue.get())                                                           

@app.before_server_start                                                                                   
async def on_before_server_start(app) -> None:                                                             
    print('startup...')                                                                                    
    loop = asyncio.get_running_loop()                                                                      
    loop.create_task(reader(), name='reader')                                                              

@app.before_server_stop                                                                                    
async def on_before_server_stop(app) -> None:                                                              
    print('cleanup...')                                                                                    
    tasks = []                                                                                             
    for task in asyncio.all_tasks():                                                                       
        if task.get_name() in ['reader']:                                                                  
            tasks.append(task)                                                                             
    for task in tasks:                                                                                     
        task.cancel() 

@app.main_process_start                                                                                    
async def main_process_start(app):                                                                         
    app.shared_ctx.queue = Queue()                                                                         

@app.main_process_ready                                                                                    
async def ready(app: Sanic, _):                                                                            
    app.manager.manage("MyProcess", my_process, {"q": app.shared_ctx.queue})                               

if __name__ == '__main__':                                                                                 
    app.run(debug=True)  

Expected Behavior

Clean shutdown on the first CTRL-C.

How do you run Sanic?

Sanic CLI

Operating System

Linux

Sanic Version

Sanic 23.12.2; Routing 23.12.0

Additional context

No response