CN-GuoZiyang / My-RPC-Framework

一个简单的RPC框架的实现
https://cn-guoziyang.github.io/My-RPC-Framework/
MIT License
802 stars 191 forks source link

ChannelProvider类中的channel对象设置成static类型是否存在线程安全问题? #4

Open winterliu1020 opened 3 years ago

winterliu1020 commented 3 years ago

以下是ChannelProvider类中的代码。

我的疑问(第五行代码):private static Channel channel = null; // 是否有线程安全问题?

假设A, B两个线程去调用get()方法以此获得对应的channel对象,A线程先执行get()方法,然后进行connect获取到了一个channel对象(注意:这是static类型的channel已经指向了A线程中获得的channel对象),但是此时A线程时间片用完了,CPU切到B线程执行,B线程开始执行get()方法,然后B线程中connect获取到属于B线程的channel对象,然后又让static类型的channel已经指向了B线程这个channel对象;

但是,此时B时间片用完,切回A线程,A线程返回static类型的channel执行的值(这时指向的其实是B线程获得的channel),所以产生了线程安全问题。

以上是我的疑问,不知道是否正确。

    private static final Logger logger = LoggerFactory.getLogger(ChannelProvider.class);
    private static EventLoopGroup eventLoopGroup;
    private static Bootstrap bootstrap = initializeBootstrap();

    private static final int MAX_RETRY_COUNT = 5;
    private static Channel channel = null; // 这一行是我的疑问点:是否有线程安全问题???

    public static Channel get(InetSocketAddress inetSocketAddress, CommonSerializer serializer) {
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) {
                /*自定义序列化编解码器*/
                // RpcResponse -> ByteBuf
                ch.pipeline().addLast(new CommonEncoder(serializer))
                        .addLast(new CommonDecoder())
                        .addLast(new NettyClientHandler());
            }
        });
        CountDownLatch countDownLatch = new CountDownLatch(1);
        try {
            connect(bootstrap, inetSocketAddress, countDownLatch);
            countDownLatch.await();
        } catch (InterruptedException e) {
            logger.error("获取channel时有错误发生:", e);
        }
        return channel;
    }

    private static void connect(Bootstrap bootstrap, InetSocketAddress inetSocketAddress, CountDownLatch countDownLatch) {
        connect(bootstrap, inetSocketAddress, MAX_RETRY_COUNT, countDownLatch);
    }

    private static void connect(Bootstrap bootstrap, InetSocketAddress inetSocketAddress, int retry, CountDownLatch countDownLatch) {
        bootstrap.connect(inetSocketAddress).addListener((ChannelFutureListener) future -> {
            if (future.isSuccess()) {
                logger.info("客户端连接成功!");
                channel = future.channel();
                countDownLatch.countDown();
                return;
            }
            if (retry == 0) {
                logger.error("客户端连接失败:重试次数已用完,放弃连接!");
                countDownLatch.countDown();
                throw new RpcException(RpcError.CLIENT_CONNECT_SERVER_FAILURE);
            }
            // 第几次重连
            int order = (MAX_RETRY_COUNT - retry) + 1;
            // 本次重连的间隔
            int delay = 1 << order;
            logger.error("{}: 连接失败,第 {} 次重连……", new Date(), order);
            bootstrap.config().group().schedule(() -> connect(bootstrap, inetSocketAddress, retry - 1, countDownLatch), delay, TimeUnit
                    .SECONDS);
        });
    }