uqfoundation / pathos

parallel graph management and execution in heterogeneous computing
http://pathos.rtfd.io
Other
1.38k stars 89 forks source link

pathos.multiprocessing doesn't seem to utilize multiple CPU cores #212

Closed vector818 closed 1 year ago

vector818 commented 3 years ago

I'm trying to use pathos for my project and it's works fine (without errors) but doesn't seem to utilize more than one of my CPU's core. Parts of task are completed by different processes (checked it by os.getpid() ) but they seem to not work in parallel. Task that I want to parallelize uses custom objects and custom modules.

I created some simple example code that has exact same issues:

from pathos.multiprocessing import ProcessPool as Pool
import math
import os
class myClass:
    def __init__(self):
        pass

    def calc(self, IN_list):
        proc = os.getpid()
        otpt = []
        for x in IN_list: 
            otpt.append(abs(math.sin(x) + math.cos(x)) ** 0.5)
        print('Done calculating from {} to {} by process {}'.format(IN_list[0],IN_list[-1],proc))
        return otpt

    def run(self, inList, nodes):
        pool = Pool(nodes=nodes)
        result = pool.map(self.calc, inList)
        return result

if __name__== '__main__' :
    m = myClass()
    inA = list(range(10000000))
    nodes = 10
    n=math.ceil(len(inA)/nodes)
    inA_parts = [inA[a:a+n] for a in range(0, len(inA), n)]    
    results = m.run(inA_parts,nodes)
    print(results)

When I run it, parts of task are completed in sequence and my CPU usage is pretty low (around 8%) thus I don't think it is executed in parallel fashion.

I would be very grateful if someone give me a hint what to change in above example code to run it in parallel.

mmckerns commented 3 years ago

It's not clear from your code what you'd like to happen in parallel. Can you explain what you intended to happen?

vector818 commented 3 years ago

Thanks for reply. So in above example I have custom class myClass in it I have method calc which as an input takes list of numbers and for every number x on this list it performs calculation: sqrt(abs(sin(x) + cos(x))) and then appends calculated value to new list called otpt.

There is also method run which takes as an input list of numbers divided to N parts (In my case number of parts N is equal to number of nodes I want to have in pathos pool) and number of nodes for Pool. It creates Pool (pathos.multiprocessing.ProcessPool) with given numer of nodes and then runs method calc in that pool.

I'm generating long list inA of consecutive numbers from 0 to 9'999'999, then I'm dividing this list to 10 parts (n=1'000'000) in line: inA_parts = [inA[a:a+n] for a in range(0, len(inA), n)] It's basically list of lists and each 'inside' list contains numbers from 0 to 999'999, next list 1'000'000 to 1'999'999 and so on until 9'999'999.

I'd like the pool I'm creating in method run to take the list containing 10 smaller list and perform calculations from method calc in parallel, meaning that I'd like to receive results from 10 smaller tasks (lists) in the same time (or close).

vector818 commented 3 years ago

Maybe it's some kind of bug? Because if I write simpler code in which instead of performs mathematical operations on elements in list I just tell processes to sleep for X seconds, it seems to work in parallel. Code below:

from pathos.multiprocessing import ProcessPool as Pool
import time
import os

class myClass:
    def __init__(self):
        pass

    def idle(self, sec):
        proc = os.getpid()
        print('Start idling for {} seconds by process {}'.format(sec,proc))
        time.sleep(sec)
        print('Done idling for {} seconds by process {}'.format(sec,proc))        

    def run(self, sec_list, nodes):
        pool = Pool(nodes=nodes)
        result = pool.map(self.idle, sec_list)
        return result

if __name__== '__main__' :
    m = myClass()
    sec = 5
    nodes = 10
    sec_list=[sec]*nodes
    m.run(sec_list,nodes) 

Code runs for about 5 seconds and output is something like this:

