dibbhatt / kafka-spark-consumer

High Performance Kafka Connector for Spark Streaming.Supports Multi Topic Fetch, Kafka Security. Reliable offset management in Zookeeper. No Data-loss. No dependency on HDFS and WAL. In-built PID rate controller. Support Message Handler . Offset Lag checker.
Apache License 2.0
635 stars 318 forks source link

After long time running, the processing time of "ProcessedOffsetManager.persists(partitonOffset_stream, props)" incresing. #56

Closed yufuxin closed 6 years ago

yufuxin commented 6 years ago

After long time running, the processing time of "ProcessedOffsetManager.persists(partitonOffset_stream, props)" incresing to above 20s, is it reasonable or something I did wrong?

dibbhatt commented 6 years ago

You mean to say initially the offset persists is doing fast and after long time it started taking longer time ?

After how long you see this behavior ? If you restart is the ProcessedOffsetManager.persists still taking longer time ? This offset is writing to Zookeeper. So ideally it should not have issue.

Which version of this consumer you are using ?

yufuxin commented 6 years ago

Yes, here is logs come from spark UI: 233 | Streaming job from [output operation 1, batch time 09:45:20] collect at ProcessedOffsetManager.java:79 | 2018/08/24 09:45:20 | 23 ms 659 | Streaming job from [output operation 1, batch time 10:04:10] collect at ProcessedOffsetManager.java:79 | 2018/08/24 10:04:10 | 48 ms | 2/2 The spark-streaming is created as following: val ssc = new StreamingContext(sc, Seconds(10)) val numberOfReceivers = 12 val kafkaProperties: Map[String, String] = Map("zookeeper.hosts" -> zkhosts, "zookeeper.port" -> zkports, "kafka.topic" -> topics, "bootstrap.servers" -> brokers, "group.id" -> groupId, "security.protocol" -> "SASL_PLAINTEXT", "kafka.consumer.id" -> "kafka-consumer" ) val props = new java.util.Properties() kafkaProperties foreach { case (key, value) => props.put(key, value) } val stream = ReceiverLauncher.launch(ssc, props, numberOfReceivers, StorageLevel.MEMORY_ONLY) If I restart the job, initially is fast and then becomes slow. I am using consumer-1.0.14, job script is 👍 nohup spark-submit --class com.haloutput.main.SparkConsumerKafka \ --master yarn --deploy-mode cluster \ --driver-cores 8 --driver-memory 16g --executor-memory 4g --executor-cores 6 --num-executors 12 \ --conf spark.yarn.maxAppAttempts=4 \ --conf spark.speculation=false \ --conf spark.yarn.am.attemptFailuresValidityInterval=1h \ --conf spark.streaming.kafka.maxRatePerPartition=12000 \ --conf spark.yarn.max.executor.failures=400 \ --conf spark.yarn.executor.failuresValidityInterval=1h \ --conf spark.task.maxFailures=8 \ --conf spark.hadoop.fs.hdfs.impl.disable.cache=true \ --conf "spark.streaming.kafka.consumer.cache.enabled=true" \ --conf "spark.dynamicAllocation.enabled=false" \ --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas.conf" \ --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=kafka_client_jaas.conf" \ --files /etc/spark/conf/hive-site.xml,"./kafka_client_jaas.conf,./xxxx.keytab" \ --jars spark-streaming-kafka-0-10_2.11-2.2.1.jar,log4j-api-2.7.jar,log4j-core-2.7.jar,metrics-core-2.2.0.jar,zkclient-0.9.jar,spark-avro_2.11 -3.2.0.jar,kafka-clients-0.11.0.0.jar,bijection-core_2.11-0.9.2.jar,bijection-avro_2.11-0.9.2.jar,kafka-spark-consumer-1.0.14.jar,kafka_2.11-0.11.0 .2.jar,json-simple-1.1.1.jar xxxx.jar ,xxx.xxx.xxx 2181 xxxx xxx.xxx.xxx:6667 xxxx xxxxx xxxxxx xxxxx 2 4 0 >/dev/null& Thanks,

dibbhatt commented 6 years ago

