Linyuzai / concept

封装了一些常用组件,走过路过不要错过哦
Apache License 2.0
380 stars 113 forks source link

大量websocket连接优化 #44

Open chuchushijingxi opened 1 week ago

chuchushijingxi commented 1 week ago

依赖名称: concept-websocket-loadbalance 依赖版本: 2.6.0 问题描述: 目前集群中两台机器,极端的情况单台机器会建立一万六的 websocket 连接,请问有什么优化的手段吗 异常堆栈:

代码示例:

Linyuzai commented 1 week ago

是指客户端数量就有1.6w吗

Linyuzai commented 1 week ago

是指多个实例,但是连接都路由到了同一个实例,导致一个实例连的特别多?

chuchushijingxi commented 1 week ago

是指多个实例,但是连接都路由到了同一个实例,导致一个实例连的特别多?

目前后台服务作为服务端,提供 websocket 连接,只是作为一个中转站的功能 A、B 客户端都连接上后台服务,连接的 URL 是

A 和 B 有相同的 id,这样 A 发送到服务端的消息就处理后转发给 B

chuchushijingxi commented 1 week ago

是指客户端数量就有1.6w吗

Linyuzai commented 1 week ago

是指多个实例,但是连接都路由到了同一个实例,导致一个实例连的特别多?

目前后台服务作为服务端,提供 websocket 连接,只是作为一个中转站的功能 A、B 客户端都连接上后台服务,连接的 URL 是

  • /concept-websocket/connect?id=zer89Fxc4hB_ZY6R-SmTB168mspaGD_jL9oAvC7cdlp7YQ&connectType=A
  • /concept-websocket/connect?id=zer89Fxc4hB_ZY6R-SmTB168mspaGD_jL9oAvC7cdlp7YQ&connectType=B

A 和 B 有相同的 id,这样 A 发送到服务端的消息就处理后转发给 B

所以问题是什么,如果是客户端数量本身就很大的话,只能多部署实例吧

chuchushijingxi commented 1 week ago

是指多个实例,但是连接都路由到了同一个实例,导致一个实例连的特别多?

目前后台服务作为服务端,提供 websocket 连接,只是作为一个中转站的功能 A、B 客户端都连接上后台服务,连接的 URL 是

  • /concept-websocket/connect?id=zer89Fxc4hB_ZY6R-SmTB168mspaGD_jL9oAvC7cdlp7YQ&connectType=A
  • /concept-websocket/connect?id=zer89Fxc4hB_ZY6R-SmTB168mspaGD_jL9oAvC7cdlp7YQ&connectType=B

A 和 B 有相同的 id,这样 A 发送到服务端的消息就处理后转发给 B

所以问题是什么,如果是客户端数量本身就很大的话,只能多部署实例吧

昨天看了一下是用 keepalived 在集群弄了虚 ip,连接都只走到一台机器的 nginx 上了,请问一下大佬有什么办法可以避免吗

Linyuzai commented 1 week ago

这个不太清楚,nginx不是可以配置负载均衡的吗

chuchushijingxi commented 1 week ago

这个不太清楚,nginx不是可以配置负载均衡的吗

websocket连接建立到一万三左右,心跳开始不回消息了,这个有啥排查方向吗

Linyuzai commented 1 week ago

这个不太清楚,nginx不是可以配置负载均衡的吗

websocket连接建立到一万三左右,心跳开始不回消息了,这个有啥排查方向吗

这个心跳是指,你那边自己定义的还是说自带的心跳

chuchushijingxi commented 1 week ago

这个不太清楚,nginx不是可以配置负载均衡的吗

websocket连接建立到一万三左右,心跳开始不回消息了,这个有啥排查方向吗

这个心跳是指,你那边自己定义的还是说自带的心跳

自定义的一个心跳。

Linyuzai commented 1 week ago

控制台没有任何输出吗,有代码吗,是收到客户端的消息然后回复吗

chuchushijingxi commented 1 week ago

控制台没有任何输出吗,有代码吗,是收到客户端的消息然后回复吗

嗯嗯,心跳只是收到消息之后判断一下,然后就直接通过websocket回复了

String jsonStr = context.getJsonStr();
        String roomId = context.getRoomId();
        String key = StrUtil.format(RedisKeyEnum.CSTREAMING_ROOM_KEY.getKey(), roomId);

        HeartBeatReq heartBeatReq = JSONObject.parseObject(jsonStr, HeartBeatReq.class);
        HeartBeatRsp rsp = BeanUtil.toBean(heartBeatReq, HeartBeatRsp.class);
        rsp.setMessageID(TemplateEnum.HEART_BEAT_RSP_CODE.getCode());
        rsp.setResult(CodeEnum.CODE_JOIN_ROOM_NULL_ERR.getMessage());

        // 发送给 cstreaming
        if (Boolean.TRUE.equals(redisTemplate.hasKey(key))) {
            rsp.setResult(CommonConstants.SUCCESS_STR);

            // 发送给 cstreaming
            String jsonString = JSONObject.toJSONString(rsp);
            LOGGER.info("-> cstreaming ({}) :{}", rsp.getMessageID(), jsonString);
            concept.send(new RoomMessage(jsonString, roomId + context.getClientType()));
            refresh(roomId, getContainerId(roomId));
        } else {
            offLine(context);
        }
