Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KeyboardInterrupt is not processed correctly when using multiprocessing.Queue #2989

Open
1 task done
zoranbosnjak opened this issue Jul 23, 2024 · 0 comments
Open
1 task done
Labels

Comments

@zoranbosnjak
Copy link

Is there an existing issue for this?

  • I have searched the existing issues

Describe the bug

I am trying to use multiprocessing.Queue from my_process (see the sample code). The problem is that sanic won't stop on KeyboardInterrupt (even on repeated CTRL-C).

It looks like a bug, but I am not sure. I have tryed with several variations, without success. Please advice how to properly implement the intended communication and how to stop the process.

Code snippet

from multiprocessing import Queue                                                                          
from queue import Empty                                                                                    
import time                                                                                                
import asyncio                                                                                             
from sanic import Request, Sanic, json                                                                     
                                                                                                           
def my_process(q):                                                                                         
    try:                                                                                                   
        while True:                                                                                        
            q.put(time.monotonic())                                                                        
            time.sleep(1)                                                                                  
    except KeyboardInterrupt:                                                                              
        pass                                                                                               
                                                                                                           
app = Sanic("TestApp")                                                                                     
                                                                                                           
@app.get("/")                                                                                              
async def handler(request: Request):                                                                       
    return json({"foo": "bar"})                                                                            
                                                                                                           
async def reader() -> None:                                                                                
    while True:                                                                                            
        print('got', app.shared_ctx.queue.get())                                                           
                                                                                                           
@app.before_server_start                                                                                   
async def on_before_server_start(app) -> None:                                                             
    print('startup...')                                                                                    
    loop = asyncio.get_running_loop()                                                                      
    loop.create_task(reader(), name='reader')                                                              
                                                                                                           
@app.before_server_stop                                                                                    
async def on_before_server_stop(app) -> None:                                                              
    print('cleanup...')                                                                                    
    tasks = []                                                                                             
    for task in asyncio.all_tasks():                                                                       
        if task.get_name() in ['reader']:                                                                  
            tasks.append(task)                                                                             
    for task in tasks:                                                                                     
        task.cancel() 

@app.main_process_start                                                                                    
async def main_process_start(app):                                                                         
    app.shared_ctx.queue = Queue()                                                                         
                                                                                                           
@app.main_process_ready                                                                                    
async def ready(app: Sanic, _):                                                                            
    app.manager.manage("MyProcess", my_process, {"q": app.shared_ctx.queue})                               
                                                                                                           
if __name__ == '__main__':                                                                                 
    app.run(debug=True)  

Expected Behavior

Clean shutdown on the first CTRL-C.

How do you run Sanic?

Sanic CLI

Operating System

Linux

Sanic Version

Sanic 23.12.2; Routing 23.12.0

Additional context

No response

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant