haoshuaifei / blog-comments

0 stars 0 forks source link

wiki/rocketmq/index_08 #19

Open beaudar-bot opened 6 months ago

beaudar-bot commented 6 months ago

RocketMq:08-消费者 - 码界Musing

RocketMQ 的传输模型是:发布订阅模型 。 发布订阅模型具有如下特点: 消费独⽴ 相⽐队列模型的匿名消费⽅式,发布订阅模型中消费⽅都会具备的身份,⼀般叫做订阅组(订阅关系),不同订阅 组之间相互独⽴不会相互影响。 ⼀对多通信 基于独⽴身份的设计,同⼀个主题内的消息可以被多个订阅组处理,每个订阅组都可以拿到全量消息。因此发布订 阅模型可以实现⼀对多通信。 RocketMQ ⽀持两

https://shuaifeihao.top/wiki/rocketmq/index_08.html

haoshuaifei commented 6 months ago

RocketMQ轮询机制由两个线程共同完成: 1、PullRequestHoldService:每隔5s重试一次 2、DefaultMessageStore#ReputMessageService:每处理一次重新拉取,线程休眠1s,继续下次检查

如果开启了长轮询机制,PullRequestService线程每隔5s被唤醒,尝试检测是否有新消息到来,直到超时才停止,如果被挂起,需要等待5s再执行,消息的实时性比较差;由此引入另一种机制:消息到达时唤醒挂起线程,触发一次检查。

haoshuaifei commented 6 months ago

RocketAMQ默认提供5种分配算法: 1、平均分配 2、平均轮询分配 3、一致性哈希 4、根据配置 5、根据broker部署机房名

建议使用1或者2,直观简单

消息队列分配原则: 一个消费者可以分配多个消费队列同一个消费队列只能分配给一个消费者。

haoshuaifei commented 6 months ago

1、消息拉取与消费模式 2、消息拉取流程图 3、PullMessageService和RebalanceService

haoshuaifei commented 6 months ago

RocketMQ支持顺序消费和并发消费,CounsumeMessageService有两个实现类CounsumeMessageConcurrentlyService和CounsumeMessageOrderlyService。 并发消费流程CounsumeMessageConcurrentlyService#submitConsumeRequest(): 1、consumeMessageBatchMaxSize和msgs.size()比较,如果msgs.size()比较大,则分多次提交,否则直接提交;如果提交过程中出现拒绝提交异常,则延迟5s再提交。 2、检查processQueue的dropped,避免消费者消费不属于自己的消息队列。 3、执行ComsumeMessageHook#consumeMessageBefore 4、恢复重试主题名;SHCHEDULE_TOPIC_XXX 5、执行具体的消息消费;CONSUME_SUCCESS、RECONSUME_LATER(需要重新消费) 6、执行ComsumeMessageHook#consumeMessageAfter 7、根据消息监听器返回的结果计算ackIndex;如果CONSUME_SUCCESS,则ackIndex = msgs.size-1;如果RECONSUME_LATER,则将ackIndex设置为-1 8、如果是广播模式,业务方会返回RECONSUME_LATER,消费并不会重新消费,而是以告警级别输出到日志文件中去。如果是集群模式,该批消息需要发送ack消息,如果发送失败则延迟5s重新消费 9、从processQueue移除该批消息,这里返回的偏移量是移除该批消息后的最小偏移量(与消费任务中的消息没关系);然后使用该偏移量更新消费进度,以便消费者重启后能从上一次的消费进度继续开始消费,避免重复消费。即使返回RECONSUME_LATER也会推进消费进度。

haoshuaifei commented 6 months ago

定时消息实现原理: 1、消息消费者发送消息,如果发送消息的delayLevel大于0,则将消息主题变更为SHCEDULE_TOPIC_XXXX,消息队列为depayLevel减1. 2、消息经由CommitLog文件转发到消息消费队列SCHEDULE_TOPIC_XXXX中。 3、定时任务Time每隔1s根据上次拉取偏移量从消费队列中取出所有消息。 4、根据消息的物理偏移量与消息大小从CommitLog文件中拉取消息。 5、根据消息属性重新创建消息,恢复原主题topicA、原队列ID,清楚depayLevel属性,并存入CommitLog文件。 6、将消息转发到原主题topicA的消息消费队列,供消息消费者消费。

haoshuaifei commented 6 months ago

顺序消息 RocketMQ支持局部消息顺序消费,可以确保同一个消息消费队列中的消息按照顺序消费,如果要做到全局顺序消费,则可以将主题配置成一个队列,适用于数据库BinLog等严格要求顺序消息消费的场景。