Linyuzai commented 1 week ago

可以先看下连接是不是没问题,AbstractConnectionLoadBalanceConcept#send 里面,ConnectionSelector#select 出来的连接是不是有数据(如果数量少的时候没问题,连接问题的可能性不大)

发送消息的时候,默认是顺序发送的,也有可能是连接太多导致发送时间很久(所有客户端都没有收到么?)可以注入 CompletableFutureMessageSender试试多线程下发(这个类是在比较新的版本里面有)

最后是 ServletWebSocketConnection#doSend(webmvc环境)是最终下发数据的地方,可以看看是不是走到了

Linyuzai commented 1 week ago

CompletableFutureMessageSender注入的时候记得加上scope(文档里面有写)不然不会生效

chuchushijingxi commented 1 week ago

CompletableFutureMessageSender

CompletableFutureMessageSender注入的时候记得加上scope(文档里面有写)不然不会生效

谢谢大佬

chuchushijingxi commented 1 week ago

CompletableFutureMessageSender注入的时候记得加上scope(文档里面有写)不然不会生效

在 Concept WebSocket LoadBalance 2 的文档里面没有找到 CompletableFutureMessageSender 的使用说明 😰

Linyuzai commented 1 week ago

CompletableFutureMessageSender注入的时候记得加上scope(文档里面有写)不然不会生效

在 Concept WebSocket LoadBalance 2 的文档里面没有找到 CompletableFutureMessageSender 的使用说明 😰

我是指加Scope的说明,CompletableFutureMessageSender 直接注入就可以了

Linyuzai commented 1 week ago

CompletableFutureMessageSender注入的时候记得加上scope(文档里面有写)不然不会生效

在 Concept WebSocket LoadBalance 2 的文档里面没有找到 CompletableFutureMessageSender 的使用说明 😰

说错了,是注入CompletableFutureMessageSenderFactory,然后注入的时候设置scope就行了

chuchushijingxi commented 3 days ago

CompletableFutureMessageSender注入的时候记得加上scope(文档里面有写)不然不会生效

在 Concept WebSocket LoadBalance 2 的文档里面没有找到 CompletableFutureMessageSender 的使用说明 😰

说错了,是注入CompletableFutureMessageSenderFactory,然后注入的时候设置scope就行了

集群之间的 websocket 同步可以关闭吗,现在大量websocket连接建立之后,通过netstat查看发现同步全部阻塞了

Linyuzai commented 3 days ago

CompletableFutureMessageSender注入的时候记得加上scope(文档里面有写)不然不会生效

在 Concept WebSocket LoadBalance 2 的文档里面没有找到 CompletableFutureMessageSender 的使用说明 😰

说错了,是注入CompletableFutureMessageSenderFactory,然后注入的时候设置scope就行了

集群之间的 websocket 同步可以关闭吗,现在大量websocket连接建立之后,通过netstat查看发现同步全部阻塞了

哪里的同步?哪个代码

chuchushijingxi commented 3 days ago

CompletableFutureMessageSender注入的时候记得加上scope(文档里面有写)不然不会生效

在 Concept WebSocket LoadBalance 2 的文档里面没有找到 CompletableFutureMessageSender 的使用说明 😰

说错了,是注入CompletableFutureMessageSenderFactory,然后注入的时候设置scope就行了

集群之间的 websocket 同步可以关闭吗,现在大量websocket连接建立之后,通过netstat查看发现同步全部阻塞了

哪里的同步?哪个代码

集群启动之后会打印 ws://192.168.50.37:7000/concept-websocket-subscriber 。就是这里的 websocket 同步吧。。想着压力测试不做集群同步排查一下网络阻塞的问题

Linyuzai commented 3 days ago

试一下自定义ConnectionServerManagerFactory(设置scope)注入,自定义ConnectionServerManager里面getLocal返回null

chuchushijingxi commented 3 days ago

试一下自定义ConnectionServerManagerFactory(设置scope)注入,自定义ConnectionServerManager里面getLocal返回null

没研究明白怎么写。。。😭😭

Linyuzai commented 3 days ago
@Configuration
public class SampleConfig {

    @Bean
    public ConnectionServerManagerFactory connectionServerManagerFactory() {
        return new ConnectionServerManagerFactoryImpl();
    }

    public static class ConnectionServerManagerFactoryImpl implements ConnectionServerManagerFactory {

        @Override
        public ConnectionServerManager create(String scope) {
            return new ConnectionServerManagerImpl();
        }

