jmschrei / pomegranate

Fast, flexible and easy to use probabilistic modelling in Python.
http://pomegranate.readthedocs.org/en/latest/
MIT License
3.38k stars 590 forks source link

Distributing training (or running model.summarize in parallel #702

Closed ndolev closed 1 year ago

ndolev commented 4 years ago

I have a very large data set (200M x 60K) and I'd like to train a Bayesian network from samples.

If I break the data into chunks and run sequentially model.summarize(chunk), I can not complete the training in a reasonable amount of time.

Ideally, I'd like to distribute the training so that I can summarize on each chunk at the same time and then combine the models.

model1 = model1.summarize(chunk1)
.
.
.
modelN = modelN.summarize(chunkN)

And then "from_summaries" over the N models.

Is there a trivial way to do this that I am missing?

Thanks!

ndolev commented 4 years ago

Also, if allow the user to pass a non-default "backend" parameter to joblib then it would be possible to choose "dask" instead of "threading" to distribute the computation. This still won't help for combining models...

jmschrei commented 4 years ago

Howdy

There currently is no built-in way to do distributed training. If I allow an option for users to pass in their own backends, will that fix the issue? Obviously users will have to set up dask on their own. My recollection is that, at the time I coded it, I thought there might be more complex changes I'd need to make on my end and so chose to only use multi-threading.

There is a workaround to this, though. On each machine you can load up the model and summarize a batch of data. These statistics are stored in model.summaries as well as distribution.summaries for each of the distributions being stored in the model. These summaries are additive, meaning that if you calculate them for one batch of data and add them to those calculated for the second batch of data you will get the same statistics as if you had calculated them on both batches of data together. So, if you sum these statistics up over all the copies of the model (for each distribution or model object appropriately) then you can call from_summaries on that and get an update over the entire model.

Let me know if you have any other questions about that

ndolev commented 4 years ago

Hi,

Thanks for your awesome reply. Before you close the issue, I'll post the code for distributing using the method you describe so others can benefit.

Regarding passing a custom backend - I definitely recommend it. Dask is very simple to use and should be transparent to your code. I am willing to test it for you on a separate branch if you want.

Basically, it means you can run all of the multiprocessing on a compute cluster. It potentially opens up the whole package to really large datasets (that fit in memory or someone can work out some streaming or memory map solution). By proxy, this could help for all of these guys: #663 , #627, #603 , #605 , #560 .

Cheers, Noah

ndolev commented 4 years ago

Hi mate,

So here is a simple implementation using PySpark to distribute:

df_bayes = sqlContext.read.parquet("s3://...<some-parquet-file-with-observations as SparseVectors>") \
                     .repartition(5000).cache()  #the repartition size will set the number of samples per model
def trainmod(part):
    try:
        X = np.stack(part['features'].apply(lambda x : x.toArray()).values)
        model = BN.from_samples(X, algorithm='chow-liu',n_jobs = -1) 
    except Exception as e:
        model = repr(e)
    return(model)

columns = ['id','features']
results = df_bayes.rdd.mapPartitions(lambda iter: [trainmod(pd.DataFrame(list(iter), columns=columns))]).collect()

I have two questions.

(1) Is there anyway to avoid sending all the features in each chunk? If we have many more features than samples in each block, it'd save a lot of RAM to do something like this:

X = X[:,np.sum(X,0)!=0]

In other words, if we have 200M observations and 50K features (nodes) but we chunk 5Kx50K at a time, it's a shame to send empty columns. But I think this is necessary for summarize to work properly. WDYT?

(2) Is it possible to feed from_samples a sparse matrix (like csr_matrix from scipy)? Or even to send samples like this:

sample ---- features
      0    ----- [A,B,C]
      1    ----- [C,DD,FFG,GGR]

This would be much more efficient than feeding:

sample ---- A B C D ....DD.....FFG....GGR
 0    ----- 1 1 1 0 ....00...

Thanks again for this amazing package!

ndolev commented 4 years ago

Oh and you can avoid collecting a list of large models to memory by doing something like this:

def trainmod(part):
    try:
        X = np.stack(part['features'].apply(lambda x : x.toArray()).values)
        model = BN.from_samples(X, algorithm='chow-liu',n_jobs = -1) 
        now = datetime.now()
        time_string = now.strftime("%H-%M-%S") 
        model.to_json("s3://...bnmodel/bnmodel_"+time_string+'.json')
        return(True)
    except Exception as e:
        return(repr(e))
jmschrei commented 4 years ago

I think I was a little bit confused; you can update the parameters of a pre-defined structure using the strategy I outlined but you can't run from_samples on each partition and then aggregate them. The from_samples method involves searching over a large number of potential structures and scoring them, returning the best structure (+ parameters) on that data set. If you do this on multiple partitions you will get different structures which are incongruent. The summarize method will, for a given structure, get the counts of each relevant feature combo in the data set. These counts are additive and so can be done in parallel.

If you have a massive data set and want to learn the structure using all your data I would suggest first learning the structure on a subset of the data and then refining the parameters using the strategy I outlined. You might not get the best structure, but you will get a good structure and the best parameters given that structure.

An alternate method for partitioning the data is to do each component independently. Each distribution should only be over a subset of the variables. If you're learning a tree (chow-liu) each distribution should involve either one or two variables. Thus, you can just slice the respective variables out for each component (~2M by 2), update the distribution, and move on to the next one. This is called model-parallel, whereas my first solution was data-parallel.

ndolev commented 4 years ago

Hi,

Thanks for taking the time to respond.

If you have a massive data set and want to learn the structure using all your data I would suggest first learning the structure on a subset of the data and then refining the parameters using the strategy I outlined. You might not get the best structure, but you will get a good structure and the best parameters given that structure.

This I understand. I can take a subset, run from_samples and then every additional call will be summarize. Of course, the best thing would be to take multiple subsets and then perhaps search for the most likely structure given the samples. This would greatly benefit from being able to pass a custom backend to joblib by the way.

I don't quite follow option 2.

An alternate method for partitioning the data is to do each component independently. Each distribution should only be over a subset of the variables. If you're learning a tree (chow-liu) each distribution should involve either one or two variables. Thus, you can just slice the respective variables out for each component (~2M by 2), update the distribution, and move on to the next one. This is called model-parallel, whereas my first solution was data-parallel.

You mean:

Given

sample ---- A B C D ....DD.....FFG....GGR
 0    ----- 1 1 1 0 ....00...
 .
 .
 .
 N

You'd do:

First iteration

sample ---- A B 
 0    ----- 1 1 
 .
 .
 N    ----- 1 1

Second iteration

sample ---- B C 
 0    ----- 1 1 
 .
 .
 N    ----- 1 1

etc

And then how would you update the distributions?

Thanks again.

At the end of this thread, I'll make sure to post sample code for data-parallel and model-parallel options.

ndolev commented 4 years ago

Hi Jacob,

First off, I know Washington state is being hit hard so I hope you are doing well in spite of COVID-19.

I made a first pass at distributing the Chow-Liu structure learning over Spark.

First off, we have a dataset that looks like this:

id  variable count
11 name1 134
11 name2 17
12 name2 134
 . 
 .
 . 
N

The count reflects the number of samples this variable appeared.

Then we take a cartesian product (self join) on ids to get the marginals:

matrix = matrix.alias('p').join(matrix.alias('c'),(col('p.id') == col('c.id'))).select(
    col("p.variable").alias("parentID"), 
    col("p.count").alias("parentCount"),
    (col("p.count") / totdocs).alias("marginal_p"),
    col("c.variable").alias("childID"), 
    col("c.count").alias("childCount"),
    (col("c.count") / totdocs).alias("marginal_c"),
    col("p.docId").alias("docId")).dropDuplicates(['parentID','childID','docId'])

Now we calculate the mutual information:

matrix = matrix.filter(col('parentId')!=col('childId')) #get rid of self-loops

window = Window.partitionBy(["parentId", "childID"])
matrix = matrix.withColumn('marginal_pc', F.count(col("id")).over(window)/totdocs)
matrix = matrix.withColumn('mutual_i', col('marginal_pc') * (F.log('marginal_pc') - F.log('marginal_c') - F.log('marginal_p')))

Now the joint:

window = Window.partitionBy(["parentId", "childID"])  #the intersection
matrix = matrix.withColumn('marginal_pc', F.count(col("docId")).over(window)/totdocs)

matrix = matrix.withColumn('mutual_i', col('marginal_pc') * (F.log('marginal_pc') - F.log('marginal_c') - F.log('marginal_p')))

matrix = matrix.select('parentId','childId', "mutual_i").groupby('parentId','childId').agg((-1*F.sum('mutual_i')).alias('mutual_i'))

Now that we have the mutual information for every child-parent pair, we can create trees simultaneously (since the join is over ids, parent-child pairs are present when they appear together in a sample, thus we get the minimum spanning tree in one shot):

window = Window.partitionBy('parentId')
matrix = matrix.withColumn('ismin', col('mutual_i') == F.min('mutual_i').over(window))

And then collect them together:

matrix = matrix.filter(col('ismin') == True).select('parentId', 'childId').groupby('parentId').agg(F.collect_list('childId').alias('childrenIds')).orderBy('parentId')

To get the tuple structure Pomegranate is expecting:

mst = matrix.rdd.map(tuple) 
mst_tuple = mst.collect()
mst_tuple = list(mst_tuple)
ids = dictionary.select('id').toPandas()
ids = list(ids['id'].to_numpy())
structure = [[] for i in ids]
for index,s in enumerate(mst_tuple):
    structure[s[0]] = s[1]
structure = tuple(tuple(x) for x in structure)

This runs on 200M (samples) x 60K (variables) in a couple minutes and the outputted structure looks like this:

((11974,), (), (), (), (), (), (), (41843,), (), ().....

What do you think?

jmschrei commented 4 years ago

I'm not sure I understand the data set. It looks like you have counts for the number of times each element occurs, but how do you get the counts for the number of times that each pair of elements occur together? The minimum spanning tree should result in each variable having a parent. Why do you think there are some empty parent sets there?

jmschrei commented 4 years ago

I remembered why I didn't let users pass in their own backends. Cython objects can't be pickled, which I think prevents dask from working. If you can show me an example of how to modify the code and that the changes work with dask I'd be happy to include them, but I wasn't sure how to do it myself.

ndolev commented 4 years ago

Hey mate,

I'm not sure I understand the data set. It looks like you have counts for the number of times each element occurs, but how do you get the counts for the number of times that each pair of elements occur together?

The answer is this piece of code which runs on the matrix joined to itself from the proceeding lines:

matrix = matrix.withColumn('marginal_pc', F.count(col("id")).over(window)/totdocs)

This counts how many times the parent-child pair appear together.

The minimum spanning tree should result in each variable having a parent. Why do you think there are some empty parent sets there?

This occurs because some variables are orphans (e.g., they are not connected to any other node). This also implies that they are not relevant for inference of course.

I remembered why I didn't let users pass in their own backends. Cython objects can't be pickled, which I think prevents dask from working. If you can show me an example of how to modify the code and that the changes work with dask I'd be happy to include them, but I wasn't sure how to do it myself.

With regard to cython under a dask (or spark) backend to Joblib this was resolved already (see https://github.com/dask/dask/issues/3276). But I'd also argue that it's out of scope. Meaning, if someone opens an issue complaining that passing a particular backend doesn't work, you can always tag it as a problem with the backend rather than with Pomegranate.

I really think there is a good opportunity here to make pomegranate work with huge datasets by incorporating some simple spark code. I currently see some inconsistencies with your Chow-Liu code and my output on a toy test set. I'll post that as a separate issue (#708 ).

jmschrei commented 4 years ago

The Chow-Liu algorithms gives a minimal spanning tree. The "spanning" part means that it covers all the variables and so all variables (except the root) should have a parent. It is possible to modify this algorithm to require a certain threshold to connect two variables but the algorithm, as originally proposed, does not do this.

With regard to cython under a dask (or spark) backend to Joblib this was resolved already (see dask/dask#3276). But I'd also argue that it's out of scope. Meaning, if someone opens an issue complaining that passing a particular backend doesn't work, you can always tag it as a problem with the backend rather than with Pomegranate.

My concern is that I don't think that any backends other than multithreading would work with pomegranate's data model. Specifically, pomegranate currently assumes shared memory access to a data set and the stored internal statistics. If you're willing to investigate this for me by seeing what needs to be done to support other backends I'd be happy to merge a PR.

I really think there is a good opportunity here to make pomegranate work with huge datasets by incorporating some simple spark code. I currently see some inconsistencies with your Chow-Liu code and my output on a toy test set. I'll post that as a separate issue (#708 ).

I don't think it's a good idea to include spark code in pomegranate. First, I don't know spark well. Second, I think keeping the scope of pomegranate limited in the same manner that sklearn is makes it more intuitive to use. It might be a good idea for me to include more examples of how to do distributed training of models using the summarize and from_summaries method, though.

ndolev commented 4 years ago

Sure, I can give you a "tutorial" notebook which uses Spark to create a very large Bayesian network with Pomegranate once we finish working out the kinks.

jassinm commented 4 years ago

I would be very interested in how to distributed training in pomegranate using summarize and from_summarize with other backends. I tried this with spark for HMM baum-welch:

s1 = State(DiscreteDistribution({'A': 0.1, 'B': 0.7, 'C': 0.2}))
s2 = State(DiscreteDistribution({'A': 0.7, 'B': 0.1, 'C':0.2}))

model = HiddenMarkovModel()
model.add_states(s1, s2)
model.add_transition(model.start, s1, 0.8)
model.add_transition(model.start, s2, 0.2)
model.add_transition(s1, s1, 0.3)
model.add_transition(s1, s2, 0.7)
model.add_transition(s2, s1, 0.4)
model.add_transition(s2, s2, 0.6)
model.add_transition(s2, model.end, 0.30)
model.bake()

X = model.sample(100000, length=20, random_state=0)

model1 = model.copy()
bmodel_cast = sc.broadcast(model1)

def trainmodel(part):
    data = list(part)
    yield bmodel_cast.value.summarize(data, algorithm='baum-welch', check_input=True)

for i in range(10):
    print(sum(sc.parallelize(X).mapPartitions(lambda x: trainmodel(x)).collect()))
     bmodel_cast.value.from_summaries()

Unfortunately in always outputs the same value in contrast to joblib:

from joblib import Parallel, delayed

def generate_batches(data, batch_size):
        start, end = 0, batch_size
        while start < len(data):
            yield data[start:end]
            start += batch_size
            end += batch_size

model1 = model.copy()         
for i in range(10):
    with Parallel(n_jobs=12, backend='threading') as parallel:
        f = delayed(model1.summarize,  check_pickle=False)
        print(sum(parallel(f(batch) for batch in generate_batches(X, 1000))))
        model1.from_summaries()

the model.from_summaries() doesn't work in spark probably because it's shared in a threaded env in joblib. I think I will need to access expected_transitions and distributions of each of the models (which are not public atm) to peform additive operations.

dolevamir commented 2 years ago

Sure, I can give you a "tutorial" notebook which uses Spark to create a very large Bayesian network with Pomegranate once we finish working out the kinks.

@ndolev I'd be very interested in such a notebook :)

jmschrei commented 1 year ago

Thank you for opening an issue. pomegranate has recently been rewritten from the ground up to use PyTorch instead of Cython (v1.0.0), and so all issues are being closed as they are likely out of date. Please re-open or start a new issue if a related issue is still present in the new codebase.