YeautyYE / netty-websocket-spring-boot-starter

:rocket: lightweight high-performance WebSocket framework ( 轻量级、高性能的WebSocket框架)
Apache License 2.0
1.83k stars 542 forks source link

请问大佬有遇到过 io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1 的问题吗 #230

Closed xujimu closed 2 years ago

xujimu commented 2 years ago

我把netty交给spring托管就会出现这个异常,但是如果我单独使用netty则不会出现这个问题,请问大佬你在做这个项目的时候会遇到这个问题吗?

io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
 at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74)
 at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138)
 at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
 at io.netty.buffer.DefaultByteBufHolder.release(DefaultByteBufHolder.java:115)
 at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88)
 at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:106)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
 at io.netty.handler.codec.http.websocketx.Utf8FrameValidator.channelRead(Utf8FrameValidator.java:82)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
 at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMes

这是我的代码

package com.ppgjx.app.netty.user;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * test
 */
@Data
@Component
@Slf4j
public class UserWebSocketServer implements DisposableBean {

    @Resource
    private UserChannelHandler userChannelHandler;

    @Resource
    private UserHeartbeatHandler userHeartbeatHandler;

    /**
     * boos
     */
    private EventLoopGroup boos;

    /**
     * work
     */
    private EventLoopGroup work;

    /**
     * 
     * @param port
     */
    public void start(int port,String path){

        boos =  new NioEventLoopGroup();
        work =  new NioEventLoopGroup();

        //createServer
        ServerBootstrap server = new ServerBootstrap();
        server.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {

                ChannelPipeline pipeline = socketChannel.pipeline();

                pipeline.addLast(new HttpServerCodec());

                pipeline.addLast(new ChunkedWriteHandler());

                pipeline.addLast(new HttpObjectAggregator(1024*64));

                pipeline.addLast(new WebSocketServerProtocolHandler(path));

                pipeline.addLast(new UserChannelHandler());
                pipeline.addLast(new IdleStateHandler(60,60,60));

                pipeline.addLast(new UserHeartbeatHandler());

            }
        });
        server.group(boos,work)
                .channel(NioServerSocketChannel.class);

        try {
            ChannelFuture future = server.bind(port).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e){
            e.printStackTrace();
        }
    }

    /**
     * 
     * @throws Exception
     */
    @Override
    public void destroy() throws Exception {
        close();
    }

    /**
     * 
     */
    public void close() {
        if (boos!=null) {
            boos.shutdownGracefully();
        }
        if (work!=null) {
            work.shutdownGracefully();
        }
    }

}
package com.ppgjx.app.netty.user;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.springframework.stereotype.Component;

/**
 * 
 */
@Component
public class UserHeartbeatHandler extends ChannelInboundHandlerAdapter {

    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

        if(evt instanceof IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent) evt;
            if(event.state() == IdleState.READER_IDLE){

            }else if(event.state() == IdleState.WRITER_IDLE){

            }else  if(event.state() == IdleState.ALL_IDLE){

                Channel channel = ctx.channel();

                channel.close();
            }
        }
    }

}
package com.ppgjx.app.netty.user;

import com.alipay.api.domain.ChannelInfo;
import com.ppgjx.app.service.PxUserService;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;

@Component
@Slf4j
public class UserChannelHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    private static StringRedisTemplate redisTemplate;

    private static PxUserService pxUserService;

    @Resource
    public void setStringRedisTemplate(StringRedisTemplate redisTemplate) {
        UserChannelHandler.redisTemplate = redisTemplate;
    }

    @Resource
    public void setPxUserService(PxUserService pxUserService) {
        UserChannelHandler.pxUserService = pxUserService;
    }

    public static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {

          String context = msg.text();
          System.out.println(context);
          ctx.writeAndFlush(msg);
    }

    /**
     *
     * 
     * @param text
     */
    public void pushAllUser(String text){
        clients.writeAndFlush(new TextWebSocketFrame(text));
    }

    /**
     * 
     * @param text
     * @param ctx
     */
    public void pushOtherUser(String text,ChannelHandlerContext ctx){
        clients.forEach(ch ->{
            if(ctx.channel() != ch){
                ch.writeAndFlush(new TextWebSocketFrame(text));
            }
        });
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws  Exception{

        clients.add(ctx.channel());

    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

        clients.remove(ctx.channel());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();

        ctx.channel().close();
        clients.remove(ctx.channel());
    }

}
xujimu commented 2 years ago