        @Override
        public boolean support(String scope) {
            return true;
        }
    }

    public static class ConnectionServerManagerImpl implements ConnectionServerManager {

        @Override
        public ConnectionServer getLocal(ConnectionLoadBalanceConcept concept) {
            //这里返回null
            return null;
        }

        @Override
        public List<ConnectionServer> getConnectionServers(ConnectionLoadBalanceConcept concept) {
            return Collections.emptyList();
        }
    }
}
chuchushijingxi commented 3 days ago
@Configuration
public class SampleConfig {

    @Bean
    public ConnectionServerManagerFactory connectionServerManagerFactory() {
        return new ConnectionServerManagerFactoryImpl();
    }

    public static class ConnectionServerManagerFactoryImpl implements ConnectionServerManagerFactory {

        @Override
        public ConnectionServerManager create(String scope) {
            return new ConnectionServerManagerImpl();
        }

        @Override
        public boolean support(String scope) {
            return true;
        }
    }

    public static class ConnectionServerManagerImpl implements ConnectionServerManager {

        @Override
        public ConnectionServer getLocal(ConnectionLoadBalanceConcept concept) {
            //这里返回null
            return null;
        }

        @Override
        public List<ConnectionServer> getConnectionServers(ConnectionLoadBalanceConcept concept) {
            return Collections.emptyList();
        }
    }
}
@Configuration
public class SampleConfig {

    @Bean
    public ConnectionServerManagerFactory connectionServerManagerFactory() {
        return new ConnectionServerManagerFactoryImpl();
    }

    public static class ConnectionServerManagerFactoryImpl implements ConnectionServerManagerFactory {

        @Override
        public ConnectionServerManager create(String scope) {
            return new ConnectionServerManagerImpl();
        }

        @Override
        public boolean support(String scope) {
            return true;
        }
    }

    public static class ConnectionServerManagerImpl implements ConnectionServerManager {

        @Override
        public ConnectionServer getLocal(ConnectionLoadBalanceConcept concept) {
            //这里返回null
            return null;
        }

        @Override
        public List<ConnectionServer> getConnectionServers(ConnectionLoadBalanceConcept concept) {
            return Collections.emptyList();
        }
    }
}

谢谢大佬。使用这段代码后再进行大量 websocket 连接测试收发消息都正常了,使用 netstat 查看也不会阻塞了。。是不是集群 websocket 同步会有影响😟

Linyuzai commented 3 days ago

之前说的CompletableFutureMessageSenderFactory这个有尝试吗

你们的测试方案时怎么样的,我这边按你们的试一下看能不能复现

你试一下用redis/mq的方式转发会不会有问题

chuchushijingxi commented 3 days ago

之前说的CompletableFutureMessageSenderFactory这个有尝试吗

你们的测试方案时怎么样的,我这边按你们的试一下看能不能复现

你试一下用redis/mq的方式转发会不会有问题

集群中两台机器,用nginx转发,大概连接数到 1.3 - 1.5 W 的时候收发消息开始阻塞。无法回复心跳 客户端 websocket 连接上之后给服务发送

{"version":0,"messageID":1019,"timestamp":1719393151641,"roomID":"m-jxoQ5et0B8ZIifkX7KQmKKl9pdJjjuOfFY6nSA","peerID":"1805874855115026432","networkInfo":{"rtt":10,"lost":0,"jitter":16},"appList":null}

然后服务中使用 WebSocketLoadBalanceConcept concept 回复

{"messageID":1020,"networkInfo":{"jitter":16,"lost":0.0,"rtt":10},"peerID":"1805874519604260864","result":"success","roomID":"m-jxoQ5et0B8ZIifkX7KQmKKl9pdJjjuOfFQ53uM","timestamp":1719393151647,"version":0}

尝试过 redis 进行转发,可能因为我改了 redis 的配置会报错。

@EnableCaching
@AutoConfiguration
@AutoConfigureBefore(RedisAutoConfiguration.class)
public class RedisTemplateConfiguration {

    @Bean
    @Primary
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setKeySerializer(RedisSerializer.string());
        redisTemplate.setHashKeySerializer(RedisSerializer.string());
        redisTemplate.setValueSerializer(RedisSerializer.string());
        redisTemplate.setHashValueSerializer(RedisSerializer.string());
        redisTemplate.setConnectionFactory(factory);
        return redisTemplate;
    }

    @Bean
    public HashOperations<String, String, Object> hashOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForHash();
    }

    @Bean
    public ValueOperations<String, String> valueOperations(RedisTemplate<String, String> redisTemplate) {
        return redisTemplate.opsForValue();
    }

    @Bean
    public ListOperations<String, Object> listOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForList();
    }

    @Bean
    public SetOperations<String, Object> setOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForSet();
    }

    @Bean
    public ZSetOperations<String, Object> zSetOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForZSet();
    }

}

