stanfordnlp / stanza

Stanford NLP Python library for tokenization, sentence segmentation, NER, and parsing of many human languages
https://stanfordnlp.github.io/stanza/
Other
7.22k stars 887 forks source link

Better batch processing #550

Closed johann-petrak closed 1 year ago

johann-petrak commented 3 years ago

There is currently a recommendation to concatenate documents to improve speed but there is almost not information about how to determine the optimal size of the input depending on the config settings for the processor, e.g. max sequence length and batchsize.

Ideally the way of how to combine individual documents to optimally create a batch given the config parameters should be determined by the pipeline.

This could be done by adding a streaming mode, where the method for processing consumes a stream of texts and yields a stream of stanza documents. So, instead of actively passing a document and getting back a document, one would pass something that yields the documents to process and would consume the results yielded, e.g.

def docsource(infile):
    with open(infile,"rt") as infp:
        for line in infp:
            doc = json.loads(line)["text"]
            yield doc

for doc in stanzapipeline.pipe(docsource, otherargs=couldbeset):
    # do something with the stanza result doc

the pipe method could internally accumulate or split input texts as need to run them through the network so that the batch is ideally used, then split or merge the result to create individual documents and yield those.

This way the ad hoc-method of manually doing this before invoking the stanza pipeline would be replaced by something that should know much better how to do this properly and would avoid the problems of using some weird pattern to separate documents which may actually occur in the document.

BTW, a more detailed explanation of the batch size and other parameters for the various processors and how they influence performance, relate to document size to process and how they should be adapted to each other among the different processors would be great to have in the documentation!

AngledLuffa commented 3 years ago

Yah, we've been putting some thought into how to make the pipeline more efficient. My initial thoughts are to rewrite the data processing as a subclass of torch.Dataset (exact path may vary), which would leverage torch's ability to load data via CPU while doing the GPU processing in parallel. However, I believe that would be more of an advantage at train time than run time, although it should also help run time.

The biggest issue is that there is a bit of a time crunch - j38's been working on other projects, and I've been swamped with rebuilding models for UD 2.7, trying to flatten to curve on user issues, and working on my other projects. Once the next release is done, I'll have more time to devote to the engineering debt. Of course, if you have a pull request which addresses the issue, we'll be happy to integrate it.

qipeng commented 3 years ago