我把netty交给spring托管就会出现这个异常,但是如果我单独使用netty则不会出现这个问题,请问大佬你在做这个项目的时候会遇到这个问题吗?

io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
 at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74)
 at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138)
 at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
 at io.netty.buffer.DefaultByteBufHolder.release(DefaultByteBufHolder.java:115)
 at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88)
 at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:106)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
 at io.netty.handler.codec.http.websocketx.Utf8FrameValidator.channelRead(Utf8FrameValidator.java:82)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
 at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
 at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMes

这是我的代码

package com.ppgjx.app.netty.user;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * test
 */
@Data
@Component
@Slf4j
public class UserWebSocketServer implements DisposableBean {

    @Resource
    private UserChannelHandler userChannelHandler;

    @Resource
    private UserHeartbeatHandler userHeartbeatHandler;

    /**
     * boos
     */
    private EventLoopGroup boos;

    /**
     * work
     */
    private EventLoopGroup work;

    /**
     * 
     * @param port
     */
    public void start(int port,String path){

        boos =  new NioEventLoopGroup();
        work =  new NioEventLoopGroup();

        //createServer
        ServerBootstrap server = new ServerBootstrap();
        server.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {

                ChannelPipeline pipeline = socketChannel.pipeline();

                pipeline.addLast(new HttpServerCodec());

                pipeline.addLast(new ChunkedWriteHandler());

                pipeline.addLast(new HttpObjectAggregator(1024*64));

                pipeline.addLast(new WebSocketServerProtocolHandler(path));

                pipeline.addLast(new UserChannelHandler());
                pipeline.addLast(new IdleStateHandler(60,60,60));

                pipeline.addLast(new UserHeartbeatHandler());

            }
        });
        server.group(boos,work)
                .channel(NioServerSocketChannel.class);

        try {
            ChannelFuture future = server.bind(port).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e){
            e.printStackTrace();
        }
    }

    /**
     * 
     * @throws Exception
     */
    @Override
    public void destroy() throws Exception {
        close();
    }

    /**
     * 
     */
    public void close() {
        if (boos!=null) {
            boos.shutdownGracefully();
        }
        if (work!=null) {
            work.shutdownGracefully();
        }
    }

}
package com.ppgjx.app.netty.user;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.springframework.stereotype.Component;

/**
 * 
 */
@Component
public class UserHeartbeatHandler extends ChannelInboundHandlerAdapter {

    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

        if(evt instanceof IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent) evt;
            if(event.state() == IdleState.READER_IDLE){

            }else if(event.state() == IdleState.WRITER_IDLE){

            }else  if(event.state() == IdleState.ALL_IDLE){

                Channel channel = ctx.channel();

                channel.close();
            }
        }
    }

}
package com.ppgjx.app.netty.user;

import com.alipay.api.domain.ChannelInfo;
import com.ppgjx.app.service.PxUserService;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;

@Component
@Slf4j
public class UserChannelHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    private static StringRedisTemplate redisTemplate;

    private static PxUserService pxUserService;

    @Resource
    public void setStringRedisTemplate(StringRedisTemplate redisTemplate) {
        UserChannelHandler.redisTemplate = redisTemplate;
    }

    @Resource
    public void setPxUserService(PxUserService pxUserService) {
        UserChannelHandler.pxUserService = pxUserService;
    }

    public static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {

          String context = msg.text();
          System.out.println(context);
          ctx.writeAndFlush(msg);
    }

    /**
     *
     * 
     * @param text
     */
    public void pushAllUser(String text){
        clients.writeAndFlush(new TextWebSocketFrame(text));
    }

    /**
     * 
     * @param text
     * @param ctx
     */
    public void pushOtherUser(String text,ChannelHandlerContext ctx){
        clients.forEach(ch ->{
            if(ctx.channel() != ch){
                ch.writeAndFlush(new TextWebSocketFrame(text));
            }
        });
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws  Exception{

        clients.add(ctx.channel());

    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

        clients.remove(ctx.channel());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();

        ctx.channel().close();
        clients.remove(ctx.channel());
    }

}