gorilla / websocket

Package gorilla/websocket is a fast, well-tested and widely used WebSocket implementation for Go.
https://gorilla.github.io
BSD 2-Clause "Simplified" License
22.31k stars 3.48k forks source link

how to stop conn.ReadMessage() in gorilla/websocket #747

Closed hutu1st closed 2 years ago

hutu1st commented 2 years ago

Describe the problem you're having

A clear and concise description of what the bug is.

… when I Implement a auto-reconnect client, I need close the goroutine conn.ReadMessage() , I try conn.Close() and conn.SetReadDeadline(time.Now()) , they usually works, but sometimes, the goroutine run conn.ReadMessage() does not exit immediately, but after a few minutes. I need your help! --

Versions

go version go1.15.6 darwin/amd64

"Show me the code!" ` package extools

import ( "fmt" "github.com/gorilla/websocket" "github.com/tal-tech/go-zero/core/logx" "sync" "sync/atomic" "time" )

const ( wsSubscriberStatusRunning int32 = 1 wsSubscriberStatusResetting int32 = 2 wsSubscriberStatusClosing int32 = 3 wsSubscriberStatusClosed int32 = 4 )

const ( pingPeriod = time.Second * 8 )

type ( // WsSubscriber websocket 订阅器, 封装了断线重连和重新订阅的功能,且是并发安全的 WsSubscriber interface { LeftTopics() int // 剩余可订阅数量 Sub(topics ...string) error UnSub(topics ...string) error OnRcvMsg(func(msg []byte)) HaveTopic(topic string) bool ResetConn() // 重置网络链接 Close() }

TopicsToMsgFn func(topics ...string) ([]byte, error) // 将订阅的topic转化成发给对端的msg

wsSubscriber struct {
    WsClient
    m             sync.Map // 存储所有订阅的topic
    url           string
    writeCh       chan []byte
    closeWriteCh  chan struct{} // 发送信号给 closeWriteCh,用来关闭 负责写 的 go routine
    closeReConnCh chan struct{} // 发送信号给 closeReConnCh,用来关闭重连的 go routine
    sendReConnCh  chan struct{} // 发送信号给 sendReConnCh, 负责重连的 go routine 会重新建立网络连接
    rcvReConnCh   chan struct{} // 从 rcvReConnCh 接受到信号, 表示重新建立网络连接完成,或者收到close信号
    maxTopicCount int32         // 每个conn 最多订阅的topic数量
    curTopicCount int32         // 当前订阅的topic数量
    conn          *websocket.Conn
    onRcvMsg      func(msg []byte)
    subFn         TopicsToMsgFn
    unsubFn       TopicsToMsgFn
    rcvPongTs     time.Time
    status        int32
}

)

func NewWsSubscriber(url string, maxTopic int32, client WsClient, subFn, unsubFn TopicsToMsgFn) (WsSubscriber, error) { conn, err := client.CreateConn(url) if err != nil { return nil, err }

ws := &wsSubscriber{
    WsClient:      client,
    m:             sync.Map{},
    url:           url_,
    writeCh:       make(chan []byte),
    closeWriteCh:  make(chan struct{}),
    closeReConnCh: make(chan struct{}),
    sendReConnCh:  make(chan struct{}),
    rcvReConnCh:   make(chan struct{}),
    maxTopicCount: maxTopic,
    curTopicCount: 0,
    conn:          conn,
    onRcvMsg:      func(msg []byte) {},
    subFn:         subFn,
    unsubFn:       unsubFn,
    status:        wsSubscriberStatusRunning,
}

//conn.SetPongHandler(ws.pongHandler)
go ws.read()
go ws.write()
go ws.checkReset()

logx.Infof("wsUrl:%s, proxy:%s, create a new ws subscriber", ws.url, client.Proxy())
return ws, nil

}

func (o *wsSubscriber) LeftTopics() int { return int(atomic.LoadInt32(&o.maxTopicCount) - atomic.LoadInt32(&o.curTopicCount)) }

func (o *wsSubscriber) HaveTopic(topic string) bool { _, ok := o.m.Load(topic) return ok }

func (o *wsSubscriber) Sub(topics ...string) error { var unStoredTopics []string // 有效的topic

for _, topic := range topics {
    if _, ok := o.m.Load(topic); !ok {
        unStoredTopics = append(unStoredTopics, topic)
        o.m.Store(topic, struct{}{})
    }
}

if len(unStoredTopics) == 0 {
    return nil
}

if o.LeftTopics() < len(unStoredTopics) {
    return fmt.Errorf("maximum topic limit exceeded. ")
}

atomic.AddInt32(&o.curTopicCount, int32(len(unStoredTopics)))

msg, err := o.subFn(unStoredTopics...)
if err != nil {
    return err
}

logx.Infof("wsUrl:%s, send msg:%s", o.url, string(msg))

o.writeCh <- msg
return nil

}

func (o *wsSubscriber) UnSub(topics ...string) error { var storedTopic []string // 有效的topic

for _, topic := range topics {
    if _, ok := o.m.Load(topic); ok {
        storedTopic = append(storedTopic, topic)
    }
}

if len(storedTopic) == 0 {
    return nil
}

atomic.AddInt32(&o.curTopicCount, -int32(len(storedTopic)))

msg, err := o.unsubFn(storedTopic...)
if err != nil {
    return err
}

for _, topic := range storedTopic {
    o.m.Delete(topic)
}

logx.Infof("wsUrl:%s, send msg:%s", o.url, string(msg))

o.writeCh <- msg
return nil

}

func (o *wsSubscriber) OnRcvMsg(fn func(msg []byte)) { o.onRcvMsg = fn }

func (o *wsSubscriber) Close() { if atomic.CompareAndSwapInt32(&o.status, wsSubscriberStatusRunning, wsSubscriberStatusClosing) { logx.Infof("wsUrl:%s, start closing", o.url) o.closeWriteCh <- struct{}{} // 关闭负责写的 go routine o.closeReConnCh <- struct{}{} // 关闭负责 重连 的 go routine err := o.conn.SetReadDeadline(time.Now()) // 让read go routine 立马超时退出 if err != nil { logx.Error(err) } atomic.StoreInt32(&o.status, wsSubscriberStatusClosed) logx.Infof("wsUrl:%s, complete closed", o.url) } else if atomic.CompareAndSwapInt32(&o.status, wsSubscriberStatusResetting, wsSubscriberStatusClosing) { // 如果是 resetting,置为 closing return } else { // closing 和 closed 状态 忽略 return } }

func (o *wsSubscriber) ResetConn() { if atomic.CompareAndSwapInt32(&o.status, wsSubscriberStatusRunning, wsSubscriberStatusResetting) { logx.Infof("wsUrl:%s, start re conn", o.url) err := o.conn.SetReadDeadline(time.Now()) // 让read go routine 超时退出 if err != nil { logx.Error(err) } o.closeWriteCh <- struct{}{} // 关闭负责写的 go routine o.sendReConnCh <- struct{}{} // 发送重连消息 <-o.rcvReConnCh // 等待重连完成, 这里不一定是真的重连完成, 也可能状态被置为 closing if atomic.CompareAndSwapInt32(&o.status, wsSubscriberStatusResetting, wsSubscriberStatusRunning) { // 说明还是 wsSubscriberStatusResetting 状态 go o.read() go o.write() logx.Infof("wsUrl:%s, complete re conn", o.url) } else if atomic.CompareAndSwapInt32(&o.status, wsSubscriberStatusClosing, wsSubscriberStatusClosed) { // 如果是 closing 状态, 置为 Closed o.closeReConnCh <- struct{}{} err := o.conn.Close() // 关闭conn, 并且 置为 wsSubscriberStatusResetting 状态后,read go routine 会退出 if err != nil { logx.Error(err) } } } else { // 除了 running 状态,其他的都 忽略 } }

// read 是一个接收消息的 go routine func (o *wsSubscriber) read() { logx.Infof("wsUrl:%s, start read go routine", o.url)

conn := o.conn

defer func() {
    err := conn.Close() // 关闭conn
    if err != nil {
        logx.Error(err)
    }
}()

for atomic.LoadInt32(&o.status) == wsSubscriberStatusRunning { // running 状态才继续读
    t, message, err := conn.ReadMessage()
    if err != nil {
        logx.Errorf("wsUrl:%s,ReadMessage t:%d msg:%s err:%v", o.url, t, string(message), err)
        if atomic.LoadInt32(&o.status) == wsSubscriberStatusRunning && o.conn == conn {
            logx.Infof("wsUrl:%s,ReadMessage t:%d msg:%s err:%v, and status==running, and conn==o.conn, will re conn",
                o.url, t, string(message), err)
            go o.ResetConn()
        }
        break // 退出
    }
    //fmt.Println(string(message))
    o.onRcvMsg(message)
}

logx.Infof("wsUrl:%s,quit read go routine", o.url)

}

// write 负责写的go routine, 并且会 定时主动发送 ping 帧 func (o *wsSubscriber) write() { logx.Infof("wsUrl:%s, start write go routine", o.url) ticker := time.NewTicker(pingPeriod) conn := o.conn defer ticker.Stop() for { select { case <-o.closeWriteCh: goto exit case msg := <-o.writeCh: err := conn.WriteMessage(websocket.TextMessage, msg) if err != nil { logx.Errorf("wsUrl:%s, write msg:%s ,err:%v", o.url, string(msg), err) } case <-ticker.C: //logx.Infof("wsUrl:%s, send ping", o.url) err := conn.WriteMessage(websocket.PingMessage, []byte{}) if err != nil { logx.Errorf("wsUrl:%s, send keepalive ping err:%v", o.url, err) if atomic.LoadInt32(&o.status) == wsSubscriberStatusRunning && o.conn == conn { logx.Infof("wsUrl:%s, send keepalive ping err:%v, and status==running, and conn==o.conn, will re conn", o.url, err) go o.ResetConn() } } } } exit: logx.Infof("wsUrl:%s,quit write go routine", o.url) }

// checkReset 是一个处理重连的 go routine func (o wsSubscriber) checkReset() { logx.Infof("wsUrl:%s, start checkReset go routine", o.url) for { select { case <-o.sendReConnCh: // 收到 重连的信号: logx.Infof("wsUrl:%s checkReset go routine rcv re conn signal", o.url) for atomic.LoadInt32(&o.status) == wsSubscriberStatusResetting { // resetting 状态才继续重置 conn, err := o.WsClient.CreateConn(o.url) if err != nil { logx.Errorf("wsUrl:%s, creat conn err:%v", o.url, err) time.Sleep(time.Second 3) // 隔三秒重连 continue } // 重连完成 o.conn = conn // 重置连接 err = o.reSub() // 重新订阅 if err != nil { logx.Errorf("wsUrl:%s, onReConn err:%v", o.url, err) err := o.conn.Close() if err != nil { logx.Error(err) } time.Sleep(time.Second * 3) // 隔三秒重连 continue // 重新订阅失败,继续重连 } o.rcvReConnCh <- struct{}{} // 发送信号, 重连完成 logx.Infof("wsUrl:%s, checkReset go routine send re conn completed signal", o.url) break //重连成功 且 重新订阅 成功 } // 退出重连循环有两种情况 1、重连成功 且 重新订阅 成功 2、被置为 closing 状态了 if atomic.LoadInt32(&o.status) != wsSubscriberStatusResetting { logx.Infof("wsUrl:%s, in re conn progress, status is set:%d ", o.url, o.status) o.rcvReConnCh <- struct{}{} // 发送信号 } case <-o.closeReConnCh: goto exit } } exit: logx.Infof("wsUrl:%s,quit checkReset go routine", o.url) }

// reSub 重连时,重新订阅记录的 topic func (o *wsSubscriber) reSub() error { var topics []string o.m.Range(func(key, value interface{}) bool { topics = append(topics, key.(string)) return true })

if len(topics) == 0 {
    return nil
}

msg, err := o.subFn(topics...)
if err != nil {
    return err
}

logx.Infof("reConn, and send msg:%s", string(msg))

return o.conn.WriteMessage(websocket.TextMessage, msg)

}

func (o *wsSubscriber) pongHandler(msg string) error { //_ = o.conn.SetReadDeadline(time.Now().Add(pongWait)) // 设置读超时 logx.Infof("wsUrl:%s, rcv pong", o.url) return nil }

`

