Linyuzai / concept

封装了一些常用组件,走过路过不要错过哦
Apache License 2.0
380 stars 113 forks source link

使用 netty websocket 不生效呢 #16

Closed bydongxing closed 9 months ago

bydongxing commented 10 months ago

如何正确的姿势使用 netty websocket 并且有负载均衡的能力

配置文件如下:

concept:
  netty:
    server: #服务配置
      message:
        retry:
          times: 0 #客户端重试次数,默认不重试
          period: 0 #客户端重试间隔,单位ms,默认0ms
    load-balance: #负载均衡(转发)配置
      subscriber-master: redisson_topic #主订阅器,默认无
      subscriber-slave: none #从订阅器,默认无
      message:
        retry:
          times: 0 #转发重试次数,默认不重试
          period: 0 #转发重试间隔,单位ms,默认0ms
      heartbeat: #心跳配置
        enabled: true #是否启用心跳,默认true
        period: 60000 #心跳间隔,单位ms,默认1分钟
        timeout: 210000 #超时时间,单位ms,默认3.5分钟,3次心跳间隔
    executor:
      thread-pool-size: 1 #线程池大小,默认1

java 代码

@Slf4j
@Configuration
public class NettyWebSocketServer {
    public static final int WEB_SOCKET_PORT = 8090;

    /**
     * 创建线程池执行器
     */
    private final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    /**
     * 工作线程池
     */
    private final EventLoopGroup workerGroup = new NioEventLoopGroup(NettyRuntime.availableProcessors() * 2);

    @Resource
    private NettyLoadBalanceConcept concept;

    /**
     * 启动 ws server
     */
    @PostConstruct
    public void start() throws InterruptedException {
        // 需要开启一个新的线程来执行netty server 服务器
        new Thread(() -> {
            try {
                run();
            } catch (InterruptedException e) {
                log.error("启动 ws server 失败! reason=[{}]", e.getMessage());
            }
        }).start();

    }

    /**
     * 销毁
     */
    @PreDestroy
    public void destroy() {
        Future<?> bossGroupShutdownFuture = bossGroup.shutdownGracefully();
        Future<?> workerGroupShutdownFuture = workerGroup.shutdownGracefully();
        bossGroupShutdownFuture.syncUninterruptibly();
        workerGroupShutdownFuture.syncUninterruptibly();
        log.info("关闭 ws server 成功!");
    }

    public void run() throws InterruptedException {
        // 服务器启动引导对象
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 128)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .handler(new LoggingHandler(LogLevel.INFO)) // 为 bossGroup 添加 日志处理器
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        // 30秒客户端没有向服务器发送心跳则关闭连接
                        pipeline.addLast(new IdleStateHandler(30, 0, 0));
                        // 因为使用http协议,所以需要使用http的编码器,解码器
                        pipeline.addLast(new HttpServerCodec());
                        // 以块方式写,添加 chunkedWriter 处理器
                        pipeline.addLast(new ChunkedWriteHandler());
                        /**
                         * 说明:
                         *  1. http数据在传输过程中是分段的,HttpObjectAggregator可以把多个段聚合起来;
                         *  2. 这就是为什么当浏览器发送大量数据时,就会发出多次 http请求的原因
                         */
                        pipeline.addLast(new HttpObjectAggregator(8192));
                        // 保存用户ip
                        pipeline.addLast(new HttpHeadersHandler());
                        /**
                         * 说明:
                         *  1. 对于 WebSocket,它的数据是以帧frame 的形式传递的;
                         *  2. 可以看到 WebSocketFrame 下面有6个子类
                         *  3. 浏览器发送请求时: ws://localhost:7000/hello 表示请求的uri
                         *  4. WebSocketServerProtocolHandler 核心功能是把 http协议升级为 ws 协议,保持长连接;
                         *      是通过一个状态码 101 来切换的
                         */
                        pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
                        // 将连接交由 NettyLoadBalanceHandler 管理
                        pipeline.addLast(new NettyLoadBalanceHandler(concept));
                        // 自定义handler ,处理业务逻辑
                        pipeline.addLast(new NettyWebSocketServerHandler());
                    }
                });
        // 启动服务器,监听端口,阻塞直到启动成功
        // ChannelFuture future = serverBootstrap.bind(WEB_SOCKET_PORT).sync();
        ChannelFuture future = serverBootstrap.bind(Integer.parseInt(Objects.requireNonNull(SpringUtil.getApplicationContext().getEnvironment().getProperty("netty.port")))).sync();
        log.info("Server started and listen on:{}", future.channel().localAddress());
        future.channel().closeFuture().sync();
    }

}
Linyuzai commented 10 months ago

