JohnSnowLabs / spark-nlp

State of the Art Natural Language Processing
https://sparknlp.org/
Apache License 2.0
3.83k stars 710 forks source link

Spark NLP pipeline runs much faster on PySpark 3.0.x compare to PySpark 3.1.x #2739

Closed jczestochowska closed 2 years ago

jczestochowska commented 3 years ago

Description

I have a dataset of around 2 million amazon reviews, I want to count most frequent words. For that I am tokenizing and removing stop words. I wanted to use spark-nlp to create a more sophisticated pipeline than that for later stages but even this simple one is not working for me. On the other hand an equivalent (?) pipeline in plain spark works correctly. Note that when I do out.show() on the spark-nlp pipeline output it shows me a correctly tokenized lists of words.

Expected Behavior

Pipeline should clean the dataset and count most frequent words

Current Behavior

Pipeline freezes

Possible Solution

No idea

Steps to Reproduce

Plain spark working pipeline

import pyspark.sql
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col
import time

conf = pyspark.SparkConf().setMaster("local[*]").setAll([
                                   ('spark.executor.memory', '12g'),
                                   ('spark.driver.memory','4g'), 
                                   ('spark.driver.maxResultSize', '2G')
                                  ])
# create the session
spark = SparkSession.builder.config(conf=conf).getOrCreate()

# create the context
sc = spark.sparkContext

# FIX for Spark 2.x
locale = sc._jvm.java.util.Locale
locale.setDefault(locale.forLanguageTag("en-US"))

data_path = "../data/toys-cleaned.csv.gz"
Toys = spark.read.options(header= True, delimiter=',', inferSchema=True).csv(data_path)

# tokenize the text
regexTokenizer = RegexTokenizer(inputCol="reviewText", outputCol="all_words", pattern="\\W")
toys_with_words = regexTokenizer.transform(Toys)

# remove stop words
remover = StopWordsRemover(inputCol="all_words", outputCol="words")
toys_with_tokens = remover.transform(toys_with_words).drop("all_words")

toys_with_tokens.show(5)

# get all words in a single dataframe
start = time.time()
all_words = toys_with_tokens.select(explode("words").alias("word"))
# group by, sort and limit to 50k 
top50k = all_words.groupBy("word").agg(count("*").alias("total")).sort(col("total").desc()).limit(50000)

top50k.show()
print(time.time() - start)

Spark nlp pipeline - not working

import sparknlp
spark = sparknlp.start()

from sparknlp.base import Finisher, DocumentAssembler
from sparknlp.annotator import (Tokenizer, Normalizer,
                                LemmatizerModel, StopWordsCleaner)
from pyspark.ml import Pipeline

documentAssembler = DocumentAssembler() \
     .setInputCol('reviewText') \
     .setOutputCol('document')
tokenizer = Tokenizer() \
     .setInputCols(['document']) \
     .setOutputCol('token')
# note normalizer defaults to changing all words to lowercase.
# Use .setLowercase(False) to maintain input case.
normalizer = Normalizer() \
     .setInputCols(['token']) \
     .setOutputCol('normalized') \
     .setLowercase(True)
# note that lemmatizer needs a dictionary. So I used the pre-trained
# model (note that it defaults to english)
# lemmatizer = LemmatizerModel.pretrained() \
#      .setInputCols(['normalized']) \
#      .setOutputCol('lemma')
stopwords_cleaner = StopWordsCleaner().pretrained("stopwords_en", "en") \
     .setInputCols(['normalized']) \
     .setOutputCol('clean') \
     .setCaseSensitive(False) \

# finisher converts tokens to human-readable output
finisher = Finisher() \
     .setInputCols(['clean']) \
     .setCleanAnnotations(False)

pipeline = Pipeline() \
     .setStages([
           documentAssembler,
           tokenizer,
           normalizer,
           stopwords_cleaner,
           finisher
     ])

data_path = "../data/toys-cleaned.csv.gz"
Toys = spark.read.options(header= True, delimiter=',', inferSchema=True).csv(data_path)

out = pipeline.fit(Toys).transform(Toys)

