lorenzodonini / ocpp-go

Open Charge Point Protocol implementation in Go
MIT License
277 stars 126 forks source link

[ENG] help! websocket write: broken pipe, and ws.cleanupConnection() Unlock() not work #291

Open AndrewYEEE opened 3 months ago

AndrewYEEE commented 3 months ago

OCPP version: [x] 1.6 [ ] 2.0.1

I'm submitting a ...

[x] bug report [ ] feature request

Current behavior: ChargingPoint sent a large number of Heartbeats at the same time due to an exception, and we encountered the following error:

time="2024-08-31T06:45:32+08:00" level=info msg=CCCCCCHeartbeat time="2024-08-31T06:45:32+08:00" level=info msg=BBBBBBHeartbeat time="2024-08-31T06:45:33+08:00" level=info msg=AAAAAAHeartbeat time="2024-08-31T06:45:33+08:00" level=info msg=AAAAAAHeartbeat time="2024-08-31T06:45:33+08:00" level=info msg="write failed for AAAAAA: %!w(*net.OpError=&{write tcp 0xc0007a24b0 0xc0007a24e0 0xc000682a60})" logger=websocket time="2024-08-31T06:45:33+08:00" level=error msg="write failed for AAAAAA: write tcp 172.18.0.xx:9001->172.18.0.xx:57006: write: broken pipe" logger=websocket time="2024-08-31T06:45:33+08:00" level=info msg=AAAAAAHeartbeat time="2024-08-31T06:45:33+08:00" level=info msg=EEEEEEHeartbeat

But the next step is not printed: "closed connection to AAAAAA"

According to writePump() of ws/websocket.go, When "write failed" occurs, the next step is to execute server.cleanupConnection(ws) to close the connection and print "closed connection to AAAAAA", but it does not.

We judge that the program is blocked in server.cleanup Connection(ws) and server.connMutex.Unlock() is not released.

As a result, all subsequent messages are blocked and unable to respond to ChargingPoint because they are waiting for server.connMutex.Unlock().

Can anyone give an answer?

Is it possible that a Race condition occurs in cleanupConnection(), or are close(ws.outQueue) and close(ws.closeC) in server.cleanupConnection(ws) blocked?

Please help me! thank you!

AndrewYEEE commented 3 months ago

Maybe I found problem

The problem is, using RLock() in a process that may require Lock() :

    func (server *Server) Write(webSocketId string, data []byte) error {
        server.connMutex.RLock()
        defer server.connMutex.RUnlock()  \\<=============
        ws, ok := server.connections[webSocketId]
        if !ok {
            return fmt.Errorf("couldn't write to websocket. No socket with id %v is open", webSocketId)
        }
        log.Debugf("queuing data for websocket %s", webSocketId)
        ws.outQueue <- data \\<=============
        return nil
    }

We know that when several messages enter at the same time, several RLock() will be generated, And if one of the messages is wrong, cleanupConnection() in writePump() will be triggered.

            case ping := <-ws.pingMessage:
                _ = conn.SetWriteDeadline(time.Now().Add(server.timeoutConfig.WriteWait))
                err := conn.WriteMessage(websocket.PongMessage, ping)
                if err != nil {
                    server.error(fmt.Errorf("write failed for %s: %w", ws.ID(), err))
                    // Invoking cleanup, as socket was forcefully closed
                    server.cleanupConnection(ws) //<============= here
                    return
                }

There is Lock() in cleanupConnection(), which will cause Lock() to be blocked by other RLock() and generate Deadlock.

          func (server *Server) cleanupConnection(ws *WebSocket) {
              _ = ws.connection.Close()
              server.connMutex.Lock() //<============== here
              close(ws.outQueue)
              close(ws.closeC)
              delete(server.connections, ws.id)
              server.connMutex.Unlock()
              log.Infof("closed connection to %s", ws.ID())
              if server.disconnectedHandler != nil {
                  server.disconnectedHandler(ws)
              }
          }

So, we need to modify the RUnLock position so that it does not occur at the same time as Lock():

      func (server *Server) Write(webSocketId string, data []byte) error {
          server.connMutex.RLock()
          // defer server.connMutex.RUnlock() //<======== change here
          ws, ok := server.connections[webSocketId] 
              server.connMutex.RUnlock()  //<======== to here
          if !ok {
              return fmt.Errorf("couldn't write to websocket. No socket with id %v is open", webSocketId)
          }
          log.Debugf("queuing data for websocket %s", webSocketId)
          ws.outQueue <- data
          return nil
      }

Not sure if it is correct, please help me confirm, thks. @lorenzodonini

andig commented 2 months ago

There is Lock() in cleanupConnection(), which will cause Lock() to be blocked by other RLock() and generate Deadlock.

This would only deadlock if the queue is blocked and/or not buffered.

dwibudut commented 2 months ago

Hi @AndrewYEEE

I also had the same problem, in my env. it might caused by by the client sent burst message in one time to the server and then the client is closed by network layer (like VPN disconnected or very slowly network/4G). After server get write timeout the call inside cleanupConnection method is blocked.