Start idling for 5 seconds by process 20616
Start idling for 5 seconds by process 18560
Start idling for 5 seconds by process 9488
Start idling for 5 seconds by process 21396
Start idling for 5 seconds by process 13784
Start idling for 5 seconds by process 10492
Start idling for 5 seconds by process 18600
Start idling for 5 seconds by process 20068
Start idling for 5 seconds by process 20728
Start idling for 5 seconds by process 17712
Done idling for 5 seconds by process 20616
Done idling for 5 seconds by process 18560
Done idling for 5 seconds by process 9488
Done idling for 5 seconds by process 21396
Done idling for 5 seconds by process 13784
Done idling for 5 seconds by process 10492
Done idling for 5 seconds by process 18600
Done idling for 5 seconds by process 20068
Done idling for 5 seconds by process 20728
Done idling for 5 seconds by process 17712
The time of the run: 5.26

It seems to work in parallel because otherwise I would except code to run much longer and output to be something like this:

Start idling for 5 seconds by process 20616
Done idling for 5 seconds by process 20616
Start idling for 5 seconds by process 18560
Done idling for 5 seconds by process 18560
Start idling for 5 seconds by process 9488
Done idling for 5 seconds by process 9488
Start idling for 5 seconds by process 21396
Done idling for 5 seconds by process 21396
Start idling for 5 seconds by process 13784
Done idling for 5 seconds by process 13784
Start idling for 5 seconds by process 10492
Done idling for 5 seconds by process 10492
Start idling for 5 seconds by process 18600
Done idling for 5 seconds by process 18600
Start idling for 5 seconds by process 20068
Done idling for 5 seconds by process 20068
Start idling for 5 seconds by process 20728
Done idling for 5 seconds by process 20728
Start idling for 5 seconds by process 17712
Done idling for 5 seconds by process 17712
The time of the run: 50.5
mmckerns commented 3 years ago

One other thing to note is that if you think you are building multiple Pool instances, you actually aren't. Pools are cached in pathos, and changing the number of nodes will cause the pool to momentary become unstable (i.e. it will be less efficient as it transitions from N to M nodes). It wouldn't however slow things down that much.

So, is this resolved, and thus can be closed?

vector818 commented 3 years ago

So I understand I shouldn't change number of nodes and just leave it on default. Unfortunately it still seems to run in sequential manner. I've modified my dummy code so if one runs my program it should be more clear what is my problem:

from pathos.multiprocessing import ProcessPool as Pool
import math
import os

class myClass:
    def __init__(self):
        pass

    def calc(self, IN_list):
        proc = os.getpid()
        otpt = []
        print('Start calculating from {} to {} by process {}'.format(IN_list[0],IN_list[-1],proc))
        for x in IN_list: 
            otpt.append(abs(math.sin(x) + math.cos(x)) ** 0.5)
        print('Done calculating from {} to {} by process {}'.format(IN_list[0],IN_list[-1],proc))
        return otpt

    def run(self, inList):
        pool = Pool()
        result = pool.map(self.calc, inList)
        return result

if __name__== '__main__' :
    m = myClass()
    inA = list(range(10000000))
    nodes = 10
    n=math.ceil(len(inA)/nodes)
    inA_parts = [inA[a:a+n] for a in range(0, len(inA), n)]    
    results = m.run(inA_parts)

Example output of this program is:

Start calculating from 0 to 999999 by process 14368
Done calculating from 0 to 999999 by process 14368
Start calculating from 1000000 to 1999999 by process 5764
Done calculating from 1000000 to 1999999 by process 5764
Start calculating from 2000000 to 2999999 by process 14536
Done calculating from 2000000 to 2999999 by process 14536
Start calculating from 3000000 to 3999999 by process 12428
Done calculating from 3000000 to 3999999 by process 12428
Start calculating from 4000000 to 4999999 by process 19820
Done calculating from 4000000 to 4999999 by process 19820
Start calculating from 5000000 to 5999999 by process 8332
Done calculating from 5000000 to 5999999 by process 8332
Start calculating from 6000000 to 6999999 by process 1860
Done calculating from 6000000 to 6999999 by process 1860
Start calculating from 7000000 to 7999999 by process 20912
Done calculating from 7000000 to 7999999 by process 20912
Start calculating from 8000000 to 8999999 by process 12580
Done calculating from 8000000 to 8999999 by process 12580
Start calculating from 9000000 to 9999999 by process 20036
Done calculating from 9000000 to 9999999 by process 20036