# get all words in a single dataframe
import time
start = time.time()
all_words = out.select(explode("finished_clean").alias("word"))
# group by, sort and limit to 50k 
top50k = all_words.groupBy("word").agg(count("*").alias("total")).sort(col("total").desc()).limit(50000)

top50k.show()
print(time.time() - start)

1. 2. 3. 4.

Context

Trying to clean data with spark-nlp and perform some analysis, on a later stage I would like to use spark-nlp to process data for some classification task.

Your Environment

maziyarpanahi commented 3 years ago

Hi,

Here is what I did in Python/Jupyter (same specs as yours):

from sparknlp.annotator import *
from sparknlp.base import *
from pyspark.ml import Pipeline
import sparknlp

#make sure there is no other SparkSession with fewer resources already running
spark = sparknlp.start()

documentAssembler = DocumentAssembler() \
     .setInputCol('reviewText') \
     .setOutputCol('document')

tokenizer = RegexTokenizer() \
     .setInputCols(['document']) \
     .setOutputCol('token') \
     .setPattern("\\W") \
     .setToLowercase(True)

stopwords_cleaner = StopWordsCleaner()\
     .setInputCols(['token']) \
     .setOutputCol('clean') \
     .setCaseSensitive(False)

finisher = Finisher() \
     .setInputCols(['clean'])

pipeline = Pipeline() \
     .setStages([
           documentAssembler,
           tokenizer,
           stopwords_cleaner,
           finisher
     ])

Since there is a RegexTokenizer in Spark NLP with the same pattern and lowercase parameters I replaced it with the Tokenizer and Normalizer annotators.

Also, the csv, json, gz, etc. are not breakable formats in Apache Spark. They won't be distributed over all the cores, machines, etc. They are single partitioned. So I read and converted the csv into parquet for distribution even in a single machine:

Toys = spark.read \
  .options(header= True, delimiter=',', inferSchema=True, mode='DROPMALFORMED')\
  .csv(data_path)

Toys.write.parquet("./toys-cleaned")

Working with csv in Apache Spark / PySpark having mode='DROPMALFORMED' is very important as if it's corrupted it will be problematic.

The following code took 5 minutes on 6 cores laptop

from pyspark.sql.functions import rank, col, explode, count

Toys = spark.read \
  .parquet('./toys-cleaned').repartition(12)

out = pipeline.fit(Toys).transform(Toys)

all_words = out.select(explode("finished_clean").alias("word"))

# group by, sort and limit to 50k 
top50k = all_words.groupBy("word").agg(count("*").alias("total")).sort(col("total").desc()).limit(50000)

top50k.show()

It is possible to further optimize this by using minLength and maxLength in RegexTokenizer knowing if you are not interested in tokens less or more than a certain length etc. Also, keep in mind what happens in Spark ML feature are a very basic/simple transformation without any annotation and metadata that can be used for other NLP tasks

I also noticed the dataset is kind of skew, meaning some partitioned are much much heavier than the others so while most of them are finished they have to wait for these to be finished as well.

image

maziyarpanahi commented 3 years ago

Hi @jczestochowska

I have found some new information. I will investigate more as to why, but for some unknown reasons if you run the very same pipeline I showed previously on pyspark==3.0.2 it goes from 5 minutes to 55 seconds!

There must be some sort of Spark config set by default in 3.1.x compare to 3.0.x or vice versa causes this, but I will investigate more to see why Apache Spark 3.1.x performs poorly compare to Apache Spark 3.0.x.

In Spark 3.1.x the executors are like:

image

In Spark 3.0.x the executors are like:

image

Clearly, there is something in spark 3.1.x that causes this behavior to have unbalanced partitions. Perhaps some sort of auto partitioning etc.

UPDATE:

jczestochowska commented 3 years ago

Hi @maziyarpanahi,

thanks so much for the tips and investigation!

SriniVest commented 3 years ago

Very good useful post

github-actions[bot] commented 2 years ago

This issue is stale because it has been open 120 days with no activity. Remove stale label or comment or this will be closed in 5 days