pinterest / secor

Secor is a service implementing Kafka log persistence
Apache License 2.0
1.84k stars 543 forks source link

progress monitor NoSuchElementException #176

Closed fxpz closed 7 years ago

fxpz commented 8 years ago

Hi,

I'm trying to use progress monitor component. It works fine in my dev env where I have only one partition in kafka topic. In my staging env I have 5 partitions and when I try to use progress monitor I have the following exception. I paste log at DEBUG level if it can help you. I have three server that holds zookeeper and kafka on port 2181 and 9092

Regards

2016-02-09 17:23:50,713 [main] (org.apache.zookeeper.ZooKeeper:438) INFO  Initiating client connection, connectString=kafka01:2181,kafka02:2181,kafka03:2181 sessionTimeout=86400000 watcher=com.twitter.common.zookeeper.ZooKeeperClient$2@28bf3bbb
2016-02-09 17:23:50,717 [main] (org.apache.zookeeper.ClientCnxn:102) DEBUG zookeeper.disableAutoWatchReset is false
2016-02-09 17:23:50,747 [main-SendThread(kafka01:2181)] (org.apache.zookeeper.ClientCnxn$SendThread:975) INFO  Opening socket connection to server kafka01/10.209.3.142:2181. Will not attempt to authenticate using SASL (unknown error)
2016-02-09 17:23:50,760 [main-SendThread(kafka01:2181)] (org.apache.zookeeper.ClientCnxn$SendThread:852) INFO  Socket connection established to kafka01/10.209.3.142:2181, initiating session
2016-02-09 17:23:50,765 [main-SendThread(kafka01:2181)] (org.apache.zookeeper.ClientCnxn$SendThread:892) DEBUG Session establishment request sent on kafka01/10.209.3.142:2181
2016-02-09 17:23:50,778 [main-SendThread(kafka01:2181)] (org.apache.zookeeper.ClientCnxn$SendThread:1235) INFO  Session establishment complete on server kafka01/10.209.3.142:2181, sessionid = 0x152c5ea10ad0022, negotiated timeout = 40000
2016-02-09 17:23:50,790 [main-SendThread(kafka01:2181)] (org.apache.zookeeper.ClientCnxn$SendThread:818) DEBUG Reading reply sessionid:0x152c5ea10ad0022, packet:: clientPath:null serverPath:null finished:false header:: 1,8  replyHeader:: 1,55866723174,0  request:: '/consumers/secor_partition/offsets,F  response:: v{'ads} 
2016-02-09 17:23:50,800 [main-SendThread(kafka01:2181)] (org.apache.zookeeper.ClientCnxn$SendThread:818) DEBUG Reading reply sessionid:0x152c5ea10ad0022, packet:: clientPath:null serverPath:null finished:false header:: 2,8  replyHeader:: 2,55866723174,0  request:: '/consumers/secor_partition/offsets/ads,F  response:: v{'3,'2,'1,'0,'4} 
2016-02-09 17:23:50,802 [main] (org.apache.zookeeper.ZooKeeper:438) INFO  Initiating client connection, connectString=kafka01:2181,kafka02:2181,kafka03:2181 sessionTimeout=86400000 watcher=com.twitter.common.zookeeper.ZooKeeperClient$2@16393296
2016-02-09 17:23:50,816 [main-SendThread(kafka03:2181)] (org.apache.zookeeper.ClientCnxn$SendThread:975) INFO  Opening socket connection to server kafka03/10.208.139.175:2181. Will not attempt to authenticate using SASL (unknown error)
2016-02-09 17:23:50,819 [main-SendThread(kafka03:2181)] (org.apache.zookeeper.ClientCnxn$SendThread:852) INFO  Socket connection established to kafka03/10.208.139.175:2181, initiating session
2016-02-09 17:23:50,820 [main-SendThread(kafka03:2181)] (org.apache.zookeeper.ClientCnxn$SendThread:892) DEBUG Session establishment request sent on kafka03/10.208.139.175:2181
2016-02-09 17:23:50,830 [main-SendThread(kafka03:2181)] (org.apache.zookeeper.ClientCnxn$SendThread:1235) INFO  Session establishment complete on server kafka03/10.208.139.175:2181, sessionid = 0x351685a81410461, negotiated timeout = 40000
2016-02-09 17:23:50,846 [main-SendThread(kafka03:2181)] (org.apache.zookeeper.ClientCnxn$SendThread:818) DEBUG Reading reply sessionid:0x351685a81410461, packet:: clientPath:null serverPath:null finished:false header:: 1,4  replyHeader:: 1,55866723175,0  request:: '/consumers/secor_partition/offsets/ads/3,F  response:: #363332393239343538,s{55835982392,55866713128,1453478610562,1455036631919,478,0,0,0,9,0,55835982392} 
2016-02-09 17:23:50,847 [main] (com.pinterest.secor.common.KafkaClient:62) DEBUG looking up leader for topic ads partition 3
2016-02-09 17:23:51,556 [main] (kafka.utils.Logging$class:52) DEBUG Disconnecting from kafka01:9092
2016-02-09 17:23:51,566 [main] (kafka.utils.Logging$class:52) DEBUG Created socket with SO_TIMEOUT = 100000 (requested 100000), SO_RCVBUF = 65536 (requested 65536), SO_SNDBUF = 43520 (requested -1), connectTimeoutMs = 100000.
2016-02-09 17:23:51,635 [main] (kafka.utils.Logging$class:52) DEBUG Disconnecting from kafka01:9092
2016-02-09 17:23:51,638 [main] (com.pinterest.secor.common.KafkaClient:154) DEBUG leader for topic ads partition 3 is 10.209.10.38:9092
2016-02-09 17:23:51,640 [main] (com.pinterest.secor.common.KafkaClient:116) DEBUG fetching message topic ads partition 3 offset 
2016-02-09 17:23:51,671 [main] (kafka.utils.Logging$class:52) DEBUG Disconnecting from 10.209.10.38:9092
2016-02-09 17:23:51,673 [main] (kafka.utils.Logging$class:52) DEBUG Created socket with SO_TIMEOUT = 100000 (requested 100000), SO_RCVBUF = 65536 (requested 65536), SO_SNDBUF = 43520 (requested -1), connectTimeoutMs = 100000.
2016-02-09 17:23:51,724 [main] (kafka.utils.Logging$class:52) DEBUG Disconnecting from 10.209.10.38:9092
2016-02-09 17:23:51,725 [main] (com.pinterest.secor.main.ProgressMonitorMain:45) ERROR Progress monitor failed
java.util.NoSuchElementException
        at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39)
        at kafka.javaapi.message.ByteBufferMessageSet$$anon$1.next(ByteBufferMessageSet.scala:47)
        at kafka.javaapi.message.ByteBufferMessageSet$$anon$1.next(ByteBufferMessageSet.scala:40)
        at com.pinterest.secor.common.KafkaClient.getMessage(KafkaClient.java:130)
        at com.pinterest.secor.common.KafkaClient.getCommittedMessage(KafkaClient.java:207)
        at com.pinterest.secor.tools.ProgressMonitor.getStats(ProgressMonitor.java:183)
        at com.pinterest.secor.tools.ProgressMonitor.exportStats(ProgressMonitor.java:131)
        at com.pinterest.secor.main.ProgressMonitorMain.main(ProgressMonitorMain.java:43)
