apache / rocketmq

Apache RocketMQ is a cloud native messaging and streaming platform, making it simple to build event-driven applications.
https://rocketmq.apache.org/
Apache License 2.0
21.16k stars 11.66k forks source link

The value of sendThreadPoolQueueHeadWaitTimeMills is 0 most of the time #2516

Closed xxd763795151 closed 2 years ago

xxd763795151 commented 3 years ago

BUG REPORT

  1. Please describe the issue you observed:

When the request of send message resides in the sendThreadPoolQueue too long, the broker may occur "[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, code as follow":

                    final long behind = System.currentTimeMillis() - rt.getCreateTimestamp();
                    if (behind >= maxWaitTimeMillsInQueue) {
                        if (blockingQueue.remove(runnable)) {
                            rt.setStopRun(true);
                            rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a while, period in queue: %sms, size of queue: %d", behind, blockingQueue.size()));
                        }
                    }

the default value of maxWaitTimeMillsInQueue is 200ms. We have set it`s value to 1000ms on the production environment, but, this quesiton still happens occasionally. We use rocketmq-exporter+prometheus+grafana monitoring the value of sendThreadPoolQueueHeadWaitTimeMills, however the value always is 0(Occasionally a very high value appears).It is not science!

When I debug the broker`s source code, I found that there are two types of data in the sendThreadPoolQueue.

java.util.concurrent.CompletableFuture$UniAccept org.apache.rocketmq.broker.latency.FutureTaskExt

If the header element of sendThreadPoolQueue is org.apache.rocketmq.broker.latency.FutureTaskExt, will computer the value of sendThreadPoolQueueHeadWaitTimeMills. Otherwise, it return 0. Look at the source code below:

// BrokerController.java
    public long headSlowTimeMills(BlockingQueue<Runnable> q) {
        long slowTimeMills = 0;
        final Runnable peek = q.peek();
        if (peek != null) {
            RequestTask rt = BrokerFastFailure.castRunnable(peek);
            slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp();
        }

        if (slowTimeMills < 0) {
            slowTimeMills = 0;
        }

        return slowTimeMills;
    }

Look at this line of code : BrokerFastFailure.castRunnable(peek);

    public static RequestTask castRunnable(final Runnable runnable) {
        try {
            if (runnable instanceof FutureTaskExt) {
                FutureTaskExt object = (FutureTaskExt) runnable;
                return (RequestTask) object.getRunnable();
            }
        } catch (Throwable e) {
            log.error(String.format("castRunnable exception, %s", runnable.getClass().getName()), e);
        }

        return null;
    }

The data of java.util.concurrent.CompletableFuture$UniAccept comes from(SendMessageProcessor.java):

 @Override
    public void asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request, RemotingResponseCallback responseCallback) throws Exception {
        asyncProcessRequest(ctx, request).thenAcceptAsync(responseCallback::callback, this.brokerController.getSendMessageExecutor());
    }

They share the sendThreadPoolQueue. And the header element of sendThreadPoolQueue is java.util.concurrent.CompletableFuture$UniAccept most of the time.

  1. Please tell us about your environment:

linux mac os rocketmq 4.7.1 release

  1. Other information (e.g. detailed explanation, logs, related issues, suggestions how to fix, etc):

How I found these info in the sendThreadPoolQueue. Print it. Such as the code below:

// BrokerController#headSlowTimeMills(BlockingQueue<Runnable> q)
........
        if (q == this.sendThreadPoolQueue) {
            System.out.println("send queue foreach size: " + q.size());
            q.stream().forEach(r -> {
                long tmpSlowTime = 0l;
                RequestTask rt = BrokerFastFailure.castRunnable(r);
                System.out.println(r.getClass());
                tmpSlowTime = rt == null ? -1 : this.messageStore.now() - rt.getCreateTimestamp();
                System.out.println(tmpSlowTime);
            });
            //System.out.println("Send queue slow time mills: " + slowTimeMills);
        }
.......

this is print info:

send queue foreach size: 4 class java.util.concurrent.CompletableFuture$UniAccept -1 class org.apache.rocketmq.broker.latency.FutureTaskExt 387 class java.util.concurrent.CompletableFuture$UniAccept -1 class org.apache.rocketmq.broker.latency.FutureTaskExt 80