CompletableFutureMessageSenderFactory 这个也是不太明白怎么用 🥹🥹

Linyuzai commented 3 days ago

之前说的CompletableFutureMessageSenderFactory这个有尝试吗 你们的测试方案时怎么样的,我这边按你们的试一下看能不能复现 你试一下用redis/mq的方式转发会不会有问题

集群中两台机器,用nginx转发,大概连接数到 1.3 - 1.5 W 的时候收发消息开始阻塞。无法回复心跳 客户端 websocket 连接上之后给服务发送

{"version":0,"messageID":1019,"timestamp":1719393151641,"roomID":"m-jxoQ5et0B8ZIifkX7KQmKKl9pdJjjuOfFY6nSA","peerID":"1805874855115026432","networkInfo":{"rtt":10,"lost":0,"jitter":16},"appList":null}

然后服务中使用 WebSocketLoadBalanceConcept concept 回复

{"messageID":1020,"networkInfo":{"jitter":16,"lost":0.0,"rtt":10},"peerID":"1805874519604260864","result":"success","roomID":"m-jxoQ5et0B8ZIifkX7KQmKKl9pdJjjuOfFQ53uM","timestamp":1719393151647,"version":0}

尝试过 redis 进行转发,可能因为我改了 redis 的配置会报错。

@EnableCaching
@AutoConfiguration
@AutoConfigureBefore(RedisAutoConfiguration.class)
public class RedisTemplateConfiguration {

  @Bean
  @Primary
  public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
      RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
      redisTemplate.setKeySerializer(RedisSerializer.string());
      redisTemplate.setHashKeySerializer(RedisSerializer.string());
      redisTemplate.setValueSerializer(RedisSerializer.string());
      redisTemplate.setHashValueSerializer(RedisSerializer.string());
      redisTemplate.setConnectionFactory(factory);
      return redisTemplate;
  }

  @Bean
  public HashOperations<String, String, Object> hashOperations(RedisTemplate<String, Object> redisTemplate) {
      return redisTemplate.opsForHash();
  }

  @Bean
  public ValueOperations<String, String> valueOperations(RedisTemplate<String, String> redisTemplate) {
      return redisTemplate.opsForValue();
  }

  @Bean
  public ListOperations<String, Object> listOperations(RedisTemplate<String, Object> redisTemplate) {
      return redisTemplate.opsForList();
  }

  @Bean
  public SetOperations<String, Object> setOperations(RedisTemplate<String, Object> redisTemplate) {
      return redisTemplate.opsForSet();
  }

  @Bean
  public ZSetOperations<String, Object> zSetOperations(RedisTemplate<String, Object> redisTemplate) {
      return redisTemplate.opsForZSet();
  }

}

CompletableFutureMessageSenderFactory 这个也是不太明白怎么用 🥹🥹

你注入一下StringRedisTemplate用redis转发看会不会抱错

Linyuzai commented 3 days ago

之前说的CompletableFutureMessageSenderFactory这个有尝试吗 你们的测试方案时怎么样的,我这边按你们的试一下看能不能复现 你试一下用redis/mq的方式转发会不会有问题

集群中两台机器,用nginx转发,大概连接数到 1.3 - 1.5 W 的时候收发消息开始阻塞。无法回复心跳 客户端 websocket 连接上之后给服务发送

{"version":0,"messageID":1019,"timestamp":1719393151641,"roomID":"m-jxoQ5et0B8ZIifkX7KQmKKl9pdJjjuOfFY6nSA","peerID":"1805874855115026432","networkInfo":{"rtt":10,"lost":0,"jitter":16},"appList":null}

然后服务中使用 WebSocketLoadBalanceConcept concept 回复

{"messageID":1020,"networkInfo":{"jitter":16,"lost":0.0,"rtt":10},"peerID":"1805874519604260864","result":"success","roomID":"m-jxoQ5et0B8ZIifkX7KQmKKl9pdJjjuOfFQ53uM","timestamp":1719393151647,"version":0}

尝试过 redis 进行转发,可能因为我改了 redis 的配置会报错。

@EnableCaching
@AutoConfiguration
@AutoConfigureBefore(RedisAutoConfiguration.class)
public class RedisTemplateConfiguration {

