JohnSnowLabs / spark-nlp

State of the Art Natural Language Processing
https://sparknlp.org/
Apache License 2.0
3.8k stars 707 forks source link

Pyspark Consumer analyze_sentiment Abort Task #895

Closed kmillanrfractal closed 4 years ago

kmillanrfractal commented 4 years ago

import os,sys,time
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession, SQLContext
from sparknlp.base import *
from pyspark.sql.types import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline
import sparknlp
import json
from pyspark.sql.functions import regexp_replace,col
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.functions import to_json, from_json, struct, udf

# Initiate spark context
spark = sparknlp.start()

pipeline = PretrainedPipeline('analyze_sentiment', lang='en')

# Define Schema
schema = StructType([ StructField("text", StringType(), True)])

# read stream from kafka producer
kafkaDf = spark \
        .readStream.format("kafka") \
        .option("kafka.bootstrap.servers","localhost:9092") \
        .option("subscribe", "tweetsTopic") \
        .option('kafkaConsumer.pollTimeoutMs',5000) \
        .load()

# transform stream for readability
df = kafkaDf.selectExpr("CAST(value AS STRING)") \
    .select(from_json('value', schema).alias('data')) \
    .select('data.text') <--------------------------------------------CODE WORKS PERFECTLY UP TO THIS POINT

# apply sentiment analysis to text stream
df = pipeline.transform(df)  <------------------------------------ERROR OCCURS HERE
df = df.select('sentiment.result',"sentiment.metadata") \
    .withColumn('result',F.concat_ws(',','result')) \
    .withColumn("result", regexp_replace('result', "positive",'1')) \
    .withColumn("result", regexp_replace('result', "na",'0')) \
    .withColumn("result", regexp_replace('result', "negative",'-1')) \
    .select(F.split('result', ',').alias('sents'), 'metadata')
df.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", False) \
    .start() \
    .awaitTermination()`

Now, the error I get is as follows:

20/05/07 11:15:24 INFO TaskSchedulerImpl: Adding task set 19.0 with 1 tasks
20/05/07 11:15:24 INFO TaskSetManager: Starting task 0.0 in stage 19.0 (TID 73, localhost, executor driver, partition 0, PROCESS_LOCAL, 8726 bytes)
20/05/07 11:15:24 INFO Executor: Running task 0.0 in stage 19.0 (TID 73)
20/05/07 11:15:24 INFO CodeGenerator: Code generated in 213.820001 ms
20/05/07 11:15:25 INFO ContextCleaner: Cleaned accumulator 483
20/05/07 11:15:27 ERROR Utils: Aborting task <---------------------------------Here is the ERROR I think
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$dfAssembleNoExtras$1: (string) => array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.CreateArray_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.ScalaUDF_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.subExpr_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:118)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
    at com.johnsnowlabs.nlp.DocumentAssembler.assemble(DocumentAssembler.scala:98)
    at com.johnsnowlabs.nlp.DocumentAssembler$$anonfun$dfAssembleNoExtras$1.apply(DocumentAssembler.scala:124)
    at com.johnsnowlabs.nlp.DocumentAssembler$$anonfun$dfAssembleNoExtras$1.apply(DocumentAssembler.scala:123)
    ... 20 more

Any ideas on what might be the issue? If you need additional info on the error or code please don't hesitate to ask. I'm innitializing the spark script with the following command:

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5,com.johnsnowlabs.nlp:spark-nlp_2.11:2.4.5 --class com.inndata.StructuredStreaming.Kafka --master local[*] /home/kevin/kafka-pyspark-streaming.py

maziyarpanahi commented 4 years ago

Thanks for the detailed report. I assume you already did the pip install spark-nlp with the same version. (just wanted to double-check)

Could you please do a .show() and .printSchema() on your df?

df = kafkaDf.selectExpr("CAST(value AS STRING)")
.select(from_json('value', schema).alias('data'))
.select('data.text')

I want to see if you have a text column and it is actually a String and noting else.

kmillanrfractal commented 4 years ago

Hey Maziyar,

thank you for you quick response. Yeah I installed spark-nlp through pip. Sure thing, attached you will be able to view the required information. One image shows how the output looks on the console, in the other picture I saved some batches locally and used jupyter notebook to view the contents of the stream to obtain the .show() and .printSchema() output.

thank you once again for your help.

all the best,

Kevin

On Thu, May 7, 2020 at 1:57 PM Maziyar Panahi notifications@github.com wrote:

Thanks for the detailed report. I assume you already did the pip install spark-nlp with the same version. (just wanted to double-check)

Could you please do a .show() and .printSchema() on your df?

df = kafkaDf.selectExpr("CAST(value AS STRING)") .select(from_json('value', schema).alias('data')) .select('data.text')

I want to see if you have a text column and it is actually a String and noting else.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/JohnSnowLabs/spark-nlp/issues/895#issuecomment-625407535, or unsubscribe https://github.com/notifications/unsubscribe-auth/AO72KADNH3OU7UCK3QNGQDLRQLZAPANCNFSM4M3OLZKA .

kmillanrfractal commented 4 years ago

One more thing I forgot to mention, the pipeline does actually work, but only a couple of times, after transforming 2 or 3 batches, sometimes even only 1. Then it exits with that error. Sorry for not mentioning that before.

All the best,

Kevin

On Thu, May 7, 2020 at 8:04 PM Kevin Millan kegamiro@gmail.com wrote:

Hey Maziyar,

thank you for you quick response. Yeah I installed spark-nlp through pip. Sure thing, attached you will be able to view the required information. One image shows how the output looks on the console, in the other picture I saved some batches locally and used jupyter notebook to view the contents of the stream to obtain the .show() and .printSchema() output.