如何正确的姿势使用 netty websocket 并且有负载均衡的能力

配置文件如下:

concept:
  netty:
    server: #服务配置
      message:
        retry:
          times: 0 #客户端重试次数,默认不重试
          period: 0 #客户端重试间隔,单位ms,默认0ms
    load-balance: #负载均衡(转发)配置
      subscriber-master: redisson_topic #主订阅器,默认无
      subscriber-slave: none #从订阅器,默认无
      message:
        retry:
          times: 0 #转发重试次数,默认不重试
          period: 0 #转发重试间隔,单位ms,默认0ms
      heartbeat: #心跳配置
        enabled: true #是否启用心跳,默认true
        period: 60000 #心跳间隔,单位ms,默认1分钟
        timeout: 210000 #超时时间,单位ms,默认3.5分钟,3次心跳间隔
    executor:
      thread-pool-size: 1 #线程池大小,默认1

java 代码

@Slf4j
@Configuration
public class NettyWebSocketServer {
    public static final int WEB_SOCKET_PORT = 8090;

    /**
     * 创建线程池执行器
     */
    private final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    /**
     * 工作线程池
     */
    private final EventLoopGroup workerGroup = new NioEventLoopGroup(NettyRuntime.availableProcessors() * 2);

    @Resource
    private NettyLoadBalanceConcept concept;

    /**
     * 启动 ws server
     */
    @PostConstruct
    public void start() throws InterruptedException {
        // 需要开启一个新的线程来执行netty server 服务器
        new Thread(() -> {
            try {
                run();
            } catch (InterruptedException e) {
                log.error("启动 ws server 失败! reason=[{}]", e.getMessage());
            }
        }).start();

    }

    /**
     * 销毁
     */
    @PreDestroy
    public void destroy() {
        Future<?> bossGroupShutdownFuture = bossGroup.shutdownGracefully();
        Future<?> workerGroupShutdownFuture = workerGroup.shutdownGracefully();
        bossGroupShutdownFuture.syncUninterruptibly();
        workerGroupShutdownFuture.syncUninterruptibly();
        log.info("关闭 ws server 成功!");
    }

    public void run() throws InterruptedException {
        // 服务器启动引导对象
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 128)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .handler(new LoggingHandler(LogLevel.INFO)) // 为 bossGroup 添加 日志处理器
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        // 30秒客户端没有向服务器发送心跳则关闭连接
                        pipeline.addLast(new IdleStateHandler(30, 0, 0));
                        // 因为使用http协议,所以需要使用http的编码器,解码器
                        pipeline.addLast(new HttpServerCodec());
                        // 以块方式写,添加 chunkedWriter 处理器
                        pipeline.addLast(new ChunkedWriteHandler());
                        /**
                         * 说明:
                         *  1. http数据在传输过程中是分段的,HttpObjectAggregator可以把多个段聚合起来;
                         *  2. 这就是为什么当浏览器发送大量数据时,就会发出多次 http请求的原因
                         */
                        pipeline.addLast(new HttpObjectAggregator(8192));
                        // 保存用户ip
                        pipeline.addLast(new HttpHeadersHandler());
                        /**
                         * 说明:
                         *  1. 对于 WebSocket,它的数据是以帧frame 的形式传递的;
                         *  2. 可以看到 WebSocketFrame 下面有6个子类
                         *  3. 浏览器发送请求时: ws://localhost:7000/hello 表示请求的uri
                         *  4. WebSocketServerProtocolHandler 核心功能是把 http协议升级为 ws 协议,保持长连接;
                         *      是通过一个状态码 101 来切换的
                         */
                        pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));
                        // 将连接交由 NettyLoadBalanceHandler 管理
                        pipeline.addLast(new NettyLoadBalanceHandler(concept));
                        // 自定义handler ,处理业务逻辑
                        pipeline.addLast(new NettyWebSocketServerHandler());
                    }
                });
        // 启动服务器,监听端口,阻塞直到启动成功
        // ChannelFuture future = serverBootstrap.bind(WEB_SOCKET_PORT).sync();
        ChannelFuture future = serverBootstrap.bind(Integer.parseInt(Objects.requireNonNull(SpringUtil.getApplicationContext().getEnvironment().getProperty("netty.port")))).sync();
        log.info("Server started and listen on:{}", future.channel().localAddress());
        future.channel().closeFuture().sync();
    }

}

