databricks / spark-corenlp

Stanford CoreNLP wrapper for Apache Spark
GNU General Public License v3.0
422 stars 120 forks source link

NoSuchElementExeception when run large dataset #19

Open dong929311 opened 7 years ago

dong929311 commented 7 years ago

When I limit dataset size 100, it works well, but when dataset is large, it crashes program, here are some exeception,could you please give me some suggest... java.util.NoSuchElementException at java.util.ArrayList$Itr.next(ArrayList.java:854) at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43) at scala.collection.IterableLike$class.head(IterableLike.scala:107) at scala.collection.AbstractIterable.head(Iterable.scala:54) at com.databricks.spark.corenlp.functions$$anonfun$sentiment$1.apply(functions.scala:163) at com.databricks.spark.corenlp.functions$$anonfun$sentiment$1.apply(functions.scala:158) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:253) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325) at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:258) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 16/11/14 14:32:39 ERROR DefaultWriterContainer: Task attempt attempt_201611141429_0000_m_000001_0 aborted. 16/11/14 14:32:39 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1) org.apache.spark.SparkException: Task failed while writing rows at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:261) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

dong929311 commented 7 years ago

Code as follows:

def main(args: Array[String]) {
    val session = SparkSession.builder
      .appName("sentiment_analysis")
      .master("local[*]")
      .config("spark-driver-memory", 5)
      .enableHiveSupport()
      .getOrCreate()
    sentimentAnalysis(session)
  }
  def sentimentAnalysis(session: SparkSession) = {
    import session.implicits._
    val sentimentDf = session.table("tweet").select('tweet_id, 'tweetee_id, 'content,
      sentiment('content).as('sentiment))
    sentimentDf.createOrReplaceTempView("sentiment_tmp")
    session.sql("drop table if exists tweet_sentiment")
    session.sql("create table tweet_sentiment(tweet_id long, tweetee_id long, content string, sentiment long) stored as parquet")
    session.sql("insert into tweet_sentiment select * from sentiment_tmp")
  }