chartbeat-labs / textacy

NLP, before and after spaCy
https://textacy.readthedocs.io
Other
2.21k stars 249 forks source link

multithreading for Corpus creation #277

Closed Motorrat closed 4 years ago

Motorrat commented 5 years ago

context

I am trying to speed up the Corpus creation via parallel processing. The source is a dictionary of text strings and so the code looks roughly like this:

corpus = textacy.corpus.Corpus(lang=model)

def add_doc(key,value):
    corpus.add(tuple([value, {'key':key}],))

Parallel( n_jobs=-1, prefer="threads", verbose=5)(
    delayed(add_doc)(key,value) for key,value in texts_dict.items())

Currently it seems there is some bottleneck that prevents this from utilizing all processing power available. With a certain dataset I get roughly 3000 texts added per minute regardless from how many jobs are running: 1, 4, or 8 (threads). I have tried it on a 2 core laptop and on AWS 2xtralarge instance (4core/8threads) with the roughly same throughput on both.

A dataset of 3mln tokens takes roughly 5 minutes to get loaded into a Corpus. I wonder if this is really bound by the processing power or if there is some bottleneck that could be removed.

I have tried a workaround and implemented the same with preprocessing the texts into spacy docs:

corpus = textacy.corpus.Corpus(lang=model)

spacy_docs=[]
def add_spacy_doc(key,value):
    spacy_docs.append(model(value))

Parallel( n_jobs=-1, prefer="threads", verbose=5)(
    delayed(add_spacy_doc)(key,value) for key,value in texts_dict.items())

corpus.add_docs(spacy_docs)

Unfortunately there was no speed up so I guess I would need to open an issue against spacy and see if something could be done there

proposed solution

Add parallel processing option to textacy.corpus.Corpus(data,lang) and change it's signature to textacy.corpus.Corpus(lang,data,n_jobs=1)

alternative solutions?

Share the above doc snippets as the examples in the documentation.

Motorrat commented 5 years ago

Here I found an example on how that should work with spacy, will explore and report https://github.com/explosion/spaCy/blob/master/examples/pipeline/multi_processing.py

Motorrat commented 4 years ago

So here is an example of running the corpus creation in multiple processes vs a single process. Unfortunately the multiprocessing version is even slower with 04:23 minutes for Corpus(12000 docs, 1776000 tokens) vs. 03:29 for single process. (2 cores 4 threads laptop)

Is there something that could be done to actually profit from multiprocessing with textacy?

from multiprocessing.managers import BaseManager
from joblib import Parallel, delayed
from datetime import datetime

import spacy
import textacy
from spacy.util import minibatch

text_string='''multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.

The multiprocessing module also introduces APIs which do not have analogs in the threading module. A prime example of this is the Pool object which offers a convenient means of parallelizing the execution of a function across multiple input values, distributing the input data across processes (data parallelism). The following example demonstrates the common practice of defining such functions in a module so that child processes can successfully import that module.'''

# let's create a list of strings with all uniq tokens, so the vocab will also be large
texts=[]
for i in range(12000):
    texts.extend([' '.join(str(i)+word for word in text_string.split()),])

print(texts[1])

print('number of docs',len(texts))

batch_size=1000
partitions = minibatch(texts, size=batch_size)

model = spacy.load('en_core_web_lg')

class PoolCorpus(object):

   def __init__(self):
       self.corpus = textacy.corpus.Corpus(lang=model)

   def add_texts(self,data):
       self.corpus.add_texts(data,batch_size)

   def get_corpus(self):
       return self.corpus

BaseManager.register('PoolCorpus', PoolCorpus)

start = datetime.now()
print('started multiprocessing', start.strftime('%Y-%m-%d %H:%M:%S'))

with BaseManager() as manager:
    corpus_class = manager.PoolCorpus()

    Parallel(n_jobs=-1, backend="multiprocessing", prefer="processes", verbose=5)(
             delayed(corpus_class.add_texts)(part) for part in partitions)

    corpus=corpus_class.get_corpus()

print(corpus)

finish = datetime.now()
print('finished multiprocessing', finish.strftime('%Y-%m-%d %H:%M:%S'))
print('##### multiprocessing time',finish - start)

del corpus_class
del corpus
del model

# Now the same thing using the batch add_texts and no multiprocessing
model = spacy.load('en_core_web_lg')

start = datetime.now()
print('started sequential', start.strftime('%Y-%m-%d %H:%M:%S'))

corpus = textacy.corpus.Corpus(lang=model)
corpus.add_texts(texts, batch_size)

print(corpus)

finish = datetime.now()
print('finished sequential', finish.strftime('%Y-%m-%d %H:%M:%S'))
print('#### sequential time',finish - start)`
bdewilde commented 4 years ago

Hi @Motorrat , thanks for the detailed posts, and apologies for the belated reply. SpaCy recently (re-)implemented multiprocessing in its core nlp.pipe() functionality: https://github.com/explosion/spaCy/releases/tag/v2.2.2

I'm going to do some work to make textacy compatible with v2.2, and that will likely include parallel processing of documents. Will keep this issue open until I've released a fix.

bdewilde commented 4 years ago

Heads-up: https://github.com/chartbeat-labs/textacy/pull/285

bdewilde commented 4 years ago

The update is now available in a release: https://github.com/chartbeat-labs/textacy/releases/tag/0.10.0