能说一下具体的问题吗,做了什么操作以及最终结果

bydongxing commented 9 months ago

就是我想实现 基于 netty 的 websocket 集群的功能,目前就是我这样的配置,然后启动了 2 个实例(比如实例 A,实例 B),使用 NettyLoadBalanceConcept 发送消息的时候,我在 A 上发送消息,想在实例 B 上的 websocket 的 客户端能接收到消息,但是发现并没有接收到,不清楚哪里出问题了

Linyuzai commented 9 months ago

就是我想实现 基于 netty 的 websocket 集群的功能,目前就是我这样的配置,然后启动了 2 个实例(比如实例 A,实例 B),使用 NettyLoadBalanceConcept 发送消息的时候,我在 A 上发送消息,想在实例 B 上的 websocket 的 客户端能接收到消息,但是发现并没有接收到,不清楚哪里出问题了

应该有报错吧,如果你要在Netty里面用WebSocket的话,收发的应该都是WebSocketFrame,这个需要自己处理一下消息转发。

  1. 需要自定义MessageFactory来支持WebSocketFrame转Message
  2. 然后需要自定义AbstractMessageCodecAdapter来处理消息的编码和解码。 我这边加的话可能要过几天时间了。
bydongxing commented 9 months ago

大佬,我还发现个现象 使用 concept-websocket-loadbalance-spring-boot-starter 做 websocket 服务端集群的时候,配置文件使用的是:subscriber-master: redisson_topic。连接情况: Client1 -> Server1, Client2->Server2. 在 Server2 想要 发送数据到 Client1 的时候,是不成功的,是一定要引入 nacos 么?不引入 nacos 就没法正常使用!我没用使用默认的websocket 订阅器,也强依赖 nacos 么😅

Linyuzai commented 9 months ago

大佬,我还发现个现象 使用 concept-websocket-loadbalance-spring-boot-starter 做 websocket 服务端集群的时候,配置文件使用的是:subscriber-master: redisson_topic。连接情况: Client1 -> Server1, Client2->Server2. 在 Server2 想要 发送数据到 Client1 的时候,是不成功的,是一定要引入 nacos 么?不引入 nacos 就没法正常使用!我没用使用默认的websocket 订阅器,也强依赖 nacos 么😅

我这边是基于注册中心的,可以是nacos也可以是其他的注册中心 你说的不引入nacos 是指用其他的注册中心还是说是两个不相干的单体服务 因为没有注册中心的话我这边可能认为是单体应用所以不会转发

如果是单体的应用的话你可以自定义ConnectionServerManagerFactory和ConnectionServerManager,然后getLocal方法指定服务信息不返回null就可以了

bydongxing commented 9 months ago

大佬,我还发现个现象 使用 concept-websocket-loadbalance-spring-boot-starter 做 websocket 服务端集群的时候,配置文件使用的是:subscriber-master: redisson_topic。连接情况: Client1 -> Server1, Client2->Server2. 在 Server2 想要 发送数据到 Client1 的时候,是不成功的,是一定要引入 nacos 么?不引入 nacos 就没法正常使用!我没用使用默认的websocket 订阅器,也强依赖 nacos 么😅

我这边是基于注册中心的,可以是nacos也可以是其他的注册中心 你说的不引入nacos 是指用其他的注册中心还是说是两个不相干的单体服务 因为没有注册中心的话我这边可能认为是单体应用所以不会转发

如果是单体的应用的话你可以自定义ConnectionServerManagerFactory和ConnectionServerManager,然后getLocal方法指定服务信息不返回null就可以了

好的,了解了,然后我在测试过程中,还发现个问题:就是我使用 java 程序写 websocket 客户端的时候,去连接websocket 服务端的时候,会报 404 错误

url: ws://localhost:8080/concept-websocket/user

final Session session = this.javaxWebSocketContainer.getContainer()
                .connectToServer(JavaxWebSocketClientEndpoint.class, URI.create(url.concat("?userId=" + port)));
@Component
public class JavaxWebSocketContainer implements ServletContextAware {

    private volatile WebSocketContainer container;

    public WebSocketContainer getContainer() {
        if (container == null) {
            synchronized (this) {
                if (container == null) {
                    container = ContainerProvider.getWebSocketContainer();
                }
            }
        }
        return container;
    }

    @Override
    public void setServletContext(ServletContext servletContext) {
        if (container == null) {
            container = (WebSocketContainer) servletContext
                    .getAttribute("javax.websocket.server.ServerContainer");
        }

    }
}
@ClientEndpoint
@Slf4j
public class JavaxWebSocketClientEndpoint {

