JohnSnowLabs / spark-nlp

State of the Art Natural Language Processing
https://sparknlp.org/
Apache License 2.0
3.8k stars 707 forks source link

NER pipeline Not scaling up to use the full cluster nodes #14121

Closed mahmoudaymo closed 2 weeks ago

mahmoudaymo commented 8 months ago

Is there an existing issue for this?

Who can help?

No response

What are you working on?

I am using the spark-nlp for NER detection on Azure databricks cluster. The cluster is made of 5 nodes. But when running the job it is not scaling up to use the full cluster and uses only a single node. It seems that the NER pipeline does not parallelize and only runs on a single node.

Current Behavior

The NER pipeline uses only one node of the available 5 nodes.

CpuUsage

Expected Behavior

The expected behavior is to fully run on all the worker nodes.

CpuUsage2

Steps To Reproduce

from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.sql import DataFrame
from pyspark.ml import Pipeline
from pyspark.sql.functions import array_distinct, col, concat_ws, size, expr

def extract_and_replace_ner(
    ner_df: DataFrame,
    ner_model_name: str = "bert_base_token_classifier_ontonote",
):
    if "text" not in ner_df.columns:
        raise ValueError("NER DataFrame must contain a column named 'text'!")

    ner_df = ner_df.withColumnRenamed("text", "text_original")
    entities = ["PERSON", "LAW", "NORP", "FAC", "ORG", "GPE", "LOC", "PRODUCT"]

    documentAssembler = DocumentAssembler().setInputCol("text_original").setOutputCol("document")
    tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")
    tokenClassifier = (
        BertForTokenClassification.pretrained(ner_model_name, "en")
        .setInputCols("document", "token")
        .setOutputCol("ner")
    )
    ner_converter = (
        NerConverter()
        .setInputCols(["document", "token", "ner"])
        .setOutputCol("ner_chunk")
        .setWhiteList(entities)
    )
    ner_pipeline = Pipeline(stages=[documentAssembler, tokenizer, tokenClassifier, ner_converter])
    ner_df = ner_pipeline.fit(ner_df).transform(ner_df)

    # keep only unique ners
    ner_df, no_ner_df = extract_unique_ners(ner_df)

    # replace ners in text
    ner_df = ner_df.rdd.mapPartitions(replace_text).toDF(
        ["control_number", "text_original", "text", "ners"]
    )

    return ner_df, no_ner_df

def extract_unique_ners(ner_df: DataFrame):
    ner_df = ner_df.withColumn("ners", ner_df.ner_chunk.result)
    ner_df = ner_df.select("control_number", "text_original", "ners")
    ner_df = ner_df.withColumn("ners", array_distinct("ners"))
    ner_df = ner_df.withColumn(
        "ners", expr("filter(ners, x -> (size(split(x, ' ')) > 1))")
    )

    no_ner_df = ner_df.filter(size("ners") == 0)
    ner_df = ner_df.filter(size("ners") > 0)

    ner_df = ner_df.withColumn(
        "replacements", expr("transform(ners, x -> concat_ws('_', split(x, ' ')))")
    )

    return ner_df, no_ner_df

def replace_text(partitionData):
    updatedData = []
    for row in partitionData:
        text = row.text_original
        ners = row.ners
        reps = row.replacements
        for ner, rep in zip(ners, reps):
            text = text.replace(ner, rep)
        updatedData.append([row.control_number, row.text_original, text, ners])
    return iter(updatedData)

Spark NLP version and Apache Spark

spark-nlp==5.1.4 spark==3.4.1 com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.4

Working on Databricks

Type of Spark Application

Python Application

Java Version

8

Java Home Directory

/usr/lib/jvm/zulu8-ca-amd64/jre/

Setup and installation

Pypi

Operating System and Version

No response

Link to your project (if available)

No response

Additional Information

No response

mahmoudaymo commented 7 months ago

Any news here?

maziyarpanahi commented 7 months ago

I recommend watching this Webinar, scaling Apache Spark is independent from Spark NLP. You should follow the general "tuning and sizing your cluster" advice in order to utilize all your executors.

https://www.johnsnowlabs.com/watch-webinar-speed-optimization-benchmarks-in-spark-nlp-3-making-the-most-of-modern-hardware/

Since Spark NLP is a native extension of Apache Spark, any recommendation works for this library as well.

mahmoudaymo commented 7 months ago

Great! Thank you very much Maziyar.

I have watched the webinar and came out with great insights. The issue here is not the speed optimization but why sparknlp NER pipeline is not fully utilizing the cluster and using only one worker? Once this issue is solved, I will work on optimizing the spark application.

github-actions[bot] commented 1 month ago

This issue is stale because it has been open 180 days with no activity. Remove stale label or comment or this will be closed in 5 days