JohnSnowLabs / spark-nlp

State of the Art Natural Language Processing
https://sparknlp.org/
Apache License 2.0
3.86k stars 711 forks source link

SparkNLP dataset size limitations #7472

Closed geowynn closed 2 years ago

geowynn commented 2 years ago

I've a dataset of about 1.1mil records that I'm trying to apply the sparknlp pipeline on for topic modelling. In each document there's on average 1-5 short sentences since I've pre-cleaned them using the traditional nlp spacy methods. Currently, I intend to use this pipeline as shown below on the entire dataset. Additionally, this is done in CDSW (Cloudera workbench) which I've added additional spark.driver.memory and spark.executor.memory (as I'm assuming there's some memory issues with a large dataset).

pipeline = Pipeline() \
     .setStages([documentAssembler,
                 languageDetector,
                 tokenizer,
                 product_entity_extractor,
                 stopwordsCleaner,
                 spellChecker,
                  normalizer,
                 lemmatizer,
                 pos_tagger,
                 chunker,
                 finisher])

So far in the CDSW session, I'm getting exit code 137 (google says its memory issues) , and on top of that when i apply the pipeline to .fit() and .transform() the dataset, the spark session will crash on .show() here.

nlp_pipeline = pipeline.fit(review_text)
processed_review = nlp_pipeline.transform(review_text).cache()
processed_review.repartition(10000).limit(100).show(truncate=False)

Description

Reading the docs on cluster speed benchmark https://nlp.johnsnowlabs.com/docs/en/benchmark , the various examples seem to use datasets of <10k records. Are there tests/examples when the entire sparknlp pipeline has been applied on large datasets (>1mil rows)? Or are there further parallelisation methods that I need to wrap the pipeline in for it to work?

Expected Behavior

Hope to understand how to apply sparknlp on large datasets for it to run efficiently if possible. I was facing similar issues using spacy that's why I turned to sparknlp upon reading this article https://medium.com/spark-nlp/spacy-or-spark-nlp-a-benchmarking-comparison-23202f12a65c

For additional context, I've limited my dataset to up to 100k rows and the pipeline runs well.

Current Behavior

Spark session crashes instead of processing large dataset in pipeline.

Possible Solution

Steps to Reproduce

1. 2. 3. 4.

Context

Your Environment

maziyarpanahi commented 2 years ago

Hi,

Could you please share the following, without them, it's impossible to know what is happening:

A couple of notes while you are preparing the required information:

We'll wait for the required info

geowynn commented 2 years ago

Hi @maziyarpanahi thanks for the follow up, please find the additional information below.

Full Pipeline Code

### Libraries
import sparknlp
from sparknlp.annotator import * 
from sparknlp.base import *
from sparknlp.pretrained import PretrainedPipeline
from pyspark.ml import Pipeline
from nltk.corpus import stopwords

from pyspark.sql import SparkSession 
from pyspark.sql.functions import isnan, isnull
from pyspark.sql import functions as F
import pyspark.sql.types as T

import pandas as pd

import os
import sys

#os.environ['PYSPARK_PYTHON'] = sys.executable
#os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3.6'

#### create spark session
spark = sparknlp.start(spark24=True)
spark.version
sparknlp.version()
#spark.sparkContext._conf.getAll()

### 1. Data
# read data from cdsw folder and convert to spark df, remove those with missing values
df = df[~df['description_clean_nostopwords'].isna()].copy()
data = spark.createDataFrame(df) 

text_col = <selected column to apply pipeline>
review_text = data.select(text_col).filter(F.col(text_col).isNotNull())

### 2. NLP Pipeline
documentAssembler = DocumentAssembler().setInputCol(text_col).setOutputCol('document')

#language detection
languageDetector = LanguageDetectorDL.pretrained() \
  .setInputCols(["document"]) \
  .setOutputCol("language")

#takes data from DocumentAssembler and tokenise it
tokenizer = Tokenizer() \
     .setInputCols(['document']) \
     .setOutputCol('token')