    /**
     * 连接建立
     *
     * @param session 会话
     */
    @OnOpen
    public void onOpen(Session session) {
        log.info("客户端建立连接!");
    }

    /**
     * 连接关闭
     *
     * @param session 会话
     * @param reason  原因
     */
    @OnClose
    public void onClose(Session session, CloseReason reason) {
        log.info("客户端连接关闭!");

    }

    /**
     * 接收文本消息
     *
     * @param session 会话
     * @param message 消息
     */
    @OnMessage
    public void onMessage(Session session, String message) {
        log.info("客户端接收到服务端的消息: {}", message);

    }

    /**
     * 接收pong消息
     *
     * @param session 会话
     * @param message 消息
     */
    @OnMessage
    public void onMessage(Session session, PongMessage message) {

    }

    /**
     * 接收二进制消息,也可以用 byte[] 接收
     *
     * @param session 会话
     * @param message 消息
     */
    @OnMessage
    public void onMessage(Session session, ByteBuffer message) {

    }

    /**
     * 异常处理
     *
     * @param session 会话
     * @param e       异常
     */
    @OnError
    public void onError(Session session, Throwable e) {

    }
}

客户端的错误信息:

org.springframework.web.util.NestedServletException: Request processing failed; nested exception is javax.websocket.DeploymentException: Invalid response code 404
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1014)
    at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:497)
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:584)
    at io.undertow.servlet.handlers.ServletHandler.handleRequest(ServletHandler.java:74)
    at io.undertow.servlet.handlers.FilterHandler$FilterChainImpl.doFilter(FilterHandler.java:129)
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
    at io.undertow.servlet.core.ManagedFilter.doFilter(ManagedFilter.java:61)
    at io.undertow.servlet.handlers.FilterHandler$FilterChainImpl.doFilter(FilterHandler.java:131)
    at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
    at io.undertow.servlet.core.ManagedFilter.doFilter(ManagedFilter.java:61)
    at io.undertow.servlet.handlers.FilterHandler$FilterChainImpl.doFilter(FilterHandler.java:131)
    at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:96)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
    at io.undertow.servlet.core.ManagedFilter.doFilter(ManagedFilter.java:61)
    at io.undertow.servlet.handlers.FilterHandler$FilterChainImpl.doFilter(FilterHandler.java:131)
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
    at io.undertow.servlet.core.ManagedFilter.doFilter(ManagedFilter.java:61)
    at io.undertow.servlet.handlers.FilterHandler$FilterChainImpl.doFilter(FilterHandler.java:131)
    at io.undertow.servlet.handlers.FilterHandler.handleRequest(FilterHandler.java:84)
    at io.undertow.servlet.handlers.security.ServletSecurityRoleHandler.handleRequest(ServletSecurityRoleHandler.java:62)
    at io.undertow.servlet.handlers.ServletChain$1.handleRequest(ServletChain.java:68)
    at io.undertow.servlet.handlers.ServletDispatchingHandler.handleRequest(ServletDispatchingHandler.java:36)
    at io.undertow.servlet.handlers.RedirectDirHandler.handleRequest(RedirectDirHandler.java:68)
    at io.undertow.servlet.handlers.security.SSLInformationAssociationHandler.handleRequest(SSLInformationAssociationHandler.java:117)
    at io.undertow.servlet.handlers.security.ServletAuthenticationCallHandler.handleRequest(ServletAuthenticationCallHandler.java:57)
    at io.undertow.server.handlers.PredicateHandler.handleRequest(PredicateHandler.java:43)
    at io.undertow.security.handlers.AbstractConfidentialityHandler.handleRequest(AbstractConfidentialityHandler.java:46)
    at io.undertow.servlet.handlers.security.ServletConfidentialityConstraintHandler.handleRequest(ServletConfidentialityConstraintHandler.java:64)
    at io.undertow.security.handlers.AuthenticationMechanismsHandler.handleRequest(AuthenticationMechanismsHandler.java:60)
    at io.undertow.servlet.handlers.security.CachedAuthenticatedSessionHandler.handleRequest(CachedAuthenticatedSessionHandler.java:77)
    at io.undertow.security.handlers.AbstractSecurityContextAssociationHandler.handleRequest(AbstractSecurityContextAssociationHandler.java:43)
    at io.undertow.server.handlers.PredicateHandler.handleRequest(PredicateHandler.java:43)
    at io.undertow.servlet.handlers.SendErrorPageHandler.handleRequest(SendErrorPageHandler.java:52)
    at io.undertow.server.handlers.PredicateHandler.handleRequest(PredicateHandler.java:43)
    at io.undertow.servlet.handlers.ServletInitialHandler.handleFirstRequest(ServletInitialHandler.java:275)
    at io.undertow.servlet.handlers.ServletInitialHandler.access$100(ServletInitialHandler.java:79)
    at io.undertow.servlet.handlers.ServletInitialHandler$2.call(ServletInitialHandler.java:134)
    at io.undertow.servlet.handlers.ServletInitialHandler$2.call(ServletInitialHandler.java:131)
    at io.undertow.servlet.core.ServletRequestContextThreadSetupAction$1.call(ServletRequestContextThreadSetupAction.java:48)
    at io.undertow.servlet.core.ContextClassLoaderSetupAction$1.call(ContextClassLoaderSetupAction.java:43)
    at io.undertow.servlet.handlers.ServletInitialHandler.dispatchRequest(ServletInitialHandler.java:255)
    at io.undertow.servlet.handlers.ServletInitialHandler.access$000(ServletInitialHandler.java:79)
    at io.undertow.servlet.handlers.ServletInitialHandler$1.handleRequest(ServletInitialHandler.java:100)
    at io.undertow.server.Connectors.executeRootHandler(Connectors.java:387)
    at io.undertow.server.HttpServerExchange$1.run(HttpServerExchange.java:852)
    at org.jboss.threads.ContextClassLoaderSavingRunnable.run(ContextClassLoaderSavingRunnable.java:35)
    at org.jboss.threads.EnhancedQueueExecutor.safeRun(EnhancedQueueExecutor.java:2019)
    at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.doRunTask(EnhancedQueueExecutor.java:1558)
    at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1423)
    at org.xnio.XnioWorker$WorkerThreadFactory$1$1.run(XnioWorker.java:1282)
    at java.lang.Thread.run(Thread.java:750)