It should be in parallel so I would expect program to start calculating for couple of list before getting first results. As output indicating it clearly is running in sequence.

mmckerns commented 3 years ago

I've tested your code. It's running in parallel, at least when I run it. I've modified it slightly so you can see it's actually running in parallel (due to the random sleep time being a fair portion of the execution time).

from pathos.multiprocessing import ProcessPool as Pool
import math
import os
import time
import random

class myClass:
    def __init__(self):
        pass

    def calc(self, IN_list):
        proc = os.getpid()
        otpt = []
        print('Start calculating from {} to {} by process {}'.format(IN_list[0],IN_list[-1],proc))
        time.sleep(random.random())
        for x in IN_list:
            otpt.append(abs(math.sin(x) + math.cos(x)) ** 0.5)
        print('Done calculating from {} to {} by process {}'.format(IN_list[0],IN_list[-1],proc))
        return otpt

    def run(self, inList):
        pool = Pool()
        result = pool.map(self.calc, inList)
        pool.close()
        pool.join()
        pool.clear()
        return result

if __name__== '__main__' :
    m = myClass()
    inA = list(range(10000))# 000))
    nodes = 4# 10
    n=math.ceil(len(inA)/nodes)
    inA_parts = [inA[a:a+n] for a in range(0, len(inA), n)]
    results = m.run(inA_parts)

with results

$ python dummy_pathos.py 
Start calculating from 0 to 2499 by process 54759
Start calculating from 2500 to 4999 by process 54760
Start calculating from 5000 to 7499 by process 54761
Start calculating from 7500 to 9999 by process 54762
Done calculating from 7500 to 9999 by process 54762
Done calculating from 5000 to 7499 by process 54761
Done calculating from 2500 to 4999 by process 54760
Done calculating from 0 to 2499 by process 54759

Remember, you are spinning up a python interpreter, and serializing a big list.

vector818 commented 3 years ago

Well on my computer (Windows 10) no matter how I run it (on Visual Studio Code, on Spyder or in system terminal), it's still outputting:

Start calculating from 0 to 2499 by process 18096
Done calculating from 0 to 2499 by process 18096
Start calculating from 2500 to 4999 by process 18096
Done calculating from 2500 to 4999 by process 18096
Start calculating from 5000 to 7499 by process 11152
Done calculating from 5000 to 7499 by process 11152
Start calculating from 7500 to 9999 by process 20956
Done calculating from 7500 to 9999 by process 20956

I guess there's nothing we can do so let's close the issue.

mmckerns commented 3 years ago

Oh, you're on windows. Try adding freeze_support in __main__.

if __name__ == '__main__':
    from pathos.helpers import freeze_support
    freeze_support()
vector818 commented 3 years ago

I should have guessed that information about my OS is relevant. So I've tried adding freeze_support - no success.

Even installed Ubuntu to try if it works on Linux distribution but it's still outputting:

Start calculating from 0 to 24999 by process 15122
Done calculating from 0 to 24999 by process 15122
Start calculating from 25000 to 49999 by process 15123
Done calculating from 25000 to 49999 by process 15123
Start calculating from 50000 to 74999 by process 15124
Done calculating from 50000 to 74999 by process 15124
Start calculating from 75000 to 99999 by process 15125
Done calculating from 75000 to 99999 by process 15125

Maybe it's related to my hardware? My CPU is AMD Ryzen Threadripper 1920X 12-Core Processor. I can provide more information if needed.

mmckerns commented 3 years ago

I see the same printed results as you do for the big list. What I'm saying is that you have a lot of overhead per call, and all of the function calls are about the same length, so I'd expect that it is just printing in sequence. Why don't you try to create the Pool outside of the class, passing in the pool instance (or the map) in the init, and then calling the map as you are now? That will reduce some of the overhead.

mmckerns commented 1 year ago

I'm closing this. If there's more to add to continue the conversation, please feel free to do so.