python / cpython

The Python programming language
https://www.python.org
Other
63.44k stars 30.37k forks source link

multiprocessing.Barrier does not return if waiting process is terminated (tested on windows) #123899

Open jpvolkmann opened 2 months ago

jpvolkmann commented 2 months ago

Bug report

Bug description:

I'm using multiprocessing.Barrier to synchronize processes. The individual processes might get terminated/killed by a GUI user interaction. In case that a process, that already entered the waiting state, is killed, the barrier will never be released, a timeout is not taken into account. When debugging this issue I realized that it is cause by https://github.com/python/cpython/blob/b52de7e02dba9e1f176d6d978d782fbd0509311e/Lib/multiprocessing/synchronize.py#L297

self._woken_count is not released by the terminated process and no timeout is specified in the call to self._woken_count.acquire()

If I add a (hardcoded...) timeout toself._woken_count.acquire(True, 2.0) the program continues as expected. I hope that there a better approaches to fix this than using a hardcoded timeout...

Example script to reproduce error.

import logging
import time
import sys
import multiprocessing as mp
import multiprocessing.synchronize

from pathlib import Path

def process_main(process_index, barrier: multiprocessing.synchronize.Barrier):
    logging.debug('Process %d: Started', process_index)
    time.sleep(0.5)
    logging.debug('Process %d: Waiting for barrier', process_index)
    barrier.wait(timeout=5.0)
    logging.debug('Process %d: Barrier passed', process_index)
    time.sleep(0.5)
    logging.debug('Process %d: Terminated', process_index)

if __name__ == '__main__':
    # Set up logging
    logfile_name = Path(__file__).with_suffix('.log').name
    logging.basicConfig(
        level=logging.DEBUG,
        format='%(asctime)s %(levelname)s:%(name)s %(message)s',
        handlers=[
            logging.FileHandler(logfile_name, mode='w'),
            logging.StreamHandler(sys.stdout)
        ]    
    )

    instance_count = 4
    barrier = mp.Barrier(instance_count)

    processes = []
    for i in range(instance_count):
        runner_process = mp.Process(
            target=process_main, args=(i, barrier), daemon=True)
        processes.append(runner_process)

    for i, process in enumerate(processes):
        logging.debug('Starting process %d', i)
        process.start()
        time.sleep(0.200)

    # Terminate already waiting process
    logging.debug('Killing process 0')
    processes[0].kill()

    for process in processes:
        process.join()

CPython versions tested on:

3.12

Operating systems tested on:

Windows

Linked PRs

Zheaoli commented 2 months ago

Bug confirmed(on Linux)

picnixz commented 2 months ago

Just wondering, but can you reproduce the issue without the logging part?

jpvolkmann commented 2 months ago

Just wondering, but can you reproduce the issue without the logging part?

Just tested without logging, same behavior as before. (as expected) self._woken_count.acquire() is called in blocking mode without timeout

Zheaoli commented 2 months ago