HenryCaiHaiying commented 8 years ago

It seems to me the offset kept from the ZK is no longer exists on kafka side (maybe due to kakfa retention time),

In KafkaClient.java, before line 207, you can print the committedOffset, and use kafka tool (e.g. kafka-console-consumer.sh or kafka-simple-consumer-shell.sh) to fetch that specific offset.

On Tue, Feb 9, 2016 at 9:31 AM, fxpz notifications@github.com wrote:

Hi,

I'm trying to use progress monitor component. It works fine in my dev env where I have only one partition in kafka topic. In my staging env I have 5 partitions and when I try to use progress monitor I have the following exception. I paste log at DEBUG level if it can help you. I have three server that holds zookeeper and kafka on port 2181 and 9092

Regards

2016-02-09 17:23:50,713 main INFO Initiating client connection, connectString=kafka01:2181,kafka02:2181,kafka03:2181 sessionTimeout=86400000 watcher=com.twitter.common.zookeeper.ZooKeeperClient$2@28bf3bbb 2016-02-09 17:23:50,717 main DEBUG zookeeper.disableAutoWatchReset is false 2016-02-09 17:23:50,747 main-SendThread(kafka01:2181) INFO Opening socket connection to server kafka01/10.209.3.142:2181. Will not attempt to authenticate using SASL (unknown error) 2016-02-09 17:23:50,760 main-SendThread(kafka01:2181) INFO Socket connection established to kafka01/10.209.3.142:2181, initiating session 2016-02-09 17:23:50,765 main-SendThread(kafka01:2181) DEBUG Session establishment request sent on kafka01/10.209.3.142:2181 2016-02-09 17:23:50,778 main-SendThread(kafka01:2181) INFO Session establishment complete on server kafka01/10.209.3.142:2181, sessionid = 0x152c5ea10ad0022, negotiated timeout = 40000 2016-02-09 17:23:50,790 main-SendThread(kafka01:2181) DEBUG Reading reply sessionid:0x152c5ea10ad0022, packet:: clientPath:null serverPath:null finished:false header:: 1,8 replyHeader:: 1,55866723174,0 request:: '/consumers/secor_partition/offsets,F response:: v{'ads} 2016-02-09 17:23:50,800 main-SendThread(kafka01:2181) DEBUG Reading reply sessionid:0x152c5ea10ad0022, packet:: clientPath:null serverPath:null finished:false header:: 2,8 replyHeader:: 2,55866723174,0 request:: '/consumers/secor_partition/offsets/ads,F response:: v{'3,'2,'1,'0,'4} 2016-02-09 17:23:50,802 main INFO Initiating client connection, connectString=kafka01:2181,kafka02:2181,kafka03:2181 sessionTimeout=86400000 watcher=com.twitter.common.zookeeper.ZooKeeperClient$2@16393296 2016-02-09 17:23:50,816 main-SendThread(kafka03:2181) INFO Opening socket connection to server kafka03/10.208.139.175:2181. Will not attempt to authenticate using SASL (unknown error) 2016-02-09 17:23:50,819 main-SendThread(kafka03:2181) INFO Socket connection established to kafka03/10.208.139.175:2181, initiating session 2016-02-09 17:23:50,820 main-SendThread(kafka03:2181) DEBUG Session establishment request sent on kafka03/10.208.139.175:2181 2016-02-09 17:23:50,830 main-SendThread(kafka03:2181) INFO Session establishment complete on server kafka03/10.208.139.175:2181, sessionid = 0x351685a81410461, negotiated timeout = 40000 2016-02-09 17:23:50,846 main-SendThread(kafka03:2181) DEBUG Reading reply sessionid:0x351685a81410461, packet:: clientPath:null serverPath:null finished:false header:: 1,4 replyHeader:: 1,55866723175,0 request:: '/consumers/secor_partition/offsets/ads/3,F response:: #363332393239343538,s{55835982392,55866713128,1453478610562,1455036631919,478,0,0,0,9,0,55835982392} 2016-02-09 17:23:50,847 main DEBUG looking up leader for topic ads partition 3 2016-02-09 17:23:51,556 main DEBUG Disconnecting from kafka01:9092 2016-02-09 17:23:51,566 main DEBUG Created socket with SO_TIMEOUT = 100000 (requested 100000), SO_RCVBUF = 65536 (requested 65536), SO_SNDBUF = 43520 (requested -1), connectTimeoutMs = 100000. 2016-02-09 17:23:51,635 main DEBUG Disconnecting from kafka01:9092 2016-02-09 17:23:51,638 main DEBUG leader for topic ads partition 3 is 10.209.10.38:9092 2016-02-09 17:23:51,640 main DEBUG fetching message topic ads partition 3 offset 2016-02-09 17:23:51,671 main DEBUG Disconnecting from 10.209.10.38:9092 2016-02-09 17:23:51,673 main DEBUG Created socket with SO_TIMEOUT = 100000 (requested 100000), SO_RCVBUF = 65536 (requested 65536), SO_SNDBUF = 43520 (requested -1), connectTimeoutMs = 100000. 2016-02-09 17:23:51,724 main DEBUG Disconnecting from 10.209.10.38:9092 2016-02-09 17:23:51,725 main ERROR Progress monitor failed java.util.NoSuchElementException at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39) at kafka.javaapi.message.ByteBufferMessageSet$$anon$1.next(ByteBufferMessageSet.scala:47) at kafka.javaapi.message.ByteBufferMessageSet$$anon$1.next(ByteBufferMessageSet.scala:40) at com.pinterest.secor.common.KafkaClient.getMessage(KafkaClient.java:130) at com.pinterest.secor.common.KafkaClient.getCommittedMessage(KafkaClient.java:207) at com.pinterest.secor.tools.ProgressMonitor.getStats(ProgressMonitor.java:183) at com.pinterest.secor.tools.ProgressMonitor.exportStats(ProgressMonitor.java:131) at com.pinterest.secor.main.ProgressMonitorMain.main(ProgressMonitorMain.java:43)