  @Bean
  @Primary
  public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
      RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
      redisTemplate.setKeySerializer(RedisSerializer.string());
      redisTemplate.setHashKeySerializer(RedisSerializer.string());
      redisTemplate.setValueSerializer(RedisSerializer.string());
      redisTemplate.setHashValueSerializer(RedisSerializer.string());
      redisTemplate.setConnectionFactory(factory);
      return redisTemplate;
  }

  @Bean
  public HashOperations<String, String, Object> hashOperations(RedisTemplate<String, Object> redisTemplate) {
      return redisTemplate.opsForHash();
  }

  @Bean
  public ValueOperations<String, String> valueOperations(RedisTemplate<String, String> redisTemplate) {
      return redisTemplate.opsForValue();
  }

  @Bean
  public ListOperations<String, Object> listOperations(RedisTemplate<String, Object> redisTemplate) {
      return redisTemplate.opsForList();
  }

  @Bean
  public SetOperations<String, Object> setOperations(RedisTemplate<String, Object> redisTemplate) {
      return redisTemplate.opsForSet();
  }

  @Bean
  public ZSetOperations<String, Object> zSetOperations(RedisTemplate<String, Object> redisTemplate) {
      return redisTemplate.opsForZSet();
  }

}

CompletableFutureMessageSenderFactory 这个也是不太明白怎么用 🥹🥹

@Configuration
public class SampleConfig {

    @Bean
    public CompletableFutureMessageSenderFactory completableFutureMessageSenderFactory() {
        return new CompletableFutureMessageSenderFactory().addScopes(WebSocketScoped.NAME);
    }
}
chuchushijingxi commented 3 days ago

之前说的CompletableFutureMessageSenderFactory这个有尝试吗 你们的测试方案时怎么样的,我这边按你们的试一下看能不能复现 你试一下用redis/mq的方式转发会不会有问题

集群中两台机器,用nginx转发,大概连接数到 1.3 - 1.5 W 的时候收发消息开始阻塞。无法回复心跳 客户端 websocket 连接上之后给服务发送

{"version":0,"messageID":1019,"timestamp":1719393151641,"roomID":"m-jxoQ5et0B8ZIifkX7KQmKKl9pdJjjuOfFY6nSA","peerID":"1805874855115026432","networkInfo":{"rtt":10,"lost":0,"jitter":16},"appList":null}

然后服务中使用 WebSocketLoadBalanceConcept concept 回复

{"messageID":1020,"networkInfo":{"jitter":16,"lost":0.0,"rtt":10},"peerID":"1805874519604260864","result":"success","roomID":"m-jxoQ5et0B8ZIifkX7KQmKKl9pdJjjuOfFQ53uM","timestamp":1719393151647,"version":0}

尝试过 redis 进行转发,可能因为我改了 redis 的配置会报错。

@EnableCaching
@AutoConfiguration
@AutoConfigureBefore(RedisAutoConfiguration.class)
public class RedisTemplateConfiguration {

    @Bean
    @Primary
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setKeySerializer(RedisSerializer.string());
        redisTemplate.setHashKeySerializer(RedisSerializer.string());
        redisTemplate.setValueSerializer(RedisSerializer.string());
        redisTemplate.setHashValueSerializer(RedisSerializer.string());
        redisTemplate.setConnectionFactory(factory);
        return redisTemplate;
    }

    @Bean
    public HashOperations<String, String, Object> hashOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForHash();
    }

    @Bean
    public ValueOperations<String, String> valueOperations(RedisTemplate<String, String> redisTemplate) {
        return redisTemplate.opsForValue();
    }

    @Bean
    public ListOperations<String, Object> listOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForList();
    }

    @Bean
    public SetOperations<String, Object> setOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForSet();
    }

    @Bean
    public ZSetOperations<String, Object> zSetOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForZSet();
    }

}

CompletableFutureMessageSenderFactory 这个也是不太明白怎么用 🥹🥹

@Configuration
public class SampleConfig {

    @Bean
    public CompletableFutureMessageSenderFactory completableFutureMessageSenderFactory() {
        return new CompletableFutureMessageSenderFactory().addScopes(WebSocketScoped.NAME);
    }
}

注入了这个还是用 WebSocketLoadBalanceConcept 来进行消息发送嘛?

Linyuzai commented 3 days ago

之前说的CompletableFutureMessageSenderFactory这个有尝试吗 你们的测试方案时怎么样的,我这边按你们的试一下看能不能复现 你试一下用redis/mq的方式转发会不会有问题

集群中两台机器,用nginx转发,大概连接数到 1.3 - 1.5 W 的时候收发消息开始阻塞。无法回复心跳 客户端 websocket 连接上之后给服务发送

{"version":0,"messageID":1019,"timestamp":1719393151641,"roomID":"m-jxoQ5et0B8ZIifkX7KQmKKl9pdJjjuOfFY6nSA","peerID":"1805874855115026432","networkInfo":{"rtt":10,"lost":0,"jitter":16},"appList":null}

然后服务中使用 WebSocketLoadBalanceConcept concept 回复

{"messageID":1020,"networkInfo":{"jitter":16,"lost":0.0,"rtt":10},"peerID":"1805874519604260864","result":"success","roomID":"m-jxoQ5et0B8ZIifkX7KQmKKl9pdJjjuOfFQ53uM","timestamp":1719393151647,"version":0}

尝试过 redis 进行转发,可能因为我改了 redis 的配置会报错。

