Open Bpazy opened 1 year ago
该参数用于指定是否启用自动提交消费位移的功能。如果设置为 true,消费者将定期自动提交其消费位移。如果设置为 false,消费者必须通过手动编程的方式来提交位移,这样会更加控制消费位移的提交。如果你的业务场景希望避免重复消费,建议开启自动提交。
常用场景:消费者应用场景,其作用是自动提交消费位移,简化消费者代码,避免手动提交过程中的错误操作。
该参数用于指定Kafka服务器等待客户端心跳的超时时间(单位毫秒)。心跳是消费者在一段时间内向Kafka服务器发送的一个请求,以表明其仍处于“活动”状态。如果Heartbeat被Kafka服务器认为是过期的,那么它将被视为一个无效的会话,并被踢出消费组。需要注意的是,Session Timeout 必须比Group Max Session Timeout更短。
常用场景:消费者应用场景,如果该值设置过小,消费者有可能会因心跳超时而被认为已经死亡,从而导致分区再平衡。如果该值设置过大,则可能会延迟检测到由于消费者宕机而导致的分区再平衡。
该参数用于指定当没有初始的消费者位移或者消费者位移超出范围时,消费者应该如何定位自己的消费位移。通常有两种方式:最新消费和最早消费。
常用场景:消费者应用场景,如果你的消费者需要从某个时间点开始消费消息,你可以选择最早消费。如果你希望你的消费者从最新消息开始消费,你可以选择最新消费。
该参数用于指定消费者每次从Kafka获取的最大消息数量。这是一个可调整的值,具体应该如何设置取决于应用程序对消息消费的周期性和效率要求,但是需要确保消费者不会因为消息处理时间过长而从消费群组中消失。
常用场景:消费者应用场景,如果你的消费者想快速处理消费,那么可以增加 max.poll.records 的值,反之可以减少该值来降低消费者的消费速率。
该参数用于指定消费者处理单次调用 poll() 方法的最大时间(单位毫秒)。如果消费者处理所消费的消息所花费的时间超过了该时间,消费者将被视为没有响应,因此在没有显式关闭、提交或重新加入群组的情况下可能会触发群组重新平衡。
常用场景:消费者应用场景,如果你的消费者需要处理消费时间比较长(比如数据处理逻辑比较复杂),可以将该值设置为一个较大的值,以避免在消息处理过程中,消费者被认为没有响应。
ackMode 和 enable.auto.commit 参数在 Kafka 消费者应用程序中起到非常重要的作用,在一定程度上两者是相互关联的。
具体来讲,enable.auto.commit 告诉 Kafka 消费者是否自动提交消费位移,当 enable.auto.commit=true 时,消费者自动提交位移,可以确保消息被正确消费,但会有少量的延迟,即有可能将消息消费位置提交到 Kafka Broker 上的间隙时间段内。而当 enable.auto.commit=false 时,消费者使用手动提交位移的方式,需要显式的调用 consumer.commitSync() 或 consumer.commitAsync() 方法提交消费位移,以确保消费偏移量精确控制。
ackMode 参数用于处理手动提交位移的时候,在手动确认消费位移之前,Kafka 客户端必须等待服务器确认,以确保消费者在确认提交之前不会丢失偏移量,并允许消费者随时提交新的偏移量。这个参数可以控制提交的时点,例如每次读取消息时就提交位移,还是一次批量读取消息后再提交偏移。
consumer.commitSync() 是 Kafka 客户端的方法,用于提交消费位移的偏移量。它是同步确认的,即在 Kafka 服务器确认后,它会返回确认。如果提交位移失败,它会抛出异常。通常,commitSync() 在捕获异常并进行补偿时比 commitAsync() 更方便。
Acknowledgment.acknowledge() 是 Spring Kafka 模块提供的一个回调函数,用于显式提交偏移量。它的作用类似于 commitSync() 方法,但是它更容易与 Spring Boot 客户端代码集成,并且它可以简单地从 Spring 配置属性中注入,比如:
@KafkaListener(topics = "SOME_TOPIC", id = "SOME_ID", containerFactory = "SOME_FACTORY",
concurrency = "3", groupId = "SOME_GID")
public void listen(ConsumerRecord<String, String> consumerRecord, Acknowledgment ack) throws Exception {
try {
// doSomething();
} catch (Exception ex) {
log.error("message:{}, ex:{}", consumerRecord.value(), ExceptionTools.getExceptionStackTrace(ex));
} finally {
// 数据处理成功,通知队列清除
ack.acknowledge();
}
在消费端应用程序中,可以使用 consumer.commitSync() 或 Acknowledgment.acknowledge() 两种方式来确认消费成功,并提交偏移量。这两种方式本质上是一致的,只不过 Acknowledgment.acknowledge() 是 Spring Kafka 提供的一种用于处理手动提交的偏移量提交方式。无论使用哪一种方式,都需要保证消费生产是消息的精确性,以确保被消费的消息不会在异常发生后重新消费或者丢失。
记录一些 Kafka 的知识