— Reply to this email directly or view it on GitHub https://github.com/pinterest/secor/issues/176.

fxpz commented 8 years ago

Hi,

thanks for your response. I have a kafka retention of 3 hours and secor should commit after upload max every hour

secor.max.file.size.bytes=200000000
secor.max.file.age.seconds=3600

I checked offset for each partition and get message and it's present:

./kafka-consumer-offset-checker.sh --zookeeper localhost --topic ads --group secor_partition
Group           Topic                          Pid Offset          logSize         Lag             Owner
secor_partition ads                            0   2873013207      2873553903      540696          secor_partition_97d5a4a13fc8_6_12-0
secor_partition ads                            1   666247483       667049634       802151          secor_partition_97d5a4a13fc8_6_13-0
secor_partition ads                            2   661908626       662710652       802026          secor_partition_97d5a4a13fc8_6_14-0
secor_partition ads                            3   661709278       662296199       586921          secor_partition_e662b21fbec5_6_12-0
secor_partition ads                            4   652082676       652860944       778268          secor_partition_e662b21fbec5_6_13-0

e.g.

./kafka-simple-consumer-shell.sh --broker-list kafka01:9092,kafka02:9092,kafka03:9092 --topic ads --partition 4 --offset 652082676 --max-messages 1

Also, I put this line LOG.debug("committed offset {}", committedOffset); at line 207 but I don't see anything in execution log (set to DEBUG through log4j config file)

