apache / rocketmq

Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.
https://rocketmq.apache.org/
Apache License 2.0
21.27k stars 11.7k forks source link

[Bug] 消息消费成功了,也会重复投递,直到重复投递16次进入死信队列 #8900

Open bruce256 opened 1 week ago

bruce256 commented 1 week ago

Before Creating the Bug Report

Runtime platform environment

centos 7.9

RocketMQ version

rocketmq 5.3.0 rocketmq-client 5.0.6

JDK Version

jdk 1.8

Describe the Bug

本地消息消费成功,但是broker依然重复投递消息,直到进入死信队列。与此同时,proxy.log报如下异常堆栈。客户端到proxy的端口网络正常.刚开始部署的时候是好的,近期才出现

2024-11-07 16:12:31 INFO pool-4-thread-1 - clear handle of this client when client unregister. group:test, clientChannelInfo:ClientChannelInfo [channel=GrpcClientChannel{clientId=V-LVSHENG1@10572@1@x8pkjaacj4, remoteAddress=10.43.165.207:65031, localAddress=10.0.82.170:8081}, clientId=V-LVSHENG1@10572@1@x8pkjaacj4, language=JAVA, version=473, lastUpdateTimestamp=1730967025698]
2024-11-07 16:12:31 INFO pool-4-thread-1 - remove grpc channel when client unregister. group:test, clientChannelInfo:ClientChannelInfo [channel=GrpcClientChannel{clientId=V-LVSHENG1@10572@1@x8pkjaacj4, remoteAddress=10.43.165.207:65031, localAddress=10.0.82.170:8081}, clientId=V-LVSHENG1@10572@1@x8pkjaacj4, language=JAVA, version=473, lastUpdateTimestamp=1730967025698], removed:true
2024-11-07 16:12:31 INFO pool-4-thread-1 - remove remoting channel when client unregister. clientChannelInfo:ClientChannelInfo [channel=GrpcClientChannel{clientId=V-LVSHENG1@10572@1@x8pkjaacj4, remoteAddress=10.43.165.207:65031, localAddress=10.0.82.170:8081}, clientId=V-LVSHENG1@10572@1@x8pkjaacj4, language=JAVA, version=473, lastUpdateTimestamp=1730967025698]
2024-11-07 16:12:32 INFO GrpcClientSettingsManagerCleaner - remove unused grpc client settings. group:org.apache.rocketmq.broker.client.ConsumerGroupInfo@4f8c3973, settings:client_type: PUSH_CONSUMER
access_point {
  scheme: IPv4
  addresses {
    host: "10.0.82.170"
    port: 8081
  }
}
backoff_policy {
  max_attempts: 17
  customized_backoff {
    next {
      seconds: 1
    }
    next {
      seconds: 5
    }
    next {
      seconds: 10
    }
    next {
      seconds: 30
    }
    next {
      seconds: 60
    }
    next {
      seconds: 120
    }
    next {
      seconds: 180
    }
    next {
      seconds: 240
    }
    next {
      seconds: 300
    }
    next {
      seconds: 360
    }
    next {
      seconds: 420
    }
    next {
      seconds: 480
    }
    next {
      seconds: 540
    }
    next {
      seconds: 600
    }
    next {
      seconds: 1200
    }
    next {
      seconds: 1800
    }
    next {
      seconds: 3600
    }
    next {
      seconds: 7200
    }
  }
}
request_timeout {
  seconds: 3
}
subscription {
  group {
    name: "test"
  }
  subscriptions {
    topic {
      name: "test"
    }
    expression {
      type: TAG
      expression: "*"
    }
  }
  fifo: false
  receive_batch_size: 32
  long_polling_timeout {
    seconds: 20
  }
}
user_agent {
  language: JAVA
  version: "5.0.6"
  platform: "Windows 10 10.0"
  hostname: "V-LVSHENG1"
}

