dibbhatt / kafka-spark-consumer

High Performance Kafka Connector for Spark Streaming.Supports Multi Topic Fetch, Kafka Security. Reliable offset management in Zookeeper. No Data-loss. No dependency on HDFS and WAL. In-built PID rate controller. Support Message Handler . Offset Lag checker.
Apache License 2.0
635 stars 318 forks source link

Offset is still updated when exception occurs during processing #64

Closed doddys closed 5 years ago

doddys commented 5 years ago

Hi,

I have a problem when trying to simulate error during rdd processing. I do it by throwing and exception like the following (simplified)

val tmp_stream = ReceiverLauncher.launch(ssc, props, numberOfReceivers, StorageLevel.fromString(storageLevelReceiver))
val partitionOffset_stream = ProcessedOffsetManager.getPartitionOffset(tmp_stream, props)
tmp_stream.foreachRDD(rdd => {
    Thread.sleep(5000)
    throw new Exception("error")
})
ProcessedOffsetManager.persists(partitionOffset_stream, props)

I run the spark job in yarn cluster mode.

The problem is that the rdd stage failed but then it still execute offset persist. Then, the driver is restarted. This result a message loss since the message is skipped.

screenshot

Versions: Scala 2.11 Spark 2.3.0 Kafka 0.10.2 Kafka Spark Consumer 1.0.17

Any idea why this is happening?

doddys commented 5 years ago

some logs for your reference

2019-08-02 09:51:03,196 INFO spark-listener-group-appStatus consumer.kafka.ReceiverStreamListener Modified Rate by Controller  : 16393 
2019-08-02 09:51:03,196 INFO spark-listener-group-appStatus consumer.kafka.ZkState Starting curator service 
2019-08-02 09:51:06,067 INFO streaming-job-executor-0 com.expecc.bca.ids.IDSProcessorCommon Total of records in this batch : 1 
2019-08-02 09:51:10,331 ERROR spark-listener-group-appStatus consumer.kafka.ReceiverStreamListener Output Operation failed due to org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 68.0 failed 4 times, most recent failure: Lost task 0.3 in stage 68.0 (TID 107, dbigdata003, executor 7): java.lang.ArrayIndexOutOfBoundsException: 94
    at com.expecc.bca.ids.IDSProcessor$$anonfun$populateIDSCSV$1.apply(IDSProcessor.scala:535)
    at com.expecc.bca.ids.IDSProcessor$$anonfun$populateIDSCSV$1.apply(IDSProcessor.scala:528)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at com.expecc.bca.ids.IDSProcessor.populateIDSCSV(IDSProcessor.scala:528)
    at com.expecc.bca.ids.IDSProcessor$$anonfun$processData$1$$anonfun$apply$2$$anonfun$apply$1.apply$mcVI$sp(IDSProcessor.scala:161)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at com.expecc.bca.ids.IDSProcessor$$anonfun$processData$1$$anonfun$apply$2.apply(IDSProcessor.scala:101)
    at com.expecc.bca.ids.IDSProcessor$$anonfun$processData$1$$anonfun$apply$2.apply(IDSProcessor.scala:80)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
    at com.expecc.bca.ids.IDSProcessor$$anonfun$processData$1.apply(IDSProcessor.scala:80)
    at com.expecc.bca.ids.IDSProcessor$$anonfun$processData$1.apply(IDSProcessor.scala:35)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:935)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:933)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:933)
    at com.expecc.bca.ids.IDSProcessor.processData(IDSProcessor.scala:35)
    at com.expecc.bca.ids.IDSKafkaConsumer$$anonfun$main$4.apply(IDSKafkaConsumer.scala:101)
    at com.expecc.bca.ids.IDSKafkaConsumer$$anonfun$main$4.apply(IDSKafkaConsumer.scala:101)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 94
    at com.expecc.bca.ids.IDSProcessor$$anonfun$populateIDSCSV$1.apply(IDSProcessor.scala:535)
    at com.expecc.bca.ids.IDSProcessor$$anonfun$populateIDSCSV$1.apply(IDSProcessor.scala:528)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at com.expecc.bca.ids.IDSProcessor.populateIDSCSV(IDSProcessor.scala:528)
    at com.expecc.bca.ids.IDSProcessor$$anonfun$processData$1$$anonfun$apply$2$$anonfun$apply$1.apply$mcVI$sp(IDSProcessor.scala:161)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at com.expecc.bca.ids.IDSProcessor$$anonfun$processData$1$$anonfun$apply$2.apply(IDSProcessor.scala:101)
    at com.expecc.bca.ids.IDSProcessor$$anonfun$processData$1$$anonfun$apply$2.apply(IDSProcessor.scala:80)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
    at com.expecc.bca.ids.IDSProcessor$$anonfun$processData$1.apply(IDSProcessor.scala:80)
    at com.expecc.bca.ids.IDSProcessor$$anonfun$processData$1.apply(IDSProcessor.scala:35)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:935)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381)
    ... 3 more

2019-08-02 09:51:10,355 ERROR spark-listener-group-appStatus consumer.kafka.ReceiverStreamListener Receiver stopped with error Stopped by driver 
2019-08-02 09:51:10,438 INFO streaming-job-executor-0 consumer.kafka.ZkState Starting curator service 
2019-08-02 09:51:10,447 INFO streaming-job-executor-0 consumer.kafka.ProcessedOffsetManager Wrote processed offset 176 for Parittion 0 
2019-08-02 09:51:10,449 INFO spark-listener-group-appStatus consumer.kafka.ZkState Starting curator service 
2019-08-02 09:51:10,452 INFO streaming-job-executor-0 com.expecc.bca.ids.IDSProcessorCommon Total of records in this batch : 0 
2019-08-02 09:51:10,454 INFO spark-listener-group-appStatus consumer.kafka.ReceiverStreamListener Current Fetch Rate for topic ids_test is 500 
2019-08-02 09:51:10,455 INFO spark-listener-group-appStatus consumer.kafka.ReceiverStreamListener Current Rate in ZooKeeper  : 500 
2019-08-02 09:51:10,455 INFO spark-listener-group-appStatus consumer.kafka.PIDController ======== Rate Revision Starts ======== 
2019-08-02 09:51:10,455 INFO spark-listener-group-appStatus consumer.kafka.PIDController Current Fetch Size    : 500 
2019-08-02 09:51:10,455 INFO spark-listener-group-appStatus consumer.kafka.PIDController Fill Freq             : 500 
2019-08-02 09:51:10,455 INFO spark-listener-group-appStatus consumer.kafka.PIDController Batch Duration        : 3000 
2019-08-02 09:51:10,455 INFO spark-listener-group-appStatus consumer.kafka.PIDController Scheduling Delay      : 3 
2019-08-02 09:51:10,455 INFO spark-listener-group-appStatus consumer.kafka.PIDController Processing Delay      : 4436 
2019-08-02 09:51:10,455 INFO spark-listener-group-appStatus consumer.kafka.PIDController Fixed Rate            : 1000 
2019-08-02 09:51:10,455 INFO spark-listener-group-appStatus consumer.kafka.PIDController Processing rate       : 676 
2019-08-02 09:51:10,455 INFO spark-listener-group-appStatus consumer.kafka.PIDController Proportional Error    : 323 
2019-08-02 09:51:10,455 INFO spark-listener-group-appStatus consumer.kafka.PIDController HistoricalError       : 0 
2019-08-02 09:51:10,455 INFO spark-listener-group-appStatus consumer.kafka.PIDController DifferentialError     : 2 
2019-08-02 09:51:10,455 INFO spark-listener-group-appStatus consumer.kafka.PIDController Reviced Rate          : 676 
2019-08-02 09:51:10,455 INFO spark-listener-group-appStatus consumer.kafka.PIDController Reviced FetchSize     : 676 
2019-08-02 09:51:10,455 INFO spark-listener-group-appStatus consumer.kafka.PIDController ======== Rate Revision Ends ======== 
2019-08-02 09:51:10,455 INFO spark-listener-group-appStatus consumer.kafka.ReceiverStreamListener Modified Rate by Controller  : 676 
dibbhatt commented 5 years ago