@EnableCaching
@AutoConfiguration
@AutoConfigureBefore(RedisAutoConfiguration.class)
public class RedisTemplateConfiguration {

  @Bean
  @Primary
  public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
      RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
      redisTemplate.setKeySerializer(RedisSerializer.string());
      redisTemplate.setHashKeySerializer(RedisSerializer.string());
      redisTemplate.setValueSerializer(RedisSerializer.string());
      redisTemplate.setHashValueSerializer(RedisSerializer.string());
      redisTemplate.setConnectionFactory(factory);
      return redisTemplate;
  }

  @Bean
  public HashOperations<String, String, Object> hashOperations(RedisTemplate<String, Object> redisTemplate) {
      return redisTemplate.opsForHash();
  }

  @Bean
  public ValueOperations<String, String> valueOperations(RedisTemplate<String, String> redisTemplate) {
      return redisTemplate.opsForValue();
  }

  @Bean
  public ListOperations<String, Object> listOperations(RedisTemplate<String, Object> redisTemplate) {
      return redisTemplate.opsForList();
  }

  @Bean
  public SetOperations<String, Object> setOperations(RedisTemplate<String, Object> redisTemplate) {
      return redisTemplate.opsForSet();
  }

  @Bean
  public ZSetOperations<String, Object> zSetOperations(RedisTemplate<String, Object> redisTemplate) {
      return redisTemplate.opsForZSet();
  }

}

CompletableFutureMessageSenderFactory 这个也是不太明白怎么用 🥹🥹

@Configuration
public class SampleConfig {

    @Bean
    public CompletableFutureMessageSenderFactory completableFutureMessageSenderFactory() {
        return new CompletableFutureMessageSenderFactory().addScopes(WebSocketScoped.NAME);
    }
}

注入了这个还是用 WebSocketLoadBalanceConcept 来进行消息发送嘛?

是的,切redis那个配置也试一下

chuchushijingxi commented 3 days ago

之前说的CompletableFutureMessageSenderFactory这个有尝试吗 你们的测试方案时怎么样的,我这边按你们的试一下看能不能复现 你试一下用redis/mq的方式转发会不会有问题

集群中两台机器,用nginx转发,大概连接数到 1.3 - 1.5 W 的时候收发消息开始阻塞。无法回复心跳 客户端 websocket 连接上之后给服务发送

{"version":0,"messageID":1019,"timestamp":1719393151641,"roomID":"m-jxoQ5et0B8ZIifkX7KQmKKl9pdJjjuOfFY6nSA","peerID":"1805874855115026432","networkInfo":{"rtt":10,"lost":0,"jitter":16},"appList":null}

然后服务中使用 WebSocketLoadBalanceConcept concept 回复

{"messageID":1020,"networkInfo":{"jitter":16,"lost":0.0,"rtt":10},"peerID":"1805874519604260864","result":"success","roomID":"m-jxoQ5et0B8ZIifkX7KQmKKl9pdJjjuOfFQ53uM","timestamp":1719393151647,"version":0}

尝试过 redis 进行转发,可能因为我改了 redis 的配置会报错。

@EnableCaching
@AutoConfiguration
@AutoConfigureBefore(RedisAutoConfiguration.class)
public class RedisTemplateConfiguration {

    @Bean
    @Primary
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setKeySerializer(RedisSerializer.string());
        redisTemplate.setHashKeySerializer(RedisSerializer.string());
        redisTemplate.setValueSerializer(RedisSerializer.string());
        redisTemplate.setHashValueSerializer(RedisSerializer.string());
        redisTemplate.setConnectionFactory(factory);
        return redisTemplate;
    }

    @Bean
    public HashOperations<String, String, Object> hashOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForHash();
    }

    @Bean
    public ValueOperations<String, String> valueOperations(RedisTemplate<String, String> redisTemplate) {
        return redisTemplate.opsForValue();
    }

    @Bean
    public ListOperations<String, Object> listOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForList();
    }

    @Bean
    public SetOperations<String, Object> setOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForSet();
    }

    @Bean
    public ZSetOperations<String, Object> zSetOperations(RedisTemplate<String, Object> redisTemplate) {
        return redisTemplate.opsForZSet();
    }

}

CompletableFutureMessageSenderFactory 这个也是不太明白怎么用 🥹🥹

@Configuration
public class SampleConfig {

    @Bean
    public CompletableFutureMessageSenderFactory completableFutureMessageSenderFactory() {
        return new CompletableFutureMessageSenderFactory().addScopes(WebSocketScoped.NAME);
    }
}

注入了这个还是用 WebSocketLoadBalanceConcept 来进行消息发送嘛?

是的,切redis那个配置也试一下

我改成 StringRedisTemplate 也不行。。是我的用法不对嘛

