mattjj / pyhsmm

MIT License
547 stars 173 forks source link

Question on parallelism #5

Closed inti closed 10 years ago

inti commented 11 years ago

Hi, A quick question on the parallelism implemented for you code. i am not seen much speed up on some test data so I am wondering if I am understanding how to exploit properly your parallel implementation.

i am starting the ipcluster with $ ipcluster start --n=8 2013-04-02 14:04:54,751.751 [IPClusterStart] Using existing profile dir: u'/home/inti/.config/ipython/profile_default' 2013-04-02 14:04:54.754 [IPClusterStart] Starting ipcluster with [daemon=False] 2013-04-02 14:04:54.754 [IPClusterStart] Creating pid file: /home/inti/.config/ipython/profile_default/pid/ipcluster.pid 2013-04-02 14:04:54.754 [IPClusterStart] Starting Controller with LocalControllerLauncher 2013-04-02 14:04:55.754 [IPClusterStart] Starting 8 Engines with LocalEngineSetLauncher 2013-04-02 14:05:26.478 [IPClusterStart] Engines appear to have started successfully

Is the parallelism being applied to each data loaded to the model? i.e., each data loaded with model.add_data_parallel() is being run on a different ipython engine.

Or is the actual state sampling calculation being spread across the engines?

thanks in advance,

BW, Inti

mattjj commented 11 years ago