Caused by: javax.websocket.DeploymentException: Invalid response code 404
    at io.undertow.websockets.jsr.ServerWebSocketContainer.connectToServerInternal(ServerWebSocketContainer.java:518)
    at io.undertow.websockets.jsr.ServerWebSocketContainer.connectToServerInternal(ServerWebSocketContainer.java:492)
    at io.undertow.websockets.jsr.ServerWebSocketContainer.connectToServer(ServerWebSocketContainer.java:274)
Linyuzai commented 9 months ago

Invalid response code 404

我这边测试是正常的,不过我用的tomcat,你试下其他ws工具呢,有在线工具,localhost改成具体的ip试下,有的时候启用的虚拟网卡可能ip有问题

bydongxing commented 9 months ago

我试过了,使用 ws 工具是可以的,但是我用同样的 java websocket 的客户端代码 去连接 自己写的 websocket 服务端是可以的,连接 ws://localhost:8080/concept-websocket/user 不行!

自己写的 websocket 服务端

@Component
@Slf4j
@ServerEndpoint("/websocket/{userId}")  // 接口路径 ws://localhost:8080/websocket/userId;
public class JavaxWebSocketServerEndpoint {

 /**
     * 连接建立
     *
     * @param session 会话
     */
    @OnOpen
    public void onOpen(Session session) {
        log.info("客户端建立连接!");
    }

    /**
     * 连接关闭
     *
     * @param session 会话
     * @param reason  原因
     */
    @OnClose
    public void onClose(Session session, CloseReason reason) {
        log.info("客户端连接关闭!");

    }

    /**
     * 接收文本消息
     *
     * @param session 会话
     * @param message 消息
     */
    @OnMessage
    public void onMessage(Session session, String message) {
        log.info("客户端接收到服务端的消息: {}", message);

    }

    /**
     * 接收pong消息
     *
     * @param session 会话
     * @param message 消息
     */
    @OnMessage
    public void onMessage(Session session, PongMessage message) {

    }

    /**
     * 接收二进制消息,也可以用 byte[] 接收
     *
     * @param session 会话
     * @param message 消息
     */
    @OnMessage
    public void onMessage(Session session, ByteBuffer message) {

    }

    /**
     * 异常处理
     *
     * @param session 会话
     * @param e       异常
     */
    @OnError
    public void onError(Session session, Throwable e) {

    }
  @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
Linyuzai commented 9 months ago

我试过了,使用 ws 工具是可以的,但是我用同样的 java websocket 的客户端代码 去连接 自己写的 websocket 服务端是可以的,连接 ws://localhost:8080/concept-websocket/user 不行!

自己写的 websocket 服务端

@Component
@Slf4j
@ServerEndpoint("/websocket/{userId}")  // 接口路径 ws://localhost:8080/websocket/userId;
public class JavaxWebSocketServerEndpoint {

