dibbhatt / kafka-spark-consumer

High Performance Kafka Connector for Spark Streaming.Supports Multi Topic Fetch, Kafka Security. Reliable offset management in Zookeeper. No Data-loss. No dependency on HDFS and WAL. In-built PID rate controller. Support Message Handler . Offset Lag checker.
Apache License 2.0
635 stars 318 forks source link

work fine few days, then can't commit offerset, no error log, and no info log about PartitionManager #35

Closed cyinll closed 8 years ago

cyinll commented 8 years ago

the work fine few days, then, no commit offersset in zk

the last commit success log:

16/10/18 17:18:46 INFO PartitionManager: Committed offset 4951367 for Partition{host=x.x.x.x:9092, partition=0} for consumer: 123
16/10/18 17:18:47 INFO KafkaUtils: Fetching from Kafka for partition 0 for fetchSize 1024 and bufferSize 1048576
16/10/18 17:18:47 INFO PartitionManager: Total 1 messages from Kafka: x.x.x.x:0 there in internal buffers
16/10/18 17:18:47 INFO PartitionManager: Store for topic stream.affiliate_converted_click_record_log for partition 0 is : 4951367
16/10/18 17:18:47 INFO MemoryStore: Block input-0-1476427412421 stored as values in memory (estimated size 1392.0 B, free 2.8 GB)
16/10/18 17:18:47 INFO PartitionManager: LastComitted Offset : 4951367
16/10/18 17:18:47 INFO PartitionManager: New Emitted Offset : 4951368
16/10/18 17:18:47 INFO PartitionManager: Enqueued Offset :4951367
16/10/18 17:18:47 INFO PartitionManager: Last Enqueued offset 4951367 not incremented since previous Comitted Offset 4951367 for partition  Partition{host=x.x.x.x:9092, partition=0} for Consumer 123. Some issue in Process!!
16/10/18 17:18:48 INFO KafkaUtils: Fetching from Kafka for partition 0 for fetchSize 1024 and bufferSize 1048576
...
16/10/18 17:19:00 INFO CoarseGrainedExecutorBackend: Got assigned task 302048
16/10/18 17:19:00 INFO Executor: Running task 0.0 in stage 20386.0 (TID 302048)
16/10/18 17:19:00 INFO TorrentBroadcast: Started reading broadcast variable 20386
16/10/18 17:19:00 INFO MemoryStore: Block broadcast_20386_piece0 stored as bytes in memory (estimated size 1399.0 B, free 2.8 GB)
16/10/18 17:19:00 INFO TorrentBroadcast: Reading broadcast variable 20386 took 2 ms
16/10/18 17:19:00 INFO MemoryStore: Block broadcast_20386 stored as values in memory (estimated size 2.0 KB, free 2.8 GB)
16/10/18 17:19:00 INFO BlockManager: Found block input-0-1476427412419 locally
16/10/18 17:19:00 WARN Executor: 1 block locks were not released by TID = 302048:
[input-0-1476427412419]
16/10/18 17:19:00 INFO Executor: Finished task 0.0 in stage 20386.0 (TID 302048). 2322 bytes result sent to driver
16/10/18 17:19:00 INFO CoarseGrainedExecutorBackend: Got assigned task 302049
16/10/18 17:19:00 INFO Executor: Running task 0.0 in stage 20387.0 (TID 302049)
16/10/18 17:19:00 INFO CoarseGrainedExecutorBackend: Got assigned task 302051
16/10/18 17:19:00 INFO Executor: Running task 1.0 in stage 20387.0 (TID 302051)
16/10/18 17:19:00 INFO TorrentBroadcast: Started reading broadcast variable 20387
16/10/18 17:19:00 INFO MemoryStore: Block broadcast_20387_piece0 stored as bytes in memory (estimated size 12.5 KB, free 2.8 GB)
16/10/18 17:19:00 INFO TorrentBroadcast: Reading broadcast variable 20387 took 3 ms
16/10/18 17:19:00 INFO MemoryStore: Block broadcast_20387 stored as values in memory (estimated size 35.1 KB, free 2.8 GB)
16/10/18 17:19:00 INFO BlockManager: Found block input-0-1476427412419 locally
16/10/18 17:19:00 INFO BlockManager: Found block input-0-1476427412420 locally
16/10/18 17:19:00 INFO EsDataFrameWriter: Writing to [spark-aflt-data-test-2016-10-18/sparktest]
16/10/18 17:19:00 INFO EsDataFrameWriter: Writing to [spark-aflt-data-test-2016-10-18/sparktest]

