Closed SteveBronder closed 3 years ago
At the cmdstan level say we added options like
... n_chains=4 parallel=4
CmdStan is a thin wrapper around the stan::services
methods - e.g., you'd need to change this method: https://github.com/stan-dev/stan/blob/1d91c28530244a0adde43cab2da9f74af9e2e4b2/src/stan/services/sample/hmc_nuts_diag_e_adapt.hpp#L57-L66
does this fit in with what you're envisioning?
Lol at that signature but also yes I think so
@mitzimorris is pointing out where the integration would have to start. (Those functions are indeed crying out to have some of their arguments grouped.)
If the model code remains immutable after data load, then we don't need any kind of synchronization anywhere. Each of the chains would be writing output to a different file or to a different bit of memory because we have to keep the chains separate and ordered. I suppose we could do it long form style with a column for which chain and that would have to be synchronized. Anyway, that's the callback handler's responsibility.
MPI would let us share across boxes, but it doesn't help share memory on a single box, does it? I don't get much more throughput on my 8-core Macbook with 8 chains than I do with 4 (this is a real 8-core i9, not hyperthreading cores). I attribute that to being memory bound, but it could also have something to do with "Turboboost" (I love that they still use "turbo") or whatever they call it that overclocks with fewer cores in use.
either serially or within parallel
Not sure I understand this. Why do we need multiple chains run in serial? By "within parallel" do you mean within-chain parallel?
If we need to have chains talk to each other, we'll need to do some stuffs outside of generate_transitions
. As @bob-carpenter said we'll need to take care of output stream to point to different files, as well as different rng seed. I did these things in cmdstan::command
.
In addition, within MPI framework thread safety doesn't drop from sky and one must take care of different scenarios of MPI + threading runs.
my concern is integrating this into the interfaces and explaining to users how to use this. does it replace the way that the interfaces currently specify number of chains to run?
the stan::services functions include an argument "chain" - https://github.com/stan-dev/stan/blob/1d91c28530244a0adde43cab2da9f74af9e2e4b2/src/stan/services/sample/hmc_nuts_diag_e_adapt.hpp#L32 which is used to advance the RNG. this is how the interfaces go about setting up multiple chains using the same seed for the RNG. the interfaces also allow per-chain inits.
how will this proposal do inits, etc?
@mitzimorris is pointing out where the integration would have to start. (Those functions are indeed crying out to have some of their arguments grouped.)
Yes that's def the starting place! I was mostly just lol'ing at the v long signature (but it works so ¯\_(ツ)_/¯ )
If the model code remains immutable after data load, then we don't need any kind of synchronization anywhere. Each of the chains would be writing output to a different file or to a different bit of memory because we have to keep the chains separate and ordered. I suppose we could do it long form style with a column for which chain and that would have to be synchronized. Anyway, that's the callback handler's responsibility.
Yeah idt there is any sync that needs to be done for the model. It's mostly a question of the callbacks and what campfire needs
MPI would let us share across boxes, but it doesn't help share memory on a single box, does it? I don't get much more throughput on my 8-core Macbook with 8 chains than I do with 4 (this is a real 8-core i9, not hyperthreading cores). I attribute that to being memory bound, but it could also have something to do with "Turboboost" (I love that they still use "turbo") or whatever they call it that overclocks with fewer cores in use.
I'm not sure about this, imo I'd like to have mpi handle multiple boxes and tbb threading for a single box.
either serially or within parallel
Not sure I understand this. Why do we need multiple chains run in serial? By "within parallel" do you mean within-chain parallel?
Sorry should have clarified. By serial run I mean serial chains. i.e. For cmdstan having an n_chains
argument that allows us to have one stan executable and run multiple chains. This is mostly just a UI and input/output question.
By parallel I mean taking that n_chain argument and running N chains in parallel. No within chain parallelism here
If we need to have chains talk to each other, we'll need to do some stuffs outside of generate_transitions. As @bob-carpenter said we'll need to take care of output stream to point to different files, as well as different rng seed. I did these things in cmdstan::command.
I'm going to read over your branch more to see what would need to come back out. Did you write a design doc somewhere?
my concern is integrating this into the interfaces and explaining to users how to use this. does it replace the way that the interfaces currently specify number of chains to run?
Any changes we do to run multiple chains in one executable should keep backwards compatibility with how interfaces currently set chains for multiple runs.
the stan::services functions include an argument "chain" - https://github.com/stan-dev/stan/blob/1d91c28530244a0adde43cab2da9f74af9e2e4b2/src/stan/services/sample/hmc_nuts_diag_e_adapt.hpp#L32 which is used to advance the RNG. this is how the interfaces go about setting up multiple chains using the same seed for the RNG. the interfaces also allow per-chain inits.
So I think if the user had n_chains=4
(or whatever we want to call it) we could take that chain
value and tick it up for each chain.
how will this proposal do inits, etc?
I have to read over @yizhang-yiz 's code to see how they handled that.
to be clear, this is not a proposal. This is just to collect what the spec would need in order to build a proposal / prototype.
Did you write a design doc somewhere?
No. The implementation is intended to validate the algorithm, so it was quick & dirty.
For cmdstan having an n_chains argument that allows us to have one stan executable and run multiple chains.
What's the gain of this instead of running a bash for loop?
By parallel I mean taking that n_chain argument and running N chains in parallel.
This can also be achieved by running bash using GNU parallel
tool.
I have to read over @yizhang-yiz 's code to see how they handled that.
Note that we did this only for MPI framework, but that framework was designed to allow MPI run with communicating chains and within-chain parallelism.
I think it would be really good to run multiple chains in a standardized way by Stan services rather then letting interfaces handle this. This should also tackle how we handle in a clean way specification of the number of threads per chain. The upside of having this in Stan services is that we can give 8 worker threads to 4 chains - as an example- and whenever one chain finishes early the other chains can take advantage of the freed resources. ...but I am afraid this is a large api design task...
The scope is creeping. Are we talking about:
I'm not sure about this, imo I'd like to have mpi handle multiple boxes and tbb threading for a single box.
I htink you're saying the same thing as me.
it would be really good to run multiple chains in a standardized way by Stan services rather then letting interfaces handle this
That's why @mitzimorris framed this as a services issue.
Any changes we do to run multiple chains in one executable should keep backwards compatibility with how interfaces currently set chains for multiple runs.
The key compatibility issue is from the user perspective. It's OK to have the interfaces have to change how they interact with Stan. It's not ideal as it can be a lot of work, but it's not the end of the world.
Are we talking about: ...
I was talking about independent chains being run in multiple threads using the same shared model object. The goal is to run models faster on my notebook. As is, our current multi-process model doesn't scale well to the 8 cores I have on my notebook. I think that's going to require sharing the model object so that there's only one copy of the data in memory.
But now that you bring it up, given that we're thinking about parallel adaptation, this should really be communicating chains.
I was talking about independent chains being run in multiple threads using the same shared model object.
This is an actionable scope that we should do in stan::service, and we can do it using either mpi or tbb. It'll also pave the way to communicating chains.
I was talking about independent chains being run in multiple threads using the same shared model object
I think this changes the relationship of autodiff and threads. Right now threaded autodiff comes through nesting. The primary autodiff stack (the one you get if you don't nest) is single threaded I believe.
The scope is creeping
I would have liked to keep the scope limited. Something like, MPI for across chains, threading for within chains. But right now we support within-chain parallelism with both MPI and threading and so that will be difficult.
I think the next step of MPI stuff needs to support the MPI stuff that was there, cause it's my impression people are running MPI models they wouldn't be able to run otherwise: https://discourse.mc-stan.org/t/new-paper-incorporating-map-rect-cluster-computing/11260.
I think it would be really good to run multiple chains in a standardized way by Stan services rather then letting interfaces handle this.
What's the gain of this instead of running a bash for loop?
I find the MPI branch incredibly handy for this. It's so much easier to just say run 4 chains with one command. This could be done without cross-chain-adaptation, but if we have cross-chain-adaptation all the chains will need to run together.
I'd like to take a dive to work on @bob-carpenter 's version: multiple independent chains sharing model object to reduce memory footprint caused by multiple such objects. This is a relatively easy and also meaningful step to the other scenarios.
The key to @bbbales2 comment is that we have to make the autodiff stack thread local if we do that, because it's a static. Last time I checked, that imposed around a 10-20% performance hit in and of itself.
This has some up in previous discussions and as I noted in https://discourse.mc-stan.org/t/mpi-framework-for-parallelized-warmups/11671/23 talk of parallelization gets ahead of important infrastructure problems.
A more productive sequence is
Each step has plenty of challenges but defines a reasonably manageable, self-contained context conditioned on the previous steps having been completed.
Getting (1) and (2) implemented would also also the interfaces to all run the same basic API routes instead of reimplementing a bunch of functionality as RStan and PyStan currently do. This would go a long way towards the unification of the interface functionality that was a main focus of the roadmap that has unfortunately since been disregarded.
In other words not only does this sequence make development more manageable it also would allow for parallel interface development sooner rather than later.
On Apr 25, 2020, at 3:17 PM, Steve Bronder notifications@github.com wrote:
Summary:
Thought it would be a good idea to talk about what steps we would need to run multiple chains in a single stan program (either serially or within parallel). @yizhang-yiz and @bbbales2 have the mpi branch up, reading over their implementation they've done quite a lot! I'd like to just sketch out what the bare minimum is for calling K chains in serial/parallel.
Description:
At the cmdstan level say we added options like
... n_chains=4 parallel=4
What would we need to allow that?
It looks like a lot of it would go into generate_transitions where
• We would pass it the number of chains • We would need an atomic util::mcmc_writer and other callbacks • Assuming writer, logger, and callbacks are threadsafe, wrap the loop for the iters in generate transitions in a parallel executor (either parallel_for in tbb or whatever we want to call for MPI) For something like campfire and the mpi version would it work to have parallism all at the level of generate_transitions? Could the function that calls generate_transitions run for N transitions on K chains, then do campfire stuff over the window and keep running and then call generate_transitions again? If we write it general enough we could have an option that's something like execution_policythat could be either MPI for clusters andtbb` for threads.
Current Version:
v2.23.0
— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub, or unsubscribe.
multiple independent chains sharing model object to reduce memory footprint caused by multiple such objects
Assuming we run 4 chains, this is targeting a situation where data is large enough that 4 copies would not fit in cache but one copy would. That's a tight window. It gets smaller if we consider how much cache is getting used by the autodiff.
I don't think this is a good way to go.
Last time I checked, that imposed around a 10-20% performance hit in and of itself.
I think @wds15 found a workaround for that overhead (he said the current implementation has like a worst case 1-2% overhead).
By my arithmetic, we're going to be pretty badly memory bound.
My new Macbook Pro has a 16MB L3 cache and 8 cores, so at full capacity, that's only 2MB/core.
Assume we have 9 predictors, 1 outcome, and 25K observations. That's 2MB of data per chain if we copy the data in each chain. Autodiff adds a lot to that, as does the handling of output in RStan (and I believe PyStan), which is all kept in memory.
Well I don't disagree with that, but even if we're talking about living in an 8 chain world where all the data is shared, that only gets us to 200k observations (again ignoring autodiff), which I guess isn't nothing.
multiple independent chains sharing model object to reduce memory footprint caused by multiple such objects. This is a relatively easy and also meaningful step to the other scenarios.
But I think about it I've changed my mind. The lifetime of these memory objects probably does need redefined at a higher level than the autodiff.
I guess what Yi is getting at is this is similar to not wanting to ship Stan data over MPI, or to the GPU every log density evaluation. So everything that is reading data anywhere needs to have a read-only accessor to this stuff.
But then I guess my question is if chains are processes, one process can't just read another process's memory can it?
@betanalpha I need to be better at words because this is exactly what I meant! (1) is what I meant by serial, though I think I globbed by being excited about doing that stuff in parallel.
But then I guess my question is if chains are processes, one process can't just read another process's memory can it?
If by process you mean like separate programs running then no they can't. What Bob is talking about is sharing the model object holding the data across threads.
What was @wds15 's solution? Have we ever thought about making the stack allocator into a pool of stack allocators? Then each thread would have access to one of the K stack allocators in the pool (K is just the number of threads).
But then I guess my question is if chains are processes, one process can't just read another process's memory can it?
https://www.mpich.org/static/docs/latest/www3/MPI_Win_allocate_shared.html
I had been assuming MPI couldn't share memory. If it can share it as efficiently as multi-threading, then that'd be another way to cut down on the memory duplication.
And it'd probably be good to have a mode in Stan/PyStan where it only writes to file and doesn't save results in memory as it goes to reduce memory pressure.
If it can share it as efficiently as multi-threading, then that'd be another way to cut down on the memory duplication.
Did a benchmark of mpi shared mem against OpeMP a couple of years ago and they're pretty close, but it was in different application context and details buried in internal reports. After all in this case the backend of MPI implementation on multicore is pthreads IIRC.
Attacking this in steps as @betanalpha suggests certainly makes. It's just a lot of work as it appears to me.
What I would really recommend to do is to get away from the concepts of threads and instead think in terms of tasks - which is the concept of the TBB we adopted anyway. This let's us deal with logical work packages which need to be done rather than handling low level threads. I keep on saying this and no one seems to pick this up. I don't know why.
Now, the autodiff tape is no different in any thread. It just happens that one thread will always be the main thread and that one can fire off sub-tasks which perform a nested AD sweep on any of the available threads given to the TBB. Having multiple "main" threads at the same time which drive an AD tape of their own, is not an issue with our current design.
@SteveBronder each thread already has it's own thread-local allocator. This is exactly what is happening right now (maybe I misunderstand your comments); basically each AD tape consists of the tape itself and an allocator.
I have to correct myself for saying threading does not cost you more than 2-3%. It is definitely less than 20%! We should probably run decent benchmarks once more, but here is my current understanding:
What I wrote for map_rect
simply copies the static data only upon the first iteration and reuses local cached copies for subsequent iterations. Thus, on a given machine with multiple MPI instances running we end up having multiple copies of the data with the current thing.
I have said it many times - I am not an MPI expert and what I did was to make things work. Having said that, the current MPI thing in Stan would need to be overhauled to allow for multiple chains to be run in one thread.
I somehow doubt that avoiding copies of the data will buy us a lot. This should really be explored in a prototype first before embarking on this - and one can anyway "solve" this problem the brute force way of using multiple machines on a cluster. Large problems will anyway be solved on clusters, I guess.
The discussion we have here is great, but maybe this should be moved to discourse?
Attacking this in steps as @betanalpha suggests certainly makes. It's just a lot of work as it appears to me.
I like breaking it down piece by piece because imo I think it's going to be a lot of work either way and having the clear bits in place for step 1 should help step 2 etc.
What I would really recommend to do is to get away from the concepts of threads and instead think in terms of tasks - which is the concept of the TBB we adopted anyway. This let's us deal with logical work packages which need to be done rather than handling low level threads. I keep on saying this and no one seems to pick this up. I don't know why.
Yes! I think the language is hard to change because we are so used to talking about threading. But yeah I like phrasing chains as tasks.
@SteveBronder each thread already has it's own thread-local allocator. This is exactly what is happening right now (maybe I misunderstand your comments); basically each AD tape consists of the tape itself and an allocator.
Yep makes sense!
What I wrote for map_rect simply copies the static data only upon the first iteration and reuses local cached copies for subsequent iterations. Thus, on a given machine with multiple MPI instances running we end up having multiple copies of the data with the current thing.
Is there even a way to get around that for MPI on a cluster of computers tho?
I somehow doubt that avoiding copies of the data will buy us a lot. This should really be explored in a prototype first before embarking on this - and one can anyway "solve" this problem the brute force way of using multiple machines on a cluster. Large problems will anyway be solved on clusters, I guess.
I think this week I'm going to try to throw something up. What I'd like to do is have prototypes for MPI and tbb so we can look at both and see if we can abstract calling multiple chains with either
The discussion we have here is great, but maybe this should be moved to discourse?
I'd rather leave the discussion here instead of hopping back and forth (and I feel like for purely coding related things like this github is a better venue)
get away from the concepts of threads and instead think in terms of tasks - which is the concept of the TBB we adopted anyway. This let's us deal with logical work packages which need to be done rather than handling low level threads. I keep on saying this and no one seems to pick this up. I don't know why.
I've known this all along. It's how all the J2EE web frameworks work, for example. And how low-level thread pooling works in Java apps I used to work on.
In terms of pickup, there are a bunch of issues that are getting conflated:
None of these important design details can be swept under the rug easily. Also, I hadn't realized
Having multiple "main" threads at the same time which drive an AD tape of their own, is not an issue with our current design.
And thanks for the breakdown by OS. It's great the cost is more manageable than simply declaring the stack to be threadlocal. Do you know how TBB manages per-thread resource allocation efficiently?
I somehow doubt that avoiding copies of the data will buy us a lot. This should really be explored in a prototype first before embarking on this - and one can anyway "solve" this problem the brute force way of using multiple machines on a cluster. Large problems will anyway be solved on clusters, I guess.
That doesn't solve the problem for me fitting models on my notebook. I'm not talking large problems, either, I'm talking about the fact that the model I was fitting yesterday took 13s with 1 chain, 14s with 2 parallel chains, 16s with 4 parallel chains, and 25s with 8 parallel chains (and this isn't because I got unlucky with seeds---this model has very stable compute times and I fixed the seed and chain 1 is the slowest to finish in all runs). This is a relatively small data set with 18K observations and only a dozen or so parameters. The autodiff stack size will be about 18K * 200 bytes = 2MB, so just the data fills my cache. The bigger models for MRP I was fitting with Andrew have a much bigger slowdown when multiple of them are run, which leads me to believe it really is a memory issue. It could also be something like Intel's chips trying not to burn up my notebook. So I imagine the solution will need to vary by platform.
The tbb allocator is so-called scalable and avoids false sharing, see https://www.threadingbuildingblocks.org/tutorial-intel-tbb-scalable-memory-allocator
@bob-carpenter can u share that model? Also, you are aware of the (flawed) way cmdstan reports execution times? I mean, cmdstan reports cpu time used rather than actual wall time consumed. So you have to use the time command to get the actual wall time of parallel runs.
The performance of reverse mode ad is enormously affected by memory locality - so what fits in your cpu Cache will determine the speed. A neat example for that is reduce sum. Just by using that you gain verY often speed, simply due to running many smaller ad sweeps which fit nicely into the cpu caches as opposed to one big ad sweep. So having a smaller memory footprint is only helpful if that leads to better use of fast caches ( close to the cpus you use). Doing a few copies can make this more flexible and thus faster...but this depends very much on details. We should probably profile this with the perf tools on Linux which analyze cache use.
Another point to forget about a main thread: we actually support nested reduce sum calls. So the notion of main thread is misleading. We simply fire a lot of tasks at the tbb - that’s it.
For the shared memory mpi thing I wonder if boost mpi supports that. If not then at least I would stay away from low level mpi coding.
Having multiple "main" threads at the same time which drive an AD tape of their own, is not an issue with our current design.
So the notion of main thread is misleading
The default behavior is the first autodiff stack each thread sees is not threadsafe. And then if they start nested things those are threadsafe. That's what I was getting at.
Anyway I think what @bob-carpenter brought up about the model class is kinda the right abstraction to think about. How does the model class get loaded up with data? What happens we the gradient function gets called? And what exactly is making the gradient calls?
I'd kinda like some diagrams. It seems hard to think through. I tried to draw one myself and got confused.
The bigger models for MRP I was fitting with Andrew have a much bigger slowdown when multiple of them are run, which leads me to believe it really is a memory issue.
Matrix vector multiplies are memory bound, so if the non-hierarchical part of the model is big (the X * beta
bit), it should remain memory bound. Same will be true for the hierarchical part of the model (that is coded with a for-loop).
But just thinking about X * beta
is fine.
data {
int N; // N data points
int K; // K parameters
matrix[N, K] X;
real y[N];
}
parameters {
vector[K] beta;
}
model {
y ~ normal(X * beta, 1.0);
}
Run with:
library(cmdstanr)
N = 4000
K = 50
C = 8
beta = rnorm(K)
X = matrix(rnorm(N * K), nrow = N)
y = rnorm(N, X %*% beta, 1.0)
m = cmdstan_model("lr.stan")
system.time({ f1 <- m$sample(list(N = N,
K = K,
X = X,
y = y), num_cores = C, num_chains = C) })
system.time({ f2 <- m$sample(list(N = N,
K = K,
X = X,
y = y), num_cores = 1, num_chains = 1) })
N = 4000
K = 50
C = 8
Should do the trick.
I get about 7 seconds on 1 chain on 1 core and 24 seconds for 8 chains on 8 cores.
8 chains 8 cores:
user system elapsed
195.792 0.824 25.447
1 chain 1 core:
user system elapsed
7.167 0.058 7.146
Sure. I was measuring in RStan. But this has been a feature of every model I run that takes 30s or so to finish 1000 total iterations.
I don't know how to share here. So I just committed where I'm at on the repo where I'm working.
Repo: https://github.com/bob-carpenter/case-studies
Data (CSV form): https://github.com/bob-carpenter/case-studies/tree/master/anno-difficulty/data/canonical/espeland-et-al
R script to run: https://github.com/bob-carpenter/case-studies/blob/master/anno-difficulty/stan/fit-binary-logit.R
Also, @bbales2 example timing is consistent with what I've seen with bigger models. So we can't just assume that N cores runs N times as fast. 8 cores is running 2.25 times as fast as 1 core.
I think that time from Ben includes reading everything back into memory from file. Is there a way to turn that off in CmdStanR? Or does it not work like RStan in creating a fit object?
I think that time from Ben includes reading everything back into memory from file. Is there a way to turn that off in CmdStanR?
CmdStanR scans output files to check for divergences, but only reads the sample into memory when the fit$sample
method is called.
@bob-carpenter Ok, so I took your model and rewrote it with reduce_sum
. My system is a bit limited with 4 cores, so I only tested 1 chain with 1, 2 & 3 cores. The serial version quoted below is your original model.
First, let's see if I did not mess up in coding the lp and then let's compare the runtime in seconds:
> print(fit_serial, pars="lp__")
Inference for Stan model: chain-1-24534.
1 chains, each with iter=500; warmup=250; thin=1;
post-warmup draws per chain=250, total post-warmup draws=250.
mean se_mean sd 2.5% 25% 50% 75% 97.5% n_eff Rhat
lp__ -7468.84 0.19 2.13 -7473.66 -7470.01 -7468.56 -7467.16 -7465.72 132 1.01
Samples were drawn using NUTS(diag_e) at Wed Apr 29 12:40:30 2020.
For each parameter, n_eff is a crude measure of effective sample size,
and Rhat is the potential scale reduction factor on split chains (at
convergence, Rhat=1).
> print(fit_rs1, pars="lp__")
Inference for Stan model: chain-1-24534.
1 chains, each with iter=500; warmup=250; thin=1;
post-warmup draws per chain=250, total post-warmup draws=250.
mean se_mean sd 2.5% 25% 50% 75% 97.5% n_eff Rhat
lp__ -7468.82 0.19 2.11 -7473.6 -7470.39 -7468.4 -7467.29 -7465.53 120 1
Samples were drawn using NUTS(diag_e) at Wed Apr 29 12:46:20 2020.
For each parameter, n_eff is a crude measure of effective sample size,
and Rhat is the potential scale reduction factor on split chains (at
convergence, Rhat=1).
> print(fit_rs2, pars="lp__")
Inference for Stan model: chain-1-24534.
1 chains, each with iter=500; warmup=250; thin=1;
post-warmup draws per chain=250, total post-warmup draws=250.
mean se_mean sd 2.5% 25% 50% 75% 97.5% n_eff Rhat
lp__ -7469.07 0.19 2.23 -7474.72 -7470.33 -7468.88 -7467.44 -7465.84 131 1
Samples were drawn using NUTS(diag_e) at Wed Apr 29 12:45:09 2020.
For each parameter, n_eff is a crude measure of effective sample size,
and Rhat is the potential scale reduction factor on split chains (at
convergence, Rhat=1).
> print(fit_rs3, pars="lp__")
Inference for Stan model: chain-1-24534.
1 chains, each with iter=500; warmup=250; thin=1;
post-warmup draws per chain=250, total post-warmup draws=250.
mean se_mean sd 2.5% 25% 50% 75% 97.5% n_eff Rhat
lp__ -7468.81 0.21 2.21 -7473.95 -7470.03 -7468.3 -7467.37 -7465.79 107 1
Samples were drawn using NUTS(diag_e) at Wed Apr 29 12:44:09 2020.
For each parameter, n_eff is a crude measure of effective sample size,
and Rhat is the potential scale reduction factor on split chains (at
convergence, Rhat=1).
> elapsed_serial
user system elapsed
49.688 0.333 50.448
> elapsed_rs1
user system elapsed
47.309 0.284 47.971
> elapsed_rs2
user system elapsed
55.132 0.236 27.890
> elapsed_rs3
user system elapsed
60.352 0.292 20.413
>
So these are nice speedups, I think. In particular when considering that the bernoulli logit is super cheap to calculate which makes this problem memory bound mostly.
I used cmdstan 2.23.0 for this.
Ah, and yes, RStan can be very slow when you run models which return a lot of outputs.
Here is the Stan model:
// The ordering of logit_alpha constrains annotators to be more likely to
// produce a positive label from a positive instance than from a negative
// instance. This is equivalent to requiring AUC > 0.5 for annotators.
functions {
real item_latent_mix_lpmf(int[] item_slice, int start, int end,
int[] y,
int[] items_sidx,
int[] jj,
vector[] logit_alpha,
real log_pi,
real log1m_pi
) {
real lp = 0.0;
real zero = 0;
int M_slice = end - start + 1;
matrix[M_slice, 2] mix_lp = rep_matrix(zero, M_slice, 2);
for (i in start:end) {
int i_slice = i - start + 1;
int item_start = items_sidx[i];
int item_end = items_sidx[i+1]-1;
for(n in item_start:item_end) {
mix_lp[i_slice, 1] += bernoulli_logit_lpmf(y[n] | logit_alpha[jj[n], 2]);
mix_lp[i_slice, 2] += bernoulli_logit_lpmf(y[n] | logit_alpha[jj[n], 1]);
}
lp += log_sum_exp(log_pi + mix_lp[i_slice, 1], log1m_pi + mix_lp[i_slice, 2]);
}
return(lp);
}
}
data {
int<lower = 0> N; // # observations
int<lower = 0> I; // # items
int<lower = 0> J; // # annotators
int<lower = 1, upper = I> ii[N]; // item
int<lower = 1, upper = J> jj[N]; // annotator
int<lower = 0, upper = 1> y[N]; // label (0:neg, 1:pos)
int<lower = 1, upper = N+1> items_sidx[I+1]; // item => obs mapping
int<lower = 1> grainsize;
}
transformed data {
int<lower = 1, upper = I> items[I];
for(i in 1:I)
items[i] = i;
}
parameters {
real logit_pi; // log odds of prevalence of positive response
ordered[2] logit_alpha[J]; // log odds of coder responses (1:neg, 2:pos)
}
model {
real log_pi = log_inv_logit(logit_pi);
real log1m_pi = log_inv_logit(-logit_pi);
target += reduce_sum(item_latent_mix_lpmf, items, grainsize,
y,
items_sidx,
jj,
logit_alpha,
log_pi,
log1m_pi
);
logit_pi ~ normal(0, 1.8); // roughly uniform on pi (scale matches std logistic)
logit_alpha[ , 1] ~ normal(-1, 2); // centered at 73% specificity
logit_alpha[ , 2] ~ normal(1, 2); // centered at 73% sensitivity
}
the items_sidx
is created in R with: items_sidx <- cumsum(c(1,rle(ii)$lengths))
So on your 8-core laptop you should see a significant speedup, when running 4 chains with 2 cores each. The timings above are for 250 / 250 samples.
> elapsed_serial
user system elapsed
49.688 0.333 50.448
> elapsed_rs1
user system elapsed
47.309 0.284 47.971
> elapsed_rs2
user system elapsed
55.132 0.236 27.890
> elapsed_rs3
user system elapsed
60.352 0.292 20.413
Can you add in the 2 chain/2 core and 3 chain/3 core timings?
The coolest world for reduce_sum would be if courtesy the memory bottleneck reduce_sum 3 threads ran in 3 times less time than 3 chain/3 core (so they would be equivalent in terms of overall ESS/s).
my laptop has 4 cores... so hold you breath as I need to migrate to our cluster for that.
the more cores you use per chain, the less efficient it will be is my prediction (Amdahl's law kicks in).
... and these evaluations are very model specific... this model with a bernoulli_logit_lpmf
is certainly memory bound.
Sorry sorry, I meant 1 thread per chain, 3 chains on 3 cores in parallel. Not 3 threads per chain.
Ah, that can be done. First the serial version of the stan program and then the reduce sum thing (with one thread only per chain)
> elapsed_serial
user system elapsed
49.688 0.333 50.448
> elapsed_serial2
user system elapsed
104.494 0.608 59.728
> elapsed_serial3
user system elapsed
182.017 1.181 67.802
> elapsed_rs1_1
user system elapsed
47.232 0.246 47.800
> elapsed_rs1_2
user system elapsed
97.294 0.522 49.827
> elapsed_rs1_3
user system elapsed
158.272 0.936 55.939
>
Running independent chains is, of course, the more efficient thing in terms of parallelisation efficiency, but if you have more cores, then you gain.
What I would find a cool benchmark is too
That should give us very good ESS / s even if you have only few cores, since the warmup time is drastically reduced (hopefully).
68s is for 3 chains in parallel with the reduce_sum model?
And 56s for 3 chains in parallel with the reduce_sum model?
If so comparing these vs 3x 20s for reduce_sum on three threads seems pretty good.
run 1 chain with X cores for the warmup
I think we need multiple chains of some sort for warmup. Cross-chain diagnostics are just so effective.
It's 68s for 3 chains with the serial model (no reduce_sum).
It's 55s for 3 chains with the reduce_sum
model which, is run with just 1 thread per chain. The cool thing is that the reduce_sum formulated model is faster here as it seems, since the reduce_sum model has a smaller footprint on the CPU caches - I think.
I am not sure I understand you conclusion - which setting is better from your perspective and why?
EDIT: Ah, you mean as a comparison to run in sequence 3 chains with 3 threads each, right? If so, then I say that this is good news for the multi-threaded thing as I can reuse the warmup info and cut down on warmup if I wanted to.
In the past we never used multiple chains to warmup any chain. We just checked it after the fact. So if we stay with the old fixed iteration scheme for warmup, then I don't see why my proposal would not make sense. The least you can do is 2 chains for warmup with 2 threads each and then use 1 thread for 4 chains running in parallel.
I am not sure I understand you conclusion - which setting is better from your perspective and why?
I just wanted to make sure I was reading it correctly. I am very happy with both of these cuz running 3 chains sequentially with 3 threads would be basically as fast.
I guess back to not where this thread started but somewhere in there, but I want MPI reduce_sum.
We just checked it after the fact. So if we stay with the old fixed iteration scheme for warmup, then I don't see why my proposal would not make sense
Well the initial conditions being different is a big deal cause that catches multimodality better.
- run 1 chain with X cores for the warmup
- run X chains with 1 thread per chain for some further iterations and all chains use the same warmup
I'm curious what @andrewgelman thinks about this approach:
the reduce_sum formulated model is faster here as it seems, since the reduce_sum model has a smaller footprint on the CPU caches - I think.
I couldn't parse the naming scheme, so I couldn't line up the numbers I wanted:
For both the old and new implementations, what's the speed of:
Anyway, the initial results that the reduced form is better validates my speculation on things being memory bound. The question I have is whether running a shared model would be faster than separate processes.
I want MPI reduce_sum.
I think there is a lot other stuff which should go first (adjoint ODE, cross-chain talk warmup, sparse matrices, static_matrix stuff enabling SIMD, FFTs). Also, you will have to re-engineer a lot of the MPI things in Stan-math for it; for example, there is no task-scheduling or anything like that.
I would actually even remove MPI for the sake of keeping our lives easy. I do not plan to invest more on it other than fixing things in our current MPI implementation which I feel responsible for. I am also happy to give input, but I don't intend to touch this beast from the 90s anymore, really... unless there are good reasons which I haven't seen yet.
From what I understand from IT pro's is that you can actually logically combine multiple computers to a single one even though they are separate. This may not be as efficient as MPI, but you can then basically run over multiple machines a big threaded process.
@bob-carpenter : The naming logic is as elapsed_serialX
= your old program without reduce_sum
; elapsed_rs1_X
the program with reduce_sum
- does that help?
When you ask for 4 chains in sequence vs 4 chains in parallel, then that conflates with the TurboBoost technique from Intel. So when there is low load on a system, then the CPU can speed up to some degree as compared to when there is high simultaneous load.
The question I have is whether running a shared model would be faster than separate processes.
Fair question, but I have my doubts on it as we are super cache bound which I think will not be improved by sharing the data - I agree in that it is worthwhile to test.
I would actually even remove MPI for the sake of keeping our lives easy.
How about we decide which of MPI and TBB to use and toss the other? Removing TBB and eliminating multi-threading would also greatly simplify our lives.
but I don't intend to touch this beast from the 90s anymore,
Let's not spread FUD here. C++ is a beast from the 80s.
From what I understand from IT pro's is that you can actually logically combine multiple computers to a single one even though they are separate. This may not be as efficient as MPI, but you can then basically run over multiple machines a big threaded process.
In the land of what-if, we could also run shared memory with MPI.
When you ask for 4 chains in sequence vs 4 chains in parallel, then that conflates with the TurboBoost technique from Intel.
We're not writing theory papers---that's the hardware I actually have in front of me. So I want to know how to run Stan on it most efficiently.
In my actual runs with larger models than the one I posted, I'm finding that running beyond two parallel chains in RStan (multi-process, copying data), things slow down to the point that it's not going much faster than running pairs of chains in sequence.
What I'd like is an RStan or CmdStan that runs 8 chains on 8 cores nearly as fast as 1 chain on 1 core.
How about we decide which of MPI and TBB to use and toss the other?
Sure, we can do that if you really want to (I think the TBB would win out due to much better user-friendliness to get this going on all the platforms).
To be clear: I have only said that I personally would remove MPI. I know that others are very much in favour for it and there would be huge resistance likely from many. So I am not suggesting to discuss this. However, I am frank about that I limit my own resources spend into this other than taking care of what I feel responsible for as I have build the MPI things which are there.
In my actual runs with larger models than the one I posted, I'm finding that running beyond two parallel chains in RStan (multi-process, copying data), things slow down to the point that it's not going much faster than running pairs of chains in sequence.
Sounds like your CPU caches get saturated and as such there is not a lot we can do other than using more computers. If a shared memory model helps here, that would be great, of course; we should try, but I just have my doubts and I am happily convinced by nice benchmarks.
I am happily convinced by nice benchmarks.
Yup. On me to create benchmarks for this.
The cause for optimism is one copy of the data, which gets visited every iteration. The cause for pessimism is the autodiff stack, which is almost always larger than the data and can't be shared.
I think there is a lot other stuff which should go first
I am frank about that I limit my own resources spend into this other than taking care of what I feel responsible for as I have build the MPI things which are there.
Yes there are large development costs, but I see it's the only way to scale hierarchical regressions. We can do GPU (and I want that too), but you can only buy a GPU so large.
There's an added bonus that @yizhang-yiz keeps volunteering to do all the work (which would probably include reworking the code you wrote -- so maybe that would help you feel less responsible if I'm reading what you said correctly). We should not be telling him MPI is hard. We should be giving him useful feedback on how to make MPI interact with everything else in the Stan-o-verse. That can be as simple as, "MPI better not mess up X".
I would actually even remove MPI for the sake of keeping our lives easy.
I don't think it's a reasonable option: https://discourse.mc-stan.org/t/new-paper-incorporating-map-rect-cluster-computing/11260 . People already use this to solve real problems.
unless there are good reasons which I haven't seen yet
Bob's example, and basically any sizable regression model, are memory bound. The only way to scale memory bound things is get more bandwidth (more computers, different hardware), or use less memory.
What I'd like is an RStan or CmdStan that runs 8 chains on 8 cores nearly as fast as 1 chain on 1 core.
The cause for optimism is one copy of the data, which gets visited every iteration.
There's an additional constraint here that even if we share the Eigen::MatrixXd during the forward pass, it doesn't get shared for the reverse pass: https://github.com/stan-dev/math/blob/develop/stan/math/rev/fun/multiply.hpp#L220
So I think engineering this test would be harder than maybe it seems. It might require some higher level engineering anyway.
The most scalable thing we could do with memory-bound matrix-vector multiplies is merge them into matrix-matrix multiplies, which we'd do if we were coding this manually, but I don't think there's any way to take advantage of this automatically the way Stan models are written.
FWIW I don't think doing an MPI reduce_sum
the way MPI map_rect
is done would be worth the effort. I do however think we should move MPI transfers to the model level, which would simplify things a lot and then make the MPI reduce_sum
. That would be very nice to have and I am willing to help out.
MPI by itself isn't any more difficult to work with than TBB (ignoring the install parts here) and they both can and in my opinion should work together . MPI might be more difficult if we want to solve everything "magically" at the Math level, but we don't need to. The way we do things for GPU GLMs and will do for the rest GPU functions is how we should also solve it for MPI. Let stanc3 handle the data transfers.
For MPI we should:
So I managed to get a crappy mvp up with the dawid-skene-binary-logit
example for cmdstan. You can check it out with. It's a bit ugh, parallel io is going to be weird
https://github.com/stan-dev/cmdstan/tree/multi-chain
# from cmdstan
git checkout multi-chain
echo "STAN_THREADS=true" > make/local
make -j4 build
make -j4 ./examples/dawid/dawid-skene-binary-logit
time -p STAN_NUM_THREADS=8 examples/dawid/dawid-skene-binary-logit sample data file=examples/dawid/caries_dump.R chains n_chains=4
It just uses a little tbb::parallel_for
around the body of run_adaptive_sampler
(it only works with diag_e_adapt atm)
The results are, bad
8 Threads 8 cores: real 698.90 user 4831.39 sys 66.06
4 Threads 4 cores: real 486.07 user 1778.25 sys 0.13
Running with linux parallel -j8: real 141.07 user 1035.76 sys 0.77
The only way I could assume numbers would be that bad would be if
Something is thrashing memory very hard.
I'm running perf and heaptrack over it right now to see what's going on
Ah, my version is leaking memory like crazy
You can download heaptrack to check it out
STAN_NUM_THREADS=8 heaptrack examples/dawid/dawid-skene-binary-logit sample data file=examples/dawid/caries_dump.R chains n_chains=4
peak heap memory consumption: 77.0 MB after 265.776s peak RSS (including heaptrack overhead): 358.6 MB total memory leaked: 75.9 MB
Stan doesn't clean up after itself and lets the process takedown do that, so that may be where the extra memory's coming from.
What's RSS?
And it doesn't look like it cuts down on memory pressure noticeably to share the data.
Running with linux parallel -j8: real 141.07 user 1035.76 sys 0.77
@SteveBronder How does parallel
scale with this model?
scale in what way? Like more data or with more jobs?
Summary:
Thought it would be a good idea to talk about what steps we would need to run multiple chains in a single stan program (either serially or within parallel). @yizhang-yiz and @bbbales2 have the mpi branch up, reading over their implementation they've done quite a lot! I'd like to just sketch out what the bare minimum is for calling K chains in serial/parallel.
Description:
At the cmdstan level say we added options like
What would we need to allow that?
It looks like a lot of it would go into
generate_transitions
whereutil::mcmc_writer
and other callbacksFor something like campfire and the mpi version would it work to have parallism all at the level of
generate_transitions
? Could the function that callsgenerate_transitions
run for N transitions on K chains, then do campfire stuff over the window and keep running and then callgenerate_transitions
again? If we write it general enough we could have an option that's something likeexecution_policy
that could be either MPI for clusters andtbb
for threads.Current Version:
v2.23.0