HenryCaiHaiying commented 8 years ago

You can change that LOG.debug to LOG.warn then it will print out.

You will need to use kafka-console-consumer.sh or kafka-simple-consumer-shell.sh to pull that specific offset message to see whether it's stil there in kafka.

On Wed, Feb 10, 2016 at 4:29 AM, fxpz notifications@github.com wrote:

Hi,

thanks for your response. I have a kafka retention of 3 hours and secor should commit after upload max every hour

secor.max.file.size.bytes=200000000 secor.max.file.age.seconds=3600

I checked offset for each partition and get message and it's present:

./kafka-consumer-offset-checker.sh --zookeeper localhost --topic ads --group secor_partition Group Topic Pid Offset logSize Lag Owner secor_partition ads 0 2873013207 2873553903 540696 secor_partition_97d5a4a13fc8_6_12-0 secor_partition ads 1 666247483 667049634 802151 secor_partition_97d5a4a13fc8_6_13-0 secor_partition ads 2 661908626 662710652 802026 secor_partition_97d5a4a13fc8_6_14-0 secor_partition ads 3 661709278 662296199 586921 secor_partition_e662b21fbec5_6_12-0 secor_partition ads 4 652082676 652860944 778268 secor_partition_e662b21fbec5_6_13-0

e.g.

./kafka-simple-consumer-shell.sh --broker-list kafka01:9092,kafka02:9092,kafka03:9092 --topic ads --partition 4 --offset 652082676 --max-messages 1

Also, I put this line LOG.debug("committed offset {}", committedOffset); at line 207 but I don't see anything in execution log (set to DEBUG through log4j config file)