Can you switch to 1.0.13 version and let me know if you see same issue .

yufuxin commented 6 years ago

I can not shift to 1.0.13 since the security connection issue. Thanks

dibbhatt commented 6 years ago

ok,got it. Give me a day or two. I will try to reproduce this issue and will get back to you.

dibbhatt commented 6 years ago

Is it possible to share me the timing of each stages of ProcessedOffsetManager.persists. Want to see if the issue is with the writing offset to ZK or getting the Max offsets for each partition .

dibbhatt commented 6 years ago

Also can you try to set consumer.num_fetch_to_buffer = 5 and see if this improves your offset commit. This property will buffer 5 fetch and write larger block to Spark Block Manager. Hence you will see your RDD partition is less but larger.

yufuxin commented 6 years ago
111 Streaming job from [output operation 1, batch time 10:15:30] collect at ProcessedOffsetManager.java:79 2018/08/29 10:15:30 36 ms 2/2 41/41
110 Streaming job from [output operation 0, batch time 10:15:30] collect at SparkConsumerKafka.scala:127 2018/08/29 10:15:30 19 ms 1/1 40/40
109 Streaming job from [output operation 1, batch time 10:15:20] collect at ProcessedOffsetManager.java:79 2018/08/29 10:15:20 0.4 s 2/2 1242/1242
108 Streaming job from [output operation 0, batch time 10:15:20] collect at SparkConsumerKafka.scala:127 2018/08/29 10:15:20 0.3 s 1/1 1241/1241
107 Streaming job from [output operation 1, batch time 10:15:10] collect at ProcessedOffsetManager.java:79 2018/08/29 10:15:10 28 ms 2/2 17/17
106 Streaming job from [output operation 0, batch time 10:15:10] collect at SparkConsumerKafka.scala:127 2018/08/29 10:15:10 17 ms 1/1 16/16
105 Streaming job from [output operation 1, batch time 10:15:00] collect at ProcessedOffsetManager.java:79 2018/08/29 10:15:00 0.3 s 2/2 1267/1267
104 Streaming job from [output operation 0, batch time 10:15:00] collect at SparkConsumerKafka.scala:127 2018/08/29 10:15:00 0.3 s 1/1 1266/1266
103 Streaming job from [output operation 1, batch time 10:14:50] collect at ProcessedOffsetManager.java:79 2018/08/29 10:14:50 25 ms 2/2 22/22
102 Streaming job from [output operation 0, batch time 10:14:50] collect at SparkConsumerKafka.scala:127 2018/08/29 10:14:50 18 ms 1/1 21/21
101 Streaming job from [output operation 1, batch time 10:14:40] collect at ProcessedOffsetManager.java:79 2018/08/29 10:14:40 0.4 s 2/2 1260/1260
100 Streaming job from [output operation 0, batch time 10:14:40] collect at SparkConsumerKafka.scala:127 2018/08/29 10:14:40 0.3 s 1/1 1259/1259
99 Streaming job from [output operation 1, batch time 10:14:30] collect at ProcessedOffsetManager.java:79 2018/08/29 10:14:30 24 ms 2/2 14/14
98 Streaming job from [output operation 0, batch time 10:14:30] collect at SparkConsumerKafka.scala:127 2018/08/29 10:14:30 13 ms 1/1 13/13
97 Streaming job from [output operation 1, batch time 10:14:20] collect at ProcessedOffsetManager.java:79 2018/08/29 10:14:20 0.3 s 2/2 1269/1269

.......... After about 20 hours running: ..........

