kenttanl / kenttanl.github.io

个人博客,http://kentt.top/
https://kenttanl.github.io/
MIT License
0 stars 0 forks source link

Kakfa #2

Open kenttanl opened 3 years ago

kenttanl commented 3 years ago

Consumer poll and rebalance

通过 Wireshark 抓包以及 Debug 源码,对 Consumer poll 及 rebalance 相关操作得出以下结论:

  1. new KafkaConsumer() 以及调用 consumer.subscribe() 方法时不会产生实际的 TCP 连接;
  2. 只有在需要从服务器获取信息时,才会通过一个叫 Fetcher 的代理类去连接服务器,并执行所需的操作;
  3. 只有调用 consumer.poll(...) 方法后,才会启动心跳线程。同时将 consumer 加入到 group 中,也即是此时才会进行 rebalance 操作;
  4. 调用 consumer.poll(...) 操作时,实际执行了如下操作:
    • 1) 确保 coordinator 准备就绪;
    • 2) 确保该 group 处于活动状态;
      • 2.1) 确保 coordinator 准备就绪;
      • 2.2) 启动心跳线程(如果有必要);
      • 2.3) 加入 Group(如果需要,此时应该就是由服务器进行 rebalance 操作了);
      • 3) 检查心跳的状态,并指示客户端的活动状态(并未真正提交心跳,只是更改了心跳类中的一些指标数据。心跳由心跳线程轮询提交);
      • 4) 自动提交 offset(如果有必要则进行异步提交);
      • 5) 拉取并返回数据;
kenttanl commented 3 years ago

附加1: client.id 可能导致 Broker 的内存溢出问题(version_0.11):

Consumer 连接服务器,如果没有指定 client.id,那么将由客户端自动生成一个 clientId(由 consumer- 为前缀,累加数字为后缀)。当 consumer 连接 Broker 后,会生成一个用于 JMX 监控的对象,并以 clientId 为键,JMX对象为值的形式保存在一个 Map 容器中,并且值得注意的是:Broker 不会在 Consumer 关闭时将其对应的数据从 Map 容器中移除,所以最终可能导致的一个实际情形是:当我们不停的通过创建大量的 consumer 去监控集群上对应 consumer 的偏移量等指标时,Broker 的内存使用率逐渐的在增加,直到 OOM。最后通过 head dump 文件发现,使用 8G 的内存,该 Map 却占用了 7G 的内存。解决方法是:通过指定或复用 clientId,以减少 Broker 的缓存量,最终问题得以解决。

kenttanl commented 3 years ago

可能导致数据丢失的几种情形

  1. 异步发送消息: 异步本身就是不可靠的;
  2. acks 设置不当:
    • acks=0 时:生产者发送消息以后,不会等待服务器的响应,便认为消息已经发送成功,此时类似于异步发送消息;
    • acks=1 时:生产者发送消息以后,仅等待 leader 写入成功后,便返回确认成功的响应,此时存在单点问题;
  3. ISR 数据过小: 当 acks=-1 时,生产者发送消息后,会等待 ISR 中的所有节点全部写入成功后,才会返回确认成功的响应,所以如果 ISR 数量为 1 时,便同样存在单点问题。所以至少在 topic 级别的配置中设置 min.insync.replicas=2
  4. 消费者自动重置 offset 策略:auto.offset.reset=earliest | latest时,如果分区上不存在指定组的 offset,那么将自动重置 offset 为最早或者最新的,而此时如果数据是因为时间过期或者空间占用大小超过阈值而被删除,那么就会出现无感知的数据丢失,即数据在消费前被删除,但是消费者却并不知道其被删除,从而没有进行手动恢复数据;
  5. 消费者异步消费消息:
    • 在自动提交提交 offset 的设置下,如果在 poll 到消息后,使用一个新的线程来异步处理数据,那么 offset 很可能在消息处理成功前被提交。此时如果程序发生异常,则消息可能丢失;
    • 注1:在默认的实现中,消费者只能在 poll 数据的线程中提交 offset,否则会抛出异常。而使用 spring-kafka-client 时,非 poll 线程提交 offset 实际上是先放入一个队列,然后在 poll 时再做提交操作,所以此时异步提交数据,不会丢失数据,但是可能会延迟提交 offset;
    • 注2:同步处理消息时,无论自动提交 offset 与否,都不会存在消息丢失,只会存在消息重复消费的问题,参考最上方的 poll 原理。
kenttanl commented 3 years ago

为何快?

  1. 生产者(发送消息):
    • 可异步发送数据可减少生产者等待时间,造成仿佛很快的感觉;
    • 可批量发送数据可减少网络往返次数;
    • 可设置 acks=0 类似于异步发送消息,acks=1 时,减少了响应成功时 leader 与 ISR 中其它节点的交互时间;
    • 注:但以上的方式都无法保证数据的可靠性;
  2. 服务器:
    • 顺序写,避免了随机读写,时间复杂度 O(1)(读写数据);
    • 先写内存,内存写入成功则响应成功,内存中的数据则定时刷新到磁盘(发送消息);
    • 通过 mmap 映射磁盘 page,减少了将数据从用户空间到内核空间往返拷贝的步骤(刷新磁盘);
    • 如果消费者消费速度能跟上生产速度,则极可能直接从内存中读取数据,而不需要再从磁盘上读取数据(消费消息);
  3. 消费者(获取数据):
    • 通过 offset 定位数据,时间复杂度 O(1);
    • 可批次获取消息,减少网络往返次数;
    • 可异步提交 offset,且不存在数据丢失的问题;