then the log allways:

16/10/18 17:19:32 INFO KafkaUtils: Fetching from Kafka for partition 0 for fetchSize 1024 and bufferSize 1048576
16/10/18 17:19:33 INFO ZkCoordinator: Refreshing partition manager connections
16/10/18 17:19:33 INFO DynamicBrokersReader: Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=x.x.x.x:9092, 1=x.x.x.x:9092}}
16/10/18 17:19:33 INFO ZkCoordinator: Added partition index 0 for coordinator
16/10/18 17:19:33 INFO ZkCoordinator: Deleted partition managers: []
16/10/18 17:19:33 INFO ZkCoordinator: New partition managers: []
16/10/18 17:19:33 INFO ZkState: Starting curator service
16/10/18 17:19:33 INFO CuratorFrameworkImpl: Starting
16/10/18 17:19:33 INFO ZooKeeper: Initiating client connection, connectString=x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181 sessionTimeout=120000 watcher=org.apache.curator.ConnectionState@44bae644
16/10/18 17:19:33 INFO ClientCnxn: Opening socket connection to server x.x.x.x/x.x.x.x:2181. Will not attempt to authenticate using SASL (unknown error)
16/10/18 17:19:33 INFO ClientCnxn: Socket connection established to x.x.x.x/x.x.x.x:2181, initiating session
16/10/18 17:19:33 INFO ClientCnxn: Session establishment complete on server x.x.x.x/x.x.x.x:2181, sessionid = 0x2535a6cba5348e6, negotiated timeout = 120000
16/10/18 17:19:33 INFO ConnectionStateManager: State change: CONNECTED
16/10/18 17:19:33 INFO ZkCoordinator: Modified Fetch Rate for topic stream.affiliate_converted_click_record_log to : 1024
16/10/18 17:19:33 INFO ZooKeeper: Session: 0x2535a6cba5348e6 closed
16/10/18 17:19:33 INFO ClientCnxn: EventThread shut down
16/10/18 17:19:33 INFO ZkCoordinator: Finished refreshing
16/10/18 17:19:33 INFO KafkaUtils: Fetching from Kafka for partition 0 for fetchSize 1024 and bufferSize 1048576

spark: 2.10_2.0.0

kafka-spark-consumer:1.0.6

kafka:0.8.2.2

dibbhatt commented 8 years ago

Can you share the properties you are using.

It says Fetching from Kafka for partition 0 for fetchSize 1024

If your message size is more than this (1 KB ) , consumer won't fetch any messages.

You can set the consumer.min.fetchsizebytes to higher value ( default 1KB) so that if Back-pressure is kicked in, it should atleast pull at this min_rate.

Did you tried restart the Spark streaming job and still same issue ?

cyinll commented 8 years ago

oh,i don't set consumer.min.fetchsizebytes

val kafkaProperties: Map[String, String] = Map(
        "zookeeper.hosts" -> zkhosts,
        "zookeeper.port" -> zkports,
        "zookeeper.broker.path" -> brokerPath ,
        "kafka.topic" -> topic,
        "zookeeper.consumer.connection" -> "x.x.x.x:2181, x.x.x.x:2181, x.x.x.x:2181",
        "zookeeper.consumer.path" -> "/spark-streaming/affiliate_click_record",
        "kafka.consumer.id" -> "123",
        "consumer.forcefromstart" -> "false",
        "consumer.backpressure.enabled" -> "true",
        "consumer.fetchsizebytes" -> "1048576",
        "consumer.fillfreqms" -> "1000")

i notice the early log like this

16/10/18 17:18:17 INFO KafkaUtils: Fetching from Kafka for partition 0 for fetchSize 1048576 and bufferSize 1048576

i restart spark streaming, it work fine until now, about one day.

i check my message, i find the kafka message at offset 4951368, its serialized_value_size=1013, i think its total size is more than 1kb

Thanks for your quick reply~