— Reply to this email directly or view it on GitHub https://github.com/pinterest/secor/issues/176#issuecomment-182349103.

laloyd commented 8 years ago

Aloha Henry,

We are currently having the same issue and I can confirm that the offset is present in kafka, however when I look into zookeeper I see the offset is actually one above the one KafkaClient is looking for.

./kafka-cli simple-consumer --broker-list kafka-broker1:9092, kafka-broker2:9092, kafka-broker3:9092 --partition 22 --topic slog --offset 1543641 --max-message 1

returns a message but on zookeeper: get /consumers/secor_hr_partition/offsets/slog/22 1543642

the offset in zookeeper is one increment ahead of what the secor logs are looking for:

2016-04-19 11:38:47,309 [pool-1-thread-1] (com.pinterest.secor.common.KafkaClient:203) WARN committed offset 1543641 2016-04-19 11:38:47,311 [pool-1-thread-1] (com.pinterest.secor.common.KafkaClient:154) WARN leader for topic slog partition 22 is kafka-broker3:9092 2016-04-19 11:38:47,311 [pool-1-thread-1] (com.pinterest.secor.common.KafkaClient:116) WARN fetching message topic slog partition 22 offset 1543641 2016-04-19 11:38:47,313 [pool-1-thread-1] (com.slack.secor.server.SecorServicesMain$3:83) ERROR Progress Monitor failed to export stats java.util.NoSuchElementException at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:39) at kafka.javaapi.message.ByteBufferMessageSet$$anon$1.next(ByteBufferMessageSet.scala:47) at kafka.javaapi.message.ByteBufferMessageSet$$anon$1.next(ByteBufferMessageSet.scala:40) at com.pinterest.secor.common.KafkaClient.getMessage(KafkaClient.java:131) at com.pinterest.secor.common.KafkaClient.getCommittedMessage(KafkaClient.java:208) at com.pinterest.secor.tools.ProgressMonitor.getStats(ProgressMonitor.java:183) at com.pinterest.secor.tools.ProgressMonitor.exportStats(ProgressMonitor.java:131) .... ...

Also the next offset 1543642 exists also in kafka

HenryCaiHaiying commented 8 years ago

Is both 1543642 and 1543641 are in kafka broker?

I don't remember the details of the offset stored in zk, but I think it might be +/- 1. The offset we stored in ZK might be the one we are going to process next (instead of the one we've just processed).

On Tue, Apr 19, 2016 at 11:46 AM, laloyd notifications@github.com wrote:

Aloha Henry,

We are currently having the same issue and I can confirm that the offset is present in kafka, however when I look into zookeeper I see the offset is actually one above the one KafkaClient is looking for.

./kafka-cli simple-consumer --broker-list kafka-broker1:9092, kafka-broker2:9092, kafka-broker3:9092 --partition 22 --topic slog --offset 1543641 --max-message 1

returns a message but on zookeeper: get /consumers/secor_hr_partition/offsets/slog/22 1543642

— You are receiving this because you commented. Reply to this email directly or view it on GitHub https://github.com/pinterest/secor/issues/176#issuecomment-212064928

laloyd commented 8 years ago

So all the offsets below and above and including the original offset existed in kafka, however the KafkaClient was consistently receiving an empty result from the fetch . After an (unrelated) restart of zookeeper, everything started working again properly! I'll update here if happens again and if I can figure out why it happened in the first place.

HenryCaiHaiying commented 8 years ago

My suspicion is the stored offset in the ZK path is not consistent among the quorum members, when secor makes a request for the offset stored in ZK, it didn't get the offset it was supposed to get.

On Wed, Apr 20, 2016 at 9:25 AM, laloyd notifications@github.com wrote:

So all the offsets below and above and including the original offset existed in kafka, however the KafkaClient was consistently receiving an empty result from the fetch . After an (unrelated) restart of zookeeper, everything started working again properly! I'll update here if happens again and if I can figure out why it happened in the first place.

— You are receiving this because you commented. Reply to this email directly or view it on GitHub https://github.com/pinterest/secor/issues/176#issuecomment-212500805