And print the stack trace:

        this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity()) {
            @Override
            public void put(Runnable runnable) throws InterruptedException {
                System.out.println("queue put: " + runnable.getClass());
                super.put(runnable);
            }

            @Override
            public boolean offer(Runnable runnable) {
                System.out.println("queue offer: " + runnable.getClass() + ", current thread: " + Thread.currentThread().getName() + ", thread id: " + Thread.currentThread().getId());
                Throwable throwable = new Throwable();
                StackTraceElement[] stackTraceElements = throwable.getStackTrace();
                if (stackTraceElements != null) {
                    Arrays.stream(stackTraceElements).forEach(stackTraceElement -> {
                        System.out.println(stackTraceElement.getClassName() + "#"
                                + stackTraceElement.getMethodName() + "#" + stackTraceElement.getLineNumber());
                    });
                }
                System.out.println("---------------------------end------------------------------");
                return super.offer(runnable);
            }

            @Override
            public boolean offer(Runnable runnable, long timeout, TimeUnit unit) throws InterruptedException {
                System.out.println("queue timeoutoffer: " + runnable.getClass());
                return super.offer(runnable, timeout, unit);
            }
        };

info as fllow:

queue offer: class java.util.concurrent.CompletableFuture$UniAccept, current thread: SendMessageThread_1, thread id: 81 org.apache.rocketmq.broker.BrokerController$1#offer#205 org.apache.rocketmq.broker.BrokerController$1#offer#195 java.util.concurrent.ThreadPoolExecutor#execute#1371 java.util.concurrent.CompletableFuture$UniCompletion#claim#543 java.util.concurrent.CompletableFuture#uniAccept#667 java.util.concurrent.CompletableFuture$UniAccept#tryFire$$$capture#646 java.util.concurrent.CompletableFuture$UniAccept#tryFire#-1 java.util.concurrent.CompletableFuture#uniAcceptStage#686 java.util.concurrent.CompletableFuture#thenAcceptAsync#2019 org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncProcessRequest#82 org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$1#run#226 org.apache.rocketmq.remoting.netty.RequestTask#run#80 java.util.concurrent.Executors$RunnableAdapter#call#511 java.util.concurrent.FutureTask#run$$$capture#266 java.util.concurrent.FutureTask#run#-1 java.util.concurrent.ThreadPoolExecutor#runWorker#1149 java.util.concurrent.ThreadPoolExecutor$Worker#run#624 java.lang.Thread#run#748

queue offer: class org.apache.rocketmq.broker.latency.FutureTaskExt, current thread: NettyServerCodecThread_5, thread id: 56 org.apache.rocketmq.broker.BrokerController$1#offer#205 org.apache.rocketmq.broker.BrokerController$1#offer#195 java.util.concurrent.ThreadPoolExecutor#execute#1371 java.util.concurrent.AbstractExecutorService#submit#112 org.apache.rocketmq.broker.BrokerController$2#submit#304 org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand#256 org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processMessageReceived#158 org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler#channelRead0#420 org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler#channelRead0#415

RongtongJin commented 3 years ago

Good catch! It is indeed a bug, do you have any ideas to fix it ?

RongtongJin commented 3 years ago

Maybe we could replace to a separate thread pool when we use thenAcceptAsync @xxd763795151

xxd763795151 commented 3 years ago

Maybe we could replace to a separate thread pool when we use thenAcceptAsync @xxd763795151

Of course, this may be the quickest way, if its scope of influence is small enough. Think it about another way: modify the [ public long headSlowTimeMills(BlockingQueue q)]method. Iterate over the queue and find the first element that meets the conditions (or NULL)to cacluete the time instead of the [peek()] method( think about the impact of concurrency).

RongtongJin commented 3 years ago

Maybe we could replace to a separate thread pool when we use thenAcceptAsync @xxd763795151

Of course, this may be the quickest way, if its scope of influence is small enough. Think it about another way: modify the [ public long headSlowTimeMills(BlockingQueue q)]method. Iterate over the queue and find the first element that meets the conditions (or NULL)to calculate the time instead of the [peek()] method( think about the impact of concurrency).

Could you submit a pull request to fix the issue? I will review the code and help you merge.

xxd763795151 commented 3 years ago

Maybe we could replace to a separate thread pool when we use thenAcceptAsync @xxd763795151

Of course, this may be the quickest way, if its scope of influence is small enough. Think it about another way: modify the [ public long headSlowTimeMills(BlockingQueue q)]method. Iterate over the queue and find the first element that meets the conditions (or NULL)to calculate the time instead of the [peek()] method( think about the impact of concurrency).

Could you submit a pull request to fix the issue? I will review the code and help you merge.

Ok, I will verify the scheme and give you feedback later. @RongtongJin

vongosling commented 3 years ago

Great find~ I would like to recommend you use condition iteration instead of creating another pool thread. There are too many blocking queues and executor service now. It's almost chaotic.~

Looking back on the fail-fast mechanism in the broker end what we've done previously is a little rude. I have been thinking about making optimization here. If you have better ideas, welcome to come up with your idea. For example, you could use better algorithms and data structures coming from Resilience4j...

RongtongJin commented 2 years ago

Merged