stanfordnlp / stanza

Stanford NLP Python library for tokenization, sentence segmentation, NER, and parsing of many human languages
https://stanfordnlp.github.io/stanza/
Other
7.29k stars 893 forks source link

use async for corenlp client and separate i/o logic to make external async easier #177

Open AlJohri opened 4 years ago

AlJohri commented 4 years ago

Is your feature request related to a problem? Please describe.

The way that CoreNLP server is encapsulated using a subprocess is really nice. I want to be able to use that code for booting the server separate from the client that actually hits the service.

Similarly, the code for setting up properties, parsing the response, and actual making the request are all tied together right now which makes it very hard to directly use the library in an async manner.

Using synchronous requests performs abysmally slow for me- I'm trying to annotate some 40k documents which would take 11 hours synchronously and just under 2 hours async using a server with 16 threads.

Describe the solution you'd like

While I wrote a simple solution for my specific case (attached below) using async, it required booting a separate server instance and also copying bits of logic out of client.py such as parseFromDelimitedString.

My feature requests are as below:

1) centralize the properties creation logic separate from any requests call. it took a long time to understand how it all works together to make sure I was faithfully replicating it. there also isn't any option to see what the final properties looks like when making the API call which would also make things easier. I ended up just reading the docs for each annotator instead.

2) completely separate the server and client (i.e the code that boots up the java server vs. the code that hits the java server). allow a separate class for booting a server which I can use with my own custom client

3) separate the logic for parsing the response from actually making the request. this is actually almost there already, I just wish there was a function that encapsulated the creating a Document and then running the parseFromDelimitedString step

Here's my async version below with uses a semaphore to limit max concurrency:

#!/usr/bin/env python3

import os
import json
import asyncio
import itertools

import httpx
import pandas as pd
from tqdm.auto import tqdm
from stanfordnlp.protobuf import Document, parseFromDelimitedString

properties = {
    "annotators": "tokenize,ssplit,pos,lemma,ner",
    # https://stanfordnlp.github.io/CoreNLP/tokenize.html#options
    "tokenize.language": "en",
    # https://stanfordnlp.github.io/CoreNLP/ner.html#options
    "ner.language": "en",
    "ner.applyNumericClassifiers": False,
    "ner.useSUTime": False,
    "ner.applyFineGrained": False,
    "ner.buildEntityMentions": True,
    "ner.combinationMode": "NORMAL", # or HIGH_RECALL
    "inputFormat": "text",
    "outputFormat": "serialized",
    "serializer": "edu.stanford.nlp.pipeline.ProtobufAnnotationSerializer",
}

def get_entities(doc):
    entities = []
    for sent in doc.sentence:
        for mention in sent.mentions:
            if mention.ner == "PERSON":
                entities.append(mention.entityMentionText)
    return entities

async def parse(text, semaphore, corenlp_server):
    async with semaphore:
        try:
            response = await httpx.post(
                corenlp_server,
                params={"properties": json.dumps(properties)},
                data=text.strip(),
                headers={'Content-Type': 'text/plain; charset=utf-8'}
            )
        except httpx.exceptions.ReadTimeout as e:
            print(f'ERROR: {e!r}')
            # TODO: replace with document ID
            print(repr(text[0:50]))
            return []
    doc = Document()
    parseFromDelimitedString(doc, response.content)
    return get_entities(doc)

async def main(corenlp_server):

    with open('unique_entites_corenlp_append.txt', 'w') as f:
        pass

    semaphore = asyncio.BoundedSemaphore(10) # max concurrency
    df = pd.read_parquet("../stories.parquet")
    tasks = [parse(text, semaphore, corenlp_server) for text in df.text]
    unique_person_entites = set()
    for f in tqdm(asyncio.as_completed(tasks), total=len(tasks)):
        entities = set(await f)
        new_person_entities = entities - unique_person_entites
        unique_person_entites |= entities
        with open('unique_entites_corenlp_append.txt', 'a') as f:
            f.write(str(new_person_entities) + "\n")

    with open('unique_entites_corenlp.txt', 'w') as f:
        for entity in unique_person_entites:
            f.write(entity + "\n")

if __name__ == "__main__":

    import sys

    if len(sys.argv) != 2:
        print("Usage: python ner-corenlp.py <CORENLP_SERVER>")

    corenlp_server = sys.argv[1]
    asyncio.run(main(corenlp_server))

I'd be happy to talk further and help make some of these changes if you're interested. Thanks for reading!

stale[bot] commented 3 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

stale[bot] commented 3 years ago

This issue has been automatically closed due to inactivity.