noxdafox / pebble

Multi threading and processing eye-candy.
GNU Lesser General Public License v3.0
536 stars 52 forks source link

SIGTERM handling in pebble.ProcessPool #137

Open xiaodongsg opened 2 months ago

xiaodongsg commented 2 months ago

With the following code:

from pebble import ProcessPool
import concurrent.futures
import asyncio
import time

def blocking_sleep():
    for i in range(2):
        print('sleeping for 1 second')
        time.sleep(1)
    print("finished")

async def main():
    loop = asyncio.get_running_loop()
    with ProcessPool(max_workers=5) as pool:
        tasks = []
        tasks.append(loop.run_in_executor(
            pool,
            blocking_sleep,
            None
            ))

        await asyncio.gather(*tasks, return_exceptions=False)

import signal
signal.signal(signal.SIGTERM, signal.default_int_handler)   # <---- signal handler installed

asyncio.run(main())

with the installed signal handler, the above code crashed with

Process pebble_pool_worker:
Traceback (most recent call last):
  File "some/path/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "some/path/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "some/path/lib/python3.11/site-packages/pebble/pool/process.py", line 427, in worker_process
    for task in worker_get_next_task(channel, params.max_tasks):
  File "some/path/lib/python3.11/site-packages/pebble/pool/process.py", line 443, in worker_get_next_task
    yield fetch_task(channel)
          ^^^^^^^^^^^^^^^^^^^
  File "some/path/lib/python3.11/site-packages/pebble/pool/process.py", line 455, in fetch_task
    while channel.poll():
          ^^^^^^^^^^^^^^
  File "some/path/lib/python3.11/site-packages/pebble/pool/channel.py", line 59, in unix_poll
    return bool(poll.poll(timeout))
                ^^^^^^^^^^^^^^^^^^
KeyboardInterrupt

The concurrent.futures.ProcessPoolExecutor works fine with this setup.

from pebble import ProcessPool
import concurrent.futures
import asyncio
import time

def blocking_sleep():
    for i in range(2):
        print('sleeping for 1 second')
        time.sleep(1)
    print("finished")

async def main():
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        tasks = []
        tasks.append(loop.run_in_executor(
            pool,
            blocking_sleep
            ))

        await asyncio.gather(*tasks, return_exceptions=False)

import signal
signal.signal(signal.SIGTERM, signal.default_int_handler)
asyncio.run(main())

Can you please help on this?