python / cpython

The Python programming language
https://www.python.org
Other
63.13k stars 30.22k forks source link

Pool.imap doesn't work as advertised #64192

Open 8e5e46ec-69cf-4b4b-abb5-1119de60ab55 opened 10 years ago

8e5e46ec-69cf-4b4b-abb5-1119de60ab55 commented 10 years ago
BPO 19993
Nosy @tim-one, @jneb, @pitrou, @MojoVampire, @applio
Files
  • mypool.py: Concept implementation of pool.imap_unordered
  • imu.py
  • Note: these values reflect the state of the issue at the time it was migrated and might not reflect the current state.

    Show more details

    GitHub fields: ```python assignee = None closed_at = None created_at = labels = ['type-bug', 'library'] title = "Pool.imap doesn't work as advertised" updated_at = user = 'https://github.com/jneb' ``` bugs.python.org fields: ```python activity = actor = 'josh.r' assignee = 'none' closed = False closed_date = None closer = None components = ['Library (Lib)'] creation = creator = 'jneb' dependencies = [] files = ['33158', '33166'] hgrepos = [] issue_num = 19993 keywords = [] message_count = 5.0 messages = ['206279', '206334', '206356', '315231', '315235'] nosy_count = 6.0 nosy_names = ['tim.peters', 'jneb', 'pitrou', 'josh.r', 'davin', 'alex-garel'] pr_nums = [] priority = 'normal' resolution = None stage = None status = 'open' superseder = None type = 'behavior' url = 'https://bugs.python.org/issue19993' versions = [] ```

    8e5e46ec-69cf-4b4b-abb5-1119de60ab55 commented 10 years ago

    The pool.imap and pool.imap_unordered functions are documented as "a lazy version of Pool.map". In fact, they aren't: they consume the iterator argument as a whole. This is almost certainly not what the user wants: it uses unnecessary memory and will be slower than expected if the output iterator isn't consumed in full. In fact, there isn't much use at all of imap over map at the moment. I tried to fixed the code myself, but due to the two-level queueing of the input arguments this is not trivial. Stackoverflow's Blckknght wrote a simplified solution that gives the idea how it should work. Since that wasn't posted here, I thought it would be useful to put it here, even if only for documentation purposes.

    tim-one commented 10 years ago

    Nice to see you, Jurjen! Been a long time :-)

    I'd like to see changes here too. It's unclear what "a lazy version" is intended to mean, exactly, but I agree the actual behavior is surprising, and that mpool.py is a lot less surprising in several ways.

    I got bitten by this just last week, when running a parallelized search over a massive space _expected_ to succeed after exploring a tiny fraction of the search space. Ran out of system resources because imap_unordered() tried to queue up countless millions of work descriptions. I had hoped/expected that it would interleave generating and queue'ing "a few" inputs with retrieving outputs, much as mpool.py behaves.

    In that case I switched to using apply_async() instead, interposing my own bounded queue (a collections.deque used only in the main program) to throttle the main program. I'm still surprised it was necessary ;-)

    tim-one commented 10 years ago

    Just for interest, I'll attach the worm-around I mentioned (imu.py). At this level it's a very simple implementation, but now that I look at it, it's actually a lazy implementation of imap() (or of an unimaginative ;-) imap_unordered()).

    3d7ebedf-939a-4b37-a0df-ddd242f81f86 commented 6 years ago

    Hello, I think this is a really important feature, it hits me hard those days.

    It would also solve https://bugs.python.org/issue19173 in a nice way.

    99ffcaa5-b43b-4e8e-a35e-9c890007b9cd commented 6 years ago

    Related: bpo-29842 "Make Executor.map work with infinite/large inputs correctly" for a similar problem in concurrent.futures (but worse, since it doesn't even allow you to begin consuming results until all inputs are dispatched).

    A similar approach to my Executor.map patch could probably be used with imap/imap_unordered.

    simontindemans commented 2 years ago

    I ran into this problem as well. It turns out it is a little more subtle (and I think it should classify as a bug). The number of inputs that is consumed from the iterable/iterator is dependent on their size, as multiprocessing.imap uses a finite capacity Pipe behind the scenes. So when you create small iterables (e.g. a simple range), you can fit thousands, but when you generate large objects to be dispatches, you can only fit a few.

    See a discussion on pipes vs queues in MP on SO

    @tim-one, your imu.py has solved this problem for me.

    simontindemans commented 2 years ago

    For illustration, I made the following code:

    import multiprocessing as mp
    import threading
    import time
    import numpy as np
    
    def imap_unordered(pool, f, iterable, max_in_flight):
        # function adapted from https://github.com/python/cpython/issues/64192#issuecomment-1093638885
        from collections import deque
        q = deque()
        for arg in iterable:
            while len(q) >= max_in_flight:
                yield q.popleft().get()
            q.append(pool.apply_async(f, (arg,)))
        while q:
            yield q.popleft().get()
    
    def func(input):
        """Processing task for parallel dispatch"""
        print(f"Running  {input[0]:5d} on {mp.current_process().name}, {threading.current_thread().name}")
        time.sleep(1)
        return input[0]
    
    def gen(max, filler):
        """Generator of input data"""
        x=0
        while x<max:
            x += 1
            print(f"Input    {x:5d} on {mp.current_process().name}, {threading.current_thread().name}")
            input = x, np.zeros(filler)
            yield input
    
    if __name__ == "__main__":
    
        lock = threading.Lock()
    
        # create Pool with 4 processes
        with mp.Pool(4) as pool:
    
            print("======Generation of small tasks=======")
            for val in pool.imap_unordered(func, gen(10,0)):
                with lock:
                    print(f"Finished {val:5d} on {mp.current_process().name}, {threading.current_thread().name}")
    
            print("======Generation of large tasks=======")
            for val in pool.imap_unordered(func, gen(10, 1000000)):
                with lock:
                    print(f"Finished {val:5d} on {mp.current_process().name}, {threading.current_thread().name}")
    
            print("======Alternative implementation=======")
            for val in imap_unordered(pool, func, gen(10, 0), 4):
                with lock:
                    print(f"Finished {val:5d} on {mp.current_process().name}, {threading.current_thread().name}")

    It outputs:

    ======Generation of small tasks=======
    Input        1 on MainProcess, Thread-2
    Input        2 on MainProcess, Thread-2
    Input        3 on MainProcess, Thread-2
    Input        4 on MainProcess, Thread-2
    Input        5 on MainProcess, Thread-2
    Input        6 on MainProcess, Thread-2
    Input        7 on MainProcess, Thread-2
    Input        8 on MainProcess, Thread-2
    Input        9 on MainProcess, Thread-2
    Input       10 on MainProcess, Thread-2
    Running      1 on SpawnPoolWorker-1, MainThread
    Running      2 on SpawnPoolWorker-2, MainThread
    Running      3 on SpawnPoolWorker-3, MainThread
    Running      4 on SpawnPoolWorker-4, MainThread
    Running      5 on SpawnPoolWorker-1, MainThread
    Finished     1 on MainProcess, MainThread
    Running      6 on SpawnPoolWorker-2, MainThread
    Finished     2 on MainProcess, MainThread
    Running      7 on SpawnPoolWorker-3, MainThread
    Finished     3 on MainProcess, MainThread
    Running      8 on SpawnPoolWorker-4, MainThread
    Finished     4 on MainProcess, MainThread
    Running      9 on SpawnPoolWorker-1, MainThread
    Finished     5 on MainProcess, MainThread
    Running     10 on SpawnPoolWorker-2, MainThread
    Finished     6 on MainProcess, MainThread
    Finished     7 on MainProcess, MainThread
    Finished     8 on MainProcess, MainThread
    Finished     9 on MainProcess, MainThread
    Finished    10 on MainProcess, MainThread
    ======Generation of large tasks=======
    Input        1 on MainProcess, Thread-2
    Input        2 on MainProcess, Thread-2
    Running      1 on SpawnPoolWorker-3, MainThread
    Input        3 on MainProcess, Thread-2
    Running      2 on SpawnPoolWorker-4, MainThread
    Input        4 on MainProcess, Thread-2
    Running      3 on SpawnPoolWorker-1, MainThread
    Input        5 on MainProcess, Thread-2
    Running      4 on SpawnPoolWorker-2, MainThread
    Finished     1 on MainProcess, MainThread
    Input        6 on MainProcess, Thread-2
    Running      5 on SpawnPoolWorker-3, MainThread
    Finished     2 on MainProcess, MainThread
    Input        7 on MainProcess, Thread-2
    Running      6 on SpawnPoolWorker-4, MainThread
    Finished     3 on MainProcess, MainThread
    Input        8 on MainProcess, Thread-2
    Running      7 on SpawnPoolWorker-1, MainThread
    Finished     4 on MainProcess, MainThread
    Input        9 on MainProcess, Thread-2
    Running      8 on SpawnPoolWorker-2, MainThread
    Finished     5 on MainProcess, MainThread
    Finished     6 on MainProcess, MainThread
    Input       10 on MainProcess, Thread-2
    Running      9 on SpawnPoolWorker-3, MainThread
    Finished     7 on MainProcess, MainThread
    Running     10 on SpawnPoolWorker-4, MainThread
    Finished     8 on MainProcess, MainThread
    Finished     9 on MainProcess, MainThread
    Finished    10 on MainProcess, MainThread
    ======Alternative implementation=======
    Input        1 on MainProcess, MainThread
    Input        2 on MainProcess, MainThread
    Input        3 on MainProcess, MainThread
    Input        4 on MainProcess, MainThread
    Input        5 on MainProcess, MainThread
    Running      1 on SpawnPoolWorker-1, MainThread
    Running      2 on SpawnPoolWorker-2, MainThread
    Running      3 on SpawnPoolWorker-3, MainThread
    Running      4 on SpawnPoolWorker-4, MainThread
    Finished     1 on MainProcess, MainThread
    Input        6 on MainProcess, MainThread
    Finished     2 on MainProcess, MainThread
    Input        7 on MainProcess, MainThread
    Finished     3 on MainProcess, MainThread
    Input        8 on MainProcess, MainThread
    Running      5 on SpawnPoolWorker-1, MainThread
    Running      6 on SpawnPoolWorker-2, MainThread
    Finished     4 on MainProcess, MainThread
    Input        9 on MainProcess, MainThread
    Running      7 on SpawnPoolWorker-3, MainThread
    Running      8 on SpawnPoolWorker-4, MainThread
    Finished     5 on MainProcess, MainThread
    Input       10 on MainProcess, MainThread
    Finished     6 on MainProcess, MainThread
    Running      9 on SpawnPoolWorker-1, MainThread
    Finished     7 on MainProcess, MainThread
    Running     10 on SpawnPoolWorker-2, MainThread
    Finished     8 on MainProcess, MainThread
    Finished     9 on MainProcess, MainThread
    Finished    10 on MainProcess, MainThread

    With the default imap_unordered implementation and small tasks, all tasks are created immediately. With arbitrarily inflated tasks, the Pipe fills to capacity and we wait for workers to empty it. With the alternative imu.py code, it works as expected, even for small tasks.

    simontindemans commented 2 years ago

    @tim-one, your imu.py has solved this problem for me.

    I spoke too soon. Because all uncompleted tasks are put in a deque and only popleft is used to clear them, this yields an ordered set of results (i.e. an imap replacement). Based on the code provided above, I made the following imap_unordered replacement:

    def imap_unordered_fix(pool, f, iterable, prefetch_buffer=0):
        # Replacement for multiprocessing.pool.imap_unordered
        # It consumes the iterable in a lazy fashion and has no implicit size limitations on the nuumber of
        # iterations being prefetched. It aims to keep (pool._processes + prefetch_buffer) tasks 
        # queued in the pool. Results are processed asynchronously. 
        dispatched = 0
        returned = 0
        processes = pool._processes
        result_queue = queue.Queue()
        for arg in iterable:
            # If we have at least (processes + prefetch buffer) tasks in flight, attempt to process results
            while dispatched - returned - result_queue.qsize() >= processes + prefetch_buffer:
                returned += 1
                yield result_queue.get()
                result_queue.task_done()
            # Processing pipeline is not fully used, so dispatch another task
            pool.apply_async(f, (arg,), callback=result_queue.put)
            dispatched += 1        
        # all tasks have been dispatched; only collect results
        while dispatched > returned:
            returned += 1
            yield result_queue.get()
            result_queue.task_done()