dibbhatt / kafka-spark-consumer

High Performance Kafka Connector for Spark Streaming.Supports Multi Topic Fetch, Kafka Security. Reliable offset management in Zookeeper. No Data-loss. No dependency on HDFS and WAL. In-built PID rate controller. Support Message Handler . Offset Lag checker.
Apache License 2.0
635 stars 318 forks source link

It works well in local model,but when I submit it in cluster model,the fixed rate is too small #61

Closed yangcong3643 closed 5 years ago

yangcong3643 commented 6 years ago

spark:2.3.0 kafka:0.10.0

here is my error

processingRate rate 25.068939583855602 fixed rate 50.0 P 24.931060416144398 H 398.30784657808977 2018-12-03 16:17:07 INFO PIDController:88 - ======== Rate Revision Starts ======== 2018-12-03 16:17:07 INFO PIDController:89 - Current Fetch Size : 50 2018-12-03 16:17:07 INFO PIDController:90 - Fill Freq : 1000 2018-12-03 16:17:07 INFO PIDController:91 - Batch Duration : 2000 2018-12-03 16:17:07 INFO PIDController:92 - Scheduling Delay : 31777 2018-12-03 16:17:07 INFO PIDController:93 - Processing Delay : 3989 2018-12-03 16:17:07 INFO PIDController:94 - Fixed Rate : 50 2018-12-03 16:17:07 INFO PIDController:95 - Processing rate : 25 2018-12-03 16:17:07 INFO PIDController:96 - Proportional Error : 24 2018-12-03 16:17:07 INFO PIDController:97 - HistoricalError : 398 2018-12-03 16:17:07 INFO PIDController:98 - DifferentialError : 0 2018-12-03 16:17:07 INFO PIDController:99 - Reviced Rate : 25 2018-12-03 16:17:07 INFO PIDController:107 - Reviced FetchSize : 25 2018-12-03 16:17:07 INFO PIDController:108 - ======== Rate Revision Ends ======== 2018-12-03 16:17:07 INFO ReceiverStreamListener:129 - Modified Rate by Controller : 25 2018-12-03 16:17:07 WARN ReceiverStreamListener:139 - Controller rate not applied as waiting queue is greater than throttle queue 2018-12-03 16:17:07 INFO ClientCnxn:512 - EventThread shut down 2018-12-03 16:17:08 INFO ShuffledDStream:54 - Time 1543825028000 ms is invalid as zeroTime is 1543824956000 ms , slideDuration is 120000 ms and difference is 72000 ms 2018-12-03 16:17:08 INFO BlockManagerInfo:54 - Added input-0-1543825026000 in memory on 10.221.150.71:38632 (size: 32.4 KB, free: 351.6 MB) 2018-12-03 16:17:08 INFO JobScheduler:54 - Added jobs for time 1543825028000 ms 2018-12-03 16:17:08 WARN ReceiverStreamListener:98 - stop consumer as pending queue 18 greater than configured limit 3 2018-12-03 16:17:08 INFO ZkState:85 - Starting curator service 2018-12-03 16:17:08 INFO CuratorFrameworkImpl:224 - Starting

image

image

but in local model,everything goes fine

processingRate rate 1562500.0 fixed rate 50000.0 P -1512500.0 H 5468.75 2018-12-03 16:35:42 INFO PIDController:88 - ======== Rate Revision Starts ======== 2018-12-03 16:35:42 INFO PIDController:89 - Current Fetch Size : 50000 2018-12-03 16:35:42 INFO PIDController:90 - Fill Freq : 1000 2018-12-03 16:35:42 INFO PIDController:91 - Batch Duration : 2000 2018-12-03 16:35:42 INFO PIDController:92 - Scheduling Delay : 7 2018-12-03 16:35:42 INFO PIDController:93 - Processing Delay : 64 2018-12-03 16:35:42 INFO PIDController:94 - Fixed Rate : 50000 2018-12-03 16:35:42 INFO PIDController:95 - Processing rate : 1562500 2018-12-03 16:35:42 INFO PIDController:96 - Proportional Error : -1512500 2018-12-03 16:35:42 INFO PIDController:97 - HistoricalError : 5468 2018-12-03 16:35:42 INFO PIDController:98 - DifferentialError : -884 2018-12-03 16:35:42 INFO PIDController:99 - Reviced Rate : 1562500 2018-12-03 16:35:42 INFO PIDController:107 - Reviced FetchSize : 1562500 2018-12-03 16:35:42 INFO PIDController:108 - ======== Rate Revision Ends ======== 2018-12-03 16:35:42 INFO ReceiverStreamListener:129 - Modified Rate by Controller : 1562500 2018-12-03 16:35:42 INFO ZkState:85 - Starting curator service 2018-12-03 16:35:42 INFO CuratorFrameworkImpl:224 - Starting image

yangcong3643 commented 6 years ago

local config is --master local[2] cluster config is --master spark://master:6066 --deploy-mode cluster --total-executor-cores 2 --executor-cores 1 --executor-memory 1g

dibbhatt commented 5 years ago

I think your Kafka topic has 2 partitions ? So with total executors 2 each having 1 cores has consumed all cores for receiver and you do not have any cores left for processing. You can increase total-executor to higher and see if that works for you.

yangcong3643 commented 5 years ago

I have already set receiver.num=1 Why does it behave differently in both modes?

yangcong3643 commented 5 years ago

It’s sloved after I Raised the batchDuration

dibbhatt commented 5 years ago

@yangcong3643 good to hear that. If things are fine, you can close the issue.