java.lang.NoSuchMethodError: 'void org.springframework.data.redis.core.StringRedisTemplate.convertAndSend(java.lang.String, java.lang.Object)
@EnableCaching
@AutoConfiguration
@AutoConfigureBefore(RedisAutoConfiguration.class)
public class RedisTemplateConfiguration {

    @Bean
    @Primary
    public StringRedisTemplate redisTemplate(RedisConnectionFactory factory) {
        return new StringRedisTemplate(factory);
    }

    @Bean
    public HashOperations<String, String, Object> hashOperations(StringRedisTemplate redisTemplate) {
        return redisTemplate.opsForHash();
    }

    @Bean
    public ValueOperations<String, String> valueOperations(StringRedisTemplate redisTemplate) {
        return redisTemplate.opsForValue();
    }

    @Bean
    public ListOperations<String, String> listOperations(StringRedisTemplate redisTemplate) {
        return redisTemplate.opsForList();
    }

    @Bean
    public SetOperations<String, String> setOperations(StringRedisTemplate redisTemplate) {
        return redisTemplate.opsForSet();
    }

    @Bean
    public ZSetOperations<String, String> zSetOperations(StringRedisTemplate redisTemplate) {
        return redisTemplate.opsForZSet();
    }

}
Linyuzai commented 3 days ago

这个抱错应该是spring没这个方法,你的spring版本是多少,先试下CompletableFutureMessageSenderFactory吧

chuchushijingxi commented 3 days ago

这个抱错应该是spring没这个方法,你的spring版本是多少,先试下CompletableFutureMessageSenderFactory吧

3.1.3 的版本

chuchushijingxi commented 2 days ago

这个抱错应该是spring没这个方法,你的spring版本是多少,先试下CompletableFutureMessageSenderFactory吧

用 CompletableFutureMessageSenderFactory 也是有很多阻塞 image

Linyuzai commented 2 days ago

我本地电脑连接不到6000的报错了,你的控制台没有输出什么信息吗

你算下从收到消息到发送完成的时间,看会不会出现时间很长的情况

redis那个是spring3改了方法返回值,我这边要支持一下spring3,你可以先集成redisson,用redisson转发试试,或者等支持了spring3你在试redis

然后你说的收不到消息是指,服务端收不到客户端消息了,还是说客户端收不到concept回复的消息

Linyuzai commented 2 days ago

你们的服务有配置什么参数吗,tomcat的连接数,或者内存大小什么的

chuchushijingxi commented 2 days ago

我本地电脑连接不到6000的报错了,你的控制台没有输出什么信息吗

你算下从收到消息到发送完成的时间,看会不会出现时间很长的情况

redis那个是spring3改了方法返回值,我这边要支持一下spring3,你可以先集成redisson,用redisson转发试试,或者等支持了spring3你在试redis

然后你说的收不到消息是指,服务端收不到客户端消息了,还是说客户端收不到concept回复的消息

本地电脑确实连不了很多,端口会耗尽。 是客户端收不到消息,客户端发了 messageId = 1019 的消息后显示一直没有收到服务端 1020 的响应导致心跳超时重连。

nginx 配置

user  nginx;
worker_processes        auto;
worker_cpu_affinity     auto;
worker_rlimit_nofile    65535;

error_log  /var/log/nginx/error.log warn;
pid        /var/run/nginx.pid;

events {
    use                 epoll;
    worker_connections  65535;
    multi_accept        on;
    accept_mutex        off;
}

tomcat 配置

server:
  tomcat:
    max-connections: 25000
    accept-count: 3000
    max-threads: 2000 
    min-spare-threads: 300
chuchushijingxi commented 2 days ago

我本地电脑连接不到6000的报错了,你的控制台没有输出什么信息吗

你算下从收到消息到发送完成的时间,看会不会出现时间很长的情况

redis那个是spring3改了方法返回值,我这边要支持一下spring3,你可以先集成redisson,用redisson转发试试,或者等支持了spring3你在试redis

然后你说的收不到消息是指,服务端收不到客户端消息了,还是说客户端收不到concept回复的消息

我看有这个方法 image

Linyuzai commented 2 days ago

我本地电脑连接不到6000的报错了,你的控制台没有输出什么信息吗 你算下从收到消息到发送完成的时间,看会不会出现时间很长的情况 redis那个是spring3改了方法返回值,我这边要支持一下spring3,你可以先集成redisson,用redisson转发试试,或者等支持了spring3你在试redis 然后你说的收不到消息是指,服务端收不到客户端消息了,还是说客户端收不到concept回复的消息

本地电脑确实连不了很多,端口会耗尽。 是客户端收不到消息,客户端发了 messageId = 1019 的消息后显示一直没有收到服务端 1020 的响应导致心跳超时重连。

nginx 配置

user  nginx;
worker_processes        auto;
worker_cpu_affinity     auto;
worker_rlimit_nofile    65535;

error_log  /var/log/nginx/error.log warn;
pid        /var/run/nginx.pid;

