GenweiWu / Blog

个人技术能力提升
MIT License
4 stars 0 forks source link

如何用websocket实现即时消息推送? #61

Closed GenweiWu closed 5 years ago

GenweiWu commented 5 years ago

参考

疑问

GenweiWu commented 5 years ago

1、基本实例js版

客户端

// websocket.html
<div id="clock"></div>
<script>
let clockDiv = document.getElementById('clock')
let socket = new WebSocket('ws://localhost:9999')
//当连接成功之后就会执行回调函数
socket.onopen = function() {
  console.log('客户端连接成功')
  //再向服务 器发送一个消息
  socket.send('hello') //客户端发的消息内容 为hello
}
//绑定事件是用加属性的方式
socket.onmessage = function(event) {
  clockDiv.innerHTML = event.data
  console.log('收到服务器端的响应', event.data)
}
</script>

服务器

// websocket.js
let express = require('express')
let app = express()
app.use(express.static(__dirname))
//http服务器
app.listen(3000)
let WebSocketServer = require('ws').Server
//用ws模块启动一个websocket服务器,监听了9999端口
let wsServer = new WebSocketServer({ port: 9999 })
//监听客户端的连接请求  当客户端连接服务器的时候,就会触发connection事件
//socket代表一个客户端,不是所有客户端共享的,而是每个客户端都有一个socket
wsServer.on('connection', function(socket) {
  //每一个socket都有一个唯一的ID属性
  console.log(socket)
  console.log('客户端连接成功')
  //监听对方发过来的消息
  socket.on('message', function(message) {
    console.log('接收到客户端的消息', message)
    socket.send('服务器回应:' + message)
  })
})

参考:https://juejin.im/post/5c20e5766fb9a049b13e387b

GenweiWu commented 5 years ago

2、springBoot 实现websocket

2.1 客户端

还是js版本的,主要是url改了一下

<div id="clock"></div>
<script>
let clockDiv = document.getElementById('clock')
let socket = new WebSocket('ws://localhost:21088/code-service/message')
//当连接成功之后就会执行回调函数
socket.onopen = function() {
  console.log('客户端连接成功')
  //再向服务 器发送一个消息
  socket.send('hello ironman') //客户端发的消息内容 为hello
}
//绑定事件是用加属性的方式
socket.onmessage = function(event) {
  clockDiv.innerHTML = event.data
  console.log('收到服务器端的响应', event.data)
}
</script>

2.2 服务端

在搭建基本springBoot应用的前台下

1. 增加依赖

pom.xml

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
    <version>2.0.4.RELEASE</version>
</dependency>

2.配置url映射

import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer
{
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry)
    {
        //setAllowedOrigins的作用:因为默认不支持跨域,会报403错
        registry.addHandler(myHandler(), "message").setAllowedOrigins("*");;
    }

    public WebSocketHandler myHandler()
    {
        return new MyHandler();
    }

}

3. 进行逻辑处理


import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;

/**
 * 相当于controller的处理器
 */
public class MyHandler extends TextWebSocketHandler
{
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message)
        throws Exception
    {
        String payload = message.getPayload();
        System.out.println("=====接受到的数据" + message);
        session.sendMessage(new TextMessage("服务器返回收到的信息," + payload));
    }
}

参考:https://segmentfault.com/a/1190000016012270


2.3 如果使用的是jetty作为服务器,则会报错

org.eclipse.jetty.websocket.jsr356.server.ServerContainer cannot be cast to org.apache.tomcat.websocket.server.WsServerContainer

则需要参考上述服务端的基础上,修改排除下tomcat

pom.xml

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>2.3.8.RELEASE</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</exclusion>
</exclusions>
</dependency>
GenweiWu commented 5 years ago

广播 和 特定用户发送

服务端改造:在2的基础上改造下

(1)拦截器,用于保存用户信息到socketsessetion

import java.util.Map;

import javax.servlet.http.HttpSession;

import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;