Can you try like this and let me know if you still see the same issue

val tmp_stream = ReceiverLauncher.launch(ssc, props, numberOfReceivers, StorageLevel.fromString(storageLevelReceiver))
val partitionOffset_stream = ProcessedOffsetManager.getPartitionOffset(tmp_stream, props)
tmp_stream.foreachRDD(rdd => {
    Thread.sleep(5000)
    throw new Exception("error")
})
ProcessedOffsetManager.persists(partitionOffset_stream, props)

try {
  jsc.start();
  jsc.awaitTermination();
}catch  {
  case e : Exception {
    jsc.ssc().sc().cancelAllJobs();
    jsc.stop(true, false);
    System.exit(-1);
  }
}
dibbhatt commented 5 years ago

Hi @doddys any update if you still see same issue if you try above snippet

doddys commented 5 years ago

Hi,

Sorry for the late reply, I just got around and test the recommendation. I can not exacly use the recommendation because I do not have jsc. I only have references to SparkContext and StreamingContext (I am using scala btw)

Below is the updated code.

  val conf = new SparkConf()
  val ssc = new StreamingContext(sc, Seconds(batchInterval))
 ...   
    try{
      ssc.start()
      val checkIntervalMillis = 10000
      var isStopped = false
      while (!isStopped) {
        isStopped = ssc.awaitTerminationOrTimeout(checkIntervalMillis)
        val fs = FileSystem.get(sc.hadoopConfiguration)
        val isShutdownRequested = fs.exists(new Path(s"hdfs:///tmp/${appName}.shutdown"))
        if (!isStopped && isShutdownRequested) {
          logger.error("Shutting Down due to shutdown marker file")
          ssc.stop(true, true)
        }
      }
    } catch {
      case e: Exception => {
        logger.error("Stopping ALL JOBS due to Exception", e)
        sc.cancelAllJobs()
        ssc.stop(stopSparkContext = true, stopGracefully = false)
        logger.error("Stopped ALL JOBS due to Exception", e)
        System.exit(-1)
        throw e // to exit with error condition
      }
    }

Unfortunately the behaviour is still the same. the offset is still being updated.

2019-08-05 13:26:37,428 ERROR spark-listener-group-appStatus consumer.kafka.ReceiverStreamListener Output Operation failed due to org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 74.0 failed 4 times, most recent failure: Lost task 0.3 in stage 74.0 (TID 112, dbigdata003, executor 5): java.lang.ArrayIndexOutOfBoundsException: 94
2019-08-05 13:26:37,429 ERROR Driver IDSKafkaConsumer$ Stopping ALL JOBS due to Exception 
2019-08-05 13:26:37,505 INFO streaming-job-executor-0 consumer.kafka.ZkState Starting curator service 
2019-08-05 13:26:37,512 INFO streaming-job-executor-0 consumer.kafka.ProcessedOffsetManager Wrote processed offset 191 for Parittion 0 
2019-08-05 13:26:37,514 INFO spark-listener-group-appStatus consumer.kafka.ZkState Starting curator service 
2019-08-05 13:26:37,568 ERROR Driver Stopped ALL JOBS due to Exception 

I run the job in YARN. is it possible this is related to SPARK YARN env?

dibbhatt commented 5 years ago

Try doing ssc.start and ssc.awaittermination way I mentioned. No need to do a busy wait in while loop . I do not think it is anything to do with Spark on Yarn.

doddys commented 5 years ago

I change the code to the following but the result is still the same

    try{
      ssc.start()
      ssc.awaitTermination()
    } catch {
      case e: Exception => {
        logger.error("Stopping ALL JOBS due to Exception", e)
        sc.cancelAllJobs()
        ssc.stop(stopSparkContext = true, stopGracefully = false)
        logger.error("Stopped ALL JOBS due to Exception", e)
        System.exit(-1)
        throw e // to exit with error condition
      }
    }
dibbhatt commented 5 years ago

Are you sure offset is really committed to ZK ? It may Log it, but as context is closed, the commit should fail. Can you please check the zk path ( you can do that using zkCli.sh), and go to /consumers//topic/partitions path and see if this hold the same offset value as it is logged .

I will take a close look at this, a fix is possible if I pass the spark context to the ProcessedOffsetManager and offset commit only happen if same is not closed.

doddys commented 5 years ago

Yes it is committed to ZK. I tested this by sending 1 message which will cause an exception. Then since the job is configured to auto restart during failed execution, the job was restarted and continue/skip the previous message. To confirm, I checked the offset lag, it was reduced(latest).

below are the debug level log from the driver after the exception happen. you can see near the end It updated the zookeeper offset successfully

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 94.0 failed 4 times, most recent failure: Lost task 0.3 in stage 94.0 (TID 122, dbigdata003, executor 6): java.lang.ArrayIndexOutOfBoundsException: 94
    at com.expecc.bca.ids.IDSProcessor$$anonfun$populateIDSCSV$1.apply(IDSProcessor.scala:535)
    at com.expecc.bca.ids.IDSProcessor$$anonfun$populateIDSCSV$1.apply(IDSProcessor.scala:528)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at com.expecc.bca.ids.IDSProcessor.populateIDSCSV(IDSProcessor.scala:528)
    at com.expecc.bca.ids.IDSProcessor$$anonfun$processData$1$$anonfun$apply$2$$anonfun$apply$1.apply$mcVI$sp(IDSProcessor.scala:161)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at com.expecc.bca.ids.IDSProcessor$$anonfun$processData$1$$anonfun$apply$2.apply(IDSProcessor.scala:101)
    at com.expecc.bca.ids.IDSProcessor$$anonfun$processData$1$$anonfun$apply$2.apply(IDSProcessor.scala:80)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
    at com.expecc.bca.ids.IDSProcessor$$anonfun$processData$1.apply(IDSProcessor.scala:80)
    at com.expecc.bca.ids.IDSProcessor$$anonfun$processData$1.apply(IDSProcessor.scala:35)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:935)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:933)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:933)
    at com.expecc.bca.ids.IDSProcessor.processData(IDSProcessor.scala:35)
    at com.expecc.bca.ids.IDSKafkaConsumer$$anonfun$main$4.apply(IDSKafkaConsumer.scala:97)
    at com.expecc.bca.ids.IDSKafkaConsumer$$anonfun$main$4.apply(IDSKafkaConsumer.scala:97)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 94
    at com.expecc.bca.ids.IDSProcessor$$anonfun$populateIDSCSV$1.apply(IDSProcessor.scala:535)
    at com.expecc.bca.ids.IDSProcessor$$anonfun$populateIDSCSV$1.apply(IDSProcessor.scala:528)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at com.expecc.bca.ids.IDSProcessor.populateIDSCSV(IDSProcessor.scala:528)
    at com.expecc.bca.ids.IDSProcessor$$anonfun$processData$1$$anonfun$apply$2$$anonfun$apply$1.apply$mcVI$sp(IDSProcessor.scala:161)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at com.expecc.bca.ids.IDSProcessor$$anonfun$processData$1$$anonfun$apply$2.apply(IDSProcessor.scala:101)
    at com.expecc.bca.ids.IDSProcessor$$anonfun$processData$1$$anonfun$apply$2.apply(IDSProcessor.scala:80)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
    at com.expecc.bca.ids.IDSProcessor$$anonfun$processData$1.apply(IDSProcessor.scala:80)
    at com.expecc.bca.ids.IDSProcessor$$anonfun$processData$1.apply(IDSProcessor.scala:35)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:935)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381)
    ... 3 more
