Closed marxin closed 4 years ago
What you observe is the expected behaviour. There are 2 issues with you code.
Firstly, you are cancelling an already terminated future. Cancelling an already finished future is basically a no-op.
In you code you schedule 4 jobs with 1 second timeout each. Then you wait 3 seconds (enough for all jobs to timeout) and then you cancel them.
You can see the futures are in the correct state:
<ProcessFuture at 0x7fc92b5a4d90 state=finished raised TimeoutError>
Secondly, you are launching a new process via the subprocess
module inside the pool worker. This is the best way to leak those processes. As the pool worker process is terminated after 1 second, its child is left hanging there. The Pool
is in fact tracking its own processes and not any other resource a worker function creates. It is the developer duty to handle any persistent resource a worker is initializing.
Moreover, it is consider a bad pattern to nest multiple levels of processes as it can easily lead to process leaks.
You have no value in using subprocess
within a Pool of Processes. The subprocess
module in fact, already allows to interrupt a running command. If you want to execute multiple commands asynchronously via the subprocess
module, the best way is to either control them in the main loop or to use multithreading.
What you observe is the expected behaviour. There are 2 issues with you code.
Thank you for the helpful answer.
Firstly, you are cancelling an already terminated future. Cancelling an already finished future is basically a no-op. In you code you schedule 4 jobs with 1 second timeout each. Then you wait 3 seconds (enough for all jobs to timeout) and then you cancel them. You can see the futures are in the correct state:
<ProcessFuture at 0x7fc92b5a4d90 state=finished raised TimeoutError>
That's what I expected that .cancel
is not needed.
Secondly, you are launching a new process via the
subprocess
module inside the pool worker. This is the best way to leak those processes. As the pool worker process is terminated after 1 second, its child is left hanging there. ThePool
is in fact tracking its own processes and not any other resource a worker function creates. It is the developer duty to handle any persistent resource a worker is initializing. Moreover, it is consider a bad pattern to nest multiple levels of processes as it can easily lead to process leaks.
Well, it seems to me quite natural approach that is used on Unix system.
My Pebble usage is about running parallel tasks (for which I chose ProcessPool
) where some of these task need to execute an external program (that why I use subprocess
). Moreover, the ProcessPool
selection was based on fact that I need an immediate cancel option for futures. Am I right that ThreadPool
future can't be easily canceled?
You have no value in using
subprocess
within a Pool of Processes. Thesubprocess
module in fact, already allows to interrupt a running command. If you want to execute multiple commands asynchronously via thesubprocess
module, the best way is to either control them in the main loop or to use multithreading.
Ok, so is solution to my task usage of p = subprocess.Popen
and p.terminate()
being called from termination signal handler? Or am I use os.killpg(os.getpid(), signal.SIGTERM)
from such a signal handler?
I'm all ears for proper solution for my workload? Thanks!
Am I right that
ThreadPool
future can't be easily canceled?
Cannot be as explained for instance here: https://github.com/noxdafox/pebble/issues/23#issuecomment-372102938.
You can apply two strategy in here which do not require the use of external modules such as pebble
.
The first would be a polling routine in your main loop. As subprocess.Popen
returns an object which can be checked for completion, you can simply use it to check whether or not your command has finished or timed out.
Some pseudocode as example:
TIMEOUT = 600
processes = []
start = time.time()
now = start
for command in command_list:
processes.append(subprocess.Popen(command, ...))
while now - start < TIMEOUT:
for process in processes[:]:
if process.poll() is not None:
# process has completed its job on time
processes.remove(process)
process_result(process)
time.sleep(0.2)
now = time.time()
if len(processes) > 0:
print("Processes %s still pending, terminating them")
for process in processes:
process.terminate()
The second would rely on a thread pool and handle the timeout in the worker itself:
TIMEOUT = 600
futures = []
def worker_function(command, timeout):
process = subprocess.Popen(command, ...)
try:
return process.communicate(timeout=timeout)
except TimeoutExpired:
process.terminate()
raise
with concurrent.futures.ThreadPoolExecutor() as pool:
for command in command_list:
futures.append(pool.submit(worker_function, command, TIMEOUT))
for future in futures:
try:
process_result(future.result())
except TimeoutExpired:
print("Timeout while processing a command")
In both cases the logic would be a bit more complicated if you'd like to have different timeouts for each process but not too much, just replace the processes
list with a dictionary containing the desired timeout as well.
The first approach is pretty lightweight as it does not require any additional resources to run your commands. Nevertheless, it's logic is a bit more complex. The second approach requires to spawn few threads for it to work which is additional burden on your workstation. Its logic is a bit cleaner though.
Thank you very much for your answer.
You can apply two strategy in here which do not require the use of external modules such as
pebble
. What a pity, I really like the API provided bypebble
. But let me describe my use-case and how the suggested solutions do not fit precisely.
My workload:
The first would be a polling routine in your main loop. As
subprocess.Popen
returns an object which can be checked for completion, you can simply use it to check whether or not your command has finished or timed out.
This does not cover my Python jobs that does need to run a sub-process.
Some pseudocode as example:
TIMEOUT = 600 processes = [] start = time.time() now = start for command in command_list: processes.append(subprocess.Popen(command, ...)) while now - start < TIMEOUT: for process in processes[:]: if process.poll() is not None: # process has completed its job on time processes.remove(process) process_result(process) time.sleep(0.2) now = time.time() if len(processes) > 0: print("Processes %s still pending, terminating them") for process in processes: process.terminate()
The second would rely on a thread pool and handle the timeout in the worker itself:
TIMEOUT = 600 futures = [] def worker_function(command, timeout): process = subprocess.Popen(command, ...) try: return process.communicate(timeout=timeout) except TimeoutExpired: process.terminate() raise with concurrent.futures.ThreadPoolExecutor() as pool: for command in command_list: futures.append(pool.submit(worker_function, command, TIMEOUT)) for future in futures: try: process_result(future.result()) except TimeoutExpired: print("Timeout while processing a command")
This is problematic as I want the capability to terminate running jobs (here Threads can't be terminated).
In both cases the logic would be a bit more complicated if you'd like to have different timeouts for each process but not too much, just replace the
processes
list with a dictionary containing the desired timeout as well.
Different timeouts are not an issue for me, I'll use only one for all of jobs.
The first approach is pretty lightweight as it does not require any additional resources to run your commands. Nevertheless, it's logic is a bit more complex. The second approach requires to spawn few threads for it to work which is additional burden on your workstation. Its logic is a bit cleaner though.
I see, but I still don't see a scenario for all my needs. Thanks.
I am afraid there is not a solution for your problem which can be handled with a single framework.
I would suggest you to use pebble.ProcessPool
for running your Python workloads concurrently and use the subprocess.Popen
polling technique to control the external programs. You can then handle the termination of the two pools together once the stop signal is triggered.
Be wary that mixing subprocess
and multiprocessing
in the same program is considered dangerous especially in Unix environments due to the way fork
handles child process creation.
Example:
https://bugs.python.org/issue25829
I would suggest you to use
pebble.ProcessPool
for running your Python workloads concurrently and use thesubprocess.Popen
polling technique to control the external programs.
Do you mean here doing subprocess.Popen
in a pebble.ProcessPool
context? If so, should I rather register a signal handler that will call the os.kill(pid_from_popen)
or is it better to send the PID to the main process that will then do the termination?
You can then handle the termination of the two pools together once the stop signal is triggered.
Be wary that mixing
subprocess
andmultiprocessing
in the same program is considered dangerous especially in Unix environments due to the wayfork
handles child process creation. Example: https://bugs.python.org/issue25829
Good to know!
I've tried a little demo:
#!/usr/bin/env python3
from pebble import ProcessPool
from concurrent.futures import wait, FIRST_COMPLETED
import os
import subprocess
import time
import signal
import datetime
popen_pid = None
def gt():
return str(datetime.datetime.now())
def signal_handler(signum, frame):
global popen_pid
os.kill(popen_pid, signum)
print('%s: killing: %d' % (gt(), popen_pid))
def run():
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
global popen_pid
try:
p = subprocess.Popen('md5sum /dev/random', shell=True)
popen_pid = p.pid
print('%s: create PID: %d' % (gt(), popen_pid))
p.communicate()
except Exception as e:
print(e)
asdf
with ProcessPool(max_workers=4) as pool:
print('%s New ProcessPool:' % gt())
futures = []
for i in range(4):
futures.append(pool.schedule(run, timeout=1))
print('%s sleep(3):' % gt())
time.sleep(3)
print('%s after sleep:' % gt())
for f in futures:
print(f)
print('%s: at the end' % gt())
And I see:
2020-04-22 14:11:08.530282 New ProcessPool:
2020-04-22 14:11:08.535995 sleep(3):
2020-04-22 14:11:08.539071: create PID: 24516
2020-04-22 14:11:08.539189: create PID: 24515
2020-04-22 14:11:08.540193: create PID: 24518
2020-04-22 14:11:08.540494: create PID: 24517
2020-04-22 14:11:09.537216: killing: 24515
2020-04-22 14:11:11.539053 after sleep:
<ProcessFuture at 0x7f09dd71e250 state=finished raised TimeoutError>
<ProcessFuture at 0x7f09dd771fd0 state=running>
<ProcessFuture at 0x7f09dd771f10 state=running>
<ProcessFuture at 0x7f09dd771dc0 state=running>
2020-04-22 14:11:12.641871: killing: 24516
2020-04-22 14:11:15.644228: killing: 24517
2020-04-22 14:11:18.648170: killing: 24518
2020-04-22 14:11:21.761921: at the end
as seen it takes a long time to terminate a Process
. It's likely related to this join
:
https://github.com/noxdafox/pebble/blob/161b900419cdf8cc1407b584a05f8f5e2e576787/pebble/common.py#L149
Am I doing something wrong in the signal handler? Thanks.
print('%s: killing: %d' % (gt(), popen_pid))
Calling sys.exit(1)
works for me. Hope it's the proper fix.
And there's a working example for multiprocessing.Queue
:
#!/usr/bin/env python3
from pebble import ProcessPool
from concurrent.futures import wait, FIRST_COMPLETED
from multiprocessing import Queue, Manager
import os
import subprocess
import time
import signal
import datetime
import sys
popen_pid = None
def gt():
return str(datetime.datetime.now())
def run(queue):
try:
p = subprocess.Popen('md5sum /dev/random', shell=True)
popen_pid = p.pid
queue.put(popen_pid)
print('%s: create PID: %d' % (gt(), popen_pid))
p.communicate()
except Exception as e:
print(e)
asdf
with ProcessPool(max_workers=4) as pool:
m = Manager()
pid_queue = m.Queue()
print('%s New ProcessPool:' % gt())
futures = []
for i in range(4):
futures.append(pool.schedule(run, [pid_queue], timeout=1))
print('%s sleep 2:' % gt())
time.sleep(2)
print('%s after sleep:' % gt())
for f in futures:
print(f)
while not pid_queue.empty():
pid = pid_queue.get()
print('%s: killing: %d' % (gt(), pid))
os.kill(pid, signal.SIGTERM)
print('%s: at the end' % gt())
$ 2020-04-22 14:42:07.930466 New ProcessPool:
2020-04-22 14:42:07.935961 sleep 2:
2020-04-22 14:42:07.939869: create PID: 27305
2020-04-22 14:42:07.962820: create PID: 27307
2020-04-22 14:42:07.983714: create PID: 27310
2020-04-22 14:42:07.983889: create PID: 27312
2020-04-22 14:42:09.937035 after sleep:
<ProcessFuture at 0x7fa5c28c96a0 state=finished raised TimeoutError>
<ProcessFuture at 0x7fa5c28c9790 state=finished raised TimeoutError>
<ProcessFuture at 0x7fa5c28c9850 state=finished raised TimeoutError>
<ProcessFuture at 0x7fa5c28c9910 state=finished raised TimeoutError>
2020-04-22 14:42:09.947932: killing: 27305
2020-04-22 14:42:09.948127: killing: 27307
2020-04-22 14:42:09.948299: killing: 27310
2020-04-22 14:42:09.948461: killing: 27312
2020-04-22 14:42:10.049040: at the end
I hope we can close the issue. Thanks for the brainstorming!
I meant to suggest to use pebble.ProcessPool
for jobs which are to be solved natively in Python and to use the subprocess.Popen
polling loop for those which are external programs.
They would act as two separate "pools" which you can terminate according to your needs.
And there's a working example for
multiprocessing.Queue
:#!/usr/bin/env python3 from pebble import ProcessPool from concurrent.futures import wait, FIRST_COMPLETED from multiprocessing import Queue, Manager import os import subprocess import time import signal import datetime import sys popen_pid = None def gt(): return str(datetime.datetime.now()) def run(queue): try: p = subprocess.Popen('md5sum /dev/random', shell=True) popen_pid = p.pid queue.put(popen_pid) print('%s: create PID: %d' % (gt(), popen_pid)) p.communicate() except Exception as e: print(e) asdf with ProcessPool(max_workers=4) as pool: m = Manager() pid_queue = m.Queue() print('%s New ProcessPool:' % gt()) futures = [] for i in range(4): futures.append(pool.schedule(run, [pid_queue], timeout=1)) print('%s sleep 2:' % gt()) time.sleep(2) print('%s after sleep:' % gt()) for f in futures: print(f) while not pid_queue.empty(): pid = pid_queue.get() print('%s: killing: %d' % (gt(), pid)) os.kill(pid, signal.SIGTERM) print('%s: at the end' % gt())
$ 2020-04-22 14:42:07.930466 New ProcessPool: 2020-04-22 14:42:07.935961 sleep 2: 2020-04-22 14:42:07.939869: create PID: 27305 2020-04-22 14:42:07.962820: create PID: 27307 2020-04-22 14:42:07.983714: create PID: 27310 2020-04-22 14:42:07.983889: create PID: 27312 2020-04-22 14:42:09.937035 after sleep: <ProcessFuture at 0x7fa5c28c96a0 state=finished raised TimeoutError> <ProcessFuture at 0x7fa5c28c9790 state=finished raised TimeoutError> <ProcessFuture at 0x7fa5c28c9850 state=finished raised TimeoutError> <ProcessFuture at 0x7fa5c28c9910 state=finished raised TimeoutError> 2020-04-22 14:42:09.947932: killing: 27305 2020-04-22 14:42:09.948127: killing: 27307 2020-04-22 14:42:09.948299: killing: 27310 2020-04-22 14:42:09.948461: killing: 27312 2020-04-22 14:42:10.049040: at the end
So this totally solved a similar issue I was having. Thank you :-)
I can't cancel a running
pebble.ProcessFuture
after a timeout is reached:and