baidu / Jprotobuf-rpc-socket

Protobuf RPC是一种基于TCP协议的二进制RPC通信协议的Java实现
Apache License 2.0
530 stars 221 forks source link

RpcClient#shutdown 极端情况可能会出现hang死情况 #65

Open iSenninha opened 4 years ago

iSenninha commented 4 years ago

RpcClient建立连接过程中是同步等待在这里:

 at io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:286)
        - locked <0x00000006506433d0> (a io.netty.channel.DefaultChannelPromise)
        at io.netty.channel.DefaultChannelPromise.awaitUninterruptibly(DefaultChannelPromise.java:135)
        at io.netty.channel.DefaultChannelPromise.awaitUninterruptibly(DefaultChannelPromise.java:28)
        at com.baidu.jprotobuf.pbrpc.transport.ChannelPoolObjectFactory.wrap(ChannelPoolObjectFactory.java:75)
        at com.baidu.jprotobuf.pbrpc.transport.ChannelPoolObjectFactory.wrap(ChannelPoolObjectFactory.java:35)
        at org.apache.commons.pool2.BasePooledObjectFactory.makeObject(BasePooledObjectFactory.java:60)
        at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:868)
        at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:435)
        at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:363)
        at com.baidu.jprotobuf.pbrpc.transport.ChannelPool.getChannel(ChannelPool.java:61)
        at com.baidu.jprotobuf.pbrpc.transport.RpcChannel.getConnection(RpcChannel.java:64)
        at com.baidu.jprotobuf.pbrpc.client.ProtobufRpcProxy.invoke(ProtobufRpcProxy.java:366)

如果上述连接未建立完成/连接失败,并且RpcClient过早暴露并被其他线程执行shutdown,上面的线程无法再被唤醒。

无法被唤醒的具体原因是依赖的Netty 4.0.27.Final shutdown 的时候没有处理未完成连接的channel等待的DefaultChannelPromise。最新版的Netty已经修复这个bug。

Bug复现:

环境信息

import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;

/**

纯Netty版复现:

    @Test
    public void testNetty() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        while (true) {
            final Bootstrap bootstrap = new Bootstrap();
            final NioEventLoopGroup group = new NioEventLoopGroup(1);
            bootstrap.group(group);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            ChannelFuture connect = bootstrap.connect("localhost", 1080);
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    group.shutdownGracefully();
                }
            });
            try {
                connect.awaitUninterruptibly();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private static class TimeClientHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            ByteBuf m = (ByteBuf) msg; // (1)
            try {
                long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
                System.out.println(new Date(currentTimeMillis));
                ctx.close();
            } finally {
                m.release();
            }
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            cause.printStackTrace();
            ctx.close();
        }
    }
jhunters commented 4 years ago

感谢反馈,我们review 一下这个情况

jhunters commented 4 years ago

确认, 需要升级netty最版本 Thread [main] (Suspended)
owns: ProtobufRpcProxy (id=83)
waiting for: DefaultChannelPromise (id=99) Object.wait(long) line: not available [native method] [local variables unavailable] DefaultChannelPromise(Object).wait() line: 502 [local variables unavailable]
DefaultChannelPromise(DefaultPromise).awaitUninterruptibly() line: 286
DefaultChannelPromise.awaitUninterruptibly() line: 135
DefaultChannelPromise.awaitUninterruptibly() line: 28
ChannelPoolObjectFactory.wrap(Connection) line: 88
ChannelPoolObjectFactory.wrap(Object) line: 1
ChannelPoolObjectFactory(BasePooledObjectFactory).makeObject() line: 60
GenericObjectPool.create() line: 861 GenericObjectPool.borrowObject(long) line: 435
GenericObjectPool.borrowObject() line: 363
ChannelPool.getChannel() line: 79
RpcChannel.testChannlConnect() line: 49 ProtobufRpcProxy.proxy() line: 326
TestRpcHang.testRpcHang() line: 30
NativeMethodAccessorImpl.invoke0(Method, Object, Object[]) line: not available [native method]
NativeMethodAccessorImpl.invoke(Object, Object[]) line: 62
DelegatingMethodAccessorImpl.invoke(Object, Object[]) line: 43
Method.invoke(Object, Object...) line: 497
FrameworkMethod$1.runReflectiveCall() line: 50
FrameworkMethod$1(ReflectiveCallable).run() line: 12
FrameworkMethod.invokeExplosively(Object, Object...) line: 47
InvokeMethod.evaluate() line: 17
BlockJUnit4ClassRunner(ParentRunner).runLeaf(Statement, Description, RunNotifier) line: 325
BlockJUnit4ClassRunner.runChild(FrameworkMethod, RunNotifier) line: 78
BlockJUnit4ClassRunner.runChild(Object, RunNotifier) line: 57
ParentRunner$3.run() line: 290
ParentRunner$1.schedule(Runnable) line: 71
BlockJUnit4ClassRunner(ParentRunner).runChildren(RunNotifier) line: 288
ParentRunner.access$000(ParentRunner, RunNotifier) line: 58
ParentRunner$2.evaluate() line: 268 BlockJUnit4ClassRunner(ParentRunner).run(RunNotifier) line: 363
JUnit4TestReference.run(TestExecution) line: 89 TestExecution.run(ITestReference[]) line: 41
RemoteTestRunner.runTests(String[], String, TestExecution) line: 541
RemoteTestRunner.runTests(TestExecution) line: 763
RemoteTestRunner.run() line: 463
RemoteTestRunner.main(String[]) line: 209