hutu1st commented 2 years ago

sorry, I cannot format the code with ``

hutu1st commented 2 years ago

how can i format the code?

echlebek commented 2 years ago

@hutu1st you must use triple backticks. For instance,

func someCode() {}
hutu1st commented 2 years ago

I think I get it

finalclass commented 2 years ago

What is the solution here? I still experience the same issue. I have a client written using gorilla/websocket and it is connecting to a server written in nodejs (using primusjs library). All works fine however when I kill the nodejs app then gorilla/websocket app is hanging on the conn.ReadMessage() and it does not want to stop hanging. How to kill it?

kenhan168 commented 2 years ago

What is the solution here? I still experience the same issue. I have a client written using gorilla/websocket and it is connecting to a server written in nodejs (using primusjs library). All works fine however when I kill the nodejs app then gorilla/websocket app is hanging on the conn.ReadMessage() and it does not want to stop hanging. How to kill it?

Does the conn.SetReadDeadLine work?

e.g.

conn.SetReadDeadline(time.Now().Add(time. Second * 35)) _, messageBytes, err := conn.ReadMessage()

finalclass commented 2 years ago

It does not really meter because setting a deadline does not solve the issue. I wanted the ReadMessage to be running forever and only when a connection is broken I wanted it to return an err. Currently I switched to a different solution however from what I remember when a connection was broken the ReadMessage was still blocking, not returning the err.

Since there was no clear answer to this issue I proceeded with a totally different approach to solve my problem.

ghost commented 2 years ago

Use PING and PONG as in the original poster's application and the package examples. Update the deadline on any message received including the PONG.

finalclass commented 2 years ago

@volkerite it's so so helpful what you wrote. It's not easy to pick this flow up from the examples. I assume that this is how almost all usages of websocket should look like? If yes then it would be so great to have it plainly stated in documentation.