JohnSnowLabs / spark-nlp

State of the Art Natural Language Processing
https://sparknlp.org/
Apache License 2.0
3.81k stars 708 forks source link

java.lang.NullPointerException in DocumentAssembler #1229

Closed SankethBK closed 3 years ago

SankethBK commented 3 years ago

I am using news dataset from kaggle. I am using a spark nlp pipeline to preprocess the data.

Link to the notebook

i got the expected output in above notebook because i am passing only first 1000 rows.

df = spark.read.option('header',True).csv('archive/articles1.csv')

# after removing some columns 

from pyspark.ml import Pipeline

document_assembler = DocumentAssembler() \
                        .setInputCol('text') \
                        .setOutputCol('document') \
                        .setCleanupMode('shrink') 

sentenceDetector = SentenceDetector() \
                        .setInputCols(['document']) \
                        .setOutputCol('sentences')

tokenizer = Tokenizer() \
                .setInputCols(['sentences']) \
                .setOutputCol('token') \
                .setContextChars(['(', ')', '?', '!','"']) 

normalizer = Normalizer() \
                .setInputCols(['token']) \
                .setOutputCol('normal')

stop_words = StopWordsCleaner() \
        .setInputCols(["token"]) \
        .setOutputCol("cleanTokens")

lemmatizer = LemmatizerModel.pretrained(name="lemma_antbnc", lang="en") \
                .setInputCols(['cleanTokens']) \
                .setOutputCol('lemma') \

finisher = Finisher() \
    .setInputCols(["lemma"]) \
    .setIncludeMetadata(False)

preprocess_pipeline = Pipeline(stages = [document_assembler, sentenceDetector, tokenizer,normalizer, stop_words, lemmatizer,finisher])

preprocess_model = preprocess_pipeline.fit(df2)

transformed_df = preprocess_model.transform(df2)
from pyspark.ml.feature import CountVectorizer , CountVectorizerModel

cv = CountVectorizer(inputCol='finished_lemma', outputCol='features', vocabSize=5000, minDF=3.0)
cv_model = cv.fit(transformed_df.limit(2500))
vectorized_tokens = cv_model.transform(transformed_df.limit(2500))

Here finished_lemma is a column which has list of tokens in each row.