public class WebSocketHandlerInterceptor extends HttpSessionHandshakeInterceptor
{
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
        Map<String, Object> attributes)
        throws Exception
    {
        if (request instanceof ServletServerHttpRequest)
        {
            ServletServerHttpRequest servletRequest = (ServletServerHttpRequest)request;
            HttpSession session = servletRequest.getServletRequest().getSession(false);
            if (session != null)
            {
                //使用userName区分WebSocketHandler,以便定向发送消息
                String userName = (String)session.getAttribute("SESSION_USERNAME");
                if (userName != null)
                {
                    attributes.put("WEBSOCKET_USERID", userName);
                }

            }
        }
        return super.beforeHandshake(request, response, wsHandler, attributes);
    }

}

(2)用来进行保存用户和对应的sokcet的映射

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;

/**
 * Created by zhangwenchao on 2017/11/20.
 */
public class SpringWebSocketHandler extends TextWebSocketHandler {

    private static Logger logger = LoggerFactory.getLogger(SpringWebSocketHandler.class);

    private static final Map<String, WebSocketSession> users;  //Map来存储WebSocketSession,key用USER_ID 即在线用户列表

    //用户标识
    private static final String USER_ID = "WEBSOCKET_USERID";   //对应监听器从的key

    static {
        users =  new HashMap<String, WebSocketSession>();
    }

    public SpringWebSocketHandler() {}

    /**
     * 连接成功时候,会触发页面上onopen方法
     */
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {

        System.out.println("成功建立websocket连接!");
        String userId = (String) session.getAttributes().get(USER_ID);
        users.put(userId,session);
        System.out.println("当前线上用户数量:"+users.size());

        //这块会实现自己业务,比如,当用户登录后,会把离线消息推送给用户
        //TextMessage returnMessage = new TextMessage("成功建立socket连接,你将收到的离线");
        //session.sendMessage(returnMessage);
    }

    /**
     * 关闭连接时触发
     */
    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
        logger.debug("关闭websocket连接");
        String userId= (String) session.getAttributes().get(USER_ID);
        System.out.println("用户"+userId+"已退出!");
        users.remove(userId);
        System.out.println("剩余在线用户"+users.size());
    }

    /**
     * js调用websocket.send时候,会调用该方法
     */
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {

        super.handleTextMessage(session, message);

        /**
         * 收到消息,自定义处理机制,实现业务
         */
        System.out.println("服务器收到消息:"+message);

        if(message.getPayload().startsWith("#anyone#")){ //单发某人

             sendMessageToUser((String)session.getAttributes().get(USER_ID), new TextMessage("服务器单发:" +message.getPayload())) ;

        }else if(message.getPayload().startsWith("#everyone#")){

             sendMessageToUsers(new TextMessage("服务器群发:" +message.getPayload()));

        }else{

        }

    }

    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        if(session.isOpen()){
            session.close();
        }
        logger.debug("传输出现异常,关闭websocket连接... ");
        String userId= (String) session.getAttributes().get(USER_ID);
        users.remove(userId);
    }

    public boolean supportsPartialMessages() {

        return false;
    }

    /**
     * 给某个用户发送消息
     *
     * @param userId
     * @param message
     */
    public void sendMessageToUser(String userId, TextMessage message) {
        for (String id : users.keySet()) {
            if (id.equals(userId)) {
                try {
                    if (users.get(id).isOpen()) {
                        users.get(id).sendMessage(message);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
                break;
            }
        }
    }

    /**
     * 给所有在线用户发送消息
     *
     * @param message
     */
    public void sendMessageToUsers(TextMessage message) {
        for (String userId : users.keySet()) {
            try {
                if (users.get(userId).isOpen()) {
                    users.get(userId).sendMessage(message);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}

参考:https://blog.csdn.net/zmx729618/article/details/78584633


个人在springBoot项目中发现,没有WebSocketHandlerInterceptor,在session对象的attribute中也有用户信息

GenweiWu commented 5 years ago

nginx代理websocket

http {
    map $http_upgrade $connection_upgrade {
        default upgrade;
        ''      close;
    }

    server {
        ...

        location /chat/ {
            proxy_pass http://backend;
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection $connection_upgrade;
        }
    }
GenweiWu commented 5 years ago

其他问题

多页面发送问题

多个页面无法共享websocket,所以需要在后台维护一个用户的多个websocket

微服务部署

需要将内存中存储改到redis中进行存储