joshspeagle / dynesty

Dynamic Nested Sampling package for computing Bayesian posteriors and evidences
https://dynesty.readthedocs.io/
MIT License
357 stars 77 forks source link

Why does Dynesty multiprocessing with ThreadPoolExecutor not use all cores? #100

Closed exowanderer closed 6 years ago

exowanderer commented 6 years ago

I am trying to run a simple example with nested sampling and dynesty.

I installed dynesty from github: https://github.com/joshspeagle/dynesty


Computer Setup

OS: Mac OSX El Capitan (10.11.6)
CPU: 8 cores
RAM: 16.0 GB

gcc: 4.8.5 via conda install gcc


Problem Setup

I ran the code below (simulate data, setup prior/likelihood, submit to dynesty.

To establish multiprocessing, I used multiprocessing.Pool, concurrent.futures.ProcessPoolExecutor, and concurrent.futures.ThreadPoolExecutor.

I tried the code in Jupyter Lab, ipython, and (script) python run_dynesty_test.py.

Problem: The entire script runs fine; but dynesty/python starts using all of the cores; then it slowly starts to use less and less cores. Finally, after about 5 minutes, dynesty/python uses almost exactly 1 core.

Evidence: htop starts reading 780%, then 550%, then 350%, then 100% CPU -- and it stays at 100% CPU for the rest of the operation; except a once every other minute, htop will read ~250-300% CPU.


Question

Why is dynesty/ThreadPoolExecutor/python/etc not using all of my cores all of the time?


Code Snippet Involving Dynesty and Multiprocessing

with ThreadPoolExecutor(max_workers=cpu_count()-1) as executor:

    sampler = dynesty.DynamicNestedSampler(
                            loglike,
                            prior,
                            ndim=ndims,
                            nparam=ndims,
                            bound='multi',
                            sample='unit',
                            pool=executor,
                            queue_size=cpu_count())

    sampler.run_nested(nlive_init=100,nlive_batch=100)

    res = sampler.results

Full Script to Setup Test

from __future__ import absolute_import,\
              unicode_literals, print_function

from multiprocessing import set_start_method
set_start_method('forkserver')

import dynesty
import math
import os
import threading, subprocess

from sys import platform
from numpy import pi, sin, cos, linspace
from pylab import *#;ion()

from multiprocessing import Pool, cpu_count

if not os.path.exists("chains"): os.mkdir("chains")

# plotting
import matplotlib
from matplotlib import pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

def gaussian1Dp(cube):
    center = cube[0]
    width  = cube[1]
    height = cube[2]
    return lambda y: height*np.exp(-0.5*(( (center - y) / width)**2))

np.random.seed(42)

param0a= -0.5
param0b= 0.5
param1a= 0.1
param1b= 0.1
param2a= 0.8
param2b= 0.8

yunc  = 0.1
nPts  = int(100)
nThPts= int(1e3)

xmin  = -1
xmax  =  1
dx    = 0.1*(xmax - xmin)

yuncs = np.random.normal(yunc, 1e-2 * yunc, nPts)
thdata= np.linspace(xmin-dx, xmax+dx, nThPts)

xdata = np.linspace(xmin,xmax,nPts)

ydata = model([param0a,param1a,param2a])(xdata) \
        + model([param0b,param1b,param2b])(xdata)

yerr  = np.random.normal(0, yuncs, nPts)
zdata = ydata + yerr

figure(figsize=(10,10))
plot(thdata, model([param0a,param1a,param2a])(thdata) \
        + model([param0b,param1b,param2b])(thdata))
errorbar(xdata, zdata, yunc*ones(zdata.size), fmt='o')
show()

def prior(cube):
    cube[0] = cube[0]*2 - 1
    cube[1] = cube[1]*2
    cube[2] = cube[2]*2

    return cube

def loglike(cube):
    modelNow = model(cube)(xdata)
    return -0.5*((modelNow - ydata)**2./yuncs**2.).sum()

from concurrent.futures import ThreadPoolExecutor,\
                               ProcessPoolExecutor

if __name__ == '__main__':
    if not os.path.exists("chains"): os.mkdir("chains")
    n_params = len(parameters)
    ndims = 3

    with ThreadPoolExecutor(max_workers=cpu_count()-1) as executor:

        sampler = dynesty.DynamicNestedSampler(
                                loglike,
                                prior,
                                ndim=ndims,
                                nparam=ndims,
                                bound='multi',
                                sample='unit',
                                pool=executor,
                                queue_size=cpu_count())

        sampler.run_nested(nlive_init=100, nlive_batch=100)

        res = sampler.results

from dynesty import plotting as dyplot

# evidence check
fig, axes = dyplot.runplot(res, color='red',
                lnz_truth=lnz_truth,
                truth_color='black',
                logplot=True)

fig.tight_layout()

joblib.dump(res,'dynesty_double_gaussian_test_results.joblib.save')
joshspeagle commented 6 years ago

Thanks for the code. I'm actually surprised this executes because 'unit' is not actually defined as a sampling method (you mean 'unif'?) and so you should be getting an error thrown on initialization here.

As for the parallelism, dynesty implements a pretty straightforward scheme so that wherever function calls can be parallelized, it just calls self.M: this is map when no pool is provided and pool.map otherwise. The bulk of this happens here. I'm not as familiar with these pools, so I'll have to look into this in more detail later if nothing obvious jumps out at you from that.

joshspeagle commented 6 years ago

Circling back around to this because I haven't heard anything so far. Any luck on this? I've been traveling and haven't been able to get around to investigating this in detail.

exowanderer commented 6 years ago

To answer your question, I copy pasted the code that I provided above and found that it didn't work out of the box; so I posted a new version here.


The answer to your question is unfortunately 'no'.

The code that I provided below has the same issues as before:

(1) run dynesty_multiprocess_test.py
(2) 400-500% cores are used (3) ... 2 minutes later ... only 100% of 'cores' are used.

I tried this on several machines; some with MP setup by my IT department, and some with my own setup (i.e. brew install openmp, and installed from source).


from __future__ import absolute_import, unicode_literals, print_function

# from multiprocessing import set_start_method
# set_start_method('forkserver')

from matplotlib import use; use('Agg')

import dynesty
import math
import os
import threading, subprocess

from sys import platform
from numpy import pi, sin, cos, linspace
from pylab import *#;ion()

from multiprocessing import Pool, cpu_count

if not os.path.exists("chains"): os.mkdir("chains")

# plotting
import matplotlib
from matplotlib import pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

def model(cube):
    center = cube[0]
    width  = cube[1]
    height = cube[2]
    return lambda y: height*np.exp(-0.5*(( (center - y) / width)**2))

def prior(cube):
    cube[0] = cube[0]*2 - 1
    cube[1] = cube[1]*2
    cube[2] = cube[2]*2

    return cube

def loglike(cube):
    modelNow = model(cube)(xdata)
    return -0.5*((modelNow - ydata)**2./yuncs**2.).sum()

from concurrent.futures import ThreadPoolExecutor,\
                               ProcessPoolExecutor
if __name__ == '__main__':
    from dynesty import plotting as dyplot

    np.random.seed(42)

    param0a= -0.5
    param0b= 0.5
    param1a= 0.1
    param1b= 0.1
    param2a= 0.8
    param2b= 0.8

    yunc  = 0.1
    nPts  = int(100)
    nThPts= int(1e3)

    xmin  = -1
    xmax  =  1
    dx    = 0.1*(xmax - xmin)

    yuncs = np.random.normal(yunc, 1e-2 * yunc, nPts)
    thdata= np.linspace(xmin-dx, xmax+dx, nThPts)

    xdata = np.linspace(xmin,xmax,nPts)

    params_a = [param0a,param1a,param2a]
    params_b = [param0b,param1b,param2b]
    ydata = model(params_a)(xdata) + model(params_b)(xdata)

    yerr  = np.random.normal(0, yuncs, nPts)
    zdata = ydata + yerr

    try:
        figure(figsize=(10,10))
        plot(thdata, model([param0a,param1a,param2a])(thdata) \
                + model([param0b,param1b,param2b])(thdata))
        errorbar(xdata, zdata, yunc*ones(zdata.size), fmt='o')
        show()
    except:
        pass # probably ran on a headless node

    if not os.path.exists("chains"): os.mkdir("chains")
    n_params = len(params_a)
    ndims = 3

    # with ThreadPoolExecutor(max_workers=cpu_count()-1) as executor:
    with Pool(cpu_count()-1) as executor:
        sampler = dynesty.DynamicNestedSampler(
                                loglike,
                                prior,
                                ndim=ndims,
                                nparam=ndims,
                                bound='multi',
                                sample='unif',
                                pool=executor,
                                queue_size=cpu_count(),
                                bootstrap=0) # bootstrap was added after comment below

        sampler.run_nested(nlive_init=100, nlive_batch=100)

        res = sampler.results

    # evidence check
    fig, axes = dyplot.runplot(res, color='red',
                    lnz_truth=lnz_truth,
                    truth_color='black',
                    logplot=True)

    fig.tight_layout()

    joblib.dump(res,'dynesty_double_gaussian_test_results.joblib.save')
joshspeagle commented 6 years ago

Okay, looks like this is an issue with the bootstrap estimator blowing up, which was causing the run to hang generating random numbers. While I had originally intended it to be the default option since it's the "safest", results like this have consistently plagued the procedure since it requires a lot of live points to be robust when sampling multi-modal distributions (like this one). I'm pushing a new version that changes these defaults soon, but try setting bootstrap = 0 and see if that fixes things for you (it did for my local tests running your code).

exowanderer commented 6 years ago

Indeed, if I set bootstrap = 0 in dynesty.DynamicNestedSampler, then the software maintains ~350-400% cpu usages. I'm certain happier with that; but I would prefer it to be 800% (I have 8 cores). Is that to be expected or is ½ efficiency the right expectation?

Thank you

joshspeagle commented 6 years ago

No, it should be using all the cores if they're available, and when I ran your code snippet I got all of the cores I was using to run. There is some overhead involved that can lead to idle cores popping up frequently as the queue gets exhausted, but this quickly becomes small once the likelihood takes a non-negligible amount of time to compute. You machine isn't using virtual cores, right?

exowanderer commented 6 years ago

The first thing to say is that I don't know what virtual cores are. But I'm using

OSX 10.14 (Mojave) Macbook Pro Mid2014 2.5 GHz Intel Core i7 gcc version 4.8.5

multiprocessing.cpu_count() registers 8 cores; I think that is 4 cores with 2 threads each; but this is where I get confused.

I also tried it on 2 linux servers that I have access to:

RHEL6 x86_64 Kernel: 2.6.32-696.23.1.el6.x86_64 CPUs: 12 and 24 gcc 4.4.7

TL;DR In all 3 cases (Mac + 2 Linux) dynesty seems to use almost exactly half of my cpus (50% +\- 5%). It oscillated so consistently around 50% that it feels as though it were programmed to use exactly 50% full CPU usage, with room for statistical uncertainty. Sometimes it grows to 70% for < 1 minutes (~30 seconds); but falls back down.


With other software (i.e. tensorflow) stack overflow level information informed me that "some problems are not worth all of your CPUs and tensorflow decides that on the fly". Is it possible that the multiprocessing library is not requesting all of the cpus? At the same time, you said that you tried my code above and it used all of your cpus.

joshspeagle commented 6 years ago

image

Yep, can confirm that dynesty is using all the cores on my machine. The variation in behavior is due to overhead in-between allocating batches (which requires operations done in serial), but during a single batch I can get up to full usage of the cores. My guess is you're seeing ~50% usage overall just because evaluating the likelihood is essentially instantaneous so the overhead is taking up a significant portion of the runtime.