 /**
     * 连接建立
     *
     * @param session 会话
     */
    @OnOpen
    public void onOpen(Session session) {
        log.info("客户端建立连接!");
    }

    /**
     * 连接关闭
     *
     * @param session 会话
     * @param reason  原因
     */
    @OnClose
    public void onClose(Session session, CloseReason reason) {
        log.info("客户端连接关闭!");

    }

    /**
     * 接收文本消息
     *
     * @param session 会话
     * @param message 消息
     */
    @OnMessage
    public void onMessage(Session session, String message) {
        log.info("客户端接收到服务端的消息: {}", message);

    }

    /**
     * 接收pong消息
     *
     * @param session 会话
     * @param message 消息
     */
    @OnMessage
    public void onMessage(Session session, PongMessage message) {

    }

    /**
     * 接收二进制消息,也可以用 byte[] 接收
     *
     * @param session 会话
     * @param message 消息
     */
    @OnMessage
    public void onMessage(Session session, ByteBuffer message) {

    }

    /**
     * 异常处理
     *
     * @param session 会话
     * @param e       异常
     */
    @OnError
    public void onError(Session session, Throwable e) {

    }
  @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

我这边测不出来这个问题,你试一下把concept.websocket.type指定为javax试试呢

bydongxing commented 9 months ago

我将 web 容器从 undertow 换成 tomcat 后,我使用工具就没问题,就很神奇,地址信息在截图中都有,报错信息如下:

image image image
javax.websocket.DeploymentException: The HTTP response from the server [404] did not permit the HTTP upgrade to WebSocket
    at org.apache.tomcat.websocket.WsWebSocketContainer.connectToServerRecursive(WsWebSocketContainer.java:400)
    at org.apache.tomcat.websocket.WsWebSocketContainer.connectToServer(WsWebSocketContainer.java:131)
    at com.xavier.dong.websocket.client.controller.WebsocketClientController.addUsers(WebsocketClientController.java:41)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150)
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808)
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1071)
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:964)
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
    at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:670)
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:779)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
    at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
    at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:96)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:197)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:541)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:135)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:360)
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:399)
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1789)
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
    at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191)
    at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659)
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
    at java.lang.Thread.run(Thread.java:750)
Linyuzai commented 9 months ago

我将 web 容器从 undertow 换成 tomcat 后,我使用工具就没问题,就很神奇,地址信息在截图中都有,报错信息如下:

image image image

javax.websocket.DeploymentException: The HTTP response from the server [404] did not permit the HTTP upgrade to WebSocket
  at org.apache.tomcat.websocket.WsWebSocketContainer.connectToServerRecursive(WsWebSocketContainer.java:400)
  at org.apache.tomcat.websocket.WsWebSocketContainer.connectToServer(WsWebSocketContainer.java:131)
  at com.xavier.dong.websocket.client.controller.WebsocketClientController.addUsers(WebsocketClientController.java:41)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
  at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150)
  at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117)
  at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895)
  at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808)
  at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
  at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1071)
  at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:964)
  at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
  at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898)
  at javax.servlet.http.HttpServlet.service(HttpServlet.java:670)
  at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
  at javax.servlet.http.HttpServlet.service(HttpServlet.java:779)
  at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227)
  at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
  at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
  at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
  at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
  at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)
  at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
  at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
  at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
  at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)
  at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
  at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
  at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
  at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:96)
  at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
  at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
  at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
  at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
  at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
  at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
  at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
  at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:197)
  at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)
  at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:541)
  at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:135)
  at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
  at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78)
  at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:360)
  at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:399)
  at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)
  at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893)
  at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1789)
  at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
  at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191)
  at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659)
  at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
  at java.lang.Thread.run(Thread.java:750)

所以你的意思是 undertow + java客户端连接 会有问题么?

bydongxing commented 9 months ago

不好意思,我没表述清楚,就是我不管使用 undertow 还是 tomcat 连接 concept-websocket-loadbalance-spring-boot-starter 提供的地址 都是提示 404,都没成功过!同样的 Java ws客户端程序,连接自己写的 websocket Server 的地址连接是成功的!同时: 我使用 ws 在线工具 连接 concept-websocket-loadbalance-spring-boot-starter 提供的地址和 自己写的 websocket Server 的地址 都是可行的! 所以觉得很神奇,不知道问题出在哪了