# spell check
#!!!!pending update from johnsnowlab to have compatible pretrained model
#spellChecker = ContextSpellCheckerModel.pretrained("spellcheck_dl") \
#  .setInputCols("tokenized") \
#  .setOutputCol("checked")

#stopwords removal 
#set stopwords dictionary
eng_stopwords = stopwords.words('english')
eng_stopwords.extend(['please','ko','po','ba','pag','ung','ang','com','would'])
eng_stopwords = list(set(eng_stopwords))#optional

stopwordsCleaner = StopWordsCleaner() \
     .setInputCols(['token']) \
     .setOutputCol('clean_token') \
     .setStopWords(eng_stopwords)

##create text matcher to pick out keywords

product_entity_extractor = TextMatcher() \
    .setInputCols(["document",'token'])\
    .setOutputCol("product_entities")\
    .setEntities("<_custom file path_>")\
    .setCaseSensitive(False)\
    .setEntityValue('product_entity')\
    .setBuildFromTokens(True)\
    .setMergeOverlapping(True)\
#    .setEntityValue('product_entity')

#spelling check
spellChecker = NorvigSweetingModel.pretrained()\
  .setInputCols("clean_token")\
  .setOutputCol("spell_checked")
#  .setDoubleVariants(true)

#clean data, lowercasing    
normalizer = Normalizer() \
     .setInputCols(['spell_checked']) \
     .setOutputCol('normalised') \
     .setLowercase(True) 
#    .setCleanupPatterns(["""[^\w\d\s]"""]) # pattern removes punctuations (keeps alphanumeric chars), if don't set only keep alphabets

#lemma with pre-trained model
lemmatizer = LemmatizerModel.pretrained() \
     .setInputCols(['normalised']) \
     .setOutputCol('lemmatised')

#ngram and pos-tagger to filter for meaning n-grams 
pos_tagger = PerceptronModel.pretrained('pos_anc') \
     .setInputCols(['document', 'lemmatised']) \
     .setOutputCol('pos')

allowed_tags = ['<JJ>','<NN>']#  ['<JJ>+<NN>', '<NN>+<NN>']
chunker = Chunker() \
     .setInputCols(['document', 'pos']) \
     .setOutputCol('ngrams') \
     .setRegexParsers(allowed_tags)
#chunker doesn't output lemmatised text, need to explore how to get lemmatised n-grams

#human-readable format
finisher = Finisher() \
     .setInputCols(['token','clean_token','spell_checked','normalised','lemmatised','language','pos','ngrams']) 
#    .setIncludeMetadata(True)

#creating NLP pipeline
pipeline = Pipeline() \
     .setStages([documentAssembler,
                 languageDetector,
                 tokenizer,
                 product_entity_extractor,
                 stopwordsCleaner,
                 spellChecker,
                  normalizer,
                 lemmatizer,
                 pos_tagger,
                 chunker,
                 finisher])

nlp_pipeline = pipeline.fit(review_text)
processed_review = nlp_pipeline.transform(review_text)
processed_review.sample(0.05).show(truncate=False)

Spark configs

CDSW session 2vCPU and 8GB memory

started the session with sparknlp.start(spark24=True)

additional spark-defaults.conf in project directory

spark.rpc.message.maxSize = 1024

spark.master=yarn
spark.kryoserializer.buffer.max = 2047m
spark.streaming.backpressure.enabled=true
spark.driver.maxResultSize=6g
spark.driver.memory=6g
spark.yarn.driver.memoryOverhead=6g
spark.executor.memory=6g
spark.yarn.driver.memoryOverhead=6g
spark.yarn.executor.memoryOverhead=6g
spark.executor.cores=1
spark.python.worker.memory=6g
spark.streaming.stopGracefullyOnShutdown=true
spark.streaming.receiver.maxRate=500
spark.streaming.backpressure.initialRate=500
spark.streaming.kafka.maxRatePerPartition=500
spark.cleaner.referenceTracking.cleanCheckpoints=true
spark.network.timeout=300s
spark.rpc.askTimeout=450s

