uqfoundation / pathos

parallel graph management and execution in heterogeneous computing
http://pathos.rtfd.io
Other
1.36k stars 91 forks source link

pathos Pool can't find dynamically generated log function #260

Closed FOXP16 closed 1 year ago

FOXP16 commented 1 year ago

Hi, I have an issue using Pathos which I have been unable to solve myself. The following is the code I am trying to run in python 3.10:

import time
import math
import os
import sys
os.chdir(r"C:\Users\seanr\Documents\Programming\Python\PyCHARM\Otago\Python Technical")
sys.path.append(r"C:\Users\seanr\Documents\Programming\Python\PyCHARM\Otago\Python Technical")
import numpy as np
import sympy as sym
from sympy.stats import Normal, density, E, std, cdf, skewness
from sympy import lambdify
from scipy.optimize import minimize
from pathos.pools import ParallelPool as Pool
import pathos

#The following creats the log-likelihood function for 10 gaussian observations.
CS1_Onsets = np.array([11.65239205, 23.67586247, 31.96652543, 40.59708088, 48.44196047, 57.50153782, 68.16787918, 76.90944557, 88.71767575, 99.527949  ])
CS_Wait_Times = np.array(CS1_Onsets[0]) # This takes care of the fact that the first observation cannot be obtained by differencing.
CS_Wait_Times = np.append(CS_Wait_Times,np.diff(CS1_Onsets))
del CS1_Onsets
u_CS = sym.Symbol('u_CS',positive=True)
var_CS = sym.Symbol('var_CS',positive=True)
x_CS = sym.Symbol('x_CS',positive=True)
Gaussian_CS = density(Normal("x_CS", u_CS, var_CS**(1/2)))
log_likelihood = 0
#Compute log-likelihood:
for i in range(0,len(CS_Wait_Times),1):
    log_likelihood = log_likelihood + sym.log(Gaussian_CS(CS_Wait_Times[i]))
#########################################          Optimising the log-likelihood function        #####################################
Neg_log_likelihood = -1*log_likelihood
del log_likelihood
Neg_Score = [sym.diff(Neg_log_likelihood,u_CS),sym.diff(Neg_log_likelihood,var_CS)]
##The following implements the optimisation
bounds = [(0.0000000001,86400),(0.0000000001,86400)]  #u_CS, var_CS,u_CS_US_Int,var_CS_US_Int,p_US
options = {"maxiter":400}
Results = [] #Store the results for different starting points.
Initial_Parameter_Estimates = [(1,1),(10,10),(100,100),(1000,1000),(10000,10000),(8,1),(80,10),(800,100),(8000,1000),(80000,10000)]
start_time = time.perf_counter()
Objective = lambdify([(u_CS, var_CS)], Neg_log_likelihood,'mpmath') #This gives the negative log-likelihood in the desired form:
Gradient = lambdify([(u_CS, var_CS)], Neg_Score,'mpmath')

pool = Pool()
##Parallel Version - The following doesnt work.
if __name__ == '__main__':
    def MINI(start_loc):
        import math
        import numpy as np
        import sympy as sym
        from sympy.stats import Normal, density, E, std, cdf, skewness
        from sympy import lambdify
        from scipy.optimize import minimize
        from sympy.stats import Normal, density, E, std, cdf, skewness
        fun = Objective
        x0 = start_loc
        method = 'SLSQP'
        jac = Gradient
        bounds = [(0.0000000001, 86400), (0.0000000001, 86400)]
        options = {"maxiter": 400}
        result = minimize(fun=fun, x0=x0, method='SLSQP', jac=jac, bounds=bounds, options=options)
        return result
    from pathos.helpers import freeze_support
    freeze_support()
    start_time = time.perf_counter()
    RESULTS = pool.map(MINI,Initial_Parameter_Estimates)
    r = list(map(MINI, Initial_Parameter_Estimates))
    finish = time.perf_counter()
    Running_parallel = finish - start_time
    print('Running time of parallel execution is:',Running_parallel)
    # cleanup
    pool.clear()

Whenever I try to run this code, the following error message is displayed (it always repeats itself 10 times - I have 14 processors in my computer and utilisation jumps to near 100% - so the processes are getting submitted):