I just change the Rlock() to Lock() but still blocked.

I tried this method and it worked for a temporary solution:

func (server *Server) Write(webSocketId string, data []byte) error {
    server.connMutex.RLock()
    ws, ok := server.connections[webSocketId]
        server.connMutex.RUnlock()
    if !ok {
        return fmt.Errorf("couldn't write to websocket. No socket with id %v is open", webSocketId)
    }
    log.Debugf("queuing data for websocket %s", webSocketId)
    defer func() {
        if r := recover(); r != nil {
            log.Debugf("couldn't write to already closed websocket with id %v", webSocketId)
        }
    }()
    ws.outQueue <- data
    return nil
}
AndrewYEEE commented 1 month ago

@dwibudut Very very thank you! This problem just happened to me too

 time="2024-10-25T17:28:52+08:00" level=info msg=AAAAAAAAAAA____Heartbeat
time="2024-10-25T17:28:52+08:00" level=info msg=AAAAAAAAAAA____Heartbeat
time="2024-10-25T17:28:52+08:00" level=info msg=AAAAAAAAAAA____Heartbeat
time="2024-10-25T17:28:52+08:00" level=info msg="(outQueue) write failed for AAAAAAAAAAA: %!w(*net.OpError=&{write tcp 0xc000598f90 0xc000598fc0 0xc00070a000})" logger=websocket
time="2024-10-25T17:28:52+08:00" level=error msg="(outQueue) write failed for AAAAAAAAAAA: write tcp 172.18.0.21:9001->172.18.0.24:35496: write: broken pipe" logger=websocket
time="2024-10-25T17:28:52+08:00" level=info msg="closed connection to AAAAAAAAAAA" logger=websocket
time="2024-10-25T17:28:52+08:00" level=info msg="==========ChargePointDisconnected========="
time="2024-10-25T17:28:52+08:00" level=info msg="charge point disconnected" client=AAAAAAAAAAA
panic: send on closed channel
goroutine 6530 [running]:
github.com/lorenzodonini/ocpp-go/ws.(*Server).Write(0xc00012e000, {0xc00048738a, 0xc}, {0xc000394000, 0x3d, 0x40})
    /home/vsts/work/1/ocpp-go/ws/websocket.go:462 +0x154
github.com/lorenzodonini/ocpp-go/ocppj.(*Server).SendResponse(0xc000168870, {0xc00048738a, 0xc}, {0xc0003e3ff0, 0x10}, {0xb1eca0?, 0xc00033a008?})
    /home/vsts/work/1/ocpp-go/ocppj/server.go:198 +0x16d

I will try the solution you gave, thank you!

AndrewYEEE commented 1 week ago

HI @dwibudut @lorenzodonini [Method 1]. Thank you for the method provided by @dwibudut . This method can indeed avoid Lock and Panic problems. Just change websocket.go:

func (server *Server) Write(webSocketId string, data []byte) (err error) {
    server.connMutex.RLock()
    // defer server.connMutex.RUnlock()
    ws, ok := server.connections[webSocketId]
    server.connMutex.RUnlock()
    if !ok {
        // [改]
        log.Infof("(Write) couldn't write to websocket. No socket with id %s is open", webSocketId)
        return fmt.Errorf("(Write) couldn't write to websocket. No socket with id %v is open", webSocketId)
    }
    log.Debugf("(Write) queuing data for websocket %s", webSocketId)

    // [改]
    defer func() {
        if r := recover(); r != nil {
            log.Infof("(Write) couldn't write to already closed websocket with id %s", webSocketId)
            err = fmt.Errorf("(Write) couldn't write to already closed websocket with id %v", webSocketId)
        }
    }()

    ws.outQueue <- data
    return nil
}

[Method 2]. In addition, we are working on another solution. The reason why the DeadLock problem occurs is because the ocpp-go lib allows responding to several requests to the charging point at the same time, and the channel size is limited to 1. When a channel is deleted because the charging point is offline, the RLock queued into the channel and the LOCK in cleanupConnection() will wait for each other, resulting in Deadlock.

So the simplest solution is to give a large enough channel size (poor solution). With a large enough channel, queued messages can be sent to the channel immediately.

// The id of the charge point is the final path element
    // [改]
    ws := WebSocket{
        connection:         conn,
        id:                 id,
        // outQueue:           make(chan []byte, 1),
        outQueue:           make(chan []byte, 200), // [改]
        closeC:             make(chan websocket.CloseError, 1),
        forceCloseC:        make(chan error, 1),
        pingMessage:        make(chan []byte, 1),
        tlsConnectionState: r.TLS,
    }
    log.Debugf("upgraded websocket connection for %s from %s", id, conn.RemoteAddr().String())

If this method is adopted, the first modification ( Write() ) is not needed.

Due to time constraints, the second method has not been tested for a long time, but I want to understand everyone's thoughts first to see if it is correct, thanks.