TrumanDu / comments

comments
0 stars 0 forks source link

2019/04/13/%E5%A6%82%E4%BD%95%E7%9B%91%E6%8E%A7kafka%E6%B6%88%E8%B4%B9Lag%E6%83%85%E5%86%B5/ #5

Open utterances-bot opened 4 years ago

utterances-bot commented 4 years ago

如何监控kafka消费Lag情况 | Truman's Blog

前言为什么会有这个需求? kafka consumer 消费会存在延迟情况,我们需要查看消息堆积情况,就是所谓的消息Lag。目前是市面上也有相应的监控工具KafkaOffsetMonitor,我们自己也写了一套监控kmanager。但是随着kafka版本的升级,消费方式也发生了很大的变化,因此,我们需要重构一下kafk

http://trumandu.github.io/2019/04/13/%E5%A6%82%E4%BD%95%E7%9B%91%E6%8E%A7kafka%E6%B6%88%E8%B4%B9Lag%E6%83%85%E5%86%B5/

lihuimintu commented 4 years ago

基于zookeeper消费方式 offset 获取中第三步“通过consumer获取LogEndOffset(可见offset)”怎么做?看文中直接说略了。

TrumanDu commented 4 years ago

@lihuimintu zk目录中有,遍历目录读取就好了

TrumanDu commented 4 months ago

原文consumerOffset存在一个问题 建议修改为:

KafkaConsumers<String, String>  kafkaConsumer  = new KafkaConsumers<>(consumerProps);
    List<PartitionInfo> patitions = kafkaConsumer.partitionsFor(topicName);
    List<TopicPartition>topicPatitions = new ArrayList<>();
    patitions.forEach(patition->{
        TopicPartition topicPartition = new TopicPartition(topicName,patition.partition());
        topicPatitions.add(topicPartition);
    });
    Map<TopicPartition, Long> result = kafkaConsumer.endOffsets(topicPatitions);