Open rafa-be opened 2 weeks ago
One way to prevent the processor thread to do any computation is to setup a signal handler that waits on a condition variable:
import multiprocessing
import os
import time
import signal
def process_2(condition):
def signal_handler(sig, frame):
if sig == signal.SIGUSR1:
print(f"Process 2 received signal, wait on condition variable.")
with condition:
condition.wait()
print("Process 2 stopped waiting on condition variable")
signal.signal(signal.SIGUSR1, signal_handler)
while True:
print("Process 2 is doing a computation...")
time.sleep(1)
if __name__ == '__main__':
condition = multiprocessing.Condition()
p2 = multiprocessing.Process(target=process_2, args=(condition,))
p2.start()
time.sleep(5)
print(f"Process 1 sending SIGUSR1 to Process 2 (PID {p2.pid})")
os.kill(p2.pid, signal.SIGUSR1)
time.sleep(5) # Wait a bit before notifying the condition variable
print("Process 1 notifying the condition variable.")
with condition:
condition.notify()
# Wait for Process 2 to finish
p2.join()
Process 2 is doing a computation...
Process 2 is doing a computation...
Process 2 is doing a computation...
Process 2 is doing a computation...
Process 2 is doing a computation...
Process 1 sending SIGUSR1 to Process 2 (PID 84341)
Process 2 received signal, wait on condition variable.
Process 1 notifying the condition variable.
Process 2 stopped waiting on condition variable
Process 2 is doing a computation...
Process 2 is doing a computation...
Process 2 is doing a computation...
[..]
This only blocks the main thread, and will not prevent any client or 0mq thread to execute.
I couldn't find any documentation saying that waiting on a synchronization primitive should not be done inside a signal handler, except that it's not recommended as it might prevent the main thread to execute... but that's exactly what we are trying to do...
When a task gets suspended, it is prevented from running by a
SIGSTOP
signal.If the task is running a Scaler client, that will prevent it from sending heartbeats to the scheduler. If suspended for long enough, the client will raise either a
TimeoutError
orClientQuitException
exception (if the timeout is first triggered at the client or scheduler side).The (currently disabled)
test_no_timeout_if_suspended
test reproduces this issue.One way to solve this problem is to disable the client heartbeat mechanism when the client is running inside a worker. That raises a few changes: