JohnSnowLabs / spark-nlp

State of the Art Natural Language Processing
https://sparknlp.org/
Apache License 2.0
3.82k stars 709 forks source link

XlmRoBertaForTokenClassification slow during prediction #13716

Closed LucaPifferettiPrivate closed 11 months ago

LucaPifferettiPrivate commented 1 year ago

Is there an existing issue for this?

Who can help?

No response

What are you working on?

I’m using a spark nlp ner model XlmRoBertaForTokenClassification from (https://sparknlp.org/2022/08/14/xlmroberta_ner_edwardjross_base_finetuned_panx_it_3_0.html) to find person names inside a text columns with variable length.

Current Behavior

I'm applying this model to find person name inside a text column and it requires 50 minutes to give us results with a big configuration (14 executors with 14 cores and 14 Gb of memory). During the preprocessing phase I split the sentences longer than 200 characters.

Expected Behavior

I would like to increase the performance and reduce the time of computation because it is excessive for a simple 5 Gb dataframe.

Steps To Reproduce

val columns: Seq[String] = df.columns.toSeq val specialCharactersRegex: String = "[\"`'#%&,:;<>=@{}~\$\(\)\*\+\/\\\?\[\]\^\|è]" // regex to filter only special characters val partitionNumber = 196 // repartition number for 14 executors with 14 cores

val nerWhiteList: Seq[String] = Seq("PER", "LOC") // ner to anonymize

val textDataset =
  sentenceSplitter(
    df.sample(0.01)
      .withColumn("text", regexp_replace(col(columnAnonymize), specialCharactersRegex, "")),
    sentenceModelPath)
    .repartition(partitionNumber)

textDataset.cache()
logger.info(textDataset.show())

val documentAssembler = new DocumentAssembler().setInputCol("text").setOutputCol("document")
val tokenizer = new Tokenizer()
  .setInputCols("document")
  .setOutputCol("token")
val token_classifier = XlmRoBertaForTokenClassification
  .load(nerModelPath)
  .setInputCols(Array("document", "token"))
  .setBatchSize(partitionNumber)

val ner_converter = new NerConverter()
  .setInputCols(Array("document", "token", "ner"))
  .setOutputCol("ner_chunk")
  .setWhiteList(nerWhiteList: _*)
val pipeline = new Pipeline().setStages(
  Array(documentAssembler, tokenizer, token_classifier, ner_converter))

val result = pipeline
  .fit(textDataset)
  .transform(textDataset)
  .repartition(partitionNumber)

result.cache()
result.count()
textDataset.unpersist()

val anonymizedTextColumn: DataFrame = result
  .select(
    (result.columns.map(col(_)) :+
      concat_ws(
        "|",
        transform(
          filter(
            col("ner_chunk"),
            (c: Column) =>
              (c.getField("metadata").getItem("confidence") >= 0.6) &&
                (c.getField("metadata").getItem("entity") === "PER")),
          (c: Column) => c.getField("result"))).alias("personFound") :+
      concat_ws(
        "|",
        transform(
          filter(
            col("ner_chunk"),
            (c: Column) =>
              (c.getField("metadata").getItem("confidence") >= 0.6) &&
                (c.getField("metadata").getItem("entity") === "LOC")),
          (c: Column) => c.getField("result"))).alias("locationFound")): _*)
  .withColumn(
    "text",
    when(
      length(col("personFound")) > 0,
      regexp_replace(col("text"), col("personFound"), lit("PERSON_NAME")))
      .otherwise(col("text")))
  .withColumn(
    "text",
    when(
      length(col("locationFound")) > 0,
      regexp_replace(col("text"), col("locationFound"), lit("LOCATION_NAME")))
      .otherwise(col("text")))
  .withColumn(
    "text",
    collect_list(col("text")).over(
      Window.partitionBy(columns.head, columns.tail: _*).orderBy("sentence_order")))
  .groupBy(columns.head, columns.tail: _*)
  .agg(max(col("text")).as("text"))
  .withColumn("text", concat_ws("", col("text")))

val anonymizedDataFrame: DataFrame =
  if (columnAnonymize.contains(".")) {
    val structColumnName: String =
      columnAnonymize.substring(0, columnAnonymize.indexOf("."))
    val fieldColumnName: String =
      columnAnonymize.substring(columnAnonymize.indexOf(".") + 1, columnAnonymize.length)

    anonymizedTextColumn.select(
      (columns diff Seq(structColumnName)).map(col) :+
        col(structColumnName)
          .withField(fieldColumnName, col("text"))
          .alias(structColumnName): _*)
  } else {
    anonymizedTextColumn.select(
      (columns diff Seq(columnAnonymize)).map(col) :+
        col("text").alias(columnAnonymize): _*)
  }

Spark NLP version and Apache Spark

spark-nlp: 4.2.8 scala: 2.12.17 spark: 3.3.1

Type of Spark Application

No response

Java Version

No response

Java Home Directory

No response

Setup and installation

No response

Operating System and Version

No response

Link to your project (if available)

No response

Additional Information

No response

maziyarpanahi commented 1 year ago

Hi,

It's really hard without having detailed info to give targeted advice. But I can share some points:

I highly recommend watching this Webinar for further information regarding hardware acceleration: https://www.johnsnowlabs.com/watch-webinar-speed-optimization-benchmarks-in-spark-nlp-3-making-the-most-of-modern-hardware/

github-actions[bot] commented 1 year ago

This issue is stale because it has been open 180 days with no activity. Remove stale label or comment or this will be closed in 5 days