Linyuzai commented 9 months ago

不好意思,我没表述清楚,就是我不管使用 undertow 还是 tomcat 连接 concept-websocket-loadbalance-spring-boot-starter 提供的地址 都是提示 404,都没成功过!同样的 Java ws客户端程序,连接自己写的 websocket Server 的地址连接是成功的!同时: 我使用 ws 在线工具 连接 concept-websocket-loadbalance-spring-boot-starter 提供的地址和 自己写的 websocket Server 的地址 都是可行的! 所以觉得很神奇,不知道问题出在哪了

确实很奇怪,我复制你的代码在我这运行也是没有问题的,可以连上

bydongxing commented 9 months ago

不好意思,我没表述清楚,就是我不管使用 undertow 还是 tomcat 连接 concept-websocket-loadbalance-spring-boot-starter 提供的地址 都是提示 404,都没成功过!同样的 Java ws客户端程序,连接自己写的 websocket Server 的地址连接是成功的!同时: 我使用 ws 在线工具 连接 concept-websocket-loadbalance-spring-boot-starter 提供的地址和 自己写的 websocket Server 的地址 都是可行的! 所以觉得很神奇,不知道问题出在哪了

确实很奇怪,我复制你的代码在我这运行也是没有问题的,可以连上

我又试了下,同样的代码,在我本地再起一个 新的端口号的 Server 端,在起一个 新的端口号的 Client 端,client new 可以连上 server new ,然后我再回头用 老的client 连接老的 server,就不行!这问题就很诡异,百分百必现

bydongxing commented 9 months ago

我使用 https://github.com/TooTallNate/Java-WebSocket/wiki#server-example 这个开源的 java websocket 库,作为客户端,连接 concept-websocket-loadbalance-spring-boot-starter内置的接口 ,比如: ws://localhost:9090/concept-websocket/user?userId=56 也是提示 404. 错误信息: closed with exit code 1002 additional info: Invalid status code received: 404 Status line: HTTP/1.1 404 Not Found

image

我查了下,好像是从 http 升级为 websocket 的时候,服务端给的响应不对,所以才升级失败的!

Linyuzai commented 9 months ago

我使用 https://github.com/TooTallNate/Java-WebSocket/wiki#server-example 这个开源的 java websocket 库,作为客户端,连接 concept-websocket-loadbalance-spring-boot-starter内置的接口 ,比如: ws://localhost:9090/concept-websocket/user?userId=56 也是提示 404. 错误信息: closed with exit code 1002 additional info: Invalid status code received: 404 Status line: HTTP/1.1 404 Not Found

image

我查了下,好像是从 http 升级为 websocket 的时候,服务端给的响应不对,所以才升级失败的!

我这边也是封装的Spring的接口,Spring也是封装的Javax,按道理来说,连接这部分的逻辑还没有到我的代码里面,你之前发的报错也都是容器内部的堆栈,主要是我这边是正常的,如果是404的话应该就是没有这个地址,但是你那边在线工具又是没问题的

Linyuzai commented 9 months ago

我使用 https://github.com/TooTallNate/Java-WebSocket/wiki#server-example 这个开源的 java websocket 库,作为客户端,连接 concept-websocket-loadbalance-spring-boot-starter内置的接口 ,比如: ws://localhost:9090/concept-websocket/user?userId=56 也是提示 404. 错误信息: closed with exit code 1002 additional info: Invalid status code received: 404 Status line: HTTP/1.1 404 Not Found

image

我查了下,好像是从 http 升级为 websocket 的时候,服务端给的响应不对,所以才升级失败的!

发一下完整的配置,启动注解应该加了吧

bydongxing commented 9 months ago

我知道怎么复现了,就是 server 端 在本地启动两个,然后 Client 端就会存在连接其中一个 server 端会 404,连接另外一个 server 端就正常的情况!

bydongxing commented 9 months ago

我使用 https://github.com/TooTallNate/Java-WebSocket/wiki#server-example 这个开源的 java websocket 库,作为客户端,连接 concept-websocket-loadbalance-spring-boot-starter内置的接口 ,比如: ws://localhost:9090/concept-websocket/user?userId=56 也是提示 404. 错误信息: closed with exit code 1002 additional info: Invalid status code received: 404 Status line: HTTP/1.1 404 Not Found

image

我查了下,好像是从 http 升级为 websocket 的时候,服务端给的响应不对,所以才升级失败的!

