apache / rocketmq

Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.
https://rocketmq.apache.org/
Apache License 2.0
21.19k stars 11.67k forks source link

[Bug] Issue with setting setAwaitDuration below 20 seconds in RocketMQ SimpleConsumer mode? #7986

Open CodingOX opened 7 months ago

CodingOX commented 7 months ago

Before Creating the Bug Report

Runtime platform environment

Ubuntu 20.04

RocketMQ version

RocketMQ 5.2

JDK Version

JDK 17

Describe the Bug

Feedback for RocketMQ 5.2 version using SimpleConsumer mode:

I am encountering an issue when using the SimpleConsumer mode in RocketMQ version 5.2. Specifically, when I set the setAwaitDuration to a value less than 20 seconds, I receive the following error message:

Exception in thread "main" org.apache.rocketmq.client.java.exception.BadRequestException: [request-id=0bb55e31-e6ba-4539-8e23-296c6b35224f, response-code=40018] The deadline time remaining is not enough for polling, please check network condition

Here is the relevant Kotlin code snippet for reference:

val simpleConsumer = provider
    .newSimpleConsumerBuilder()
    .setClientConfiguration(clientConfiguration)
    .setConsumerGroup(consumerGroup)
    .setAwaitDuration(Duration.ofMillis(500)) // The issue occurs when this value is set below 20s
    .setSubscriptionExpressions(mapOf(topic to filterExpression))
    .build()

It seems that the error is related to the setAwaitDuration parameter needing to be greater than 20 seconds for proper functioning. However, I require more flexibility in setting this duration for my use case. Could you provide guidance on how to resolve this issue or suggest any workarounds that would allow me to set an AwaitDuration below 20 seconds without encountering this error?

Steps to Reproduce

Exception in thread "main" org.apache.rocketmq.client.java.exception.BadRequestException: [request-id=0bb55e31-e6ba-4539-8e23-296c6b35224f, response-code=40018] The deadline time remaining is not enough for polling, please check network condition
    at org.apache.rocketmq.client.java.exception.StatusChecker.check(StatusChecker.java:63)
    at org.apache.rocketmq.client.java.impl.consumer.ConsumerImpl.lambda$receiveMessage$0(ConsumerImpl.java:114)
    at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractTransformFuture$AsyncTransformFuture.doTransform(AbstractTransformFuture.java:221)
    at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractTransformFuture$AsyncTransformFuture.doTransform(AbstractTransformFuture.java:208)
    at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractTransformFuture.run(AbstractTransformFuture.java:122)
    at org.apache.rocketmq.shaded.com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:31)
    at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1270)
    at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:1038)
    at org.apache.rocketmq.shaded.com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:783)
    at org.apache.rocketmq.shaded.com.google.common.util.concurrent.SettableFuture.set(SettableFuture.java:49)
    at org.apache.rocketmq.client.java.rpc.RpcClientImpl$1.onCompleted(RpcClientImpl.java:168)
    at org.apache.rocketmq.shaded.io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:485)
    at org.apache.rocketmq.shaded.io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
    at org.apache.rocketmq.shaded.io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
    at org.apache.rocketmq.shaded.io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
    at org.apache.rocketmq.shaded.io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:563)
    at org.apache.rocketmq.shaded.io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70)
    at org.apache.rocketmq.shaded.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:744)
    at org.apache.rocketmq.shaded.io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
    at org.apache.rocketmq.shaded.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at org.apache.rocketmq.shaded.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
    at java.base/java.lang.Thread.run(Thread.java:1583)

What Did You Expect to See?

Code can run well when setAwaitDuration paramter < 20 seconds

What Did You See Instead?

No

Additional Context

No response

redlsz commented 6 months ago

This exception indicates the pollingTime (awaitDuration) set by client is too small.

There are two configs related to pollingtime in proxy, grpcClientConsumerMinLongPollingTimeoutMillis (default=5s) and grpcClientConsumerMaxLongPollingTimeoutMillis (default=20s). Set the pollingTime within this range would be ok.

Related proxy codes: https://github.com/apache/rocketmq/blob/develop/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java#L82

CodingOX commented 6 months ago

This exception indicates the pollingTime (awaitDuration) set by client is too small.

There are two configs related to pollingtime in proxy, grpcClientConsumerMinLongPollingTimeoutMillis (default=5s) and grpcClientConsumerMaxLongPollingTimeoutMillis (default=20s). Set the pollingTime within this range would be ok.

Related proxy codes: develop/proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/consumer/ReceiveMessageActivity.java#L82

thanks, I consulted the relevant documents and did not find this suggestion. The code comments also did not mention this problem. In order to better help subsequent users, would you consider improving the documentation?

lizhimins commented 6 months ago

Can you describe what scenario requires about setting setAwaitDuration below 20 seconds.

CodingOX commented 6 months ago

Can you describe what scenario requires about setting setAwaitDuration below 20 seconds.

In my understanding, the awaitDuration parameter allows me to fetch messages more promptly. During periods of low business activity, when a particular message is highly important yet the number of such messages may fall short of the maxMessageNum threshold (which stems from the org.apache.rocketmq.client.apis.consumer.SimpleConsumer#receive(maxMessageNum, duration) call), this parameter ensures a swift return nonetheless. Alternatively, I might need to fallback to assigning the macMessageNum variable with a value of 1, which could potentially be less efficient.