NewLifeX / NewLife.RocketMQ

纯托管轻量级RocketMQ客户端,支持发布消息、消费消息、负载均衡等核心功能!
MIT License
212 stars 79 forks source link

消费者 consume 支持异步 #44

Closed xujiesh0510 closed 2 years ago

xujiesh0510 commented 2 years ago

看了下源码,消费者是 pull 模型。 拉取消息都做了 await,Consume 就改下异步嘛? 能不能安排上?

var pr = await Pull(mq, offset, BatchSize, SuspendTimeout);
                    if (pr != null)
                    {
                        switch (pr.Status)
                        {
                            case PullStatus.Found:
                                if (pr.Messages != null && pr.Messages.Length > 0)
                                {
                                    // 触发消费
                                    **var rs = Consume(mq, pr);**  这边能改成异步么? await ConsumeAsync

                                    // 更新偏移
                                    if (rs)
                                    {
                                        st.Offset = pr.NextBeginOffset;
                                        // 提交消费进度
                                        await UpdateOffset(mq, st.Offset);
                                    }
                                }
nnhy commented 2 years ago

https://github.com/NewLifeX/NewLife.RocketMQ/commit/564ae4f60e6e86809bf8a4fa13ea886ebbafa031

xujiesh0510 commented 2 years ago

感谢大佬