Not quite streaming, but we recently made a change to allow batched processing that respects Document boundaries (#577)

johann-petrak commented 3 years ago

TBH I find it a pity that this patch missed the chance to implement this in a way where the input is any iterable (instead of a list) and the method bulk_process would yield documents instead of returning a list.

An architecture where each step is capable of going over iterables and yielding the results can create pipelines where each generator is the iterable for the next step. But each step would then be able to decide on whatever way to group into minibatches, distribute among processes etc would be most efficient for processing.

I did not really look into the details of how stanza implements each of the processing steps but I do not see any principal problem with adding this kind of stream processing (which would have to also bubble up to the API so that stream processing would not return a result but a generator that produces the result).

Even if for now the actual implementation would still much the same as it is now, the API would not have to chance if a much more efficient streaming implementation is added later.

qipeng commented 3 years ago

@johann-petrak I agree -- this is a first step towards streaming, but is definitely not quite there yet. We were trying to add complexity one step at a time, and with the current (list-based) bulk processing interface we are planning to experiment with cross-document batching/data feeding in the near future.

Streaming is a bit more complex, though, since the main design of the current pipeline is still strictly staged, which prevents it from spitting out processing results for some docs before others are fully processed. Besides there is the issue of figuring out what is the sweet spot for dynamic batching, which I suspect further development on the current bulk processing interface will help us figure out best practices for. What you're suggesting is definitely still something we aspire to achieve at some point.

Endalokin commented 3 years ago

I'd just like to support this issue as I often experience that I either send small pieces of texts having slow performance or large texts running into CUDA memory errors. Then I iteratively reduce batch_size as far that the performance is worse then before, sending the small text bites. I find it hard to judge what is the best way to process my file and if I made any errors or if stanza is just not suitable for my kind of use case.

My regular example file contains 30 articles with ~17.400 words in total. A larger example file contains 1799 articles with ~660.000 words in total - still being an example file. The latter takes about 40 minutes if not a CUDA launch failure emerges.

AngledLuffa commented 3 years ago

Would you send us the example file? I'll see what I can do with it.

Endalokin commented 3 years ago

Unfortunately, as with so many, the articles are still under copyright so I can't provide them publicly.

AngledLuffa commented 3 years ago

Hmm. If you can send it to me privately, I'll delete it once the debugging is done.

apmoore1 commented 3 years ago

Hi, I have created this batch utility around Stanza that might solve some of the problem raised here:

https://github.com/apmoore1/stanza-batch

It allows you to give it an Iterable of texts and it returns an Iterable of Stanza Documents, an example is shown below:

from typing import List
import stanza
from stanza.models.common.doc import Document
from stanza_batch import batch

# Documents to process
document_1 = 'Hello how are you\n\nI am good thank you'
document_2 = 'This is a different document'

# Download Stanza English model
stanza.download("en")

# stanza model
nlp = stanza.Pipeline(lang="en", processors="tokenize",)

# Process Documents
stanza_documents: List[Document] = []
for document in batch([document_1, document_2], nlp, batch_size=32): # Default batch size is 32
    stanza_documents.append(document)

assert len(stanza_documents) == 2 # 2 Documents after processing
# Each document contains the same raw text after processing
assert stanza_documents[0].text == document_1 
assert stanza_documents[1].text == document_2
# Each document contains the expected number of sentences
assert len(stanza_documents[0].sentences) == 2
assert len(stanza_documents[1].sentences) == 1

This wrapper is compatible with Stanza versions 1.1.1 and 1.2.0. If it is of interest I could add this to the Stanza package or just maintain this wrapper. I have tried to make the wrapper work with older versions of Stanza, but I found that it will require some code re-factoring.

AngledLuffa commented 3 years ago

I believe we have a pull request of our own which should address this shortly:

https://github.com/stanfordnlp/stanza/pull/611

If there's something we're not covering, please feel free to make an additional pull request.

It's possible to pip install directly from our dev branch once this is merged, btw. Let us know if you need a hand with that.

johann-petrak commented 3 years ago

Is there any information about how to use this properly and what the performance impact is?

AngledLuffa commented 3 years ago

If you use the bulkprocess feature in the dev branch, it will be several times faster for tokenizing small text. Here is an example. Calling it with pre-created documents triggers the bulk_process operation automatically.

import stanza from stanza.models.common.doc import Document

nlp = stanza.Pipeline("en", processors="tokenize")

or just print something playable in lantern in MH2 please

docs = ["Unban mox opal"] * 100000 results = nlp([Document([], text=doccontent) for doccontent in docs])

johann-petrak commented 3 years ago

What I do not understand is if Stanza will now internally choose the right number of documents from that list for each internal batch so that throughput gets maximised? What about the batch sizes for downstream processors in the pipeline? So how would I decide on how many documents to put into that list, depending on the size of those documents to optomize throughput?

AngledLuffa commented 3 years ago

It'll batch as many as it can, up to the batch size. There is possibly some improvement to be made in this mechanism or in the later processors, but so far, not implemented. You can try a few different batch sizes based on how long your average document is, but the result should already be a lot faster using the new mechanism.

johann-petrak commented 3 years ago

I wanted the test this with the c3608dc2dec38268a6d9bba033764614f7736df2 from dev, but maybe I am doing something wrong, I am getting the following exception:

Traceback (most recent call last):
  File "/home/johann/.conda/envs/femdwell-stanza1/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "./debug-stanzabatch-mp.py", line 33, in runner
    docs = pipeline(batch)
  File "/home/johann/misc-git/stanza/stanza/pipeline/core.py", line 212, in __call__
    doc = self.process(doc)
  File "/home/johann/misc-git/stanza/stanza/pipeline/core.py", line 206, in process
    doc = process(doc)
  File "/home/johann/misc-git/stanza/stanza/pipeline/tokenize_processor.py", line 131, in bulk_process
    token.words[0]._misc = token._misc
IndexError: list index out of range
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "./debug-stanzabatch-mp.py", line 64, in <module>
    run_pool(n=args.n, nmax=args.max, gpu=args.gpu, batchsize=args.batch)
  File "./debug-stanzabatch-mp.py", line 47, in run_pool
    results = [ret.get() for ret in result_futures]
  File "./debug-stanzabatch-mp.py", line 47, in <listcomp>
    results = [ret.get() for ret in result_futures]
  File "/home/johann/.conda/envs/femdwell-stanza1/lib/python3.6/multiprocessing/pool.py", line 644, in get
    raise self._value
IndexError: list index out of range

If I am doing something wrong, maybe there should be a better error message?

AngledLuffa commented 3 years ago

The exact code I sent earlier, or some other input?

johann-petrak commented 3 years ago

The exact code you sent (but with a German model) works just fine. It seems to happen for a specific input text. This does not happen in the released version and it does not happen when running the text directly though the pipeline, but it happens with the dev version mentioned above.

Code:

import stanza
from stanza.models.common.doc import Document

txt = """ Ah, eine frische Portion von Qualitätswissenschaft..  Die Zeit eine Einheit des Vergleichs vom Verlauf eines physischen Prozesses, in der Nähe von massereichen Körpern verläuft ein Prozess, auf Grund der Anspannung des Gravitaionsfeldes anders, als neben weniger massereichen Objekten. Wie kann diese Tatsache auch nur irgendeinen Effekt auf den Raum ausüben? Welche Eigenschaft des Raums soll sich dabei ändern? Genauso wenig wie auch die Temperatur eines Körpers - nämlich gar keine.  In realitätsfremnden Modellen soll sich ja dadurch etwas biegen. Dem realen Raum ist es absolut egal. Er bleibt "gerade", unäbhingig davon mit welcher Geschwindigkeit die Materie in ihm (im Raum) wechselwirkt.
"""

nlp = stanza.Pipeline("de", processors="tokenize")

# this works
nlp(txt)

# this does not
nlp([Document([], text=txt)])
AngledLuffa commented 3 years ago

Thanks for the example - very clear. This should be fixed as of

https://github.com/stanfordnlp/stanza/commit/7f1b1474b64be38e9493397ac4eb9c20320c2c21

johann-petrak commented 3 years ago

Thank you! I can confirm that this bug does not occur any more. However I am now getting a different exception (which does not occur with the released version using the same code and data). This happens when the pipeline contains all processors, not just the tokenizer. You can try it by running to code above, without restricting the pipeline to the tokenizer.

  File "i550indexerror.py", line 14, in <module>
    nlp([Document([], text=txt)])
  File "/home/johann/misc-git/stanza/stanza/pipeline/core.py", line 212, in __call__
    doc = self.process(doc)
  File "/home/johann/misc-git/stanza/stanza/pipeline/core.py", line 206, in process
    doc = process(doc)
  File "/home/johann/misc-git/stanza/stanza/pipeline/processor.py", line 224, in bulk_process
    self.process(combined_doc) # annotations are attached to sentence objects
  File "/home/johann/misc-git/stanza/stanza/pipeline/pos_processor.py", line 35, in process
    batch.doc.set([doc.UPOS, doc.XPOS, doc.FEATS], [y for x in preds for y in x])
  File "/home/johann/misc-git/stanza/stanza/models/common/doc.py", line 222, in set
    "Contents must have the same length as the original file."
AssertionError: Contents must have the same length as the original file.
AngledLuffa commented 3 years ago

MWT processor itself also needed a little fiddling with the bulk process mechanism. This should also be fixed now - I tested it end to end this time, not just on the tokenizer.

AngledLuffa commented 3 years ago

Thanks for being our lab rat! I'm trying to be nicer to you than I was to the actual rat I just found in my garage...

johann-petrak commented 3 years ago

Thanks, works fine now! I have noticed a speed-up of between double and 4 times so far, depending on CPU/GPU and number of processes.

AngledLuffa commented 3 years ago

I think we just updated the MWT processor in dev branch so that it uses the bulk_process ability more effective. It should cut about 10% off the runtime for any language which uses it.

johann-petrak commented 3 years ago

Thanks for being our lab rat! I'm trying to be nicer to you than I was to the actual rat I just found in my garage...

As long as the rat gets fed regularly and often with tested stable releases, for example one that includes that batching option, there should be no problem.

johann-petrak commented 3 years ago

I think this issue can get closed now.

I would propose though to open a separate issue for actual proper streaming (both internally and providing an API) where the correct batching for all components is performed automatically when processing a stream of documents, yielding documents that have been processed.

This would be an improvement over the current situation where a list (an "external batch") has to get constructed blindly passed, then some different batching occurs internally, and a list has to get constructed and passed back.

My suggestion would be to implement a proper streaming API by providing a method stanza.Pipeline.pipe(iterable, **kwargs) which yields result documents. Keyword arguments could influence any details of how internal batching, re-batching, multiprocessing or utilization of GPU(s) happens.

ma-ji commented 3 years ago

If you use the bulkprocess feature in the dev branch, it will be several times faster for tokenizing small text. Here is an example. Calling it with pre-created documents triggers the bulk_process operation automatically. import stanza from stanza.models.common.doc import Document nlp = stanza.Pipeline("en", processors="tokenize") # or just print something playable in lantern in MH2 please docs = ["Unban mox opal"] * 100000 results = nlp([Document([], text=doccontent) for doccontent in docs])

@AngledLuffa Hi, what's the difference between passing a list of strings to nlp and wrapping the strings as docs first? In my code, I'm directly passing a list of strings to nlp and it works fine. Is the wrapping faster?

I tested with the same documents using the dev branch, and the improvement seems marginal, if not none.

AngledLuffa commented 3 years ago

Wrapping the strings as docs tells the pipeline to treat them as separate docs, but process them together in batches when possible. It's a lot faster than making many calls on different texts. Passing in a list of strings makes it think that these are already split into sentences, so this isn't appropriate for text where you might want them split into separate sentences.

On Sat, Apr 10, 2021 at 9:03 PM Ji Ma @.***> wrote:

If you use the bulkprocess feature in the dev branch, it will be several times faster for tokenizing small text. Here is an example. Calling it with pre-created documents triggers the bulk_process operation automatically. import stanza from stanza.models.common.doc import Document nlp = stanza.Pipeline("en", processors="tokenize") # or just print something playable in lantern in MH2 please docs = ["Unban mox opal"] * 100000 results = nlp([Document([], text=doccontent) for doccontent in docs])

@AngledLuffa https://github.com/AngledLuffa Hi, what's the difference between passing a list of strings to nlp and wrapping the strings as docs first? In my code, I'm directly passing a list of strings to nlp and it works fine. Is the wrapping faster?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/stanfordnlp/stanza/issues/550#issuecomment-817243451, or unsubscribe https://github.com/notifications/unsubscribe-auth/AA2AYWOLJEKVTZ2YQRMDVB3TIENQBANCNFSM4UOV324A .

liutianling commented 3 years ago

@ma-ji The speed you test is on GPU or CPU? Thanks.

ma-ji commented 3 years ago

@liutianling on GPU.

liutianling commented 3 years ago

@ma-ji Thanks, have you test the speed on CPU, I test it. And found it really slow. I test three tasks with tokenize,pos and ner, and the average time cost about 615 ms. I am not sure it's normally or not? Otherwise, can you tell me how to use the batch inference with stanza? Thank you very much...

AngledLuffa commented 3 years ago

The batch interface for CPU likely won't make things much faster

ma-ji commented 3 years ago

I must acknowledge that I gave up using Stanza because of efficiency (but I love your work!). I have billions of texts/docs to be tokenized.

I didn't check out the source codes, but a few things I noticed: 1) Stanza combines multiprocessing-CPU and GPU in batch processing, probably each for different tasks; 2) GPU utilization is always around 40%, probably because my text is short? Point 2 should be optimized.

AngledLuffa commented 3 years ago

Sorry to hear that, but billions of docs is quite a lot and would certainly take a long time with stanza. Were you using the bulk_process interface in the dev branch? Specifically, wrapping as Docs before passing the text to stanza. It should speed things up quite a bit, especially on GPU, for what it's worth.

liutianling commented 3 years ago

@AngledLuffa text = "hello girls, you are my pretty sunshine" stanza_docs: List[Document] = []

a = [text]*batchsize = [stanza_docs.append(Document([], text)) for text in a]

nlp = stanza.Pipeline('en', dir='../checkpoints', use_gpu=True, no_ssplit=True) result = nlp[stanza_docs]

I test multi docs like this. The results indicate that it may be a litter slower then one doc? I don't know why ?

I have test the batch_size with 10, 20 on RTX3090

AngledLuffa commented 3 years ago

I have test the batch_size with 10, 20 on RTX3090

When you ran these tests, were you using the use_gpu=False flag? It's not on the 3090 if you're doing that.

Also, have you installed the dev branch version of stanza?

liutianling commented 3 years ago

@AngledLuffa Thanks!

  1. I am test on the main branch, and the version of stanza is 1.2.0
  2. I am sure lt's running on the 3090
  3. I set the batch_size with 20,50,100, and the speed is almost equal to singal document input
  4. I get the batch data like this: stanzadocs: List[Document] = [] = [stanza_docs.append(Document([], line)) for line in text]
AngledLuffa commented 3 years ago

The dev branch has the speed improvements.

The use_gpu flag you quoted earlier should stop it from using the GPU, so if it is still using the GPU, that's a bug

liutianling commented 3 years ago

Thanks. I will try the dev branch.

ma-ji commented 2 years ago

I'm doing something like this to help with better batch processing, and it works well:

from multiprocessing import Pool, set_start_method
set_start_method('spawn')

with Pool(10) as p: # This is the fastest. joblib(thread, mp) experimented.
    x=p.map(nlp, [list(s) for s in partition_all(2000, in_docs)])

You need to be very cautious about your GPU RAM size. Specifying tokenize/pos/ner_batch_size to a smaller number helps (I'm using 32).

AngledLuffa commented 1 year ago

As pointed out by our PI, this issue can come up in searches for running Stanza faster. To be clear, the updates for the batch processing have been released since the last time this issue was discussed.

AngledLuffa commented 1 year ago

@johann-petrak Would you explain a bit more what you mean by the below?

I would propose though to open a separate issue for actual proper streaming (both internally and providing an API) where the correct batching for all components is performed automatically when processing a stream of documents, yielding documents that have been processed.

This would be an improvement over the current situation where a list (an "external batch") has to get constructed blindly passed, then some different batching occurs internally, and a list has to get constructed and passed back.

My suggestion would be to implement a proper streaming API by providing a method stanza.Pipeline.pipe(iterable, **kwargs) which yields result documents. Keyword arguments could influence any details of how internal batching, re-batching, multiprocessing or utilization of GPU(s) happens.

What are you looking to get passed in - an iterable of strings, perhaps, to a Pipeline, and the result is an iterable as well? The point being that it isn't processing the entire thing at once, but is rather processing in batch_size chunks so that the calling program can then handle the results? I think that is something we could add

johann-petrak commented 1 year ago

I am sorry, my brain has garbage collected the details of this in the meantime, but I think the main gist of this was what I found to be most useful in my own code where I build an NLP processing pipeline that may include several NN-based steps where batch processing improves performance:

I tend to implement each step as a class that has both __call__(self, doc,...) to process a single element (e.g. document, sentence), and pipe(self, iterator, ...) which takes an iterator and returns a generator. The pipe approach can then be chained in a way where each component is free to batch elements in whatever way is most useful for that step: the first generator just emits the original elements (documents), all downstream generators process one or more of these elements, possibly splitting into more elements, batching, combining etc. the final generator outputs the processed elements.

Sorry, I have not looked at this for quite a while so I do not know how much of this might already be implemented in Stanza anyways.

AngledLuffa commented 1 year ago

Heh, yeah, it's been a while. My PI asked me to make sure the bulk processing was easy to find & use, so I looked into this issue a bit to see if everything was resolved.

So far I did this:

https://github.com/stanfordnlp/stanza/commit/8c4e0659ae67f836b8e4c44645f9d543203a302f (this is just a wrapper for bulk processing) https://github.com/stanfordnlp/stanza/commit/2c9fe3dad434b271fa23c20a9cf8ccaf63991f16 (this is the stream interface)

johann-petrak commented 1 year ago

Ah yes, from a quick look it seems that the stream method is basically what I had in mind with the "pipe" method mentioned above!