发一下完整的配置,启动注解应该加了吧

启动类:

image

配置文件

# websocket 集群
concept:
  websocket:
    type: auto #JAVAX/SERVLET/REACTIVE,AUTO自动适配,默认AUTO
    server: #服务配置
      default-endpoint: #默认端点
        enabled: true #是否启用默认端点,默认true
        prefix: concept-websocket #前缀,默认'/concept-websocket/'
        path-selector: #Path选择器
          enabled: true #是否启用Path选择器,默认false
        user-selector: #User选择器
          enabled: true #是否启用User选择器,默认false
      message:
        retry:
          times: 0 #客户端重试次数,默认不重试
          period: 0 #客户端重试间隔,单位ms,默认0ms
      heartbeat: #心跳配置
        enabled: true #是否启用心跳,默认true
        period: 60000 #心跳间隔,单位ms,默认1分钟
        timeout: 210000 #超时时间,单位ms,默认3.5分钟,3次心跳间隔
    load-balance: #负载均衡(转发)配置
      subscriber-master: redisson_topic #主订阅器,默认 websocket
      subscriber-slave: none #从订阅器,默认无
      message:
        retry:
          times: 0 #转发重试次数,默认不重试
          period: 0 #转发重试间隔,单位ms,默认0ms
      monitor: #监控配置
        enabled: true #是否启用监控,默认true
        period: 30000 #轮训间隔,单位ms,默认30s
        logger: false #是否启用日志,默认false
      heartbeat: #心跳配置
        enabled: true #是否启用心跳,默认true
        period: 60000 #心跳间隔,单位ms,默认1分钟
        timeout: 210000 #超时时间,单位ms,默认3.5分钟,3次心跳间隔
    executor:
      thread-pool-size: 1 #线程池大小,默认1

websocket 客户端信息:

        // 这个正常
        WebSocketClient client = new EmptyClient(URI.create("ws://localhost:9091/concept-websocket/user?userId=56"));
        // 下面这个就会 404
        WebSocketClient client = new EmptyClient(URI.create("ws://localhost:9090/concept-websocket/user?userId=56"));
        client.connect();
public class EmptyClient extends WebSocketClient{

    public EmptyClient(URI serverUri, Draft draft) {
        super(serverUri, draft);
    }

    public EmptyClient(URI serverURI) {
        super(serverURI);
    }

    @Override
    public void onOpen(ServerHandshake handshakedata) {
        send("Hello, it is me. Mario :)");
        System.out.println("new connection opened");
    }

    @Override
    public void onClose(int code, String reason, boolean remote) {
        System.out.println("closed with exit code " + code + " additional info: " + reason);
    }

    @Override
    public void onMessage(String message) {
        System.out.println("received message: " + message);
    }

    @Override
    public void onMessage(ByteBuffer message) {
        System.out.println("received ByteBuffer");
    }

    @Override
    public void onError(Exception ex) {
        System.err.println("an error occurred:" + ex);
    }

    public static void main(String[] args) throws URISyntaxException {
        WebSocketClient client = new EmptyClient(new URI("ws://localhost:8887"));
        client.connect();
    }
}
<dependency>
  <groupId>org.java-websocket</groupId>
  <artifactId>Java-WebSocket</artifactId>
  <version>1.5.4</version>
</dependency>
Linyuzai commented 9 months ago

我知道怎么复现了,就是 server 端 在本地启动两个,然后 Client 端就会存在连接其中一个 server 端会 404,连接另外一个 server 端就正常的情况!

我这还是没问题,复现不了。。。

bydongxing commented 9 months ago

我知道怎么复现了,就是 server 端 在本地启动两个,然后 Client 端就会存在连接其中一个 server 端会 404,连接另外一个 server 端就正常的情况!

我这还是没问题,复现不了。。。

方便留个联系方式远程视频下?

Linyuzai commented 9 months ago

我知道怎么复现了,就是 server 端 在本地启动两个,然后 Client 端就会存在连接其中一个 server 端会 404,连接另外一个 server 端就正常的情况!

我这还是没问题,复现不了。。。

方便留个联系方式远程视频下?

WX:linyuzai1909 有试过用js写ws连接么

bydongxing commented 9 months ago

linyuzai1909

定位到问题了,端口号 9090 被 clashx 监听了,好尴尬,必须要把软件完全彻底退出才能行,关闭代理也不行的,必须要彻底退出软件才能行! 大佬,netty的 websocket 看您啥时候有时间了,给帮忙支持下呗,Thanks♪(・ω・)ノ