thank you once again for your help.

all the best,

Kevin

On Thu, May 7, 2020 at 1:57 PM Maziyar Panahi notifications@github.com wrote:

Thanks for the detailed report. I assume you already did the pip install spark-nlp with the same version. (just wanted to double-check)

Could you please do a .show() and .printSchema() on your df?

df = kafkaDf.selectExpr("CAST(value AS STRING)") .select(from_json('value', schema).alias('data')) .select('data.text')

I want to see if you have a text column and it is actually a String and noting else.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/JohnSnowLabs/spark-nlp/issues/895#issuecomment-625407535, or unsubscribe https://github.com/notifications/unsubscribe-auth/AO72KADNH3OU7UCK3QNGQDLRQLZAPANCNFSM4M3OLZKA .

maziyarpanahi commented 4 years ago

I think you forgot to attach the screenshot(s).

kmillanrfractal commented 4 years ago

Screenshot from 2020-05-07 20-00-48 Screenshot from 2020-05-07 20-01-28

kmillanrfractal commented 4 years ago

Sorry about that,

I have attached to pictures both in this email and on the github thread as well. Thank you.

all the best,

Kevin

[image: Screenshot from 2020-05-07 20-00-48.png] [image: Screenshot from 2020-05-07 20-01-28.png]

On Sat, May 9, 2020 at 9:49 AM Maziyar Panahi notifications@github.com wrote:

I think you forgot to attach the screenshot(s).

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/JohnSnowLabs/spark-nlp/issues/895#issuecomment-626179011, or unsubscribe https://github.com/notifications/unsubscribe-auth/AO72KADLZPED24LOCQOBACDRQVNM5ANCNFSM4M3OLZKA .

maziyarpanahi commented 4 years ago

Thank you. So these are Tweets. Possible to have empty strings or become empty after tokenizing due to just having emojis etc. For some reason, that specific pipeline doesn't like empty strings. You need to do a filter to be sure the text is not empty in your initial DataFrame or even do a size control that if the Tweet is larger than 10-20 characters to pass it through:

Caused by: java.lang.NullPointerException
df = kafkaDf.selectExpr("CAST(value AS STRING)")
.select(from_json('value', schema).alias('data'))
.filter("data.text != ''")
.select('data.text')

or

import pyspark.sql.functions as F

df = kafkaDf.selectExpr("CAST(value AS STRING)")
.select(from_json('value', schema).alias('data'))
.withColumn('text_length', F.length(data.text))
.filter("text_length > 20")
.select('data.text')

PS: This may not happen in our new SentimentDL and the model/pipeline for it since it uses DL Transformers Universal Sentence Encoder in 2.5.0.

kmillanrfractal commented 4 years ago

Thank you very much for your help, will give the solution a try and get back to you as soon as possible.

All the best,

Kevin

On Sat, May 9, 2020 at 1:16 PM Maziyar Panahi notifications@github.com wrote:

Thank you. So these are Tweets. Possible to have empty strings or become empty after tokenizing due to just having emojis etc. For some reason, that specific pipeline doesn't like empty strings. You need to do a filter to be sure the text is not empty in your initial DataFrame or even do a size control that if the Tweet is larger than 10-20 characters to pass it through:

Caused by: java.lang.NullPointerException

df = kafkaDf.selectExpr("CAST(value AS STRING)") .select(from_json('value', schema).alias('data')) .filter("data.text != ''") .select('data.text')

or

import pyspark.sql.functions as F

df = kafkaDf.selectExpr("CAST(value AS STRING)") .select(from_json('value', schema).alias('data')) .withColumn('text_length', F.length(data.text)) .filter("text_length > 20") .select('data.text')

PS: This may not happen in our new SentimentDL and the model/pipeline for it since it uses DL Transformers Universal Sentence Encoder in 2.5.0.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/JohnSnowLabs/spark-nlp/issues/895#issuecomment-626207925, or unsubscribe https://github.com/notifications/unsubscribe-auth/AO72KADBUVEXLIWC4EFBCZLRQWFVHANCNFSM4M3OLZKA .

kmillanrfractal commented 4 years ago

Worked like a charm, thank you.

all the best,

Kevin

On Sat, May 9, 2020 at 4:29 PM Kevin Millan kegamiro@gmail.com wrote:

Thank you very much for your help, will give the solution a try and get back to you as soon as possible.

All the best,

Kevin

On Sat, May 9, 2020 at 1:16 PM Maziyar Panahi notifications@github.com wrote:

Thank you. So these are Tweets. Possible to have empty strings or become empty after tokenizing due to just having emojis etc. For some reason, that specific pipeline doesn't like empty strings. You need to do a filter to be sure the text is not empty in your initial DataFrame or even do a size control that if the Tweet is larger than 10-20 characters to pass it through:

Caused by: java.lang.NullPointerException

df = kafkaDf.selectExpr("CAST(value AS STRING)") .select(from_json('value', schema).alias('data')) .filter("data.text != ''") .select('data.text')

or

import pyspark.sql.functions as F

df = kafkaDf.selectExpr("CAST(value AS STRING)") .select(from_json('value', schema).alias('data')) .withColumn('text_length', F.length(data.text)) .filter("text_length > 20") .select('data.text')

PS: This may not happen in our new SentimentDL and the model/pipeline for it since it uses DL Transformers Universal Sentence Encoder in 2.5.0.

— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/JohnSnowLabs/spark-nlp/issues/895#issuecomment-626207925, or unsubscribe https://github.com/notifications/unsubscribe-auth/AO72KADBUVEXLIWC4EFBCZLRQWFVHANCNFSM4M3OLZKA .