An error has occured during the function execution
Traceback (most recent call last):
  File "C:\Users\seanr\Documents\Programming\Python\PyCHARM\Otago\venv\lib\site-packages\ppft\__main__.py", line 99, in run
    __result = __f(*__args)
  File "<string>", line 26, in MINI
  File "C:\Users\seanr\Documents\Programming\Python\PyCHARM\Otago\venv\lib\site-packages\scipy\optimize\_minimize.py", line 708, in minimize
    res = _minimize_slsqp(fun, x0, args, jac, bounds,
  File "C:\Users\seanr\Documents\Programming\Python\PyCHARM\Otago\venv\lib\site-packages\scipy\optimize\_slsqp_py.py", line 374, in _minimize_slsqp
    sf = _prepare_scalar_function(func, x, jac=jac, args=args, epsilon=eps,
  File "C:\Users\seanr\Documents\Programming\Python\PyCHARM\Otago\venv\lib\site-packages\scipy\optimize\_optimize.py", line 263, in _prepare_scalar_function
    sf = ScalarFunction(fun, x0, args, grad, hess,
  File "C:\Users\seanr\Documents\Programming\Python\PyCHARM\Otago\venv\lib\site-packages\scipy\optimize\_differentiable_functions.py", line 158, in __init__
    self._update_fun()
  File "C:\Users\seanr\Documents\Programming\Python\PyCHARM\Otago\venv\lib\site-packages\scipy\optimize\_differentiable_functions.py", line 251, in _update_fun
    self._update_fun_impl()
  File "C:\Users\seanr\Documents\Programming\Python\PyCHARM\Otago\venv\lib\site-packages\scipy\optimize\_differentiable_functions.py", line 155, in update_fun
    self.f = fun_wrapped(self.x)
  File "C:\Users\seanr\Documents\Programming\Python\PyCHARM\Otago\venv\lib\site-packages\scipy\optimize\_differentiable_functions.py", line 137, in fun_wrapped
    fx = fun(np.copy(x), *args)
  File "<string>", line 3, in _lambdifygenerated
NameError: name 'log' is not defined

What do you think the issue is here? I've looked through all the documentation and tried a million and one hacks to try and get this working but I just cannot do it. The following are some potentially useful notes:

  1. Its not that the lambdafied object cannot be serialised, Dill can serialise lambdafied functions (https://stackoverflow.com/questions/31314517/how-to-serialize-sympy-lambdified-function) and Dill is implemented within Pathos (https://stackoverflow.com/questions/19984152/what-can-multiprocessing-and-dill-do-together/19985580#19985580).
  2. If I copy and paste all of the code that occurs before the if__name_==’__main__’: of my script within the definition of ‘MINI’, then everything works. The problem is that this is a toy example for a much bigger problem that I am dealing this, and using this naïve approach is cannot be used.
  3. I can also get it to work by putting everything above the if __name__ == '__main__': in another script, then importing that script and using the MINI function that way (I've just described the basic idea). This hack is also too inflexible to use however.

Would really appreciate your help with this, pathos is a great library and I am looking forward to using it!

mmckerns commented 1 year ago

I've slightly modified your code, and it seems to work:

import time
import math
import os
import sys
import numpy as np
import sympy as sym
from sympy.stats import Normal, density, E, std, cdf, skewness
from sympy import lambdify
from scipy.optimize import minimize
from pathos.pools import ProcessPool as Pool
import pathos

#The following creats the log-likelihood function for 10 gaussian observations.
CS1_Onsets = np.array([11.65239205, 23.67586247, 31.96652543, 40.59708088, 48.44196047, 57.50153782, 68.16787918, 76.90944557, 88.71767575, 99.527949  ])
CS_Wait_Times = np.array(CS1_Onsets[0]) # This takes care of the fact that the first observation cannot be obtained by differencing.
CS_Wait_Times = np.append(CS_Wait_Times,np.diff(CS1_Onsets))
del CS1_Onsets
u_CS = sym.Symbol('u_CS',positive=True)
var_CS = sym.Symbol('var_CS',positive=True)
x_CS = sym.Symbol('x_CS',positive=True)
Gaussian_CS = density(Normal("x_CS", u_CS, var_CS**(1/2)))
log_likelihood = 0
#Compute log-likelihood:
for i in range(0,len(CS_Wait_Times),1):
    log_likelihood = log_likelihood + sym.log(Gaussian_CS(CS_Wait_Times[i]))
#########################################          Optimising the log-likelihood function        #####################################
Neg_log_likelihood = -1*log_likelihood
del log_likelihood
Neg_Score = [sym.diff(Neg_log_likelihood,u_CS),sym.diff(Neg_log_likelihood,var_CS)]
##The following implements the optimisation
bounds = [(0.0000000001,86400),(0.0000000001,86400)]  #u_CS, var_CS,u_CS_US_Int,var_CS_US_Int,p_US
options = {"maxiter":400}
Results = [] #Store the results for different starting points.
Initial_Parameter_Estimates = [(1,1),(10,10),(100,100),(1000,1000),(10000,10000),(8,1),(80,10),(800,100),(8000,1000),(80000,10000)]
start_time = time.perf_counter()
Objective = lambdify([(u_CS, var_CS)], Neg_log_likelihood,'mpmath') #This gives the negative log-likelihood in the desired form:
Gradient = lambdify([(u_CS, var_CS)], Neg_Score,'mpmath')

##Parallel Version - The following doesnt work.
def MINI(start_loc):
    import math
    import numpy as np
    import sympy as sym
    from sympy.stats import Normal, density, E, std, cdf, skewness
    from sympy import lambdify
    from scipy.optimize import minimize
    from sympy.stats import Normal, density, E, std, cdf, skewness
    fun = Objective
    x0 = start_loc
    method = 'SLSQP'
    jac = Gradient
    bounds = [(0.0000000001, 86400), (0.0000000001, 86400)]
    options = {"maxiter": 400}
    result = minimize(fun=fun, x0=x0, method='SLSQP', jac=jac, bounds=bounds, options=options)
    return result

if __name__ == '__main__':

    pool = Pool()

    from pathos.helpers import freeze_support
    freeze_support()
    start_time = time.perf_counter()
    RESULTS = pool.map(MINI,Initial_Parameter_Estimates)
    r = list(map(MINI, Initial_Parameter_Estimates))
    finish = time.perf_counter()
    Running_parallel = finish - start_time
    print('Running time of parallel execution is:',Running_parallel)
    # cleanup
    pool.close()
    pool.join()
    pool.clear()

The primary change is to use ProcessPool instead of ParallelPool: from pathos.pools import ProcessPool as Pool

ProcessPool uses object serialization (using dill), while ParallelPool uses serialization by source extraction (using dill.source). The former is faster, and more robust. The issue is that the sympy-generated log function can't be found by ppft as it's a dynamically-generated function. ppft checks the namespace of the function MINI... but fails to find log in some cases (as you noted). For functions that aren't dynamically-generated, you can include the importing module, and it does generally find the function. ProcessPool is using multiprocess, which I imagine is what you were wanting to use in the first place.

I'm going to close the issue, but if it doesn't work for you, then please reopen.

FOXP16 commented 1 year ago

That did it, cheers mike.