@picnixz would you mind assigning this issue to me (lol

Zheaoli commented 2 months ago

Upon thorough investigation of this issue, I believe it should be classified as a missing feature rather than a bug.

Specifically, we need to implement a timeout action for the following:

I'm currently considering whether to open a feature request on discuss.python.org for this enhancement.

cc @picnixz

YvesDup commented 1 month ago

Barrier class already has a timeout argument and its wait method too. Have they not ? Or I miss something.

thomas-lets-go commented 1 month ago

I'm also a little confused, it seem that the problem is about killing process gracefully without causing deadlock, not about requesting new features from multiprocessing.Barrier.

Process.kill() is a brutal way to kill process, because exit handlers and finally clauses, etc., will not be executed, so Barrier is not properly cleaned up and it's internal state is not consistent, as a result other processes are blocked forever.

multiprocessing.Barrier is just a clone of threading.Barrier, which has all the necessary routines including Barrier(parties[, action[, timeout]]) and wait(timeout=None)

As far as i see, before Process.kill() is called in the main process, Barrier.abort() should be called first, and other process are supposed to deal with the relevant exception. anyway let me know if i misunderstand the point.

Zheaoli commented 1 month ago

Barrier class already has a timeout argument and its wait method too. Have they not ? Or I miss something.

You are right, the wait has timeout already. But it not effect on all the code path. FYI https://github.com/python/cpython/blob/main/Lib/threading.py#L724-L726

I think we should add timeout for the _release path.

jpvolkmann commented 1 month ago

Killing the process was just used to demonstrate the effect, it's not the real use case... I fully understand that killing a process is not the typical use-case and adding a timeout might has some unwanted side-effects. Meanwhile I also figured out that the barrier is in "messed up state" even after I added the timeout to acquire() and can not be reset (because of pending locks (?) or _ncount does not reach 0). I'll find another solution for my use-case...

In general I used multiprocessing to separate processes with unreliable third-party binary libraries that can cause access violation in case of e.g. HW defects. Access Violation causes the processes to get killed.

Zheaoli commented 1 month ago

As far as i see, before Process.kill() is called in the main process, Barrier.abort() should be called first, and other process are supposed to deal with the relevant exception. anyway let me know if i misunderstand the point.

Yes, this is the normal circumstance. But in some circumstances, like kill the process in GUI process or killed by systemed. we can't clean the code before the process exist. So for now, the timeout argument is not effective in some code path, FYI https://github.com/python/cpython/blob/main/Lib/threading.py#L724-L726. I think we need to pass the timeout argument to the _release code path

YvesDup commented 1 month ago

@Zheaoli In Barrier's _release method, there are only direct or indirect (via _break) calls to Condition's notify_all method. I don't understand the need for a timeout. What do you have in mind ?

Zheaoli commented 1 month ago

In Barrier's _release method, there are only direct or indirect (via _break) calls to Condition's notify_all method. I don't understand the need for a timeout. What do you have in mind ?

Here's the detail, For example, in this issue case. we have 4 process, Let's name it 0,1,2,3

We have the handle function like following below

def process_main(process_index, barrier: multiprocessing.synchronize.Barrier):
    logging.debug('Process %d: Started', process_index)
    time.sleep(0.5)
    logging.debug('Process %d: Waiting for barrier', process_index)
    barrier.wait(timeout=5.0)
    logging.debug('Process %d: Barrier passed', process_index)
    time.sleep(0.5)
    logging.debug('Process %d: Terminated', process_index)

We kill the process0 to simulate the process crashed unexcepted (like killed by OS, or sgement fault in thread).

For the end of the code, we use join to wait all processes has been finished


    # Terminate already waiting process
    logging.debug('Killing process 0')
    processes[0].kill()

    for index,process in enumerate(processes):
        process.join()
        logging.debug('Joining process %d', index)

OK, Let's take back to the barrier code in CPython lib

    def wait(self, timeout=None):
        """Wait for the barrier.

        When the specified number of threads have started waiting, they are all
        simultaneously awoken. If an 'action' was provided for the barrier, one
        of the threads will have executed that callback prior to returning.
        Returns an individual index number from 0 to 'parties-1'.

        """
        if timeout is None:
            timeout = self._timeout
        with self._cond:
            self._enter() # Block while the barrier drains.
            index = self._count
            self._count += 1
            try:
                if index + 1 == self._parties:
                    print(index, "index")
                    # We release the barrier
                    self._release()
                else:
                    # We wait until someone releases us
                    self._wait(timeout)
                return index
            finally:
                self._count -= 1
                # Wake up any threads waiting for barrier to drain.
                self._exit()

Here's a process make the index + 1 == self._parties condition comes True and run the self._release()

The stack would be _release->self._cond.notify_all() -> multiprocessing.Condtion.notify

We can take a look at the Condition in the multiprocessing lib

    def wait(self, timeout=None):
        assert self._lock._semlock._is_mine(), \
               'must acquire() condition before using wait()'

        # indicate that this thread is going to sleep
        self._sleeping_count.release()

        # release lock
        count = self._lock._semlock._count()
        for i in range(count):
            self._lock.release()

        try:
            # wait for notification or timeout
            return self._wait_semaphore.acquire(True, timeout)
        finally:
            # indicate that this thread has woken
            self._woken_count.release()

            # reacquire lock
            for i in range(count):
                self._lock.acquire()

    def notify(self, n=1):
        assert self._lock._semlock._is_mine(), 'lock is not owned'
        assert not self._wait_semaphore.acquire(
            False), ('notify: Should not have been able to acquire '
                     + '_wait_semaphore')

        # to take account of timeouts since last notify*() we subtract
        # woken_count from sleeping_count and rezero woken_count
        while self._woken_count.acquire(False):
            res = self._sleeping_count.acquire(False)
            assert res, ('notify: Bug in sleeping_count.acquire'
                         + '- res should not be False')

        sleepers = 0
        while sleepers < n and self._sleeping_count.acquire(False):
            self._wait_semaphore.release()        # wake up one sleeper
            sleepers += 1
        if sleepers:
            print(self._sleeping_count, self._woken_count, self._wait_semaphore)
            for i in range(sleepers):
                self._woken_count.acquire()       # wait for a sleeper to wake
                print('rezero wait_semaphore')
                print(self._sleeping_count, self._woken_count, self._wait_semaphore)

            # rezero wait_semaphore in case some timeouts just happened
            while self._wait_semaphore.acquire(False):
                pass

we can find out the process0 has been killed in the wait duration. So it will not trigger finally block in wait method.

And the in the final process which is to run the notify method. the _sleeping_count is 3 and _woken_count is 2. So the notify method will be blocked forever. So the main loop will be blocked forever too.

So this is why I suggest to add new timeout argument to the _release and pass it to all the process.

Zheaoli commented 3 weeks ago

I made a draft PR for this issue. https://github.com/python/cpython/pull/125578

asiunov commented 2 weeks ago

I've got the same problem with multiprocessing.Event. The parent process is waiting infinitely on Event.set() in self._woken_count.acquire() if a child process has died while waiting for the event (Event.wait()).

In my usecase with a single event per child process, I work this around by using multiprocessing.Queue.