uqfoundation / pathos

parallel graph management and execution in heterogeneous computing
http://pathos.rtfd.io
Other
1.38k stars 89 forks source link

Pool applies workers to toolz.parition_all even without calling map/imap/apply_async ...etc #179

Closed AlyShmahell closed 4 years ago

AlyShmahell commented 4 years ago

Hello,

I have the following minimal working example:

import time
import itertools
import operator as op
from functools import reduce
from tqdm import tqdm
from toolz import partition_all
import pathos.multiprocessing as pmp

def ncr(n, r):
    r = min(r, n-r)
    numerator   = reduce(op.mul, range(n-r+1, n+1), 1)
    denominator = reduce(op.mul, range(1, r+1),     1)
    return numerator / denominator

def peek(iterable):
    try:
        first = next(iterable)
    except StopIteration:
        return None, None
    return first, itertools.chain([first], iterable)

combinations = itertools.combinations(range(10000), 2)
iterable = range(int(ncr(10000, 2)))

with pmp.Pool(pmp.cpu_count()) as pool:
    with tqdm(iterable=iterable) as tqdmo:
        def partial_process(partition):
            for combination in partition:
                tqdmo.update(1)
                yield 1
        partitions = partition_all(
            pmp.cpu_count(), 
            combinations
        )
        while True:
            partial_results, frames = peek(
                map(
                    partial_process, partitions
                )
            )
            if frames == None:
                break
            for _ in partial_results:
                continue

As you can see, I'm wrapping my partition_all with pool, but I'm not using any pool functionality. Yet when I do a keyboard interrupt I get the following:

  7%|▋         | 3594009/49995000 [00:03<00:49, 928022.87it/s]Process ForkPoolWorker-23:
Traceback (most recent call last):
Process ForkPoolWorker-24:
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/pool.py", line 108, in worker
    task = get()
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/queues.py", line 338, in get
    res = self._reader.recv_bytes()
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/connection.py", line 219, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/connection.py", line 410, in _recv_bytes
    buf = self._recv(4)
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/connection.py", line 382, in _recv
    chunk = read(handle, remaining)
KeyboardInterrupt
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/pool.py", line 108, in worker
    task = get()
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/queues.py", line 337, in get
    with self._rlock:
  File "/usr/local/lib/python3.6/dist-packages/multiprocess/synchronize.py", line 101, in __enter__
    return self._semlock.__enter__()
KeyboardInterrupt

Why are there pool workers running? and are they all (in this case both) assigned to the partial_process function?

AlyShmahell commented 4 years ago

As it turns out, the workers were being added but not used, removing with pmp.Pool(pmp.cpu_count()) as pool: actually yielded better results for (bigger datasets/more computationally expensive partial_process function), the reason for which is the removal of the unused workers overhead, therefore I'm closing this issue.

mmckerns commented 4 years ago

Hm..interesting. Thanks for the posting.