events {
    use                 epoll;
    worker_connections  65535;
    multi_accept        on;
    accept_mutex        off;
}

tomcat 配置

server:
  tomcat:
    max-connections: 25000
    accept-count: 3000
    max-threads: 2000 
    min-spare-threads: 300

这个响应,是要发给所有的客户端?还是只要发给发消息的客户端就行了

chuchushijingxi commented 2 days ago

我本地电脑连接不到6000的报错了,你的控制台没有输出什么信息吗 你算下从收到消息到发送完成的时间,看会不会出现时间很长的情况 redis那个是spring3改了方法返回值,我这边要支持一下spring3,你可以先集成redisson,用redisson转发试试,或者等支持了spring3你在试redis 然后你说的收不到消息是指,服务端收不到客户端消息了,还是说客户端收不到concept回复的消息

本地电脑确实连不了很多,端口会耗尽。 是客户端收不到消息,客户端发了 messageId = 1019 的消息后显示一直没有收到服务端 1020 的响应导致心跳超时重连。 nginx 配置

user  nginx;
worker_processes        auto;
worker_cpu_affinity     auto;
worker_rlimit_nofile    65535;

error_log  /var/log/nginx/error.log warn;
pid        /var/run/nginx.pid;

events {
    use                 epoll;
    worker_connections  65535;
    multi_accept        on;
    accept_mutex        off;
}

tomcat 配置

server:
  tomcat:
    max-connections: 25000
    accept-count: 3000
    max-threads: 2000 
    min-spare-threads: 300

这个响应,是要发给所有的客户端?还是只要发给发消息的客户端就行了

只发给发消息的客户端。 websocket 的连接 URL wss://192.168.50.30:8443/signaling/concept-websocket/connect?roomId=yr-g9gBXsxcjZYmTkX7KQmKKl9pdJjjuOfFY4Hg&connectType=cstreaming&secretKey=FR7oUxfH&peerId=1805218222086160384

连接成功建立之后会设置一个 key

connection.getMetadata().put(RoomSelector.KEY, roomId + connectType);

然后发送给指定的客户端

concept.send(new RoomMessage(jsonString, roomId + context.getClientType()));
chuchushijingxi commented 2 days ago

我本地电脑连接不到6000的报错了,你的控制台没有输出什么信息吗 你算下从收到消息到发送完成的时间,看会不会出现时间很长的情况 redis那个是spring3改了方法返回值,我这边要支持一下spring3,你可以先集成redisson,用redisson转发试试,或者等支持了spring3你在试redis 然后你说的收不到消息是指,服务端收不到客户端消息了,还是说客户端收不到concept回复的消息

本地电脑确实连不了很多,端口会耗尽。 是客户端收不到消息,客户端发了 messageId = 1019 的消息后显示一直没有收到服务端 1020 的响应导致心跳超时重连。 nginx 配置

user  nginx;
worker_processes        auto;
worker_cpu_affinity     auto;
worker_rlimit_nofile    65535;

error_log  /var/log/nginx/error.log warn;
pid        /var/run/nginx.pid;

events {
    use                 epoll;
    worker_connections  65535;
    multi_accept        on;
    accept_mutex        off;
}

tomcat 配置

server:
  tomcat:
    max-connections: 25000
    accept-count: 3000
    max-threads: 2000 
    min-spare-threads: 300

这个响应,是要发给所有的客户端?还是只要发给发消息的客户端就行了

因为当前系统做的是像中转站的的功能,也有 A 客户端发送的消息需要响应给 B 客户端的。 推流服务在启动后会通过 websocket 连接上来,用户想进入房间也需要通过 websocket 连接上来。 用户连接上来之后会发送建立 WebRTC 的消息,这个时候我的服务就需要给这条消息转发给推流服务

Linyuzai commented 2 days ago

试一下2.7.1的版本redis转发

如果只需要回复给发送消息的客户端,直接通过connection发消息就行了

@Component
public class WsMessageHandler implements WebSocketMessageHandler {

    @Override
    public void onMessage(Message message, Connection connection, ConnectionLoadBalanceConcept concept) {
        connection.send(message);
    }
}
chuchushijingxi commented 2 days ago

试一下2.7.1的版本redis转发

如果只需要回复给发送消息的客户端,直接通过connection发消息就行了

@Component
public class WsMessageHandler implements WebSocketMessageHandler {

    @Override
    public void onMessage(Message message, Connection connection, ConnectionLoadBalanceConcept concept) {
        connection.send(message);
    }
}

看了一下,业务中还是有很多是需要转发的,现在数据都是存在 redis 里面,(未使用 redis 转发)每秒大概 3500 - 4000 命令数。 连接数量上来后担心用 redis 转发会有消息堆积。。现在的处理方式就是关闭同步,用 Nginx hash websocket URL 中的 roomId,让相同 roomId 连在同一台机器上 😭😭