2024-11-07 16:12:58 ERROR GrpcRequestExecutorThread-12 - telemetry on error
io.grpc.StatusRuntimeException: CANCELLED: client cancelled
        at io.grpc.Status.asRuntimeException(Status.java:530)
        at io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:291)
        at io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
        at io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
        at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
        at io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
        at io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
        at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
        at org.apache.rocketmq.proxy.grpc.interceptor.GlobalExceptionInterceptor$1.onCancel(GlobalExceptionInterceptor.java:65)
        at io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
        at io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
        at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
        at io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
        at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:378)
        at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:365)
        at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:923)
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
2024-11-07 16:13:12 ERROR ConsumerProcessorExecutor-0 - internal server error
io.grpc.StatusRuntimeException: CANCELLED: call already cancelled. Use ServerCallStreamObserver.setOnCancelHandler() to disable this exception
        at io.grpc.Status.asRuntimeException(Status.java:530)
        at io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onNext(ServerCalls.java:366)
        at org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageResponseStreamWriter.writeAndComplete(ReceiveMessageResponseStreamWriter.java:95)
        at org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity.lambda$receiveMessage$0(ReceiveMessageActivity.java:150)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:670)
        at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:646)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
        at org.apache.rocketmq.common.utils.FutureUtils.lambda$appendNextFuture$0(FutureUtils.java:31)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
2024-11-07 16:13:12 WARN ConsumerProcessorExecutor-0 - client has cancelled the request. response to write: status {
  code: INTERNAL_SERVER_ERROR
  message: "CANCELLED: call already cancelled. Use ServerCallStreamObserver.setOnCancelHandler() to disable this exception. StatusRuntimeException. io.grpc.Status.asRuntimeException(Status.java:530)"
}

2024-11-07 16:13:12 WARN ConsumerProcessorExecutor-0 - client has cancelled the request. response to write: delivery_timestamp {
  seconds: 1730967192
  nanos: 766000000
}

客户端发送心跳信息正常