2019-08-05 13:48:49,186 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + fields accessed by starting closure: 4 
2019-08-05 13:48:49,187 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      (class java.lang.Object,Set()) 
2019-08-05 13:48:49,187 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      (class scala.runtime.AbstractFunction0,Set()) 
2019-08-05 13:48:49,187 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      (class org.apache.spark.rdd.RDD$$anonfun$collect$1,Set($outer)) 
2019-08-05 13:48:49,187 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      (class org.apache.spark.rdd.RDD,Set(org$apache$spark$rdd$RDD$$evidence$1)) 
2019-08-05 13:48:49,187 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + outermost object is not a closure or REPL line object, so do not clone it: (class org.apache.spark.rdd.RDD,MapPartitionsRDD[183] at groupByKey at ProcessedOffsetManager.java:45) 
2019-08-05 13:48:49,187 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + cloning the object <function0> of class org.apache.spark.rdd.RDD$$anonfun$collect$1 
2019-08-05 13:48:49,187 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + cleaning cloned closure <function0> recursively (org.apache.spark.rdd.RDD$$anonfun$collect$1) 
2019-08-05 13:48:49,187 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner +++ Cleaning closure <function0> (org.apache.spark.rdd.RDD$$anonfun$collect$1) +++ 
2019-08-05 13:48:49,188 DEBUG Driver org.apache.spark.streaming.scheduler.JobScheduler Stopping JobScheduler 
2019-08-05 13:48:49,188 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + declared fields: 2 
2019-08-05 13:48:49,188 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      public static final long org.apache.spark.rdd.RDD$$anonfun$collect$1.serialVersionUID 
2019-08-05 13:48:49,188 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      private final org.apache.spark.rdd.RDD org.apache.spark.rdd.RDD$$anonfun$collect$1.$outer 
2019-08-05 13:48:49,188 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + declared methods: 2 
2019-08-05 13:48:49,188 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      public org.apache.spark.rdd.RDD org.apache.spark.rdd.RDD$$anonfun$collect$1.org$apache$spark$rdd$RDD$$anonfun$$$outer() 
2019-08-05 13:48:49,188 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      public final java.lang.Object org.apache.spark.rdd.RDD$$anonfun$collect$1.apply() 
2019-08-05 13:48:49,188 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + inner classes: 1 
2019-08-05 13:48:49,188 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12 
2019-08-05 13:48:49,188 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + outer classes: 1 
2019-08-05 13:48:49,188 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      org.apache.spark.rdd.RDD 
2019-08-05 13:48:49,188 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + outer objects: 1 
2019-08-05 13:48:49,188 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      MapPartitionsRDD[183] at groupByKey at ProcessedOffsetManager.java:45 
2019-08-05 13:48:49,189 INFO dag-scheduler-event-loop org.apache.spark.scheduler.cluster.YarnClusterScheduler Cancelling stage 2 
2019-08-05 13:48:49,189 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + fields accessed by starting closure: 4 
2019-08-05 13:48:49,189 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      (class java.lang.Object,Set()) 
2019-08-05 13:48:49,189 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      (class scala.runtime.AbstractFunction0,Set()) 
2019-08-05 13:48:49,189 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      (class org.apache.spark.rdd.RDD$$anonfun$collect$1,Set($outer)) 
2019-08-05 13:48:49,189 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      (class org.apache.spark.rdd.RDD,Set(org$apache$spark$rdd$RDD$$evidence$1)) 
2019-08-05 13:48:49,189 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + outermost object is not a closure or REPL line object, so do not clone it: (class org.apache.spark.rdd.RDD,MapPartitionsRDD[183] at groupByKey at ProcessedOffsetManager.java:45) 
2019-08-05 13:48:49,189 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  +++ closure <function0> (org.apache.spark.rdd.RDD$$anonfun$collect$1) is now cleaned +++ 
2019-08-05 13:48:49,189 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  +++ closure <function1> (org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12) is now cleaned +++ 
2019-08-05 13:48:49,190 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner +++ Cleaning closure <function2> (org.apache.spark.SparkContext$$anonfun$runJob$5) +++ 
2019-08-05 13:48:49,190 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + declared fields: 2 
2019-08-05 13:48:49,190 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      public static final long org.apache.spark.SparkContext$$anonfun$runJob$5.serialVersionUID 
2019-08-05 13:48:49,190 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      private final scala.Function1 org.apache.spark.SparkContext$$anonfun$runJob$5.cleanedFunc$1 
2019-08-05 13:48:49,190 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + declared methods: 2 
2019-08-05 13:48:49,190 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      public final java.lang.Object org.apache.spark.SparkContext$$anonfun$runJob$5.apply(java.lang.Object,java.lang.Object) 
2019-08-05 13:48:49,190 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      public final java.lang.Object org.apache.spark.SparkContext$$anonfun$runJob$5.apply(org.apache.spark.TaskContext,scala.collection.Iterator) 
2019-08-05 13:48:49,190 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + inner classes: 0 
2019-08-05 13:48:49,190 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + outer classes: 0 
2019-08-05 13:48:49,190 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + outer objects: 0 
2019-08-05 13:48:49,190 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + populating accessed fields because this is the starting closure 
2019-08-05 13:48:49,191 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + fields accessed by starting closure: 0 
2019-08-05 13:48:49,191 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + there are no enclosing objects! 
2019-08-05 13:48:49,191 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  +++ closure <function2> (org.apache.spark.SparkContext$$anonfun$runJob$5) is now cleaned +++ 
2019-08-05 13:48:49,191 INFO streaming-job-executor-0 org.apache.spark.SparkContext Starting job: collect at ProcessedOffsetManager.java:79 
2019-08-05 13:48:49,193 INFO dag-scheduler-event-loop org.apache.spark.scheduler.cluster.YarnClusterScheduler Stage 2 was cancelled 
2019-08-05 13:48:49,193 INFO dag-scheduler-event-loop org.apache.spark.scheduler.DAGScheduler ResultStage 2 (start at IDSKafkaConsumer.scala:107) failed in 138.331 s due to Job 1 cancelled as part of cancellation of all jobs 
2019-08-05 13:48:49,193 DEBUG dag-scheduler-event-loop org.apache.spark.scheduler.DAGScheduler After removal of stage 2, remaining stages = 0 
2019-08-05 13:48:49,194 INFO dispatcher-event-loop-44 org.apache.spark.streaming.scheduler.ReceiverTracker Sent stop signal to all 1 receivers 
2019-08-05 13:48:49,196 INFO Driver org.apache.spark.streaming.scheduler.ReceiverTracker All of the receivers have deregistered successfully 
2019-08-05 13:48:49,196 WARN dag-scheduler-event-loop org.apache.spark.streaming.scheduler.ReceiverTracker Receiver 0 exited but didn't deregister 
2019-08-05 13:48:49,196 DEBUG dag-scheduler-event-loop org.apache.spark.shuffle.sort.SortShuffleManager Can't use serialized shuffle for shuffle 45 because an aggregator is defined 
2019-08-05 13:48:49,196 INFO dag-scheduler-event-loop org.apache.spark.scheduler.DAGScheduler Registering RDD 181 (mapPartitionsToPair at ProcessedOffsetManager.java:44) 
2019-08-05 13:48:49,196 INFO dag-scheduler-event-loop org.apache.spark.scheduler.DAGScheduler Got job 136 (collect at ProcessedOffsetManager.java:79) with 1 output partitions 
2019-08-05 13:48:49,196 INFO dag-scheduler-event-loop org.apache.spark.scheduler.DAGScheduler Final stage: ResultStage 96 (collect at ProcessedOffsetManager.java:79) 
2019-08-05 13:48:49,196 INFO dag-scheduler-event-loop org.apache.spark.scheduler.DAGScheduler Parents of final stage: List(ShuffleMapStage 95) 
2019-08-05 13:48:49,196 INFO dag-scheduler-event-loop org.apache.spark.scheduler.DAGScheduler Missing parents: List(ShuffleMapStage 95) 
2019-08-05 13:48:49,197 DEBUG dag-scheduler-event-loop org.apache.spark.scheduler.DAGScheduler submitStage(ResultStage 96) 
2019-08-05 13:48:49,197 INFO Driver org.apache.spark.streaming.scheduler.ReceiverTracker ReceiverTracker stopped 
2019-08-05 13:48:49,197 DEBUG dag-scheduler-event-loop org.apache.spark.scheduler.DAGScheduler missing: List(ShuffleMapStage 95) 
2019-08-05 13:48:49,197 DEBUG dag-scheduler-event-loop org.apache.spark.scheduler.DAGScheduler submitStage(ShuffleMapStage 95) 
2019-08-05 13:48:49,197 DEBUG dag-scheduler-event-loop org.apache.spark.scheduler.DAGScheduler missing: List() 
2019-08-05 13:48:49,197 INFO dag-scheduler-event-loop org.apache.spark.scheduler.DAGScheduler Submitting ShuffleMapStage 95 (MapPartitionsRDD[181] at mapPartitionsToPair at ProcessedOffsetManager.java:44), which has no missing parents 
2019-08-05 13:48:49,197 DEBUG dag-scheduler-event-loop org.apache.spark.scheduler.DAGScheduler submitMissingTasks(ShuffleMapStage 95) 
2019-08-05 13:48:49,198 INFO Driver org.apache.spark.streaming.scheduler.JobGenerator Stopping JobGenerator immediately 
2019-08-05 13:48:49,199 INFO dag-scheduler-event-loop org.apache.spark.storage.memory.MemoryStore Block broadcast_52 stored as values in memory (estimated size 3.6 KB, free 2004.3 MB) 
2019-08-05 13:48:49,199 INFO Driver org.apache.spark.streaming.util.RecurringTimer Stopped timer for JobGenerator after time 1564987728000 
2019-08-05 13:48:49,199 DEBUG dag-scheduler-event-loop org.apache.spark.storage.BlockManager Put block broadcast_52 locally took  1 ms 
2019-08-05 13:48:49,199 DEBUG dag-scheduler-event-loop org.apache.spark.storage.BlockManager Putting block broadcast_52 without replication took  1 ms 
2019-08-05 13:48:49,200 INFO Driver org.apache.spark.streaming.scheduler.JobGenerator Stopped JobGenerator 
2019-08-05 13:48:49,201 DEBUG Driver org.apache.spark.streaming.scheduler.JobScheduler Stopping job executor 
2019-08-05 13:48:49,203 INFO dag-scheduler-event-loop org.apache.spark.storage.memory.MemoryStore Block broadcast_52_piece0 stored as bytes in memory (estimated size 2.1 KB, free 2004.3 MB) 
2019-08-05 13:48:49,204 INFO dispatcher-event-loop-46 org.apache.spark.storage.BlockManagerInfo Added broadcast_52_piece0 in memory on dbigdata004:39874 (size: 2.1 KB, free: 2004.5 MB) 
2019-08-05 13:48:49,204 DEBUG dag-scheduler-event-loop org.apache.spark.storage.BlockManagerMaster Updated info of block broadcast_52_piece0 
2019-08-05 13:48:49,204 DEBUG dag-scheduler-event-loop org.apache.spark.storage.BlockManager Told master about block broadcast_52_piece0 
2019-08-05 13:48:49,204 DEBUG dag-scheduler-event-loop org.apache.spark.storage.BlockManager Put block broadcast_52_piece0 locally took  1 ms 
2019-08-05 13:48:49,204 DEBUG dag-scheduler-event-loop org.apache.spark.storage.BlockManager Putting block broadcast_52_piece0 without replication took  1 ms 
2019-08-05 13:48:49,204 INFO dag-scheduler-event-loop org.apache.spark.SparkContext Created broadcast 52 from broadcast at DAGScheduler.scala:1039 
2019-08-05 13:48:49,204 INFO dag-scheduler-event-loop org.apache.spark.scheduler.DAGScheduler Submitting 1 missing tasks from ShuffleMapStage 95 (MapPartitionsRDD[181] at mapPartitionsToPair at ProcessedOffsetManager.java:44) (first 15 tasks are for partitions Vector(0)) 
2019-08-05 13:48:49,204 INFO dag-scheduler-event-loop org.apache.spark.scheduler.cluster.YarnClusterScheduler Adding task set 95.0 with 1 tasks 
2019-08-05 13:48:49,205 DEBUG dag-scheduler-event-loop org.apache.spark.scheduler.TaskSetManager Epoch for TaskSet 95.0: 2 
2019-08-05 13:48:49,205 DEBUG dag-scheduler-event-loop org.apache.spark.scheduler.TaskSetManager Valid locality levels for TaskSet 95.0: NODE_LOCAL, RACK_LOCAL, ANY 
2019-08-05 13:48:49,205 DEBUG dispatcher-event-loop-25 org.apache.spark.scheduler.cluster.YarnClusterScheduler parentName: , name: TaskSet_2.0, runningTasks: 1 
2019-08-05 13:48:49,205 DEBUG dispatcher-event-loop-25 org.apache.spark.scheduler.cluster.YarnClusterScheduler parentName: , name: TaskSet_95.0, runningTasks: 0 
2019-08-05 13:48:49,206 INFO dispatcher-event-loop-25 org.apache.spark.scheduler.TaskSetManager Starting task 0.0 in stage 95.0 (TID 123, dbigdata003, executor 4, partition 0, NODE_LOCAL, 7751 bytes) 
2019-08-05 13:48:49,206 DEBUG dispatcher-event-loop-25 org.apache.spark.scheduler.TaskSetManager No tasks for locality level NODE_LOCAL, so moving to locality level RACK_LOCAL 
2019-08-05 13:48:49,206 DEBUG dispatcher-event-loop-25 org.apache.spark.scheduler.TaskSetManager No tasks for locality level RACK_LOCAL, so moving to locality level ANY 
2019-08-05 13:48:49,206 DEBUG dispatcher-event-loop-25 org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint Launching task 123 on executor id: 4 hostname: dbigdata003. 
2019-08-05 13:48:49,215 DEBUG shuffle-server-5-5 org.apache.spark.storage.BlockManager Getting local block broadcast_52_piece0 as bytes 
2019-08-05 13:48:49,215 DEBUG shuffle-server-5-5 org.apache.spark.storage.BlockManager Level for block broadcast_52_piece0 is StorageLevel(disk, memory, 1 replicas) 
2019-08-05 13:48:49,217 INFO dispatcher-event-loop-36 org.apache.spark.storage.BlockManagerInfo Added broadcast_52_piece0 in memory on dbigdata003:47908 (size: 2.1 KB, free: 912.3 MB) 
2019-08-05 13:48:49,342 DEBUG dispatcher-event-loop-48 org.apache.spark.scheduler.cluster.YarnClusterScheduler parentName: , name: TaskSet_2.0, runningTasks: 1 
2019-08-05 13:48:49,342 DEBUG dispatcher-event-loop-48 org.apache.spark.scheduler.cluster.YarnClusterScheduler parentName: , name: TaskSet_95.0, runningTasks: 0 
2019-08-05 13:48:49,342 INFO task-result-getter-3 org.apache.spark.scheduler.TaskSetManager Finished task 0.0 in stage 95.0 (TID 123) in 137 ms on dbigdata003 (executor 4) (1/1) 
2019-08-05 13:48:49,342 INFO task-result-getter-3 org.apache.spark.scheduler.cluster.YarnClusterScheduler Removed TaskSet 95.0, whose tasks have all completed, from pool  
2019-08-05 13:48:49,342 DEBUG dag-scheduler-event-loop org.apache.spark.scheduler.DAGScheduler ShuffleMapTask finished on 4 
2019-08-05 13:48:49,342 INFO dag-scheduler-event-loop org.apache.spark.scheduler.DAGScheduler ShuffleMapStage 95 (mapPartitionsToPair at ProcessedOffsetManager.java:44) finished in 0.145 s 
2019-08-05 13:48:49,342 INFO dag-scheduler-event-loop org.apache.spark.scheduler.DAGScheduler looking for newly runnable stages 
2019-08-05 13:48:49,342 INFO dag-scheduler-event-loop org.apache.spark.scheduler.DAGScheduler running: Set() 
2019-08-05 13:48:49,342 INFO dag-scheduler-event-loop org.apache.spark.scheduler.DAGScheduler waiting: Set(ResultStage 96) 
2019-08-05 13:48:49,342 INFO dag-scheduler-event-loop org.apache.spark.scheduler.DAGScheduler failed: Set() 
2019-08-05 13:48:49,342 DEBUG dag-scheduler-event-loop org.apache.spark.MapOutputTrackerMaster Increasing epoch to 3 
2019-08-05 13:48:49,343 DEBUG dag-scheduler-event-loop org.apache.spark.scheduler.DAGScheduler submitStage(ResultStage 96) 
2019-08-05 13:48:49,343 DEBUG dag-scheduler-event-loop org.apache.spark.scheduler.DAGScheduler missing: List() 
2019-08-05 13:48:49,343 INFO dag-scheduler-event-loop org.apache.spark.scheduler.DAGScheduler Submitting ResultStage 96 (MapPartitionsRDD[183] at groupByKey at ProcessedOffsetManager.java:45), which has no missing parents 
2019-08-05 13:48:49,343 DEBUG dag-scheduler-event-loop org.apache.spark.scheduler.DAGScheduler submitMissingTasks(ResultStage 96) 
2019-08-05 13:48:49,343 INFO dag-scheduler-event-loop org.apache.spark.storage.memory.MemoryStore Block broadcast_53 stored as values in memory (estimated size 4.1 KB, free 2004.3 MB) 
2019-08-05 13:48:49,344 DEBUG dag-scheduler-event-loop org.apache.spark.storage.BlockManager Put block broadcast_53 locally took  1 ms 
2019-08-05 13:48:49,344 DEBUG dag-scheduler-event-loop org.apache.spark.storage.BlockManager Putting block broadcast_53 without replication took  1 ms 
2019-08-05 13:48:49,347 INFO dag-scheduler-event-loop org.apache.spark.storage.memory.MemoryStore Block broadcast_53_piece0 stored as bytes in memory (estimated size 2.3 KB, free 2004.3 MB) 
2019-08-05 13:48:49,347 INFO dispatcher-event-loop-33 org.apache.spark.storage.BlockManagerInfo Added broadcast_53_piece0 in memory on dbigdata004:39874 (size: 2.3 KB, free: 2004.5 MB) 
2019-08-05 13:48:49,347 DEBUG dag-scheduler-event-loop org.apache.spark.storage.BlockManagerMaster Updated info of block broadcast_53_piece0 
2019-08-05 13:48:49,347 DEBUG dag-scheduler-event-loop org.apache.spark.storage.BlockManager Told master about block broadcast_53_piece0 
2019-08-05 13:48:49,347 DEBUG dag-scheduler-event-loop org.apache.spark.storage.BlockManager Put block broadcast_53_piece0 locally took  1 ms 
2019-08-05 13:48:49,347 DEBUG dag-scheduler-event-loop org.apache.spark.storage.BlockManager Putting block broadcast_53_piece0 without replication took  1 ms 
2019-08-05 13:48:49,347 INFO dag-scheduler-event-loop org.apache.spark.SparkContext Created broadcast 53 from broadcast at DAGScheduler.scala:1039 
2019-08-05 13:48:49,347 INFO dag-scheduler-event-loop org.apache.spark.scheduler.DAGScheduler Submitting 1 missing tasks from ResultStage 96 (MapPartitionsRDD[183] at groupByKey at ProcessedOffsetManager.java:45) (first 15 tasks are for partitions Vector(0)) 
2019-08-05 13:48:49,347 INFO dag-scheduler-event-loop org.apache.spark.scheduler.cluster.YarnClusterScheduler Adding task set 96.0 with 1 tasks 
2019-08-05 13:48:49,347 DEBUG dag-scheduler-event-loop org.apache.spark.scheduler.TaskSetManager Epoch for TaskSet 96.0: 3 
2019-08-05 13:48:49,347 DEBUG dag-scheduler-event-loop org.apache.spark.scheduler.TaskSetManager Valid locality levels for TaskSet 96.0: NODE_LOCAL, RACK_LOCAL, ANY 
2019-08-05 13:48:49,348 DEBUG dispatcher-event-loop-34 org.apache.spark.scheduler.cluster.YarnClusterScheduler parentName: , name: TaskSet_2.0, runningTasks: 1 
2019-08-05 13:48:49,348 DEBUG dispatcher-event-loop-34 org.apache.spark.scheduler.cluster.YarnClusterScheduler parentName: , name: TaskSet_96.0, runningTasks: 0 
2019-08-05 13:48:49,348 INFO dispatcher-event-loop-34 org.apache.spark.scheduler.TaskSetManager Starting task 0.0 in stage 96.0 (TID 124, dbigdata003, executor 8, partition 0, NODE_LOCAL, 7639 bytes) 
2019-08-05 13:48:49,348 DEBUG dispatcher-event-loop-34 org.apache.spark.scheduler.TaskSetManager No tasks for locality level NODE_LOCAL, so moving to locality level RACK_LOCAL 
2019-08-05 13:48:49,348 DEBUG dispatcher-event-loop-34 org.apache.spark.scheduler.TaskSetManager No tasks for locality level RACK_LOCAL, so moving to locality level ANY 
2019-08-05 13:48:49,348 DEBUG dispatcher-event-loop-34 org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnDriverEndpoint Launching task 124 on executor id: 8 hostname: dbigdata003. 
2019-08-05 13:48:49,354 DEBUG shuffle-server-5-3 org.apache.spark.storage.BlockManager Getting local block broadcast_53_piece0 as bytes 
2019-08-05 13:48:49,354 DEBUG shuffle-server-5-3 org.apache.spark.storage.BlockManager Level for block broadcast_53_piece0 is StorageLevel(disk, memory, 1 replicas) 
2019-08-05 13:48:49,357 INFO dispatcher-event-loop-54 org.apache.spark.storage.BlockManagerInfo Added broadcast_53_piece0 in memory on dbigdata003:51565 (size: 2.3 KB, free: 912.3 MB) 
2019-08-05 13:48:49,362 INFO dispatcher-event-loop-35 org.apache.spark.MapOutputTrackerMasterEndpoint Asked to send map output locations for shuffle 45 to 10.36.212.61:59134 
2019-08-05 13:48:49,362 DEBUG map-output-dispatcher-0 org.apache.spark.MapOutputTrackerMaster Handling request to send map output locations for shuffle 45 to 10.36.212.61:59134 
2019-08-05 13:48:49,374 DEBUG dispatcher-event-loop-53 org.apache.spark.scheduler.cluster.YarnClusterScheduler parentName: , name: TaskSet_2.0, runningTasks: 1 
2019-08-05 13:48:49,374 DEBUG dispatcher-event-loop-53 org.apache.spark.scheduler.cluster.YarnClusterScheduler parentName: , name: TaskSet_96.0, runningTasks: 0 
2019-08-05 13:48:49,375 INFO task-result-getter-0 org.apache.spark.scheduler.TaskSetManager Finished task 0.0 in stage 96.0 (TID 124) in 27 ms on dbigdata003 (executor 8) (1/1) 
2019-08-05 13:48:49,375 INFO task-result-getter-0 org.apache.spark.scheduler.cluster.YarnClusterScheduler Removed TaskSet 96.0, whose tasks have all completed, from pool  
2019-08-05 13:48:49,375 INFO dag-scheduler-event-loop org.apache.spark.scheduler.DAGScheduler ResultStage 96 (collect at ProcessedOffsetManager.java:79) finished in 0.032 s 
2019-08-05 13:48:49,375 DEBUG dag-scheduler-event-loop org.apache.spark.scheduler.DAGScheduler After removal of stage 95, remaining stages = 1 
2019-08-05 13:48:49,375 DEBUG dag-scheduler-event-loop org.apache.spark.scheduler.DAGScheduler After removal of stage 96, remaining stages = 0 
2019-08-05 13:48:49,375 INFO streaming-job-executor-0 org.apache.spark.scheduler.DAGScheduler Job 136 finished: collect at ProcessedOffsetManager.java:79, took 0.183947 s 
2019-08-05 13:48:49,376 INFO streaming-job-executor-0 consumer.kafka.ZkState Starting curator service 
2019-08-05 13:48:49,376 INFO streaming-job-executor-0 org.apache.curator.framework.imps.CuratorFrameworkImpl Starting 
2019-08-05 13:48:49,376 DEBUG streaming-job-executor-0 org.apache.curator.CuratorZookeeperClient Starting 
2019-08-05 13:48:49,376 DEBUG streaming-job-executor-0 org.apache.curator.ConnectionState Starting 
2019-08-05 13:48:49,376 DEBUG streaming-job-executor-0 org.apache.curator.ConnectionState reset 
2019-08-05 13:48:49,376 INFO streaming-job-executor-0 org.apache.zookeeper.ZooKeeper Initiating client connection, connectString=dbigdatam02:2181,dbigdatam01:2181,dbigdatae01:2181/kafka sessionTimeout=120000 watcher=org.apache.curator.ConnectionState@60c6e3a7 
2019-08-05 13:48:49,377 INFO streaming-job-executor-0-SendThread(dbigdatam01:2181) org.apache.zookeeper.client.ZooKeeperSaslClient Client will use GSSAPI as SASL mechanism. 
2019-08-05 13:48:49,377 DEBUG streaming-job-executor-0-SendThread(dbigdatam01:2181) org.apache.zookeeper.client.ZooKeeperSaslClient creating sasl client: Client=appbigd@DTI.CO.ID;service=zookeeper;serviceHostname=dbigdatam01 
2019-08-05 13:48:49,377 INFO streaming-job-executor-0-SendThread(dbigdatam01:2181) org.apache.zookeeper.ClientCnxn Opening socket connection to server dbigdatam01/10.36.212.57:2181. Will attempt to SASL-authenticate using Login Context section 'Client' 
2019-08-05 13:48:49,377 INFO streaming-job-executor-0-SendThread(dbigdatam01:2181) org.apache.zookeeper.ClientCnxn Socket connection established, initiating session, client: /10.36.212.62:46448, server: dbigdatam01/10.36.212.57:2181 
2019-08-05 13:48:49,378 DEBUG streaming-job-executor-0-SendThread(dbigdatam01:2181) org.apache.zookeeper.ClientCnxn Session establishment request sent on dbigdatam01/10.36.212.57:2181 
2019-08-05 13:48:49,379 INFO streaming-job-executor-0-SendThread(dbigdatam01:2181) org.apache.zookeeper.ClientCnxn Session establishment complete on server dbigdatam01/10.36.212.57:2181, sessionid = 0x26c521efcac4470, negotiated timeout = 60000 
2019-08-05 13:48:49,379 DEBUG streaming-job-executor-0-SendThread(dbigdatam01:2181) org.apache.zookeeper.client.ZooKeeperSaslClient ClientCnxn:sendSaslPacket:length=0 
2019-08-05 13:48:49,379 INFO streaming-job-executor-0-EventThread org.apache.curator.framework.state.ConnectionStateManager State change: CONNECTED 
2019-08-05 13:48:49,379 DEBUG streaming-job-executor-0-SendThread(dbigdatam01:2181) org.apache.zookeeper.client.ZooKeeperSaslClient saslClient.evaluateChallenge(len=0) 
2019-08-05 13:48:49,380 DEBUG streaming-job-executor-0-SendThread(dbigdatam01:2181) org.apache.zookeeper.ClientCnxnSocketNIO deferring non-priming packet: clientPath:null serverPath:null finished:false header:: 0,3  replyHeader:: 0,0,0  request:: '/kafka/consumers/ids-spark-consumer/offsets/ids_test/0,F  response::  until SASL authentication completes. 
2019-08-05 13:48:49,380 DEBUG streaming-job-executor-0-SendThread(dbigdatam01:2181) org.apache.zookeeper.ClientCnxnSocketNIO deferring non-priming packet: clientPath:null serverPath:null finished:false header:: 0,3  replyHeader:: 0,0,0  request:: '/kafka/consumers/ids-spark-consumer/offsets/ids_test/0,F  response::  until SASL authentication completes. 
2019-08-05 13:48:49,380 DEBUG streaming-job-executor-0-SendThread(dbigdatam01:2181) org.apache.zookeeper.client.ZooKeeperSaslClient saslClient.evaluateChallenge(len=50) 
2019-08-05 13:48:49,381 DEBUG streaming-job-executor-0-SendThread(dbigdatam01:2181) org.apache.zookeeper.client.ZooKeeperSaslClient ClientCnxn:sendSaslPacket:length=67 
2019-08-05 13:48:49,381 DEBUG streaming-job-executor-0-SendThread(dbigdatam01:2181) org.apache.zookeeper.ClientCnxnSocketNIO deferring non-priming packet: clientPath:null serverPath:null finished:false header:: 0,3  replyHeader:: 0,0,0  request:: '/kafka/consumers/ids-spark-consumer/offsets/ids_test/0,F  response::  until SASL authentication completes. 
2019-08-05 13:48:49,381 DEBUG streaming-job-executor-0-SendThread(dbigdatam01:2181) org.apache.zookeeper.ClientCnxnSocketNIO deferring non-priming packet: clientPath:null serverPath:null finished:false header:: 0,3  replyHeader:: 0,0,0  request:: '/kafka/consumers/ids-spark-consumer/offsets/ids_test/0,F  response::  until SASL authentication completes. 
2019-08-05 13:48:49,381 DEBUG streaming-job-executor-0-SendThread(dbigdatam01:2181) org.apache.zookeeper.ClientCnxnSocketNIO deferring non-priming packet: clientPath:null serverPath:null finished:false header:: 0,3  replyHeader:: 0,0,0  request:: '/kafka/consumers/ids-spark-consumer/offsets/ids_test/0,F  response::  until SASL authentication completes. 
2019-08-05 13:48:49,381 DEBUG streaming-job-executor-0-SendThread(dbigdatam01:2181) org.apache.zookeeper.ClientCnxnSocketNIO deferring non-priming packet: clientPath:null serverPath:null finished:false header:: 0,3  replyHeader:: 0,0,0  request:: '/kafka/consumers/ids-spark-consumer/offsets/ids_test/0,F  response::  until SASL authentication completes. 
2019-08-05 13:48:49,381 DEBUG streaming-job-executor-0-SendThread(dbigdatam01:2181) org.apache.zookeeper.ClientCnxnSocketNIO deferring non-priming packet: clientPath:null serverPath:null finished:false header:: 0,3  replyHeader:: 0,0,0  request:: '/kafka/consumers/ids-spark-consumer/offsets/ids_test/0,F  response::  until SASL authentication completes. 
2019-08-05 13:48:49,381 DEBUG streaming-job-executor-0-SendThread(dbigdatam01:2181) org.apache.zookeeper.ClientCnxnSocketNIO deferring non-priming packet: clientPath:null serverPath:null finished:false header:: 0,3  replyHeader:: 0,0,0  request:: '/kafka/consumers/ids-spark-consumer/offsets/ids_test/0,F  response::  until SASL authentication completes. 
2019-08-05 13:48:49,381 DEBUG streaming-job-executor-0-SendThread(dbigdatam01:2181) org.apache.zookeeper.ClientCnxnSocketNIO deferring non-priming packet: clientPath:null serverPath:null finished:false header:: 0,3  replyHeader:: 0,0,0  request:: '/kafka/consumers/ids-spark-consumer/offsets/ids_test/0,F  response::  until SASL authentication completes. 
2019-08-05 13:48:49,382 DEBUG streaming-job-executor-0-SendThread(dbigdatam01:2181) org.apache.zookeeper.ClientCnxn Reading reply sessionid:0x26c521efcac4470, packet:: clientPath:null serverPath:null finished:false header:: 3,3  replyHeader:: 3,270583585230,0  request:: '/kafka/consumers/ids-spark-consumer/offsets/ids_test/0,F  response:: s{257706960915,270583585000,1564659668204,1564987645720,75,0,0,0,3,0,257706960915}  
2019-08-05 13:48:49,383 DEBUG streaming-job-executor-0-SendThread(dbigdatam01:2181) org.apache.zookeeper.ClientCnxn Reading reply sessionid:0x26c521efcac4470, packet:: clientPath:null serverPath:null finished:false header:: 4,5  replyHeader:: 4,270583585231,0  request:: '/kafka/consumers/ids-spark-consumer/offsets/ids_test/0,#313933,-1  response:: s{257706960915,270583585231,1564659668204,1564987729384,76,0,0,0,3,0,257706960915}  
2019-08-05 13:48:49,383 INFO streaming-job-executor-0 consumer.kafka.ProcessedOffsetManager Wrote processed offset 193 for Parittion 0 
2019-08-05 13:48:49,383 DEBUG streaming-job-executor-0 org.apache.curator.framework.imps.CuratorFrameworkImpl Closing 
2019-08-05 13:48:49,383 DEBUG streaming-job-executor-0 org.apache.curator.CuratorZookeeperClient Closing 
2019-08-05 13:48:49,383 DEBUG streaming-job-executor-0 org.apache.curator.ConnectionState Closing 
2019-08-05 13:48:49,383 DEBUG streaming-job-executor-0 org.apache.zookeeper.ZooKeeper Closing session: 0x26c521efcac4470 
2019-08-05 13:48:49,383 DEBUG streaming-job-executor-0 org.apache.zookeeper.ClientCnxn Closing client for session: 0x26c521efcac4470 
dibbhatt commented 5 years ago

