uqfoundation / pathos

parallel graph management and execution in heterogeneous computing
http://pathos.rtfd.io
Other
1.38k stars 89 forks source link

Subprocess killed by OOM killer leaves the script hanging forever #230

Open nicolaslegrand91 opened 2 years ago

nicolaslegrand91 commented 2 years ago

Issue

I recently faced an issue where my script was hanging forever, after treating all jobs (which I know from the prints), with 0% CPU utilized.

I eventually understood that it was because the OOM killer had killed one of the subprocesses because it used too much RAM. A new one was immediately created but it doesn't seem used, and leads Pathos to hang forever.

Possible mitigation

I guess this is quite hard to fix, but would it be at least possible to catch this problem (a subprocess killed externally), and raise an exception in the main script, so that it does not hang forever?

To reproduce the problem

If you want to try by yourself, you can reproduce the same behaviour with the following code snippet:

import time
from pathos.multiprocessing import ProcessPool

def do_something(i):
    print(i, 'entering')
    time.sleep(2)
    print(i, 'returning')
    return i

with ProcessPool(2) as pool:
    results = pool.map(
        do_something,
        range(5)
    )

During the execution, you can use htop to kill one of the subprocesses (last 2 lines of my screenshot). If you do this, you'll end up in a hanging state: no CPU use, but the Python script never returns.

image

Note: type t in htop to enable tree view, then F4 to filter

ehoppmann commented 2 years ago

I've been having a lot of similar issues with some long running jobs, where I think a subprocess dies (it's hard to say this is always it, I have seen OOM errors, I have also seen errors like _pickle.UnpicklingError: invalid load key, '\x00'., and sometimes the pool has hung without error and without any printed exceptions).

It's easy to detect when this happens, e.g. by feeding the pool using apipe and detecting when no futures have completed in a while. So perhaps detecting this situation is easier than you suspect.

I've tried to code a watchdog based on that, and the watchdog code is triggered, but when I try to recover by calling

pool.close()
pool.join()
pool.clear()

That also hangs.

Curious if anyone has found a way to recover from this sort of scenario.

FWIW I tried to code up a minimal example that reflects how my code works and wasn't able to reproduce the hang by killing a subprocess manually as the OP could. If anyone has any way to 100% reproduce this situation, I'd appreciate any pointers, as that could help develop workarounds.

That code:

import time
import multiprocessing as mp

from pathos.multiprocessing import ProcessPool

MAX_FUTURES = 8

try:
    mp.set_start_method('forkserver')  # needed to avoid deadlocks on unix systems with some libraries e.g. xgboost
except RuntimeError:
    pass

def do_something(i):
    print(i, 'entering')
    time.sleep(5)
    print(i, 'returning')
    return i

pool = ProcessPool(2)
i = 0
loop_cnt = 0
last_progress = loop_cnt
futures = {}
while True:
    while len(futures) < MAX_FUTURES:
        futures[i] = pool.apipe(do_something, i)
        i += 1
    completed = {}
    for _i, future in futures.items():
        if future.ready():
            completed[_i] = future.get()
            last_progress = i
    for _i, result in completed.items():
        print(f'got result {result}')
        del futures[_i]
    if last_progress < (loop_cnt - 60 // 5):
        print(f'pool hung for at least 1 minute')
    time.sleep(5)
    loop_cnt += 1
ehoppmann commented 2 years ago

I came up with a workaround that works great for my use case, whenever the pool dies for whatever reason I'm able to reliably resume from my last checkpoint. In case this is helpful to anyone else:

1) My main loop establishes a subprocess, which sets its pgroup (os.setpgrp()) prior to creating the Pathos pool 2) The subprocess kicks off the actual pathos worker pool, and submits work using pool.apipe as in my code example above, maintaining a limited count of futures. 3) When the while loop in the subprocess detects that no futures have been completed for some time, it calls os.killpg(0, signal.SIGKILL), killing itself and the workers 4) Whenever the subprocess terminates, the main loop checks the exit code to see if it terminated due to a SIGKILL signal (p.exitcode == -signal.SIGKILL.value), if so, it re-establishes the subprocess and workers so that we can resume from the last checkpoint.

showkeyjar commented 1 year ago

@ehoppmann would you please provide your code?