apache / rocketmq-client-go

Apache RocketMQ go client
https://rocketmq.apache.org/
Apache License 2.0
1.28k stars 409 forks source link

[BUG] 使用 pushconsumer 客户端,默认重试两次后就commit,消息状态变成consumed #1129

Open majintao opened 5 months ago

majintao commented 5 months ago

pushConsumer 一直return consumer.ConsumeRetryLater,就只重试两次,然后就在admin后台查看变成了consumed 已消费状态

image

核心代码

        topic := "GO_TEST_TOPIC"
    rocket_mq.NewConsumerProcessor(topic, func(c context.Context, msg []byte) error {
        xflog.Infof("msg:%v", string(msg))
        return errors.New("111")
    }, consumer.WithConsumerModel(consumer.Clustering),
        consumer.WithMaxReconsumeTimes(16),
        consumer.WithGroupName("TEST_GROUP_4"),
        consumer.WithNameServer([]string{"127.0.0.1:9876"})).Start()

    c.Subscribe(cp.topic, cp.selector, func(ctx context.Context, messages ...*primitive.MessageExt) (
        consumer.ConsumeResult, error) {
        // 处理消息
        for _, message := range messages {
            err := cp.process(ctx, message) // 业务处理
            if err != nil {
                fmt.Printf("messageId:%s, consumer times:%d", message.MsgId, message.ReconsumeTimes)
                concurrentCtx, _ := primitive.GetConcurrentlyCtx(ctx)
                concurrentCtx.DelayLevelWhenNextConsume = 2 // only run when return consumer.ConsumeRetryLater
                return consumer.ConsumeRetryLater, nil
            }
        }
        return consumer.ConsumeSuccess, nil
    })
majintao commented 5 months ago

@twz915

caichunlong commented 4 months ago

@twz915

twz915 commented 4 months ago

我测试了,工作正常,和预期一致。你可以检查一下 rocketmq 服务端有没有什么设置只允许重试两次。 比如 broker.conf 文件,messageDelayLevel,maxReconsumeTimes 参数之类的

我使用的版本是 master 分支 image