Closed fulcrunn closed 1 year ago
Hi,
This is a popular error and it means Spark NLP maven/JAR is missing from the SparkSession. It is true that you have included the Spark NLP package inside your SparkSession which is required, however, you ar elater using spark-submit
which also requires to have maven/jar included.
Could you please try this:
./bin/spark-submit --master local[4] tcc.py --packages com.johnsnowlabs.nlp:spark-nlp_2.12:5.0.0`
You need to include any Apache Spark configs you may need in your spark-submit
command since this is the one that actually creates
the SparkSession first, then the one inside will just gets
it. (getOrCreate())
@maziyarpanahi Hi! Thanks for the answer. I tested your guidance, but I still have the same error, exactly the same.
What did I do:
Partial resolution and new issue. I copied the 'spark-nlp_2.12-5.0.0.jar' (maven's repository) file into the /spark/jars directory. The issue with not executing the DocumentAssembler()
has been resolved. However, now the program does not execute the "tokenizer" function.
I did the test running the program with python (.../$ python3 file.py). If the 'spark-nlp_2.12-5.0.0.jar' file is in the /spark/jars folder the program does not work, an error appears when running the "tokenizer". If I remove the 'spark-nlp_2.12-5.0.0.jar' from the "jars" folder, it works.
When I run the ./bin/spark-submit --master local[4] tcc.py --packages com.johnsnowlabs.nlp:spark-nlp_2.12:5.0.0
command in the terminal, if the 'spark-nlp_2.12-5.0.0.jar' file is in the 'jars' folder it goes through the DocumentAssembler() function but stops at the 'tokenizer' function. If I remove the 'spark-nlp_2.12-5.0.0.jar' file from the 'jars' folder, it stops working when get to the function DocumentAssembler().
Any idea?
Hi,
'spark-nlp_2.12-5.0.0.jar
is not a complete JAR. You need to use an actual Fat/UBER Jar that has everything. We host them on our S3 and they are listed at the end of each release note:
https://github.com/JohnSnowLabs/spark-nlp/releases/tag/5.0.0 https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/jars/spark-nlp-assembly-5.0.0.jar
@maziyarpanahi Work!!! Thank you very much!!!
Another question. Looking at my code is it possible to find out why it is not being processed in parallel? There are 'Applications: 0 Running, 0 Completed' at "localhost:8080".
When I test it with an example of calculating Pi, it works!
' import os import sys import sparknlp from sparknlp.base import from sparknlp.common import from sparknlp.annotator import * from pyspark.ml import Pipeline from pyspark.sql import SparkSession from sparknlp.annotator import (Tokenizer,WordEmbeddingsModel,Word2VecModel) from pyspark.ml.feature import CountVectorizer, HashingTF, IDF, OneHotEncoder, StringIndexer, VectorAssembler, SQLTransformer from pyspark.ml.evaluation import MulticlassClassificationEvaluator import pandas as pd
spark = SparkSession.builder.master("local[4]") .appName('NLP') .getOrCreate() print("AQUI_1") print("Spark NLP version: ", sparknlp.version()) print("Apache Spark version: ", spark.version)
spark
base = spark.read .option("header", True) .csv("base3.csv")
base.show(truncate=False); print (type(base));
documentAssembler = DocumentAssembler().setInputCol("quadri").setOutputCol("document") doc_df=documentAssembler.transform(base) doc_df.show()
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token") doc_df2=tokenizer.fit(doc_df).transform(doc_df) type(doc_df2) doc_df2.select("token.result").show(truncate=False)
finisher = Finisher() .setInputCols(["token"]) .setOutputCols(["token_features"]) .setOutputAsArray(True) .setCleanAnnotations(False)
countVectors = CountVectorizer(inputCol="token_features", outputCol="features", vocabSize=10000, minDF=5)
from pyspark.ml.feature import CountVectorizer, HashingTF, IDF, OneHotEncoder, StringIndexer, VectorAssembler, SQLTransformer label_stringIdx = StringIndexer(inputCol = "exp", outputCol = "label") nlp_pipeline = Pipeline( stages=[documentAssembler, tokenizer,finisher,countVectors, label_stringIdx])
nlp_model = nlp_pipeline.fit(base)
processed = nlp_model.transform(base) processed.count()
type(label_stringIdx)
processed.select('*').show(truncate=50) processed.select('quadri','token').show(truncate=50)
processed.select('token_features').show(truncate=False)
(trainingData, testData) = processed.randomSplit([0.7, 0.3], seed = 100) print("Training Dataset Count: " + str(trainingData.count())) print("Test Dataset Count: " + str(testData.count()))
trainingData.show()
processed.select('label','exp').show()
trainingData.printSchema()
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)
lrn_summary = lrModel.summary lrn_summary.predictions.show()
predictions = lrModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) .select("quadri","exp","probability","label","prediction") .orderBy("probability", ascending=False) .show(n = 10, truncate = 30)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
print(evaluator.evaluate(predictions))
'
You are welcome. The csv
format is not breakable, you need to have your input as Parquet and repartition it to the number of your cores in order to have parallel tasks. This is more Apache Spark than Spark NLP (since we are native Spark), but in case tiy are interested: https://www.johnsnowlabs.com/watch-webinar-speed-optimization-benchmarks-in-spark-nlp-3-making-the-most-of-modern-hardware/
Spark NLP 5.0 also comes with state-of-the-art Text Embeddings in case you need vectorizing your text and use it as a feature for other NLP tasks
In case it is of interest to someone in the future, here are the steps I used:
OS: UBUNTU 23.04 VM: VirtualBox 7.0 Language main: Python 3.8.17
1. Install Java 8:
sudo apt install openjdk-8-jdk
2. Install Python:
sudo apt install python3
3. Install Spark:
Download: https://dlcdn.apache.org/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
tar xvf spark-3.4.1-bin-hadoop3.tgz
sudo mv spark-3.4.1-bin-hadoop3/ /opt/spark
sudo vi ~/.bashrc
Insert this lines:
export SPARK_HOME=/opt/spark export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
Reload bashrc:
source ~/.bashrc
Configure nodes master/worker:
cd /opt/spark/conf
sudo vi spark-env.sh
Insert:
SPARK_MASTER_HOST=<IP from main machine (master node)>
Start node:
.../opt/spark/sbin$ ./start-master.sh
In your browser: localhost:8080
Take "spark://
.../opt/spark/sbin$ ./start-worker.sh spark://
4. Install pyspark:
See spark version, in terminal type:
spark-shell
Take spark version. In my case Spark version 3.2.4
ATTENTION: Install pyspark, the SAME version of Spark (avoids compatibility issues):
pip install pyspark==3.2.4
Copy the sparknlp [FAT JAR]https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/jars/spark-nlp-assembly-5.0.0.jar into the opt/spark/jars folder
6. Finally:
.../opt/spark$ ./bin/spark-submit --master local['spark://ip_master:7077'] ./your_code.py
In your internet browser access: localhost:8088 You should see the works nodes (links you can access) connected to the master node.
My goal was to use parallel processing for a text classification task using pyspark. I took the wrong path and I couldn't, unfortunately! However, I hope that my mistakes are steps to your success
@maziyarpanahi You helped a lot, but if you can, one more guide. My database is formed by 1500 quadrigrams distributed in 5 categories. I would like to rank. Can you indicate some simple example of how to do this classification in parallel, using Logistic Regression, MultiClassEvaluation, etc.? Something I can show the performance of each worker node? Something really parallel. What I find is what I've already done... which is not the parallelization I would like.
Splitting my cvs file into several parquet files and leaving them in the Spark folder of my nodes, would that be a solution?
Thank you!
Hi @fulcrunn
By multi-class classification, are you referring to training
or are you referring to prediction
/inference
? Cause the trainings are usually happen in a single machine over multiple cores, however, the prediction always scale to multiple cores and multiple machines (executors).
I share some of our notebooks of how we do multi-class/multi-label classification (both training and prediction) in Spark NLP:
Please this topic can be closed! Thank you so much @maziyarpanahi for all your help.
Purpose
Text classification using pyspark + spark-nlp
Description
OS: Ubuntu Ubuntu 22.04.2 LTS
Spark NLP version: 4.3.2
Apache Spark and Pyspark version: 3.2.4
Using Scala version 2.12.15, OpenJDK 64-Bit Server VM, 1.8.0_362
Java: Openjdk version "1.8.0_362"
OpenJDK Runtime Environment (build 1.8.0_362-8u372-ga~us1-0ubuntu1~22.04-b09)
OpenJDK 64-Bit Server VM (build 25.362-b09, mixed mode)
Pyspark mode: Standalone
#I'm running a python file directly in the terminal. I'm not using hadoop or notebooks.
start command on terminal: `./bin/spark-submit --master local[4] tcc.py`
Problem
It manages to read the database but hangs when reaching the "DocumentAssembler()" function
![Img_1](https://github.com/JohnSnowLabs/spark-nlp/assets/19765898/9c776362-2b19-4c44-8c5a-eabe9aca0720)
Transcription:
23/07/12 18:02:42 INFO BlockManagerInfo: Removed broadcast_4_piece0 on 10.0.2.15:33285 in memory (size: 5.5 KiB, free: 413.8 MiB)
Traceback (most recent call last):
File "/opt/spark/tcc.py", line 46, in
documentAssembler = DocumentAssembler().setInputCol("quadri").setOutputCol("document")
File "/opt/spark/python/lib/pyspark.zip/pyspark/__init__.py", line 114, in wrapper
File "/home/andre/.local/lib/python3.10/site-packages/sparknlp/base/document_assembler.py", line 96, in __init__
super(DocumentAssembler, self).__init__(classname="com.johnsnowlabs.nlp.DocumentAssembler")
File "/opt/spark/python/lib/pyspark.zip/pyspark/__init__.py", line 114, in wrapper
File "/home/andre/.local/lib/python3.10/site-packages/sparknlp/internal/annotator_transformer.py", line 36, in __init__
self._java_obj = self._new_java_obj(classname, self.uid)
File "/opt/spark/python/lib/pyspark.zip/pyspark/ml/wrapper.py", line 66, in _new_java_obj
TypeError: 'JavaPackage' object is not callable
23/07/12 18:02:43 INFO SparkContext: Invoking stop() from shutdown hook
23/07/12 18:02:43 INFO SparkUI: Stopped Spark web UI at http://10.0.2.15:4040
23/07/12 18:02:43 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
23/07/12 18:02:43 INFO MemoryStore: MemoryStore cleared
23/07/12 18:02:43 INFO BlockManager: BlockManager stopped
23/07/12 18:02:43 INFO BlockManagerMaster: BlockManagerMaster stopped
23/07/12 18:02:43 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
23/07/12 18:02:43 INFO SparkContext: Successfully stopped SparkContext
23/07/12 18:02:43 INFO ShutdownHookManager: Shutdown hook called
23/07/12 18:02:43 INFO ShutdownHookManager: Deleting directory /tmp/spark-0b13f68a-db50-41f4-9c5d-ea6be9d22e5b/pyspark-80768c78-2040-44b6-9e82-f540d81bffce
23/07/12 18:02:43 INFO ShutdownHookManager: Deleting directory /tmp/spark-8226b001-b9d3-4f40-9c34-f370eb8f04e4
23/07/12 18:02:43 INFO ShutdownHookManager: Deleting directory /tmp/spark-0b13f68a-db50-41f4-9c5d-ea6be9d22e5b
Code snippet
Transcription:
! Sparknlp Start spark =SparkSession.builder .master("local[4]") .config("spark.driver.memory", "16G") .config("spark.driver.maxResultSize", "0") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.kryoserializer.buffer.max", "2000m") .config("spark.jsl.settings.pretrained.cache_folder", "sample_data/pretrained") .config("spark.jsl.settings.storage.cluster_tmp_dir", "sample_data/storage") .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:5.0.0") .getOrCreate() print("Spark NLP version: ", sparknlp.version()) print("Apache Spark version: ", spark.version) spark ! Take database base = spark.read \ .option("header", True) \ .csv("base3.csv") ! Start TRANSFORMER documentAssembler = DocumentAssembler().setInputCol("quadri").setOutputCol("document") doc_df=documentAssembler.transform(base) doc_df.show()
Questions:
When I run on Google Colab, it works perfectly
Thank you!