Just released 1.0.19 with a fix. Can you please try like this.

val tmp_stream = ReceiverLauncher.launch(ssc, props, numberOfReceivers, StorageLevel.fromString(storageLevelReceiver))
val partitionOffset_stream = ProcessedOffsetManager.getPartitionOffset(tmp_stream, props)
tmp_stream.foreachRDD(rdd => {
    Thread.sleep(5000)
    throw new Exception("error")
})
ProcessedOffsetManager.persists(ssc,partitionOffset_stream, props)

try {
  jsc.start();
  jsc.awaitTermination();
}catch  {
  case e : Exception {
    jsc.ssc().sc().cancelAllJobs();
    jsc.stop(true, false);
    System.exit(-1);
  }
}
dibbhatt commented 5 years ago

I have put a check in PersistOffsetManager

https://github.com/dibbhatt/kafka-spark-consumer/commit/78b2737e0ab3e6ea4ecd03a95ae57188f708b144

Let me know if this works, I will then update the ReadMe file

doddys commented 5 years ago

Let me try that tomorrow and report back.

On Mon, Aug 5, 2019, 6:03 PM Dibyendu Bhattacharya notifications@github.com wrote:

I have put a check in PersistOffsetManager

78b2737 https://github.com/dibbhatt/kafka-spark-consumer/commit/78b2737e0ab3e6ea4ecd03a95ae57188f708b144

