egerber / spaCy-entity-linker

spaCy module for linking text to Wikidata items
MIT License
221 stars 32 forks source link

Crashes when n_processes > 1 #23

Open cphoover opened 1 year ago

cphoover commented 1 year ago

I'm seeing an issue where the pipeline crashes during msgpack serialization when I set n_processes > 1.

I want to leverage all cores on my AWS c5a.2xlarge machine so I have set n_processes = number of cores.

But now my entity linking pipeline is crashing.

Below is a paste of the stacktrace:

Traceback (most recent call last):
  File "/home/ubuntu/workspace/official_benchmark_with_linking.py", line 95, in <module>
    results = process_messages(doc_batch)
  File "/home/ubuntu/workspace/official_benchmark_with_linking.py", line 65, in process_messages
    _ = [d for d in nlp.pipe(input_data,
  File "/home/ubuntu/workspace/official_benchmark_with_linking.py", line 65, in <listcomp>
    _ = [d for d in nlp.pipe(input_data,
  File "/home/ubuntu/workspace/.env/lib/python3.10/site-packages/spacy/language.py", line 1574, in pipe
    for doc in docs:
  File "/home/ubuntu/workspace/.env/lib/python3.10/site-packages/spacy/language.py", line 1657, in _multiprocessing_pipe
    self.default_error_handler(
  File "/home/ubuntu/workspace/.env/lib/python3.10/site-packages/spacy/util.py", line 1672, in raise_error
    raise e
ValueError: [E871] Error encountered in nlp.pipe with multiprocessing:

Traceback (most recent call last):
  File "/home/ubuntu/workspace/.env/lib/python3.10/site-packages/spacy/language.py", line 2273, in _apply_pipes
    byte_docs = [(doc.to_bytes(), doc._context, None) for doc in docs]
  File "/home/ubuntu/workspace/.env/lib/python3.10/site-packages/spacy/language.py", line 2273, in <listcomp>
    byte_docs = [(doc.to_bytes(), doc._context, None) for doc in docs]
  File "spacy/tokens/doc.pyx", line 1316, in spacy.tokens.doc.Doc.to_bytes
  File "spacy/tokens/doc.pyx", line 1375, in spacy.tokens.doc.Doc.to_dict
  File "/home/ubuntu/workspace/.env/lib/python3.10/site-packages/spacy/util.py", line 1312, in to_dict
    serialized[key] = getter()
  File "spacy/tokens/doc.pyx", line 1372, in spacy.tokens.doc.Doc.to_dict.lambda20
  File "/home/ubuntu/workspace/.env/lib/python3.10/site-packages/srsly/_msgpack_api.py", line 14, in msgpack_dumps
    return msgpack.dumps(data, use_bin_type=True)
  File "/home/ubuntu/workspace/.env/lib/python3.10/site-packages/srsly/msgpack/__init__.py", line 55, in packb
    return Packer(**kwargs).pack(o)
  File "srsly/msgpack/_packer.pyx", line 285, in srsly.msgpack._packer.Packer.pack
  File "srsly/msgpack/_packer.pyx", line 291, in srsly.msgpack._packer.Packer.pack
  File "srsly/msgpack/_packer.pyx", line 288, in srsly.msgpack._packer.Packer.pack
  File "srsly/msgpack/_packer.pyx", line 264, in srsly.msgpack._packer.Packer._pack
  File "srsly/msgpack/_packer.pyx", line 282, in srsly.msgpack._packer.Packer._pack
TypeError: can not serialize 'EntityCollection' object
dennlinger commented 1 year ago

Do you have a minimal code snippet to reproduce this error? As for fixing it, I am suspecting that there might be significant hurdles to allowing serializiation of EntityCollection.
From what I could find in the spacy issue tracker, it seems that there are even issues with serializing Span objects, which EntityCollection relies upon internally.

MartinoMensio commented 1 year ago

Hi @cphoover and @dennlinger ,

I have a minimal test set up from this morning, let me share with you in a new branch where we can try to make the objects serialisable.

As @dennlinger mentioned, we will need to change something about the span being memorized in the EntityElement and this will probably break the code that is using the .get_span() method. I was experimenting this morning with some exotic code with weakref but I could not restore correctly the spans. I am trying to understand from SpaCy codebase how they manage to save the references to doc without going in a loop of serialization with .to_bytes().

Let's collaborate on this issue! I will push a new branch multiprocessing where we will try to address this issue (as soon as I resolve my gpg errors on my machine 😂)

Best, Martino

cphoover commented 1 year ago

Here's an example snippet that reproduces the issue. Replace the text file with any large text file where each line is a document to be processed.

import spacy
import json
import time
from spacy.util import minibatch
from spacy.pipeline.entity_linker import DEFAULT_NEL_MODEL
import multiprocessing
from typing import Iterable
# from tf_idf import get_tfidf_ranking

labels = [
    "entertainment",
    "politics",
    "world news",
    "contract",
    "sports",
    "business",
    "travel"
]

def chunk(list, n):
    for i in range(0, len(list), n):
        yield list[i:i + n]

model_name = "en_core_web_sm"

# Load the spaCy model and enable CUDA
nlp = spacy.load(model_name)

nlp.add_pipe("entityLinker", last=True)

print('*****************************************')

print("Model: ", model_name)

# get number of available cores
n_cores = multiprocessing.cpu_count()

print("Number of cores: ", n_cores)

def process_messages(input_data):
    _ = [d for d in nlp.pipe(input_data,
                             n_process=n_cores,
                             )]
    return _

batch_size = 100
num_iterations = 10

text_file = "imdb_preprocessed.txt"

# Start the timer
start_time = time.time()

# loop through text file line by line
with open(text_file, "r") as f:
    text_data = f.readlines()

    count = 0
    # batch the text_data lines to batch_size
    for doc_batch in chunk(text_data, batch_size):
        results = process_messages(doc_batch)

        for result in results:
            print("result:" + str(result))
            for ent in result.ents:
                les = ent._.linkedEntities if isinstance(ent._.linkedEntities, Iterable) else [
                    ent._.linkedEntities
                ]

                linked_entities = []

                for le in les:
                    if le:
                        tSes = le.get_super_entities() if isinstance(le.get_super_entities(),
                                                                     Iterable) else [le.get_super_entities()]
                        ses = [{
                            "id": se.get_id(),
                            "label": se.get_label()
                        } for se in tSes]

                        linked_entities.append({
                            "desc": le.get_description(),
                            "url": le.get_url(),
                            "kb_id": le.get_id(),
                            "label": le.get_label(),
                            "span": str(le.get_span()),
                            "super_entities": ses,
                        })

                print(json.dumps(
                    {
                        "entity_group": ent.label_,
                        "word": ent.text,
                        "linked_entities": linked_entities,
                    },
                    indent=4))
        if (count == num_iterations):
            break
        count += 1

end_time = time.time()
total_time = end_time - start_time

# Print the total time and the number of docs processed per second

print("Batch size: ", batch_size)
print("Number of iterations: ", num_iterations)
print("total lines processed: ", count * batch_size)
print("Total time: ", total_time)
print("Docs per second: ", (batch_size * num_iterations) / total_time)
print('****************************************')
MartinoMensio commented 1 year ago

With the last commits on the branch multiprocessing, you should be able to use all the cores without problems.

I tested your script and with a small edit I made it run without exceptions (line 29 onwards, the code is wrapped inside a if __name__ == '__main__': to avoid a common issue https://stackoverflow.com/questions/55057957/an-attempt-has-been-made-to-start-a-new-process-before-the-current-process-has-f ) .

import spacy
import json
import time
from spacy.util import minibatch
from spacy.pipeline.entity_linker import DEFAULT_NEL_MODEL
import multiprocessing
from typing import Iterable
# from tf_idf import get_tfidf_ranking

labels = [
    "entertainment",
    "politics",
    "world news",
    "contract",
    "sports",
    "business",
    "travel"
]

def chunk(list, n):
    for i in range(0, len(list), n):
        yield list[i:i + n]

model_name = "en_core_web_sm"

if __name__ == '__main__':

    # Load the spaCy model and enable CUDA
    nlp = spacy.load(model_name)

    nlp.add_pipe("entityLinker", last=True)

    print('*****************************************')

    print("Model: ", model_name)

    # get number of available cores
    n_cores = multiprocessing.cpu_count()

    print("Number of cores: ", n_cores)

    def process_messages(input_data):
        _ = [d for d in nlp.pipe(input_data,
                                n_process=n_cores,
                                )]
        return _

    batch_size = 100
    num_iterations = 10

    text_file = "imdb_preprocessed.txt"

    # Start the timer
    start_time = time.time()

    # loop through text file line by line
    with open(text_file, "r") as f:
        text_data = f.readlines()

        count = 0
        # batch the text_data lines to batch_size
        for doc_batch in chunk(text_data, batch_size):
            results = process_messages(doc_batch)

            for result in results:
                print("result:" + str(result))
                for ent in result.ents:
                    les = ent._.linkedEntities if isinstance(ent._.linkedEntities, Iterable) else [
                        ent._.linkedEntities
                    ]

                    linked_entities = []

                    for le in les:
                        if le:
                            tSes = le.get_super_entities() if isinstance(le.get_super_entities(),
                                                                        Iterable) else [le.get_super_entities()]
                            ses = [{
                                "id": se.get_id(),
                                "label": se.get_label()
                            } for se in tSes]

                            linked_entities.append({
                                "desc": le.get_description(),
                                "url": le.get_url(),
                                "kb_id": le.get_id(),
                                "label": le.get_label(),
                                "span": str(le.get_span()),
                                "super_entities": ses,
                            })

                    print(json.dumps(
                        {
                            "entity_group": ent.label_,
                            "word": ent.text,
                            "linked_entities": linked_entities,
                        },
                        indent=4))
            if (count == num_iterations):
                break
            count += 1

    end_time = time.time()
    total_time = end_time - start_time

    # Print the total time and the number of docs processed per second

    print("Batch size: ", batch_size)
    print("Number of iterations: ", num_iterations)
    print("total lines processed: ", count * batch_size)
    print("Total time: ", total_time)
    print("Docs per second: ", (batch_size * num_iterations) / total_time)
    print('****************************************')

Please @cphoover could you tell me if this works for you too? You will need to install the package from source in the multiprocessing branch:

pip install git+https://github.com/egerber/spaCy-entity-linker@multiprocessing

The only limitation comes when you will try to read the .span property or call the .get_span() method, as they will return None.

If anyone has any ideas on how to resolve the serialization of span, feel free to comment!

I tried with the following:

I added some tests to check that everything is restored properly, and at the moment 1 test fails when span is checked.

Best, Martino

cphoover commented 1 year ago

@MartinoMensio Yes this fixes my issue... Here are two benchmarks one with entity linking and one with only the NER:

on AWS c5a.2xlarge machine


Model: en_core_web_sm With entity linking! Using CPU Number of cores: 8 Batch size: 2000 Number of iterations: 20 total lines processed: 40000 Total time: 439.9979598522186 Docs per second: 90.90951242918202



Model: en_core_web_sm Using CPU Number of cores: 8 Batch size: 2000 Number of iterations: 20 total lines processed: 40000 Total time: 134.31675672531128 Docs per second: 297.8034980535099