sofastack / sofa-rpc

SOFARPC is a high-performance, high-extensibility, production-level Java RPC framework.
https://www.sofastack.tech/sofa-rpc/docs/Home
Apache License 2.0
3.81k stars 1.16k forks source link

Optimize performance for h2c protocol #1400

Closed wangchengming666 closed 1 month ago

wangchengming666 commented 4 months ago

优化效果

MacOS 13.3.1 (a) (22E772610a) 六核Intel Core i7 16G mac OS 10.15.7 JDK 1.8.0_291 VM version: JDK 1.8.0_291, Java HotSpot(TM) 64-Bit Server VM, 25.291-b10 VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_291.jdk/Contents/Home/jre/bin/java VM options: -Xmx1g -Xms1g -XX:MaxDirectMemorySize=4g -XX:+UseG1GC -Djmh.ignoreLock=true -Dserver.host=localhost -Dserver.port=12200 -Dbenchmark.output= Blackhole mode: full + dont-inline hint Warmup: 1 iterations, 10 s each Measurement: 1 iterations, 300 s each Timeout: 10 min per iteration Threads: 1000 threads, will synchronize iterations Benchmark mode: Throughput, ops/time Benchmark: com.alipay.sofa.benchmark.Client.existUser

思路

在当前com.alipay.sofa.rpc.transport.netty.NettyChannel#writeAndFlush的代码如下

@Override
    public void writeAndFlush(final Object obj) {
       //  直接调用channel的writeAndFlush
       Future future = channel.writeAndFlush(obj); 
        future.addListener(new FutureListener() {
            @Override
            public void operationComplete(Future future1) throws Exception {
                if (!future1.isSuccess()) {
                    Throwable throwable = future1.cause();
                    LOGGER.error("Failed to send to "
                        + NetUtils.channelToString(localAddress(), remoteAddress())
                        + " for msg : " + obj
                        + ", Cause by:", throwable);
                }
            }
        });
    }

在Netty4+的版本我们通过源码可以看到,当调用channel的writeAndFlush方法时,Netty4会判断当前发送请求的线程是否是当前channel所绑定的EventLoop线程,如果不是EventLooop则会构造一个写任务WriteTask并将其提交到EventLoop中稍后执行。

private void write(Object msg, boolean flush, ChannelPromise promise) {
       //  忽略
        final AbstractChannelHandlerContext next = findContextOutbound(flush ?
                (MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
    //  判断当前线程是否是该channel绑定的EventLoop
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
            //  将写任务提交到EventLoop上稍后执行
            if (!safeExecute(executor, task, promise, m, !flush)) {
                task.cancel();
            }
        }
    }

从上面的代码我们可以知道Netty4写消息时总是会保证把任务提交到EventLoop线程上处理,而每调度一次EventLoop线程去执行写任务WriteTask只能写一个消息,也就是这时候是一对一的。

那么这个时候我们可以考虑将所有的消息都先提交到一个WriteQueue消息写队列上,内部会获取一次EventLoop并提交一个任务,然后从消息队列上不断的取消息出来并调用Netty4的write。

com.alipay.sofa.rpc.common.BatchExecutorQueue#run部分代码

    private void run(Executor executor) {
        try {
            Queue<T> snapshot = new LinkedList<>();
            T item;
            while ((item = queue.poll()) != null) {
                snapshot.add(item);
            }
            int i = 0;
            boolean flushedOnce = false;
            while ((item = snapshot.poll()) != null) {
                if (snapshot.size() == 0) {
                    flushedOnce = false;
                    break;
                }
                if (i == chunkSize) {
                    i = 0;
                    flush(item);
                    flushedOnce = true;
                } else {
                    prepare(item);
                    i++;
                }
            }
            if ((i != 0 || !flushedOnce) && item != null) {
                flush(item);
            }
        } finally {
            scheduled.set(false);
            if (!queue.isEmpty()) {
                scheduleFlush(executor);
            }
        }
    }

执行该flush的逻辑时,是处于EventLoop线程的,而从前面的Netty源码我们知道,当写动作处于EventLoop线程中时是会立即执行写动作的,此时不会出现线程切换的行为。那么相较于之前每次都直接在用户线程中调用writeAndFlush而言,大幅度的减少了用户线程与EventLoop线程的切换次数,也使得一次WriteTask写出的消息数量有了大幅度提高,达到批量发包的效果。

示意图如下

image

codecov[bot] commented 4 months ago

Codecov Report

Attention: Patch coverage is 84.84848% with 10 lines in your changes are missing coverage. Please review.

Project coverage is 72.13%. Comparing base (357fdf0) to head (742387c). Report is 14 commits behind head on master.

Files Patch % Lines
...com/alipay/sofa/rpc/common/BatchExecutorQueue.java 76.92% 5 Missing and 4 partials :warning:
.../alipay/sofa/rpc/transport/netty/NettyChannel.java 88.88% 0 Missing and 1 partial :warning:
Additional details and impacted files ```diff @@ Coverage Diff @@ ## master #1400 +/- ## ============================================ + Coverage 72.04% 72.13% +0.08% - Complexity 795 805 +10 ============================================ Files 422 424 +2 Lines 17815 17873 +58 Branches 2770 2774 +4 ============================================ + Hits 12835 12892 +57 + Misses 3570 3566 -4 - Partials 1410 1415 +5 ```

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

Lo1nt commented 3 months ago

详细的思路和方案可以开一个issue描述,pr引issue即可。放在pr里担心以后不好回溯。

stale[bot] commented 1 month ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.