Let me know if this works, I will then update the ReadMe file

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dibbhatt/kafka-spark-consumer/issues/64?email_source=notifications&email_token=AEEYDVADDHXOLYUQGGM5X43QDACIZA5CNFSM4IIZ7M6KYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOD3RPF7A#issuecomment-518189820, or mute the thread https://github.com/notifications/unsubscribe-auth/AEEYDVBMORAZZA4PY3PNWKLQDACIZANCNFSM4IIZ7M6A .

doddys commented 5 years ago

I am coming with bad news. The problem is still there. I believe there is a race condition between the driver and the executor

2019-08-06 10:41:46,448 ERROR Driver com.expecc.bca.ids.IDSKafkaConsumer$ Stopping Spark Streaming Context 
2019-08-06 10:41:46,450 DEBUG Driver org.apache.spark.streaming.scheduler.JobScheduler Stopping JobScheduler 
2019-08-06 10:41:46,453 INFO streaming-job-executor-0 org.apache.spark.SparkContext Starting job: collect at ProcessedOffsetManager.java:85 
2019-08-06 10:41:46,558 INFO streaming-job-executor-0 consumer.kafka.ProcessedOffsetManager Wrote processed offset 199 for Parittion 0 
2019-08-06 10:41:46,955 INFO Driver org.apache.spark.SparkContext Successfully stopped SparkContext 
2019-08-06 10:41:46,955 INFO Driver org.apache.spark.SparkContext SparkContext already stopped. 
2019-08-06 10:41:46,955 ERROR Driver com.expecc.bca.ids.IDSKafkaConsumer$ Stopped ALL JOBS due to Exception 