The state sampling is spread across the engines: for example, if you have 4 engines and 4 state sequences, each time resample_model_parallel() is called the current global model (transition matrix and observation parameters) is sent to each of the four engines, each engine does message passing and block-samples the hidden state for one data sequence and sends that state sequence back, then the ipython client uses those updated state samples to gather statistics and resample the global model parameters (transition matrix and observation parameters). (That is, assuming things are working correctly! I haven't tested the HMM case much but it should work just like the HSMM case.)

When the state sequence sampling is expensive, as with HSMM message passing, that kind of parallelism will result in a big speedup. If state sampling is pretty fast other (unparallelized) parts of the code may start to dominate, and the communication overhead may actually play a role. (The data isn't communicated every time; just the pickled model going out and the numpy array state sequences coming back.)

Profiling the code or adding timing print statements may answer whether the other (unparallelized) steps are dominating the running time. Also, can you give me an idea of the length and number of your data sequences, and how many engines are running on how many different machines/cores?

inti commented 11 years ago

hi, here some additional information this is being run on a single machine with 8 cores. looking at the CPU usage it seems that most action is happening on a single CPU. See below for some timings there seems to be not speed up at all.

At the moment I have 8 data sequences of lengths 806, 784, 772,741, 553, 588, 336, 342. I am using the StickyHMM model with negative binomial distributions on the data.

some timings:

WITHOUTH PARALLELISM

%time model.resample_model() CPU times: user 178.58 s, sys: 0.02 s, total: 178.60 s Wall time: 179.11 s

WITH PARALLELISM using ipcluster start --n=8, that is 7 engines

%time model.resample_model_parallel() CPU times: user 178.91 s, sys: 0.02 s, total: 178.94 s Wall time: 179.54 s

%time model.resample_model_parallel('engines') CPU times: user 179.98 s, sys: 0.02 s, total: 180.00 s Wall time: 180.61 s

I will give it a try with a full dataset to see if with longer and more sequences a speed up is seen and i will investigate more and provide more information when I have it.

thanks a lot inti

mattjj commented 11 years ago

I think with relatively short sequences like you have the message passing and state sampling will be extremely fast, probably even comparable to the communication overhead. I think the time is definitely spent elsewhere, and I bet it's in the concentration parameter resampling if you're using that. Can you do some timing on that piece of the code?

inti commented 11 years ago

Hi, I am running it now with some actual data. Before I was running with some test data I am using to get things in place. the data it is running now has 22 sequences of lengths

450524 476387 389588 375312 355385 334781 310692 285770 240248 262607 262252 260952 191175 176579 163380 157765 155583 149304 111613 119006 70198 69781

Let see how that goes.

I think I could split these sequences, each in approx half and some in more smaller pieces. do you think that will speed up things: more shorter sequences v/s less longer ones.

Which is the bit of the concentration parameter

I can profile it with the previous (smaller) data and check how much time spending there.

Cheers, IntI

mattjj commented 11 years ago

Splitting sequences would make things faster, both because the sampling complexity is linear in the sequence length and because one can parallelize over more sequences if you're willing to use more cores.

The concentration parameter resampling happens if you instantiate the StickyHMM object with parameters like alpha_a_0 alpha_b_0 (etc); the stuff that would be slow gets executed during the call to trans_distn.resample(...) on in the HMM class's resample_model_parallel in models.py That time should not increase with the length and number of state sequences, though, so with long sequences it should be less relevant.

inti commented 11 years ago

Hi, I did a run by splitting the data in shorter sequences. I had 197 sequences ranging in size from 10s to 1000s. these was run using a ipcluster with 7 engines (+ 1 controller). Each iteration took in average 1071 sec. I printed the time just before and after the trans_distn.resample(...) call within the parallel re-sampling block. On the three iteration I run it consistently took 2 seconds. I did a second try with 6 sequences of lengths ~5000 and one of 1800 and the trans_distn.resample(...) step took 1 second.

I think 1-2 seconds is probably slow overall but it does not seems to the bottle neck on this case. I will put a few more time stamps to see where it is spending more time.

cheers, Inti

inti commented 11 years ago

OK, i think this is clearer now. within resample_model_parallel i timed each step and below and the results of one iteration.

resample parameters locally 77.2806079388 seconds trans_distn.resample 0.197187900543 seconds init_state_distn.resample 0.000120162963867 seconds _push_self_parallel 0.0160870552063 seconds _build_states_parallel 3.55682086945 seconds

so the loop

for state, distn in enumerate(self.obs_distns): distn.resample([s.data[s.stateseq == state] for s in self.states_list])

is using most of the time ....

so this should be the target of speed up I think ...

any ideas how to get this faster? I am happy to code stuff if that helps

BW, Inti

inti commented 11 years ago

how about this.

 def resample_model_parallel(self,numtoresample='all'):
        print "Starting parallel resampling v3.1"
        from pyhsmm import parallel
        from IPython.parallel import Client
        lbview = Client().load_balanced_view()

        if numtoresample == 'all':
            numtoresample = len(self.states_list)
        elif numtoresample == 'engines':
            numtoresample = len(parallel.dv)

        ### resample parameters locally
        for state, distn in enumerate(self.obs_distns):
                lbview.map(distn.resample(),[s.data[s.stateseq == state] for s in self.states_list])

resample parameters locally 0.453722953796 seconds trans_distn.resample 0.237637996674 seconds init_state_distn.resample 0.000121116638184 seconds _push_self_parallel 0.0771598815918 seconds _build_states_parallel 3.8880200386 seconds

timings are with the previous data on which resample parameters locally took approx 77 sec

Do you think this will preserve the integrity of the calculations?

BW, Inti

EDIT. 03/05/2013. The timing of the first step is not properly calculated since the function is not sending the data to the engines and it should. see below comment from Matt and another of my comments for a better comparison with the unparallelized version.

mattjj commented 11 years ago

Interesting results! I'd love to know more about the specific parts of the observation resampling that is taking time. What observation class are you using?

The right strategy depends on whether it's something like gathering statistics that is taking the most time or if it's the sampler itself taking the time. If you're using the NegativeBinomial class for observations, I have a feeling it's the resampling that is taking time, in which case a parallelization strategy like the one you suggest would be the most effective. (Something like what you wrote should work, though it would need to gather the new distribution objects back again, and it may be cheaper to send data_ids and indices (except if you have 1D observations it would probably be cheaper and easier to do the thing you wrote).)

It's probably also the case that the sampler itself could be sped up with some attention, so let me know if it's indeed the NegativeBinomial class.

inti commented 11 years ago

Hi, yes it is the NB class. the resample step is definetively the slow part due to the loop generating the msum values.

At the moment I am only working with 1D data, so I think I would be happy with this solution. the way I did it uses load balance, which I think, should make better use of all core available specially if different runs take different amount of time. I think if passing indexes is needed then the data would need to be on the engines as well, which I am not sure how to get done with the load_balanced set up since it does not have a push method. (i am very new to iPython parallel so probably there is a simple way around this which i do not know). I did check the values of the obs_distns list and they change after each iteration so I was kind of assuming it had done it in place but I am most certainly not sure that it has, do you think it is right thet way it is or it needs to gather back the results, and how would I update the distn object?

mattjj commented 11 years ago

I've found writing functions like this one can be a decent style for using iPython parallel:

def resample_obs_distn(tup):
    obs_distn, data = tup
    obs_distn.resample(data)
    return obs_distn

Then you should be able to call it like this:

self.obs_distns = lbview.map_sync(resample_obs_distn,[(o,[s.data[s.stateseq == state]
                        for s in self.states_list]) for state,o in enumerate(self.obs_distns)])

It looks a bit weird, but I wrote it that way so that the first argument to map_sync is pretty much a pure function and the second argument has all the "data". I'm sure there are a lot of ways that kind of thing can be improved, and I haven't tested it, but it's worked for me in the past!

(The functions in parallel.py essentially do the same thing but by decorating the function instead, which would be like decorating the definition of resample_obs_distns.)

inti commented 11 years ago

The above code runs fine but it is slower, which i think it may be a function of that now the data actually gets moved around, which as you pointed out it did not happen before. takes 220 sec with parallel as coded above and 6 sec without ... I think using the direct view and the data already on the engines will probably speed up that figure. i'll will at your functions on parallel.py and shout for help if I get stuck. thanks a lot inti

mattjj commented 11 years ago

Good luck! This is a neat use case that will probably be useful to understand in the future.

Matt

Sent from my phone

On Apr 3, 2013, at 11:09 AM, Inti Pedroso notifications@github.com wrote:

The above code runs fine but it is slower, which i think it may be a function of that now the data actually gets moved around, which as you pointed out it did not happen before. takes 220 sec with parallel as coded above and 6 sec without ... I think using the direct view and the data already on the engines will probably speed up that figure. i'll will at your functions on parallel.py and shout for help if I get stuck. thanks a lot inti

— Reply to this email directly or view it on GitHub.

inti commented 11 years ago

i'll keep you posted. On a quick test the local re sampling of parameters uses half of the time and the other half is roughly on the actual model update on the engines. so hopefully by improving the resampling of the distn parameters this could be fast enough.

BW, Inti

On 03/04/13 16:38, Matthew Johnson wrote:

Good luck! This is a neat use case that will probably be useful to understand in the future.

Matt

Sent from my phone

On Apr 3, 2013, at 11:09 AM, Inti Pedroso notifications@github.com wrote:

The above code runs fine but it is slower, which i think it may be a function of that now the data actually gets moved around, which as you pointed out it did not happen before. takes 220 sec with parallel as coded above and 6 sec without ... I think using the direct view and the data already on the engines will probably speed up that figure. i'll will at your functions on parallel.py and shout for help if I get stuck. thanks a lot inti

— Reply to this email directly or view it on GitHub.

— Reply to this email directly or view it on GitHub https://github.com/mattjj/pyhsmm/issues/5#issuecomment-15844028.

mattjj commented 11 years ago

Inti, I have something for you!

In [1]: import distributions as d

In [2]: a = d.NegativeBinomial(5*10.,1./5,3*5.,3*1.)

In [3]: data = a.rvs(10000)

In [4]: b = d.NegativeBinomial(5*10.,1./5,3*5.,3*1.)

In [5]: timeit b.resample_python(data)
1 loops, best of 3: 8.81 s per loop

In [6]: timeit b.resample(data)
1 loops, best of 3: 180 ms per loop

In [7]: 8.81/0.18
Out[7]: 48.94444444444445

I've only used negative binomials for duration distributions so far, so the code I wrote was painfully slow when handed lots of data. I pushed it down into C++ with weave so that this:

msum = 0.
for n in data:
    msum += (np.random.rand(n) < self.r/(np.arange(n)+self.r)).sum()

became this

msum = np.array(0.)
scipy.weave.inline(
        '''
        for (int i=0; i < N; i++) {
            for (int j=0; j < data[i]; j++) {
                *msum += ((float) rand()) / RAND_MAX < ((float) r)/(j+r);
            }
        }
        ''',
        ['N','data','r','msum'],
        extra_compile_args=['-O3'])

Did I ever mention I love weave?

I did a basic test of correctness with something like:

In [7]: plt.hist(data,normed=True)

In [8]: plot(np.arange(1,300,0.1), np.exp(b.log_likelihood(np.arange(1,300,0.1))))

I think this fix should help your bottleneck :)

inti commented 11 years ago

that indeed helps a lot!!!! bring overall time to half

inti commented 11 years ago

Additional timings after pulling your latest code.

without parallel = 65.06sec total

obsparams 55.4670538902 seconds transitions 3.58757400513 seconds pi_0 0.000196933746338 seconds states 6.00739693642 seconds

with parallel function v2 = 25.83sec total

obsparams 20.435475111 seconds transitions 3.58190894127 seconds pi_0 0.000174999237061 seconds states 1.81070494652 seconds

with parallel original function = 83.88sec total

obsparams 55.4512088299 seconds trans_distn 3.61683297157 seconds init_state_distn 0.000175952911377 seconds states 4.41074371338e-05 seconds _push_self_parallel 0.0397069454193 seconds _build_states_parallel 24.7669651508 seconds purge 0.00214815139771 seconds

So, all in all the original parallel implementation is the slowest one, even compared to running in a single core as opposed to 8 engines. below is the code for the version 2

def resample_model_parallel2(self):
        def resample_obs_distn(tup):
            obs_distn, data = tup
            obs_distn.resample(data)
            return obs_distn
        def resample_states(s):
            s.resample()
            return s
     # resample obsparams
        from IPython.parallel import Client
        lbview = Client().load_balanced_view()
        self.obs_distns = lbview.map_sync(resample_obs_distn,[(o,[s.data[s.stateseq == state]
                        for s in self.states_list]) for state,o in enumerate(self.obs_distns)])
        t0 = time.time()
        # resample transitions
        self.trans_distn.resample([s.stateseq for s in self.states_list])
        # resample pi_0
        self.init_state_distn.resample([s.stateseq[:1] for s in self.states_list])
        # resample states
        self.states_list = lbview.map_sync(resample_states, [ s for s in self.states_list ] )

I ave learnt lots with your code!!! really lots! still it would be great if you let me know if you think this code does what it is suppose to do.

This was run with 24 sequences of lengths 13109 13105 13038 9554 8828 8160 7884 7773 7461 5576 22491 5945 3494 3483 23800 19475 18756 17764 16730 15519 14282 11976 15089 2288

so the speed up may be more notorious for long sequences ... rather than to be expected in general?

perhaps more speed can be gained as you said before by using the data on the engines rather than moving the data around ... i'll investigate that possibility further.

Thanks a lot for your help!!!!

Into

mattjj commented 11 years ago

How big are the numbers in the data arrays? It seems like it time complexity shouldn't depend on that, but with those auxiliary variables m the number of iterations depends on the data values. If they're big it might be possible to do something smarter (that distribution doesn't have a name as far as I know but it's related to computing Stirling numbers of the second kind, which could be cached).

Matt

Sent from my phone

On Apr 3, 2013, at 3:40 PM, Inti Pedroso notifications@github.com wrote:

Additional timings after pulling your latest code.

without parallel = 65.06sec total

obsparams 55.4670538902 seconds transitions 3.58757400513 seconds pi_0 0.000196933746338 seconds states 6.00739693642 seconds

with parallel function v2 = 25.83sec total

obsparams 20.435475111 seconds transitions 3.58190894127 seconds pi_0 0.000174999237061 seconds states 1.81070494652 seconds

with parallel original function = 83.88sec total

obsparams 55.4512088299 seconds trans_distn 3.61683297157 seconds init_state_distn 0.000175952911377 seconds states 4.41074371338e-05 seconds _push_self_parallel 0.0397069454193 seconds _build_states_parallel 24.7669651508 seconds purge 0.00214815139771 seconds

So, all in all the original parallel implementation is the slowest one, even compared to running in a single core as opposed to 8 engines. below is the code for the version 2

def resample_model_parallel2(self): def resample_obs_distn(tup): obs_distn, data = tup obs_distn.resample(data) return obs_distn def resample_states(s): s.resample() return s

resample obsparams

    from IPython.parallel import Client
    lbview = Client().load_balanced_view()
    self.obs_distns = lbview.map_sync(resample_obs_distn,[(o,[s.data[s.stateseq == state]
                    for s in self.states_list]) for state,o in enumerate(self.obs_distns)])
    t0 = time.time()
    # resample transitions
    self.trans_distn.resample([s.stateseq for s in self.states_list])
    # resample pi_0
    self.init_state_distn.resample([s.stateseq[:1] for s in self.states_list])
    # resample states
    self.states_list = lbview.map_sync(resample_states, [ s for s in self.states_list ] )

I ave learnt lots with your code!!! really lots! still it would be great if you let me know if you think this code does what it is suppose to do.

This was run with 24 sequences of lengths 13109 13105 13038 9554 8828 8160 7884 7773 7461 5576 22491 5945 3494 3483 23800 19475 18756 17764 16730 15519 14282 11976 15089 2288

so the speed up may be more notorious for long sequences ... rather than to be expected in general?

perhaps more speed can be gained as you said before by using the data on the engines rather than moving the data around ... i'll investigate that possibility further.

Thanks a lot for your help!!!!

Into

— Reply to this email directly or view it on GitHub.

inti commented 11 years ago

Hi, attached is a histogram of the values, mean 5760 and stdv 1677. In another dataset it has a mean of 34 and stdv 41. So very variable. In general whenever the values are smaller it means there are more data points. for example on the first case there are 28298 and in the second 5717021 data points respectively. In the later case one iteration still takes ~30 seconds. Because I am interested in findings change points having larger values gives more power, right? I guess another possibility would be to back up and model it with a Gaussian on the log(count) space, which may not be ideal but perhaps does the trick of being fast enough.

i am not familiar with with Stirling numbers .... :( ... did I say I am a biochemist by training ... but after having a look on the web i think I know where you are coming from. The resampling scheme would need to be changed to use the Stirling approximation, isn't?

distribution

cheers, Inti

mattjj commented 11 years ago

I'm not really too familiar with them either :)

This sampling algorithm comes from some extremely clever work of Mingyuan Zhou. He has an ICML 2012 paper about it, but this is even more recent: http://arxiv.org/pdf/1209.3442v1.pdf

The algorithm is on Page 9, right after "The Gibbs sampling process proceeds as". Those CRT distributions are being sampled as in Lemma A.1 in the Appendix: each coin flip we're adding to msum is one of those b_n variables. Those s functions are Stirling numbers.

I only just glanced at that paper but I didn't see any way to get around the fact that the complexity depends on the counts. It'd be a pain if that's true...

inti commented 11 years ago

Hi Matt, Do you think it may be possible to have a resample_model_parallel2 function of type I sketched above, that is re-sampling in parallel the observations and the states. That seems to work faster on my application. I had some code to do it, pretty much as above but using parallel.dv instead of lbview, but I noticed you re-structured a bit that part of the code and it does not work any more. One of the things I want to test is when each strategy works better. Thanks a lot in advance. Inti

mattjj commented 11 years ago

Yes that sounds like a good idea, but I don't have time at the moment to implement it! I did clean up the code a bit but the fundamental structure is still the same, so it might not take much to fix your method.

inti commented 11 years ago

Hi, yes. it may not take long. perhaps the best way is simply to pass an argument to each of the resampling functions instead of re-writing them, so one could do


self.resample_obs_distns(parallel=True)

and then


def resample_obs_distns(self,parallel=False):
         if parallel == False:
     for state, distn in enumerate(self.obs_distns):
     distn.resample([s.data[s.stateseq == state] for s in self.states_list])
         else:
             # DO PARALLEL THING
self._clear_caches()

then resample_model_parallele2 would simply call the resampling functions with the parallel=True option where it is needed. I guess this would duplicate a bit of code but it should be fairly clear that the same is happening except for the parallel part which can be see within each function.

Does it sound like a good plan?

I can have a go with the code and then send it to you for approval, it should not be very different to what it was before.

On 16/04/13 13:20, Matthew Johnson wrote:

Yes that sounds like a good idea, but I don't have time at the moment to implement it! I did clean up the code a bit but the fundamental structure is still the same, so it might not take much to fix your method.

— Reply to this email directly or view it on GitHub https://github.com/mattjj/pyhsmm/issues/5#issuecomment-16441308.

mattjj commented 11 years ago

That sounds like it could work very well. I think having specific parallel methods like resample_obs_distns_parallel() may be slightly cleaner than having an if/else in each resample method. Some code duplication in this case may be a good thing for clarity and modularity.

It's possible to abstract the parallelism by making a fake 'view' object, like in https://github.com/mattjj/dirichlet-truncated-multinomial/blob/master/parallel.py. Then all loops or maps that might be parallelized can use parallel.dv, and that may or may not be a real dv object (depending on whether parallel.go_parallel has been called). But I think that tiny bit of abstraction isn't worth the confusion and indirection it might yield, so I think it's better to go with the idea of having separate methods or arguments.

I look forward to the possibility of merging a pull request from you! I've never done it before :)

mattjj commented 10 years ago

The code has changed a lot since these issues were active, so I'm closing them.