+--------------------+ | finished_lemma| +--------------------+ |[House, Republica...| |[Rift, Officers, ...| |[Tyrus, Wong,, ‘B...| |[Among, Deaths, 2...| |[Kim, Jong-un, Sa...| |[Sick, Cold,, Que...| |[Taiwan’s, Presid...| |[‘The, Biggest, L...| |[First,, Mixtape....| |[Calling, Angels,...| |[Weak, Federal, P...| |[Carbon, Capture,...| |[Mar-a-Lago,, Fut...| |[form, healthy, h...| |[Turning, Vacatio...| |[Second, Avenue, ...| |[Dylann, Roof, Re...| |[Modi’s, Cash, Ba...| |[Suicide, Bombing...| |[Fecal, Pollution...| +--------------------+ only showing top 20 rows

At this point i am getting the error

Caused by: java.lang.NullPointerException
    at com.johnsnowlabs.nlp.DocumentAssembler.assemble(DocumentAssembler.scala:87)
    at com.johnsnowlabs.nlp.DocumentAssembler$$anonfun$dfAssembleNoExtras$1.apply(DocumentAssembler.scala:130)
    at com.johnsnowlabs.nlp.DocumentAssembler$$anonfun$dfAssembleNoExtras$1.apply(DocumentAssembler.scala:129)
    ... 14 more

full error below description.

Description

First the above code is executed on articles1.csv (link provided above)
I figured out that if i pass only first 1000 rows the code works fine, but if pass entire dataframe or > 2500 rows i am getting the error.

Afterwards i executed same code on articles2.csv abd i got the error even if i pass only 1 row.

So I can confirm that it is not related to memory

full error message

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-18-01fe78184d4e> in <module>()
      1 # train the model
      2 
----> 3 cv_model = cv.fit(transformed_df.limit(5000))
      4 
      5 # transform the data. Output column name will be features.

~/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
    130                 return self.copy(params)._fit(dataset)
    131             else:
--> 132                 return self._fit(dataset)
    133         else:
    134             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

~/spark/python/pyspark/ml/wrapper.py in _fit(self, dataset)
    293 
    294     def _fit(self, dataset):
--> 295         java_model = self._fit_java(dataset)
    296         model = self._create_model(java_model)
    297         return self._copyValues(model)

~/spark/python/pyspark/ml/wrapper.py in _fit_java(self, dataset)
    290         """
    291         self._transfer_params_to_java()
--> 292         return self._java_obj.fit(dataset._jdf)
    293 
    294     def _fit(self, dataset):

~/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

~/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

~/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o538.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 71.0 failed 1 times, most recent failure: Lost task 7.0 in stage 71.0 (TID 109, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$dfAssembleNoExtras$1: (string) => array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
    at com.johnsnowlabs.nlp.DocumentAssembler.assemble(DocumentAssembler.scala:87)
    at com.johnsnowlabs.nlp.DocumentAssembler$$anonfun$dfAssembleNoExtras$1.apply(DocumentAssembler.scala:130)
    at com.johnsnowlabs.nlp.DocumentAssembler$$anonfun$dfAssembleNoExtras$1.apply(DocumentAssembler.scala:129)
    ... 14 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
    at org.apache.spark.rdd.RDD.count(RDD.scala:1168)
    at org.apache.spark.ml.feature.CountVectorizer.fit(CountVectorizer.scala:230)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$dfAssembleNoExtras$1: (string) => array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
Caused by: java.lang.NullPointerException
    at com.johnsnowlabs.nlp.DocumentAssembler.assemble(DocumentAssembler.scala:87)
    at com.johnsnowlabs.nlp.DocumentAssembler$$anonfun$dfAssembleNoExtras$1.apply(DocumentAssembler.scala:130)
    at com.johnsnowlabs.nlp.DocumentAssembler$$anonfun$dfAssembleNoExtras$1.apply(DocumentAssembler.scala:129)
    ... 14 more

Expected Behavior

I am expecting a countvectorizer to be created, it is able to create countvectorizer siccessfully if i pass only first 1000 rows in articles1.csv.

possible solutions

I have looked at some issues in this repository having same error messages while using different functions, most of them had null values in their dataframe . As i told if i ran the same code on articles2.csv it is giving error even if i pass a single row. I manually checked if the list was empty, i verified that list of tokens was not empty.

I can confirm that there is no "None" value in the list of tokens.

I know that something in my list of tokens is getting converted to null value, i am not sure what it is.

Steps to Reproduce

I know this error is reproduced if we intentionally use "None" as one of the values in list of tokens, but this not same in my case as i confirmed that i don't have any "None" values in my list of tokens.

Context

I was trying to analyze topics present in large bunch of documents

I tried multiple versions of spraknlp including 2.6.4

Your Environment

I can provide any further details.

maziyarpanahi commented 3 years ago

Thanks for a complete and detailed issue. The DocumentAssembler is the first and the beginning of the Pipeline, so the only input is the pure text coming from the DataFrame. Anything happens after that in a sentence or the tokens won't have any effect on DocumentAssembler.

That's being said, we've had an issue similar to this and had a fix in the latest release. I can see the confusion that this might be also related to None or null values. However, there is something in the text that doesn't fit well with this line inside DocumentAssembler:

case "shrink" => text.trim.replaceAll("\\s+", " ")

You used this strategy as a cleanup. I am going to run your notebook to see what exactly in that dataset can interfere with this regex and cause that error. In the meantime, you can try other strategies (even disabling it by using disabled) to see how it goes.

PS: In any case, those strategies shouldn't crash with exception and they should just silently skip the row if they cannot do what they asked to do. Thanks for reporting this.

SankethBK commented 3 years ago

@maziyarpanahi Thank you for early reply. I removed the line .setCleanupMode('shrink') and it worked. Now the code is executing for any number of rows. But in turn it is causing a small problem.

The .setContextChars in tokenizer is not able to remove paranthesis and question marks it could be because of not shrinking space between them. I initially thought because they might be getting tokeinzed as ' (' , ' )' but later i found some cases where they are tokenized as '(' , ')' but still not removed.

Anyways my main problem is resolved and thank you very much for that.

wolliq commented 3 years ago

Hello @SankethBK,

thank you for your work reporting this issue. We have been working on a PR to avoid NPE propagation when null text is processed by DocumentAssembler in the assemble method. In fact the dataset you are using from Kaggle is containing some corrupted rows. In the assemble method the DocumentAssembler is expecting at least empty rows. When trying to apply a mode other than "disabled" to the null text string, the processing is resulting in a NPE.

Spark SQL APIs provides us with three modes in DataFrame Reader API to avoid corrupted record processing. In this scenario the

.option("mode", "DROPMALFORMED")

should do the work.

More in details:

Please also note that in spark 2.3 and prior version, CSV rows are considered as malformed if at least one column in the row is malformed. For example if there are 3 columns and there is no proper value for all the columns in case of null if there is absence of proper delimiter then the record would be considered as malformed/error/bad. Since Spark 2.4, CSV row is considered as malformed only when it contains malformed column values requested from CSV datasource, other values can be ignored. This is due to the CSV parser column pruning which is set to true by default. It might be necessary to set the column pruning to false to get a different behavior.

The important changes are introduced in Spark 2.4.0 in the UnivocityParser.scala class and it's linked to https://issues.apache.org/jira/browse/SPARK-25387 .