the only way to prevent the offset being updated is to do the following

try{
      ssc.start()
      ssc.awaitTermination()
    } catch {
      case e: Exception => {
        logger.error("Stopping Spark Context")
        sc.stop()
        logger.error("Spark Context is Stopped",e)
        System.exit(-1)
        throw e // to exit with error condition
      }
    }

The error logs will have a few of this

2019-08-06 10:55:31,727 ERROR Driver com.expecc.bca.ids.IDSKafkaConsumer$ Stopping Spark Context 
2019-08-06 10:55:31,727 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + outermost object is not a closure or REPL line object, so do not clone it: (class org.apache.spark.rdd.RDD,MapPartitionsRDD[43] at groupByKey at ProcessedOffsetManager.java:46) 
2019-08-06 10:55:31,727 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + cloning the object <function0> of class org.apache.spark.rdd.RDD$$anonfun$collect$1 
2019-08-06 10:55:31,728 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + cleaning cloned closure <function0> recursively (org.apache.spark.rdd.RDD$$anonfun$collect$1) 
2019-08-06 10:55:31,728 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner +++ Cleaning closure <function0> (org.apache.spark.rdd.RDD$$anonfun$collect$1) +++ 
2019-08-06 10:55:31,728 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + declared fields: 2 
2019-08-06 10:55:31,729 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      public static final long org.apache.spark.rdd.RDD$$anonfun$collect$1.serialVersionUID 
2019-08-06 10:55:31,729 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      private final org.apache.spark.rdd.RDD org.apache.spark.rdd.RDD$$anonfun$collect$1.$outer 
2019-08-06 10:55:31,729 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + declared methods: 2 
2019-08-06 10:55:31,729 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      public org.apache.spark.rdd.RDD org.apache.spark.rdd.RDD$$anonfun$collect$1.org$apache$spark$rdd$RDD$$anonfun$$$outer() 
2019-08-06 10:55:31,729 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      public final java.lang.Object org.apache.spark.rdd.RDD$$anonfun$collect$1.apply() 
2019-08-06 10:55:31,729 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + inner classes: 1 
2019-08-06 10:55:31,729 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12 
2019-08-06 10:55:31,729 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + outer classes: 1 
2019-08-06 10:55:31,729 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      org.apache.spark.rdd.RDD 
2019-08-06 10:55:31,729 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + outer objects: 1 
2019-08-06 10:55:31,729 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      MapPartitionsRDD[43] at groupByKey at ProcessedOffsetManager.java:46 
2019-08-06 10:55:31,729 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + fields accessed by starting closure: 4 
2019-08-06 10:55:31,729 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      (class org.apache.spark.rdd.RDD,Set(org$apache$spark$rdd$RDD$$evidence$1)) 
2019-08-06 10:55:31,729 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      (class java.lang.Object,Set()) 
2019-08-06 10:55:31,729 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      (class scala.runtime.AbstractFunction0,Set()) 
2019-08-06 10:55:31,729 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      (class org.apache.spark.rdd.RDD$$anonfun$collect$1,Set($outer)) 
2019-08-06 10:55:31,729 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + outermost object is not a closure or REPL line object, so do not clone it: (class org.apache.spark.rdd.RDD,MapPartitionsRDD[43] at groupByKey at ProcessedOffsetManager.java:46) 
2019-08-06 10:55:31,729 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  +++ closure <function0> (org.apache.spark.rdd.RDD$$anonfun$collect$1) is now cleaned +++ 
2019-08-06 10:55:31,729 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  +++ closure <function1> (org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12) is now cleaned +++ 
2019-08-06 10:55:31,730 INFO JobScheduler org.apache.spark.streaming.scheduler.JobScheduler Finished job streaming job 1565063724000 ms.1 from job set of time 1565063724000 ms 
2019-08-06 10:55:31,730 ERROR spark-listener-group-appStatus consumer.kafka.ReceiverStreamListener Output Operation failed due to java.lang.IllegalStateException: SparkContext has been shutdown
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2026)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
    at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:361)
    at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
    at consumer.kafka.ProcessedOffsetManager$2.call(ProcessedOffsetManager.java:85)
    at consumer.kafka.ProcessedOffsetManager$2.call(ProcessedOffsetManager.java:82)
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