Environment spark.sparkContext._conf.getAll()

[('spark.eventLog.enabled', 'true'),
 ('spark.driver.memory', '16G'),
 ('spark.network.timeout', '300s'),
 ('spark.yarn.appMasterEnv.MKL_NUM_THREADS', '1'),
 ('spark.streaming.kafka.maxRatePerPartition', '500'),
 ('spark.sql.queryExecutionListeners',
  'com.cloudera.spark.lineage.NavigatorQueryListener'),
 ('spark.files',
  'file:///home/cdsw/.ivy2/jars/com.johnsnowlabs.nlp_spark-nlp-spark24_2.11-3.4.2.jar,file:///home/cdsw/.ivy2/jars/com.typesafe_config-1.4.1.jar,file:///home/cdsw/.ivy2/jars/org.rocksdb_rocksdbjni-6.5.3.jar,file:///home/cdsw/.ivy2/jars/com.amazonaws_aws-java-sdk-bundle-1.11.603.jar,file:///home/cdsw/.ivy2/jars/com.github.universal-automata_liblevenshtein-3.0.0.jar,file:///home/cdsw/.ivy2/jars/com.navigamez_greex-1.0.jar,file:///home/cdsw/.ivy2/jars/org.json4s_json4s-ext_2.11-3.5.3.jar,file:///home/cdsw/.ivy2/jars/com.johnsnowlabs.nlp_tensorflow-cpu_2.11-0.3.3.jar,file:///home/cdsw/.ivy2/jars/net.sf.trove4j_trove4j-3.0.3.jar,file:///home/cdsw/.ivy2/jars/com.google.code.findbugs_annotations-3.0.1.jar,file:///home/cdsw/.ivy2/jars/com.google.protobuf_protobuf-java-util-3.0.0-beta-3.jar,file:///home/cdsw/.ivy2/jars/com.google.protobuf_protobuf-java-3.0.0-beta-3.jar,file:///home/cdsw/.ivy2/jars/it.unimi.dsi_fastutil-7.0.12.jar,file:///home/cdsw/.ivy2/jars/org.projectlombok_lombok-1.16.8.jar,file:///home/cdsw/.ivy2/jars/org.slf4j_slf4j-api-1.7.21.jar,file:///home/cdsw/.ivy2/jars/net.jcip_jcip-annotations-1.0.jar,file:///home/cdsw/.ivy2/jars/com.google.code.findbugs_jsr305-3.0.1.jar,file:///home/cdsw/.ivy2/jars/com.google.code.gson_gson-2.3.jar,file:///home/cdsw/.ivy2/jars/dk.brics.automaton_automaton-1.11-8.jar,file:///home/cdsw/.ivy2/jars/joda-time_joda-time-2.9.5.jar,file:///home/cdsw/.ivy2/jars/org.joda_joda-convert-1.8.1.jar'),
 ('spark.kryoserializer.buffer.max', '2000M'),
 ('spark.lineage.log.dir', '/var/log/spark/lineage'),
 ('spark.blacklist.timeout', '1h'),
 ('spark.serializer', 'org.apache.spark.serializer.KryoSerializer'),
 ('spark.cleaner.referenceTracking.cleanCheckpoints', 'true'),
 ('spark.streaming.backpressure.enabled', 'true'),
 ('spark.driver.maxResultSize', '0'),
 ('spark.submit.pyFiles',
  '/home/cdsw/.ivy2/jars/com.johnsnowlabs.nlp_spark-nlp-spark24_2.11-3.4.2.jar,/home/cdsw/.ivy2/jars/com.typesafe_config-1.4.1.jar,/home/cdsw/.ivy2/jars/org.rocksdb_rocksdbjni-6.5.3.jar,/home/cdsw/.ivy2/jars/com.amazonaws_aws-java-sdk-bundle-1.11.603.jar,/home/cdsw/.ivy2/jars/com.github.universal-automata_liblevenshtein-3.0.0.jar,/home/cdsw/.ivy2/jars/com.navigamez_greex-1.0.jar,/home/cdsw/.ivy2/jars/org.json4s_json4s-ext_2.11-3.5.3.jar,/home/cdsw/.ivy2/jars/com.johnsnowlabs.nlp_tensorflow-cpu_2.11-0.3.3.jar,/home/cdsw/.ivy2/jars/net.sf.trove4j_trove4j-3.0.3.jar,/home/cdsw/.ivy2/jars/com.google.code.findbugs_annotations-3.0.1.jar,/home/cdsw/.ivy2/jars/com.google.protobuf_protobuf-java-util-3.0.0-beta-3.jar,/home/cdsw/.ivy2/jars/com.google.protobuf_protobuf-java-3.0.0-beta-3.jar,/home/cdsw/.ivy2/jars/it.unimi.dsi_fastutil-7.0.12.jar,/home/cdsw/.ivy2/jars/org.projectlombok_lombok-1.16.8.jar,/home/cdsw/.ivy2/jars/org.slf4j_slf4j-api-1.7.21.jar,/home/cdsw/.ivy2/jars/net.jcip_jcip-annotations-1.0.jar,/home/cdsw/.ivy2/jars/com.google.code.findbugs_jsr305-3.0.1.jar,/home/cdsw/.ivy2/jars/com.google.code.gson_gson-2.3.jar,/home/cdsw/.ivy2/jars/dk.brics.automaton_automaton-1.11-8.jar,/home/cdsw/.ivy2/jars/joda-time_joda-time-2.9.5.jar,/home/cdsw/.ivy2/jars/org.joda_joda-convert-1.8.1.jar'),
 ('spark.master', 'local[*]'),
 ('spark.python.worker.memory', '6g'),
 ('spark.driver.extraLibraryPath',
  '/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p0.6751098/lib/hadoop/lib/native'),
 ('spark.network.crypto.enabled', 'false'),
 ('spark.executor.extraLibraryPath',
  '/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p0.6751098/lib/hadoop/lib/native'),
 ('spark.executorEnv.MKL_NUM_THREADS', '1'),
 ('spark.ui.enabled', 'true'),
 ('spark.executor.id', 'driver'),
 ('spark.dynamicAllocation.schedulerBacklogTimeout', '1'),
 ('spark.rpc.message.maxSize', '1024'),
 ('spark.yarn.config.gatewayPath', '/opt/cloudera/parcels'),
 ('spark.extraListeners', 'com.cloudera.spark.lineage.NavigatorAppListener'),
 ('spark.yarn.jars',
  'local:/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p0.6751098/lib/spark/jars/*,local:/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p0.6751098/lib/spark/hive/*'),
 ('spark.blacklist.enabled', 'true'),
 ('spark.yarn.executor.memoryOverhead', '6g'),
 ('spark.blacklist.task.maxTaskAttemptsPerExecutor', '1'),
 ('spark.sql.warehouse.dir', '/user/hive/warehouse'),
 ('spark.streaming.backpressure.initialRate', '500'),
 ('spark.jars',
  'file:///home/cdsw/.ivy2/jars/com.johnsnowlabs.nlp_spark-nlp-spark24_2.11-3.4.2.jar,file:///home/cdsw/.ivy2/jars/com.typesafe_config-1.4.1.jar,file:///home/cdsw/.ivy2/jars/org.rocksdb_rocksdbjni-6.5.3.jar,file:///home/cdsw/.ivy2/jars/com.amazonaws_aws-java-sdk-bundle-1.11.603.jar,file:///home/cdsw/.ivy2/jars/com.github.universal-automata_liblevenshtein-3.0.0.jar,file:///home/cdsw/.ivy2/jars/com.navigamez_greex-1.0.jar,file:///home/cdsw/.ivy2/jars/org.json4s_json4s-ext_2.11-3.5.3.jar,file:///home/cdsw/.ivy2/jars/com.johnsnowlabs.nlp_tensorflow-cpu_2.11-0.3.3.jar,file:///home/cdsw/.ivy2/jars/net.sf.trove4j_trove4j-3.0.3.jar,file:///home/cdsw/.ivy2/jars/com.google.code.findbugs_annotations-3.0.1.jar,file:///home/cdsw/.ivy2/jars/com.google.protobuf_protobuf-java-util-3.0.0-beta-3.jar,file:///home/cdsw/.ivy2/jars/com.google.protobuf_protobuf-java-3.0.0-beta-3.jar,file:///home/cdsw/.ivy2/jars/it.unimi.dsi_fastutil-7.0.12.jar,file:///home/cdsw/.ivy2/jars/org.projectlombok_lombok-1.16.8.jar,file:///home/cdsw/.ivy2/jars/org.slf4j_slf4j-api-1.7.21.jar,file:///home/cdsw/.ivy2/jars/net.jcip_jcip-annotations-1.0.jar,file:///home/cdsw/.ivy2/jars/com.google.code.findbugs_jsr305-3.0.1.jar,file:///home/cdsw/.ivy2/jars/com.google.code.gson_gson-2.3.jar,file:///home/cdsw/.ivy2/jars/dk.brics.automaton_automaton-1.11-8.jar,file:///home/cdsw/.ivy2/jars/joda-time_joda-time-2.9.5.jar,file:///home/cdsw/.ivy2/jars/org.joda_joda-convert-1.8.1.jar'),
 ('spark.streaming.stopGracefullyOnShutdown', 'true'),
 ('spark.driver.log.persistToDfs.enabled', 'true'),
 ('spark.yarn.config.replacementPath', '{{HADOOP_COMMON_HOME}}/../../..'),
 ('spark.executorEnv.OPENBLAS_NUM_THREADS', '1'),
 ('spark.blacklist.task.maxTaskAttemptsPerNode', '2'),
 ('spark.kerberos.renewal.credentials', 'ccache'),
 ('spark.ui.killEnabled', 'true'),
 ('spark.dynamicAllocation.executorIdleTimeout', '60'),
 ('spark.blacklist.killBlacklistedExecutors', 'true'),
 ('spark.io.encryption.enabled', 'false'),
 ('spark.authenticate', 'false'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.rpc.askTimeout', '450s'),
 ('spark.submit.deployMode', 'client'),
 ('spark.streaming.receiver.maxRate', '500'),
 ('spark.yarn.driver.memoryOverhead', '6g'),
 ('spark.shuffle.service.enabled', 'true'),
 ('spark.yarn.historyServer.allowTracking', 'true'),
 ('spark.app.name', 'Spark NLP'),
 ('spark.hadoop.hadoop.treat.subject.external', 'true'),
 ('spark.yarn.appMasterEnv.OPENBLAS_NUM_THREADS', '1'),
 ('spark.blacklist.stage.maxFailedTasksPerExecutor', '2'),
 ('spark.yarn.rmProxy.enabled', 'false'),
 ('spark.lineage.enabled', 'true'),
 ('spark.executor.memory', '6g'),
 ('spark.blacklist.application.maxFailedExecutorsPerNode', '2'),
 ('spark.rdd.compress', 'True'),
 ('spark.jars.packages', 'com.johnsnowlabs.nlp:spark-nlp-spark24_2.11:3.4.2'),
 ('spark.dynamicAllocation.minExecutors', '0'),
 ('spark.dynamicAllocation.enabled', 'true'),
 ('spark.blacklist.application.maxFailedTasksPerExecutor', '2'),
 ('spark.repl.local.jars',
  'file:///home/cdsw/.ivy2/jars/com.johnsnowlabs.nlp_spark-nlp-spark24_2.11-3.4.2.jar,file:///home/cdsw/.ivy2/jars/com.typesafe_config-1.4.1.jar,file:///home/cdsw/.ivy2/jars/org.rocksdb_rocksdbjni-6.5.3.jar,file:///home/cdsw/.ivy2/jars/com.amazonaws_aws-java-sdk-bundle-1.11.603.jar,file:///home/cdsw/.ivy2/jars/com.github.universal-automata_liblevenshtein-3.0.0.jar,file:///home/cdsw/.ivy2/jars/com.navigamez_greex-1.0.jar,file:///home/cdsw/.ivy2/jars/org.json4s_json4s-ext_2.11-3.5.3.jar,file:///home/cdsw/.ivy2/jars/com.johnsnowlabs.nlp_tensorflow-cpu_2.11-0.3.3.jar,file:///home/cdsw/.ivy2/jars/net.sf.trove4j_trove4j-3.0.3.jar,file:///home/cdsw/.ivy2/jars/com.google.code.findbugs_annotations-3.0.1.jar,file:///home/cdsw/.ivy2/jars/com.google.protobuf_protobuf-java-util-3.0.0-beta-3.jar,file:///home/cdsw/.ivy2/jars/com.google.protobuf_protobuf-java-3.0.0-beta-3.jar,file:///home/cdsw/.ivy2/jars/it.unimi.dsi_fastutil-7.0.12.jar,file:///home/cdsw/.ivy2/jars/org.projectlombok_lombok-1.16.8.jar,file:///home/cdsw/.ivy2/jars/org.slf4j_slf4j-api-1.7.21.jar,file:///home/cdsw/.ivy2/jars/net.jcip_jcip-annotations-1.0.jar,file:///home/cdsw/.ivy2/jars/com.google.code.findbugs_jsr305-3.0.1.jar,file:///home/cdsw/.ivy2/jars/com.google.code.gson_gson-2.3.jar,file:///home/cdsw/.ivy2/jars/dk.brics.automaton_automaton-1.11-8.jar,file:///home/cdsw/.ivy2/jars/joda-time_joda-time-2.9.5.jar,file:///home/cdsw/.ivy2/jars/org.joda_joda-convert-1.8.1.jar'),
 ('spark.executor.cores', '1'),
 ('spark.blacklist.stage.maxFailedExecutorsPerNode', '2'),
 ('spark.yarn.am.extraLibraryPath',
  '/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p0.6751098/lib/hadoop/lib/native'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.driver.log.dfsDir', '/user/spark/driverLogs')]

Storage tab blank page

Stages tab

image

Thanks for the additional tips and resources, will look through them in the meantime!

Update The above code after .sample().show() runs fine as per Spark UI stages. Within the same session/settings, I've continued to fit the CountVectoriser but the session exits with 137 and Spark UI closes so I'm unable to retrieve any further info.

Code:

#convert array(string) to string for filtering english reviews
email_cases_df = processed_review.withColumn("finished_language", F.concat_ws(",",F.col("finished_language")))\
.select("description_clean_nostopwords","finished_ngrams")
english_case_df = email_cases_df.filter(F.col("finished_language")=="en") 

from pyspark.ml.feature import CountVectorizer
tfizer = CountVectorizer(inputCol='finished_ngrams',
                         outputCol='tf_features',
                    vocabSize=100000, minDF=3)
tf_model = tfizer.fit(english_case_df)

Error:

image
maziyarpanahi commented 2 years ago

Hi @geowynn

Thanks for the update. The .limit() really kills the distribution. Regarding Spark ML functions, I highly suggest separating them into 2 different stages/checkpoints via save/read in Parquet.

This way you can see how they are both behaving, easier to monitor, and when something crashes you have your previous stage/checkpoint which can be easily read back.

github-actions[bot] commented 2 years ago

This issue is stale because it has been open 120 days with no activity. Remove stale label or comment or this will be closed in 5 days