2024-11-07 16:17:11.603 INFO  [12364] [RocketmqClientAsyncWorker-1-16] [o.a.r.c.j.i.ClientImpl#?:?] - Send heartbeat successfully, endpoints=ipv4:10.0.82.170:8081, clientId=V-LVSHENG1@12364@1@x8qeno6tr0

客户端稳定每80s收到重新投递的消息

2024-11-07 16:35:56.614  INFO 30944 --- [onsumption-1-25] c.b.b.e.f.r.consumer.ConsumerClient      : Consume message=MessageViewImpl{messageId=0A0052AA254E668BC3D5226EAC510057, topic=test, bornHost=10.0.82.170, bornTimestamp=1730968078417, endpoints=ipv4:10.0.82.170:8081, deliveryAttempt=7, tag=tag, keys=[key], messageGroup=null, deliveryTimestamp=null, properties={}}
2024-11-07 16:35:56.618  INFO 30944 --- [onsumption-1-25] c.b.b.e.f.r.consumer.ConsumerClient      : message body: 唐宗宋祖
唐宗宋祖
2024-11-07 16:37:16.501  INFO 30944 --- [onsumption-1-26] c.b.b.e.f.r.consumer.ConsumerClient      : Consume message=MessageViewImpl{messageId=0A0052AA254E668BC3D5226EAC510057, topic=test, bornHost=10.0.82.170, bornTimestamp=1730968078417, endpoints=ipv4:10.0.82.170:8081, deliveryAttempt=8, tag=tag, keys=[key], messageGroup=null, deliveryTimestamp=null, properties={}}
2024-11-07 16:37:16.502  INFO 30944 --- [onsumption-1-26] c.b.b.e.f.r.consumer.ConsumerClient      : message body: 唐宗宋祖
唐宗宋祖
2024-11-07 16:38:36.387  INFO 30944 --- [onsumption-1-27] c.b.b.e.f.r.consumer.ConsumerClient      : Consume message=MessageViewImpl{messageId=0A0052AA254E668BC3D5226EAC510057, topic=test, bornHost=10.0.82.170, bornTimestamp=1730968078417, endpoints=ipv4:10.0.82.170:8081, deliveryAttempt=9, tag=tag, keys=[key], messageGroup=null, deliveryTimestamp=null, properties={}}
2024-11-07 16:38:36.387  INFO 30944 --- [onsumption-1-27] c.b.b.e.f.r.consumer.ConsumerClient      : message body: 唐宗宋祖
唐宗宋祖
2024-11-07 16:39:56.286  INFO 30944 --- [onsumption-1-28] c.b.b.e.f.r.consumer.ConsumerClient      : Consume message=MessageViewImpl{messageId=0A0052AA254E668BC3D5226EAC510057, topic=test, bornHost=10.0.82.170, bornTimestamp=1730968078417, endpoints=ipv4:10.0.82.170:8081, deliveryAttempt=10, tag=tag, keys=[key], messageGroup=null, deliveryTimestamp=null, properties={}}
2024-11-07 16:39:56.286  INFO 30944 --- [onsumption-1-28] c.b.b.e.f.r.consumer.ConsumerClient      : message body: 唐宗宋祖
唐宗宋祖
2024-11-07 16:41:16.154  INFO 30944 --- [onsumption-1-29] c.b.b.e.f.r.consumer.ConsumerClient      : Consume message=MessageViewImpl{messageId=0A0052AA254E668BC3D5226EAC510057, topic=test, bornHost=10.0.82.170, bornTimestamp=1730968078417, endpoints=ipv4:10.0.82.170:8081, deliveryAttempt=11, tag=tag, keys=[key], messageGroup=null, deliveryTimestamp=null, properties={}}
2024-11-07 16:41:16.155  INFO 30944 --- [onsumption-1-29] c.b.b.e.f.r.consumer.ConsumerClient      : message body: 唐宗宋祖
唐宗宋祖
2024-11-07 16:42:36.044  INFO 30944 --- [onsumption-1-30] c.b.b.e.f.r.consumer.ConsumerClient      : Consume message=MessageViewImpl{messageId=0A0052AA254E668BC3D5226EAC510057, topic=test, bornHost=10.0.82.170, bornTimestamp=1730968078417, endpoints=ipv4:10.0.82.170:8081, deliveryAttempt=12, tag=tag, keys=[key], messageGroup=null, deliveryTimestamp=null, properties={}}
2024-11-07 16:42:36.044  INFO 30944 --- [onsumption-1-30] c.b.b.e.f.r.consumer.ConsumerClient      : message body: 唐宗宋祖
唐宗宋祖
2024-11-07 16:43:55.932  INFO 30944 --- [onsumption-1-31] c.b.b.e.f.r.consumer.ConsumerClient      : Consume message=MessageViewImpl{messageId=0A0052AA254E668BC3D5226EAC510057, topic=test, bornHost=10.0.82.170, bornTimestamp=1730968078417, endpoints=ipv4:10.0.82.170:8081, deliveryAttempt=13, tag=tag, keys=[key], messageGroup=null, deliveryTimestamp=null, properties={}}
2024-11-07 16:43:55.933  INFO 30944 --- [onsumption-1-31] c.b.b.e.f.r.consumer.ConsumerClient      : message body: 唐宗宋祖
唐宗宋祖
2024-11-07 16:45:15.817  INFO 30944 --- [onsumption-1-32] c.b.b.e.f.r.consumer.ConsumerClient      : Consume message=MessageViewImpl{messageId=0A0052AA254E668BC3D5226EAC510057, topic=test, bornHost=10.0.82.170, bornTimestamp=1730968078417, endpoints=ipv4:10.0.82.170:8081, deliveryAttempt=14, tag=tag, keys=[key], messageGroup=null, deliveryTimestamp=null, properties={}}
2024-11-07 16:45:15.817  INFO 30944 --- [onsumption-1-32] c.b.b.e.f.r.consumer.ConsumerClient      : message body: 唐宗宋祖
唐宗宋祖

Steps to Reproduce

  1. 控制台发条消息
  2. comsumer消费 消费端逻辑

    
    public class MyConsumer extends AbstractMqConsumer<String> {
    
    @Override
    public String getTopic() {
        return "test";
    }
    
    @Override
    public ConsumeResult process(String message) {
        System.out.println(message);
    
        try {
            // 模拟我们的业务,耗时3分钟左右
            Thread.sleep(200 * 1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return ConsumeResult.SUCCESS;
    }
    }

4. 消费者消费未超过15min分钟,过80s就收到重新投递的消息了
稳定复现

### What Did You Expect to See?

消费者消费超时(默认15分钟)或失败才重新投递

### What Did You See Instead?

消费者并没有超时,消费80s左右,就收到了重新投递的消息了

### Additional Context

_No response_
redlsz commented 1 week ago

If this only occurs when the consumption takes more than 1 minute, it may be related to #7837.

bruce256 commented 1 week ago

If this only occurs when the consumption takes more than 1 minute, it may be related to #7837.

1 minute is too short !