2019-08-06 10:55:31,730 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner +++ Cleaning closure <function1> (org.apache.spark.rdd.RDD$$anonfun$count$1) +++ 
2019-08-06 10:55:31,730 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + declared fields: 1 
2019-08-06 10:55:31,730 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      public static final long org.apache.spark.rdd.RDD$$anonfun$count$1.serialVersionUID 
2019-08-06 10:55:31,730 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + declared methods: 2 
2019-08-06 10:55:31,730 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      public final java.lang.Object org.apache.spark.rdd.RDD$$anonfun$count$1.apply(java.lang.Object) 
2019-08-06 10:55:31,730 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner      public final long org.apache.spark.rdd.RDD$$anonfun$count$1.apply(scala.collection.Iterator) 
2019-08-06 10:55:31,730 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + inner classes: 0 
2019-08-06 10:55:31,730 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + outer classes: 0 
2019-08-06 10:55:31,730 DEBUG streaming-job-executor-0 org.apache.spark.util.ClosureCleaner  + outer objects: 0 
2019-08-06 10:55:31,731 INFO JobScheduler org.apache.spark.streaming.scheduler.JobScheduler Starting job streaming job 1565063727000 ms.0 from job set of time 1565063727000 ms 
2019-08-06 10:55:31,731 ERROR JobScheduler org.apache.spark.streaming.scheduler.JobScheduler Error running job streaming job 1565063724000 ms.1 
java.lang.IllegalStateException: SparkContext has been shutdown
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2026)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
    at org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:361)
    at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
    at consumer.kafka.ProcessedOffsetManager$2.call(ProcessedOffsetManager.java:85)
    at consumer.kafka.ProcessedOffsetManager$2.call(ProcessedOffsetManager.java:82)
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream

Not really a graceful way to stop but is this the only way? The question is why the executor is running the offset job despite the previous job failed.

dibbhatt commented 5 years ago

hmm. I see. Yes it seems to be race condition. I have not seen it happening in any of other production usage I deployed. But not stopping gracefully wont have any issue as offset commits not happening anyway, and next batch will start from last committed offset. The offset commit is a different task, hence Spark is trying it as part of the DAG. Can you try one more thing and check if that makes sense ..

Like moving the offset commit inside foreEachRDD ...something like this.

val tmp_stream = ReceiverLauncher.launch(ssc, props, numberOfReceivers, StorageLevel.fromString(storageLevelReceiver))
val partitionOffset_stream = ProcessedOffsetManager.getPartitionOffset(tmp_stream, props)
tmp_stream.foreachRDD(rdd => {
    Thread.sleep(5000)
    throw new Exception("error")
   ProcessedOffsetManager.persists(partitionOffset_stream, props)
})
doddys commented 5 years ago

I agree eventhough it is not graceful, at least the offset will be consistent. I tried the code above and it does not seem to work

java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after starting a context is not supported
    at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224)
    at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:66)
    at org.apache.spark.streaming.dstream.ForEachDStream.<init>(ForEachDStream.scala:39)
    at org.apache.spark.streaming.dstream.DStream.org$apache$spark$streaming$dstream$DStream$$foreachRDD(DStream.scala:654)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply$mcV$sp(DStream.scala:628)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:626)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:626)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:693)
    at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:265)
    at org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:626)
    at org.apache.spark.streaming.api.java.JavaDStreamLike$class.foreachRDD(JavaDStreamLike.scala:272)
    at org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.foreachRDD(JavaDStreamLike.scala:42)
    at consumer.kafka.ProcessedOffsetManager.persists(ProcessedOffsetManager.java:82)
    at com.expecc.bca.ids.IDSKafkaConsumer$$anonfun$main$4.apply(IDSKafkaConsumer.scala:100)
    at com.expecc.bca.ids.IDSKafkaConsumer$$anonfun$main$4.apply(IDSKafkaConsumer.scala:98)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
dibbhatt commented 5 years ago

So if not stopping gracefully solved your problem, shall I close this issue ? I will update the ReadMe and code example with this change in that case.

doddys commented 5 years ago

yes, you can close this for now. Thank you

dibbhatt commented 5 years ago

Thanks, please reopen it if you need any more help

gaborgsomogyi commented 5 years ago

The main problem is that the approach has architectural issue. Namely the offsets must be committed only when data processing was successful but here the data processing handled in a completely different place from the offset commit. Please see Spark Streaming documentation.

  // begin your transaction

  // update results
  // update offsets where the end of existing offsets matches the beginning of this batch of offsets
  // assert that offsets were updated correctly

  // end your transaction

If transactions are not available and data processing throws exception then the offset update must be skipped.

All in all the System.exit approach makes it less probable but definitely not solving the problem!

doddys commented 5 years ago

Thanks for the info @gaborgsomogyi,

I was advised with the current approach the data stream and the offset stream is independant of each other which cause the race condition when an exception is thrown.

Based on your documentation link, the flow should be like this:

val tmp_stream = ReceiverLauncher.launch(ssc, props, numberOfReceivers, StorageLevel.fromString(storageLevelReceiver))

// remove this line
// val partitionOffset_stream = ProcessedOffsetManager.getPartitionOffset(tmp_stream, props)

tmp_stream.foreachRDD(rdd => {
   // process RDD
   // get max offset for each partition in current RDD
   // commit offset to zookeeper
})

// remove this line
//ProcessedOffsetManager.persists(partitionOffset_stream, props)

If this is the correct way, it would be great if this is implemented on the next release.

dibbhatt commented 5 years ago

Hi @doddys @gaborgsomogyi , I will see if this way offset commit is possible here. One thing I would like to add, I agree that the offset commit job and actual tmp_stream.foreachRDD job are two independent job, but Spark Streaming has a property , it will execute the One job at a time, and it will process in the order they are added to the DAG.

So if you see, the foreachRDD is added first to Spark DAG , and then offset persists, so this guarantee the order of execution , i.e. persists will always happen after foreachRDD, and in foreachRDD is any exception happens , closing the Spark context will make sure persists will fail.

I can understand , this looks little hacky, and i will see if I can add the persists within the foreachRDD block.

doddys commented 5 years ago

Hi @dibbhatt, you may review my changes here 15e5dae, if it makes sense.

gaborgsomogyi commented 5 years ago

@doddys the change looks better, only one minor suggestion:

...
tmp_stream.foreachRDD(rdd => {
   // process RDD
   // get max offset for each partition in current RDD
   // commit offset to zookeeper
   // assert that offsets were updated correctly
})
...

It worth to check that offset commit was successful. Storing offsets in ZK has a drawback when the cluster is growing. Namely ZK will be bombed from several places and at the end it will slow down and can collapse. There are other options which are scaling better...

@dibbhatt which config do you mean? Depending on scheduler's internal implementation may work but it can be a heavy constraint for applications developers.

dibbhatt commented 5 years ago

thanks a lot @doddys . this changes makes sense . Have you tested the fix and it's working ? I can merge it and do a release with this if your tests looks good . thanks again .

dibbhatt commented 5 years ago

@gaborgsomogyi I am referring to spark.streaming.concurrentJobs . Default is set to 1. And it should never change, as it can leads to lots of other issue . Spark don't even documented this settings for this reason .

You can refer to this JIRA : https://issues.apache.org/jira/browse/SPARK-21065

dibbhatt commented 5 years ago

@doddys can you please raise a pull request ?

dibbhatt commented 5 years ago

Thanks @doddys for the PR. I have merged it. I will make a release today with this changes. Closing this issue.

doddys commented 5 years ago

Your Welcome @dibbhatt. I did some test last friday and it should be ok. I may do more testing this week.

dibbhatt commented 5 years ago

I released kafka-spark-consumer version 2.0.0 today with this change. I also tested from my end, and it looks fine.