14787 Streaming job from [output operation 1, batch time 09:21:20] collect at ProcessedOffsetManager.java:79 2018/08/30 09:21:20 0.3 s 2/2 816/816
14786 Streaming job from [output operation 0, batch time 09:21:20] collect at SparkConsumerKafka.scala:127 2018/08/30 09:21:20 0.2 s 1/1 815/815
14785 Streaming job from [output operation 1, batch time 09:21:10] collect at ProcessedOffsetManager.java:79 2018/08/30 09:21:11 96 ms 2/2 308/308
14784 Streaming job from [output operation 0, batch time 09:21:10] collect at SparkConsumerKafka.scala:127 2018/08/30 09:21:10 0.2 s 1/1 307/307
14783 count at AggregationHandler.scala:148 2018/08/30 09:21:10 0.2 s 3/3 (288 skipped) 601/601 (589480 skipped)
14782 Streaming job from [output operation 1, batch time 09:21:00] collect at ProcessedOffsetManager.java:79 2018/08/30 09:21:06 0.3 s 2/2 794/794
14781 Streaming job from [output operation 0, batch time 09:21:00] collect at SparkConsumerKafka.scala:127 2018/08/30 09:21:06 0.3 s 1/1 793/793
14780 save at AggregationHandler.scala:147 2018/08/30 09:21:06 0.6 s 2/2 (288 skipped) 401/401 (589480 skipped)
14779 pivot at AggregationHandler.scala:134 2018/08/30 09:21:01 13 ms 1/1 (290 skipped) 3/3 (590080 skipped)
14778 Streaming job from [output operation 1, batch time 09:20:50] collect at ProcessedOffsetManager.java:79 2018/08/30 09:20:57 5 s 2/2 300/300
14777 pivot at AggregationHandler.scala:134 2018/08/30 09:20:57 78 ms 2/2 (289 skipped) 201/201 (589880 skipped)
14776 Streaming job from [output operation 0, batch time 09:20:50] collect at SparkConsumerKafka.scala:127 2018/08/30 09:20:52 4 s 1/1 299/299
14775 pivot at AggregationHandler.scala:134 2018/08/30 09:20:52 0.2 s 2/2 (288 skipped) 600/600 (589480 skipped)
14774 Streaming job from [output operation 1, batch time 09:20:40] collect at ProcessedOffsetManager.java:79 2018/08/30 09:20:47 5 s 2/2 782/782
14773 count at AggregationHandler.scala:127 2018/08/30 09:20:47 0.2 s 3/3 (288 skipped) 601/601 (589480 skipped)
14772 Streaming job from [output operation 0, batch time 09:20:40] collect at SparkConsumerKafka.scala:127 2018/08/30 09:20:43 5 s 1/1 781/781
14771 run at ThreadPoolExecutor.java:1142 2018/08/30 09:20:43 0.2 s 2/2 (288 skipped) 600/600 (589480 skipped)
14770 save at AggregationHandler.scala:126 2018/08/30 09:20:38 0.6 s 2/2 (288 skipped) 401/401 (589480 skipped)
14769 run at ThreadPoolExecutor.java:1142 2018/08/30 09:20:33 0.2 s 2/2 (288 skipped) 600/600 (589480 skipped)
14768 Streaming job from [output operation 1, batch time 09:20:30] collect at ProcessedOffsetManager.java:79 2018/08/30 09:20:32 2 s 2/2 298/298
14767 Streaming job from [output operation 0, batch time 09:20:30] collect at SparkConsumerKafka.scala:127 2018/08/30 09:20:32 79 ms 1/1 297/297
14766 pivot at AggregationHandler.scala:107 2018/08/30 09:20:32 18 ms 1/1 (290 skipped) 2/2 (590080 skipped)
14765 pivot at AggregationHandler.scala:107 2018/08/30 09:20:27 54 ms 2/2 (289 skipped) 201/201 (589880 skipped)
14764 pivot at AggregationHandler.scala:107 2018/08/30 09:20:23 0.2 s 2/2 (288 skipped) 600/600 (589480 skipped)
14763 Streaming job from [output operation 1, batch time 09:20:20] collect at ProcessedOffsetManager.java:79 2018/08/30 09:20:21 2 s 2/2 789/789
14762 pivot at AggregationHandler.scala:97 2018/08/30 09:20:21 20 ms 1/1 (290 skipped) 7/7 (590080 skipped)
14761 Streaming job from [output operation 0, batch time 09:20:20] collect at SparkConsumerKafka.scala:127 2018/08/30 09:20:20 1 s 1/1 788/788
14760 pivot at AggregationHandler.scala:97 2018/08/30 09:20:20 18 ms 1/1 (290 skipped) 4/4 (590080 skipped)
14759 pivot at AggregationHandler.scala:97 2018/08/30 09:20:15 68 ms 2/2 (289 skipped) 201/201 (589880 skipped)
14758 Streaming job from [output operation 1, batch time 09:20:10] collect at ProcessedOffsetManager.java:79 2018/08/30 09:20:11 4 s 2/2 322/322
14757 Streaming job from [output operation 0, batch time 09:20:10] collect at SparkConsumerKafka.scala:127 2018/08/30 09:20:10 1 s 1/1 321/321
14756 pivot at AggregationHandler.scala:97 2018/08/30 09:20:07 4 s 7/7 (283 skipped) 10840/10840 (579240 skipped)
14755 Streaming job from [output operation 1, batch time 09:20:00] collect at ProcessedOffsetManager.java:79 2018/08/30 09:20:00 0.2 s 2/2 707/707
14754 Streaming job from [output operation 0, batch time 09:20:00] collect at SparkConsumerKafka.scala:127 2018/08/30 09:20:00 0.2 s 1/1 706/706
14753 Streaming job from [output operation 1, batch time 09:19:50] collect at ProcessedOffsetManager.java:79 2018/08/30 09:19:50 0.1 s 2/2 512/512
14752 Streaming job from [output operation 0, batch time 09:19:50] collect at SparkConsumerKafka.scala:127 2018/08/30 09:19:50 0.1 s 1/1 511/511
14751 Streaming job from [output operation 1, batch time 09:19:40] collect at ProcessedOffsetManager.java:79 2018/08/30 09:19:40 0.2 s 2/2 588/588
14750 Streaming job from [output operation 0, batch time 09:19:40] collect at SparkConsumerKafka.scala:127 2018/08/30 09:19:40 0.1 s 1/1 587/587
14749 Streaming job from [output operation 1, batch time 09:19:30] collect at ProcessedOffsetManager.java:79 2018/08/30 09:19:30 0.1 s 2/2 393/393
14748 Streaming job from [output operation 0, batch time 09:19:30] collect at SparkConsumerKafka.scala:127 2018/08/30 09:19:30 85 ms 1/1 392/392
14747 Streaming job from [output operation 1, batch time 09:19:20] collect at ProcessedOffsetManager.java:79 2018/08/30 09:19:20 0.2 s 2/2 724/724
14746 Streaming job from [output operation 0, batch time 09:19:20] collect at SparkConsumerKafka.scala:127 2018/08/30 09:19:20 0.2 s 1/1 723/723
14745 Streaming job from [output operation 1, batch time 09:19:10] collect at ProcessedOffsetManager.java:79 2018/08/30 09:19:10 0.1 s 2/2 413/413
14744 Streaming job from [output operation 0, batch time 09:19:10] collect at SparkConsumerKafka.scala:127 2018/08/30 09:19:10 0.1 s 1/1 412/412
14743 Streaming job from [output operation 1, batch time 09:19:00] collect at ProcessedOffsetManager.java:79 2018/08/30 09:19:00 0.2 s 2/2  

Thanks!

dibbhatt commented 6 years ago

So I see one instances of spike to 4 sec for ProcessedOffsetManager. And after that it is again fine

14758 | Streaming job from [output operation 1, batch time 09:20:10] collect at ProcessedOffsetManager.java:79 | 2018/08/30 09:20:11 | 4 s | 2/2 | 322/322

dibbhatt commented 6 years ago

For each cases , the prior stage is AggregationHandler.scala. Is this your business logic ?

yufuxin commented 6 years ago

Yes, AggregationHandler will run the business logic on the received Message which collected in the last 20 minutes.

dibbhatt commented 6 years ago

Do you think this is an issue with ProcessedOffsetManager ? I do not find any issue. It could be issue with Zookeeper . How big is your ZK cluster ? Can you try increasing max connection settings( in zk.cfg) to some higher value ?

yufuxin commented 6 years ago

I will try to change some parameter in zk.cfg. Thanks!