lesismal / nbio

Pure Go 1000k+ connections solution, support tls/http1.x/websocket and basically compatible with net/http, with high-performance and low memory cost, non-blocking, event-driven, easy-to-use.
MIT License
2.22k stars 155 forks source link

使用websocket服务端时客户端刚连接进来就断开 #207

Closed Kotodian closed 2 years ago

Kotodian commented 2 years ago

版本: v1.2.21 环境: k8s中的golang:1.19镜像 问题: 连接进来的客户端刚升级完就断开连接,Close中没有找到任何的错误 日志: 日志里的调用栈也实在是没看出什么问题来

image

代码: 连接进来时

ctx := r.Context()
    snPassword := mux.Vars(r)["charge_station"]
    // 获取sn号
    if snPassword == "" {
        w.WriteHeader(http.StatusBadRequest)
        _, _ = w.Write(responseErr(ctx, protocol.ErrGenericError, "sn empty"))
        return
    }

    var password string
    spilt := strings.Split(snPassword, ":")
    if len(spilt) > 1 {
        password = spilt[1]
    }

    sn := spilt[0]
    var certSerialNumber string
    if r.TLS != nil {
        if len(r.TLS.PeerCertificates) > 0 {
            log.Logger().Debug("证书中sn号为:", zap.String("sn", r.TLS.PeerCertificates[0].Subject.OrganizationalUnit[0]))
            log.Logger().Debug("实际sn号为:", zap.String("sn", sn))
            certInfo := r.TLS.PeerCertificates[0]
            if certInfo.Subject.OrganizationalUnit[0] != sn {
                w.WriteHeader(http.StatusBadRequest)
                _, _ = w.Write(responseErr(ctx, protocol.ErrGenericError, "参数sn不等于证书中的sn"))
                return
            }
            certSerialNumber = certInfo.SerialNumber.String()
        }
    }
    port := strings.Split(r.Host, ":")[1]
    remoteAddress := readUserIP(r)
    accVerReq := &api.AccessVerifyRequest{
        DeviceSerialNumber:    sn,
        DeviceProtocol:        Hub.Protocol,
        DeviceProtocolVersion: Hub.ProtocolVersion,
        RemoteAddress:         remoteAddress,
        RequestPort:           port,
    }

    if certSerialNumber != "" {
        accVerReq.CertSerialNumber = certSerialNumber
    }
    if password != "" {
        accVerReq.Password = password
    }
    // 确认是否能够访问
    response, err := api.AccessVerify(accVerReq)
    if err != nil {
        w.WriteHeader(http.StatusBadRequest)
        _, _ = w.Write(responseErr(ctx, protocol.ErrGenericError, "access verify failed"+err.Error()))
        log.Logger().Error(err.Error(), zap.String("sn", sn))
        return
    }

    // 升级成websocket协议
    ws, err := s.upgrader.Upgrade(w, r, nil)
    if err != nil {
        responseErr(ctx, protocol.ErrGenericError, err.Error())
        return
    }
    log.Logger().Debug("创建连接成功", zap.String("sn", sn))
    conn := ws.(*websocket.Conn)
    if response != nil {
        // 解析出平台ID
        coreId, _ := strconv.ParseUint(response.CoreID, 10, 64)
        if _client, ok := Hub.Clients.Load(coreId); ok {
            _client.(lib.ClientInterface).Close(errors.New("旧连接还存在先关闭"))
        }
        // 创建新的客户端
        client := NewClient(interfaces.NewDefaultChargeStation(sn, response.Registered, coreId), Hub, conn, response.KeepAlive, remoteAddress, log.Logger(), true)
        // 设置基本路由
        client.SetBaseURL(response.BaseURL)
        client.SetClientOfflineFunc(func(err error) {
            // 如果已经注册过了需要通知平台
            if client.ChargeStation().Registered() {
                reason := "websocket关闭连接"
                _ = ClientOfflineNotify(client.ChargeStation(), reason)
            }
            Hub.RegClients.Delete(client.ChargeStation().CoreID())
        })
        // 读取加密解密数据的密钥
        encryptKey := readEncryptKey(r)
        if len(encryptKey) != 0 {
            client.SetEncryptKey(encryptKey)
        }
        // 如果注册过了订阅命令消息, 没有就订阅注册消息
        if client.ChargeStation().Registered() {
            go client.SubMQTT()
        } else {
            go client.SubRegMQTT()
        }
        conn.SetSession(client)
    }
// 连接关闭时
    c.once.Do(func() {
        if err != nil {
            c.log.Error(err.Error(), zap.String("sn", c.chargeStation.SN()))
        }
        c.hub.Clients.Delete(c.chargeStation.CoreID())
        c.hub.RegClients.Delete(c.chargeStation.CoreID())
        c.log.Error("关闭连接", zap.String("sn", c.chargeStation.SN()))
        close(c.close)
        c.clientOfflineNotifyFunc(err)
    })
    return nil

可以看到日志里面打印了创建连接成功说明Upgrade是成功的,但是立马就进入到了Close,并且也没发生任何错误

Kotodian commented 2 years ago

发现是nginx直接报了

image
lesismal commented 2 years ago

你的问题描述好像没有给出nbio是做client还是server,提供的代码里也看不出来,麻烦提供下能够复现的完整代码,包括client和server。

Kotodian commented 2 years ago

你的问题描述好像没有给出nbio是做client还是server,提供的代码里也看不出来,麻烦提供下能够复现的完整代码,包括client和server。

我是拿nbio做server, 客户端连接到nginx,再通过nginx走到我的服务上,客户端是别人的c语言代码,用gorilla能连的上来, 因为这里面代码是没法单独拿出来使用的,我也知道描述的不是很清楚

lesismal commented 2 years ago

关闭连接的地方把nbio的OnClose传给你的err打印出来,先看看是什么err

Kotodian commented 2 years ago

关闭连接的地方把nbio的OnClose传给你的err打印出来,先看看是什么err

没有error ,我直接报了空指针错误

lesismal commented 2 years ago

你先打印出来,即使是空指针报错,也把空指针的报错、调用栈打印出来,总不能全让我靠猜吧。。。

Kotodian commented 2 years ago

你先打印出来,即使是空指针报错,也把空指针的报错、调用栈打印出来,总不能全让我靠猜吧。。。

image
lesismal commented 2 years ago

gorilla的client能练上来保持正常通信? c的client能连到gorilla的server上正常通信不?

Kotodian commented 2 years ago

gorilla的client能练上来保持正常通信? c的client能连到gorilla的server上正常通信不?

能正常通信的

Kotodian commented 2 years ago

gorilla的client能练上来保持正常通信? c的client能连到gorilla的server上正常通信不?

我是想自己看下哪里出错的 但是因为大量的都是用函数作为参数传进去,感觉有点难定位

lesismal commented 2 years ago

init_nb.go的代码方便贴下吗

Kotodian commented 2 years ago
package ws

import (
    "context"
    "crypto/x509"
    "encoding/json"
    "errors"
    "fmt"
    "io/ioutil"
    "net"
    "net/http"
    "os"
    "strconv"
    "strings"
    "sync"
    "time"

    _ "net/http/pprof"

    "gitee.com/csms/jx-ocpp/log"
    "gitee.com/csms/jx-ocpp/protocol"
    "github.com/Kotodian/gokit/ac/lib"
    "github.com/Kotodian/gokit/api"
    "github.com/Kotodian/gokit/datasource"
    "github.com/Kotodian/gokit/datasource/mqtt"
    "github.com/Kotodian/gokit/datasource/rabbitmq"
    "github.com/Kotodian/gokit/datasource/redis"
    "github.com/Kotodian/protocol/golang/hardware/charger"
    "github.com/Kotodian/protocol/golang/keys"
    "github.com/Kotodian/protocol/interfaces"
    "github.com/golang/protobuf/proto"
    "github.com/gorilla/mux"
    "github.com/lesismal/llib/std/crypto/tls"
    "github.com/lesismal/nbio/nbhttp"
    "github.com/lesismal/nbio/nbhttp/websocket"
    "github.com/valyala/bytebufferpool"
    "go.uber.org/zap"
    "golang.org/x/sync/errgroup"
)

const (
    // Time allowed to write a message to the peer.
    writeWait = 10 * time.Second
    readWait  = 80 * time.Second

    // Maximum message size allowed from peer.
    maxMessageSize = 4096
    readBufferSize = 2048
)

func InitNB() {
    hostname, _ := os.Hostname()
    Hub = lib.NewHub("ocpp", "2.0.1", "core_ocpp", "core_gw")
    Hub.ResponseErrFn = responseErr
    Hub.ResponseFn = response
    Hub.TR = &protocol.Translation{}
    Hub.SetEncrypt(lib.NewAESEncrypt(lib.CBC))

    config := nbhttp.Config{Addrs: []string{":8844"}}
    ctx, cancel := context.WithCancel(context.Background())
    s := &Server{
        Hub:         Hub,
        upgrader:    websocket.NewUpgrader(),
        hostname:    hostname,
        pprofServer: &http.Server{Addr: ":6060", Handler: nil},
        ctx:         ctx,
        cancel:      cancel,
    }
    tlsServerKey := os.Getenv("TLS_SERVER_KEY")
    tlsServerCert := os.Getenv("TLS_SERVER_CER")
    tlsCaCert := os.Getenv("TLS_CA_CER")
    if tlsServerKey != "" && tlsServerCert != "" && tlsCaCert != "" {
        pool := x509.NewCertPool()
        tlsConfig := &tls.Config{}
        cert, err := tls.LoadX509KeyPair(tlsServerCert, tlsServerKey)
        if err != nil {
            log.Logger().Panic("failed to read server cert or server key", zap.Error(err))
        }
        tlsConfig.Certificates = []tls.Certificate{cert}
        caCrt, err := ioutil.ReadFile(tlsCaCert)
        if err != nil {
            log.Logger().Panic(err.Error())
        }
        pool.AppendCertsFromPEM(caCrt)
        tlsConfig.ClientCAs = pool
        config.TLSConfig = tlsConfig
        config.AddrsTLS = []string{":8846"}
    }
    s.server = nbhttp.NewServer(config)
    router := mux.NewRouter().StrictSlash(true)
    router.UseEncodedPath()
    router.HandleFunc("/ocpp/{charge_station:.*}", s.wsHandler)
    s.server.Handler = router

    s.upgrader.KeepaliveTime = readWait
    s.upgrader.CheckOrigin = func(r *http.Request) bool {
        return true
    }

    s.upgrader.SetPingHandler(func(conn *websocket.Conn, s string) {
        c := conn.Session().(*nbClient)
        c.PingHandler(s)
    })

    s.upgrader.OnMessage(func(conn *websocket.Conn, mt websocket.MessageType, msg []byte) {
        c := conn.Session().(*nbClient)
        trData := &lib.TRData{}
        ctx := context.WithValue(context.Background(), "client", c)
        ctx = context.WithValue(ctx, "trData", trData)

        var err error
        defer func() {
            //如果发生了错误,都回复给设备,否则发送到平台
            if err != nil {
                c.ReplyError(ctx, err)
            }
        }()
        var payload proto.Message
        defer func() {
            if r := recover(); r != nil {
                c.log.Sugar().Errorf("%v\n", r)
            }
        }()

        if payload, err = c.hub.TR.ToAPDU(ctx, msg); err != nil {
            return
        }

        if payload == nil {
            return
        }

        if trData.Ignore {
            return
        }

        if trData.APDU.Payload, err = proto.Marshal(payload); err != nil {
            err = fmt.Errorf("encode cmd req payload error, err:%s", err.Error())
            return
        }
        var toCoreMSG []byte
        if toCoreMSG, err = proto.Marshal(trData.APDU); err != nil {
            err = fmt.Errorf("encode cmd req apdu error, err:%s", err.Error())
            return
        }

        var sendTopic string
        var sendQos byte
        if trData.IsTelemetry {
            sendTopic = "coregw/" + c.hub.Hostname + "/telemetry/" + datasource.UUID(c.chargeStation.CoreID()).String()
        } else if !trData.Sync {
            sendTopic = "coregw/" + c.hub.Hostname + "/command/" + datasource.UUID(c.chargeStation.CoreID()).String()
        } else {
            sendTopic = c.Coregw() + "/sync/" + datasource.UUID(c.chargeStation.CoreID()).String()
        }
        sendQos = 2

        c.hub.PubMqttMsg <- mqtt.MqttMessage{
            Topic:    sendTopic,
            Qos:      sendQos,
            Retained: false,
            Payload:  toCoreMSG,
        }
    })

    s.upgrader.OnClose(func(conn *websocket.Conn, err error) {
        c := conn.Session().(*nbClient)
        c.Close(err)
    })

    go Hub.Run()

    ch = make(chan struct{}, 1)

    s.group, _ = errgroup.WithContext(s.ctx)
    s.group.Go(func() error {
        defer func() {
            ch <- struct{}{}
        }()
        if err := s.server.Start(); err != nil {
            log.Logger().Error("failed to start websocket server", zap.Error(err))
            return err
        }
        <-s.ctx.Done()
        ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
        defer cancel()
        return s.server.Shutdown(ctx)
    })

    s.group.Go(func() error {
        log.Logger().Debug("start pprof server", zap.String("port", ":6060"))
        return s.pprofServer.ListenAndServe()
    })

}

type Server struct {
    Hub         *lib.Hub
    upgrader    *websocket.Upgrader
    hostname    string
    server      *nbhttp.Server
    pprofServer *http.Server
    ctx         context.Context
    cancel      context.CancelFunc
    group       *errgroup.Group
}

func (s *Server) Stop() {
    s.cancel()
}

func (s *Server) wsHandler(w http.ResponseWriter, r *http.Request) {
    ctx := r.Context()
    snPassword := mux.Vars(r)["charge_station"]
    // 获取sn号
    if snPassword == "" {
        w.WriteHeader(http.StatusBadRequest)
        _, _ = w.Write(responseErr(ctx, protocol.ErrGenericError, "sn empty"))
        return
    }

    var password string
    spilt := strings.Split(snPassword, ":")
    if len(spilt) > 1 {
        password = spilt[1]
    }

    sn := spilt[0]
    var certSerialNumber string
    if r.TLS != nil {
        if len(r.TLS.PeerCertificates) > 0 {
            log.Logger().Debug("证书中sn号为:", zap.String("sn", r.TLS.PeerCertificates[0].Subject.OrganizationalUnit[0]))
            log.Logger().Debug("实际sn号为:", zap.String("sn", sn))
            certInfo := r.TLS.PeerCertificates[0]
            if certInfo.Subject.OrganizationalUnit[0] != sn {
                w.WriteHeader(http.StatusBadRequest)
                _, _ = w.Write(responseErr(ctx, protocol.ErrGenericError, "参数sn不等于证书中的sn"))
                return
            }
            certSerialNumber = certInfo.SerialNumber.String()
        }
    }
    port := strings.Split(r.Host, ":")[1]
    remoteAddress := readUserIP(r)
    accVerReq := &api.AccessVerifyRequest{
        DeviceSerialNumber:    sn,
        DeviceProtocol:        Hub.Protocol,
        DeviceProtocolVersion: Hub.ProtocolVersion,
        RemoteAddress:         remoteAddress,
        RequestPort:           port,
    }

    if certSerialNumber != "" {
        accVerReq.CertSerialNumber = certSerialNumber
    }
    if password != "" {
        accVerReq.Password = password
    }
    // 确认是否能够访问
    response, err := api.AccessVerify(accVerReq)
    if err != nil {
        w.WriteHeader(http.StatusBadRequest)
        _, _ = w.Write(responseErr(ctx, protocol.ErrGenericError, "access verify failed"+err.Error()))
        log.Logger().Error(err.Error(), zap.String("sn", sn))
        return
    }

    // 升级成websocket协议
    ws, err := s.upgrader.Upgrade(w, r, nil)
    if err != nil {
        responseErr(ctx, protocol.ErrGenericError, err.Error())
        return
    }
    log.Logger().Debug("创建连接成功", zap.String("sn", sn))
    conn := ws.(*websocket.Conn)
    if response != nil {
        // 解析出平台ID
        coreId, _ := strconv.ParseUint(response.CoreID, 10, 64)
        if _client, ok := Hub.Clients.Load(coreId); ok {
            _client.(lib.ClientInterface).Close(errors.New("旧连接还存在先关闭"))
        }
        // 创建新的客户端
        client := NewClient(interfaces.NewDefaultChargeStation(sn, response.Registered, coreId), Hub, conn, response.KeepAlive, remoteAddress, log.Logger(), true)
        // 设置基本路由
        client.SetBaseURL(response.BaseURL)
        client.SetClientOfflineFunc(func(err error) {
            // 如果已经注册过了需要通知平台
            if client.ChargeStation().Registered() {
                reason := "websocket关闭连接"
                _ = ClientOfflineNotify(client.ChargeStation(), reason)
            }
            Hub.RegClients.Delete(client.ChargeStation().CoreID())
        })
        // 读取加密解密数据的密钥
        encryptKey := readEncryptKey(r)
        if len(encryptKey) != 0 {
            client.SetEncryptKey(encryptKey)
        }
        // 如果注册过了订阅命令消息, 没有就订阅注册消息
        if client.ChargeStation().Registered() {
            go client.SubMQTT()
        } else {
            go client.SubRegMQTT()
        }
        conn.SetSession(client)
    }
}

type nbClient struct {
    once                    sync.Once
    chargeStation           interfaces.ChargeStation
    hub                     *lib.Hub
    conn                    *websocket.Conn
    close                   chan struct{}
    lock                    sync.RWMutex
    clientOfflineNotifyFunc func(err error)       // 网络断开同步到core的函数
    mqttRegCh               chan mqtt.MqttMessage //注册信息
    mqttMsgCh               chan mqtt.MqttMessage //返回或下发的信息
    remoteAddress           string
    log                     *rabbitmq.Logger
    keepalive               int64
    coregw                  string
    isClose                 bool
    encryptKey              []byte
    id                      string
    certificateSN           string
    orderInterval           int
    baseURL                 string // 上传日志、下载固件基本地址
    debug                   bool
}

func NewClient(chargeStation interfaces.ChargeStation, hub *lib.Hub, conn *websocket.Conn, keepalive int, remoteAddress string, log *rabbitmq.Logger, debug ...bool) lib.ClientInterface {
    var b bool
    if len(debug) > 0 {
        b = debug[0]
    }
    return &nbClient{
        log:           log,
        chargeStation: chargeStation,
        hub:           hub,
        conn:          conn,
        remoteAddress: remoteAddress,
        mqttMsgCh:     make(chan mqtt.MqttMessage, 5),
        mqttRegCh:     make(chan mqtt.MqttMessage, 5),
        close:         make(chan struct{}),
        keepalive:     int64(keepalive),
        orderInterval: 30,
        debug:         b,
    }
}

func (c *nbClient) MessageNumber() int16 {
    return 0
}

func (c *nbClient) SetMessageNumber(i int16) {
    return
}

func (c *nbClient) SetData(key, val interface{}) {
    return
}

func (c *nbClient) GetData(key interface{}) interface{} {
    return nil
}

func (c *nbClient) Send(msg []byte) (err error) {
    defer func() {
        if e := recover(); e != nil {
            err = e.(error)
        }
    }()
    if c.hub.Encrypt != nil && len(c.encryptKey) > 0 {
        msg, err = c.hub.Encrypt.Encode(msg, c.encryptKey)
        if err != nil {
            return err
        }
    }
    c.conn.SetWriteDeadline(time.Now().Add(writeWait))
    return c.conn.WriteMessage(websocket.TextMessage, msg)
}

func (c *nbClient) Encrypt() lib.Encrypt {
    return nil
}

func (c *nbClient) SetEncrypt(encrypt lib.Encrypt) {

}

func (c *nbClient) Close(err error) error {
    c.once.Do(func() {
        if err != nil {
            c.log.Error(err.Error(), zap.String("sn", c.chargeStation.SN()))
        }
        c.hub.Clients.Delete(c.chargeStation.CoreID())
        c.hub.RegClients.Delete(c.chargeStation.CoreID())
        c.log.Error("关闭连接", zap.String("sn", c.chargeStation.SN()))
        close(c.close)
        c.clientOfflineNotifyFunc(err)
    })

    return nil
}

func (c *nbClient) SubRegMQTT() {
    c.hub.RegClients.Store(c.chargeStation.CoreID(), c)
    //if c.Evse.CoreID() == 0 {
    for {
        select {
        case <-c.close:
            return
        case m := <-c.mqttRegCh:
            func() {
                var apdu charger.APDU
                var err error
                topic := m.Topic
                defer func() {
                    if err != nil {
                        c.log.Error(err.Error(), zap.String("sn", c.chargeStation.SN()))
                    }
                }()
                if err = proto.Unmarshal(m.Payload, &apdu); err != nil {
                    return
                }

                trData := &lib.TRData{
                    APDU: &apdu,
                }
                ctx := context.WithValue(context.TODO(), "client", c)
                ctx = context.WithValue(ctx, "trData", trData)

                var msg interface{}
                //var f lib.FromAPDUFunc
                if msg, err = c.hub.TR.FromAPDU(ctx, &apdu); err != nil {
                    err = fmt.Errorf("FromAPDU register error, err:%s topic:%s", err.Error(), topic)
                    return
                } else if msg == nil {
                    return
                }
                if trData.Ignore {
                    return
                }
                buffer := bytebufferpool.Get()
                defer func() {
                    buffer.Reset()
                    bytebufferpool.Put(buffer)
                }()
                encoder := json.NewEncoder(buffer)
                if err = encoder.Encode(msg); err != nil {
                    return
                } else if err = c.Send(buffer.Bytes()); err != nil {
                    return
                }
            }()
        }
    }
}

func (c *nbClient) SubMQTT() {
    c.hub.Clients.Store(c.chargeStation.CoreID(), c)
    for {
        select {
        case <-c.close:
            return
        case m := <-c.mqttMsgCh:
            if len(m.Payload) == 0 {
                break
            }
            var apdu charger.APDU
            if err := proto.Unmarshal(m.Payload, &apdu); err != nil {
                break
            }
            func() {
                topic := m.Topic
                trData := &lib.TRData{
                    APDU:  &apdu,
                    Topic: topic,
                }
                ctx := context.WithValue(context.TODO(), "client", c)
                ctx = context.WithValue(ctx, "trData", trData)
                var msg interface{}

                var err error
                defer func() {
                    //如果没有错误就转发到设备上,否则写日志,回复到平台的错误日志有FromAPDU实现了
                    if err != nil {
                        c.log.Error(err.Error(), zap.String("sn", c.chargeStation.SN()))
                        if trData.Ignore == false && (int32(apdu.MessageId)>>7 == 0 || apdu.MessageId == charger.MessageID_ID_MessageError) {
                            if apdu.MessageId != charger.MessageID_ID_MessageError {
                                apdu.MessageId = charger.MessageID_ID_MessageError
                                apdu.Payload, _ = proto.Marshal(&charger.MessageError{
                                    Error:       charger.ErrorCode_EC_GenericError,
                                    Description: err.Error(),
                                })
                            }
                            apduEncoded, _ := proto.Marshal(&apdu)
                            pubMqttMsg := mqtt.MqttMessage{
                                Topic:    strings.Replace(trData.Topic, c.hub.Hostname, "coregw", 1),
                                Qos:      2,
                                Retained: false,
                                Payload:  apduEncoded,
                            }
                            c.hub.PubMqttMsg <- pubMqttMsg
                        }
                    }
                }()

                if msg, err = c.hub.TR.FromAPDU(ctx, &apdu); err != nil {
                    return
                } else if msg == nil {
                    return
                }

                if trData.Ignore {
                    return
                }
                // 优化bytes
                buffer := bytebufferpool.Get()
                defer func() {
                    buffer.Reset()
                    bytebufferpool.Put(buffer)
                }()
                encoder := json.NewEncoder(buffer)
                if err = encoder.Encode(msg); err != nil {
                    return
                } else if err = c.Send(buffer.Bytes()); err != nil {
                    return
                }
            }()
        }
    }
}

func (c *nbClient) Reply(ctx context.Context, payload interface{}) {
    resp, err := c.hub.ResponseFn(ctx, payload)
    if err != nil {
        return
    }
    //resp := protocol.NewCallResult(ctx, payload)
    //b, _ := json.Marshal(resp)
    _ = c.Send(resp)
    //if _client := ctx.Value("client"); _client != nil {
    //  client := _client.(*Client)
    //  client.send <- b
    //}
}

func (c *nbClient) ReplyError(ctx context.Context, err error, desc ...string) {
    b := c.hub.ResponseErrFn(ctx, err, desc...)
    if b != nil {
        _ = c.Send(b)
    }
}
func (c *nbClient) PublishReg(m mqtt.MqttMessage) {
    c.mqttRegCh <- m
}

func (c *nbClient) Publish(m mqtt.MqttMessage) {
    c.mqttMsgCh <- m
}

func (c *nbClient) KeepAlive() int64 {
    return c.keepalive
}

func (c *nbClient) Hub() *lib.Hub {
    return c.hub
}

func (c *nbClient) RemoteAddress() string {
    return c.remoteAddress
}

func (c *nbClient) ChargeStation() interfaces.ChargeStation {
    return c.chargeStation
}

func (c *nbClient) SetChargeStation(chargeStation interfaces.ChargeStation) {
    c.chargeStation = chargeStation
}

func (c *nbClient) SetClientOfflineFunc(clientOfflineFunc func(err error)) {
    c.clientOfflineNotifyFunc = clientOfflineFunc
}

func (c *nbClient) ClientOfflineFunc() func(err error) {
    return c.clientOfflineNotifyFunc
}

func (c *nbClient) Lock() {
    c.lock.Lock()
}

func (c *nbClient) Unlock() {
    c.lock.Unlock()
}

func (c *nbClient) Coregw() string {
    return c.coregw
}

func (c *nbClient) SetCoregw(coregw string) {
    c.coregw = coregw
}

func (c *nbClient) SetEncryptKey(encryptKey string) {
    c.encryptKey = []byte(encryptKey)
}

func (c *nbClient) IsClose() bool {
    return c.isClose
}

func (c *nbClient) EncryptKey() []byte {
    return c.encryptKey
}

func (c *nbClient) PingHandler(msg string) error {
    // c.log.Debug("ping message received", zap.String("sn", c.chargeStation.SN()))
    _ = c.conn.SetReadDeadline(time.Now().Add(readWait))
    _ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
    err := c.conn.WriteMessage(websocket.PongMessage, []byte(msg))
    if err != nil {
        c.Close(err)
        return err
    }
    redisConn := redis.GetRedis()
    defer redisConn.Close()
    _, err = redisConn.Do("expire", keys.Equipment(strconv.FormatUint(c.chargeStation.CoreID(), 10)), 190)
    if err != nil {
        c.log.Error(err.Error(), zap.String("sn", c.chargeStation.SN()))
    }
    return nil
}

func (c *nbClient) CertificateSN() string {
    return c.certificateSN
}

func (c *nbClient) SetCertificateSN(sn string) {
    c.certificateSN = sn
}

func (c *nbClient) Conn() net.Conn {
    return c.conn
}

func (c *nbClient) SetKeepalive(keepalive int64) {
    c.keepalive = keepalive
}

func (c *nbClient) SetRemoteAddress(address string) {
    c.remoteAddress = address
}

func (c *nbClient) SetOrderInterval(interval int) {
    c.orderInterval = interval
}

func (c *nbClient) OrderInterval() int {
    return c.orderInterval
}

func (c *nbClient) BaseURL() string {
    return c.baseURL
}

func (c *nbClient) SetBaseURL(baseURL string) {
    c.baseURL = baseURL
}

func (c *nbClient) ReadPump() {

}

func (c *nbClient) WritePump() {

}
lesismal commented 2 years ago
type Server struct {
    Hub         *lib.Hub
    upgrader    *websocket.Upgrader // 不要用同一个upgrader去Upgrade多个conn,每个conn都单独new一个upgrader
    hostname    string
    server      *nbhttp.Server
    pprofServer *http.Server
    ctx         context.Context
    cancel      context.CancelFunc
    group       *errgroup.Group
}

看下我上面的注释,比如这种: https://github.com/lesismal/nbio-examples/blob/master/websocket/server/server.go#L48

然后再试试

Kotodian commented 2 years ago
type Server struct {
  Hub         *lib.Hub
  upgrader    *websocket.Upgrader // 不要用同一个upgrader去Upgrade多个conn,每个conn都单独new一个upgrader
  hostname    string
  server      *nbhttp.Server
  pprofServer *http.Server
  ctx         context.Context
  cancel      context.CancelFunc
  group       *errgroup.Group
}

看下我上面的注释,比如这种: https://github.com/lesismal/nbio-examples/blob/master/websocket/server/server.go#L48

然后再试试

好的 我先试试

Kotodian commented 2 years ago
type Server struct {
  Hub         *lib.Hub
  upgrader    *websocket.Upgrader // 不要用同一个upgrader去Upgrade多个conn,每个conn都单独new一个upgrader
  hostname    string
  server      *nbhttp.Server
  pprofServer *http.Server
  ctx         context.Context
  cancel      context.CancelFunc
  group       *errgroup.Group
}

看下我上面的注释,比如这种: https://github.com/lesismal/nbio-examples/blob/master/websocket/server/server.go#L48

然后再试试

我试了下,就是还是会有那个问题,但是只针对于那个客户端,不少客户端是能建立连接并收发消息,但是用gorilla就是都能连的上来。

Kotodian commented 2 years ago
package ws

import (
    "context"
    "crypto/x509"
    "encoding/json"
    "errors"
    "fmt"
    "io/ioutil"
    "net"
    "net/http"
    "os"
    "strconv"
    "strings"
    "sync"
    "time"

    _ "net/http/pprof"

    "gitee.com/csms/jx-ocpp/log"
    "gitee.com/csms/jx-ocpp/protocol"
    "github.com/Kotodian/gokit/ac/lib"
    "github.com/Kotodian/gokit/api"
    "github.com/Kotodian/gokit/datasource"
    "github.com/Kotodian/gokit/datasource/mqtt"
    "github.com/Kotodian/gokit/datasource/rabbitmq"
    "github.com/Kotodian/gokit/datasource/redis"
    "github.com/Kotodian/protocol/golang/hardware/charger"
    "github.com/Kotodian/protocol/golang/keys"
    "github.com/Kotodian/protocol/interfaces"
    "github.com/golang/protobuf/proto"
    "github.com/gorilla/mux"
    "github.com/lesismal/llib/std/crypto/tls"
    "github.com/lesismal/nbio/nbhttp"
    "github.com/lesismal/nbio/nbhttp/websocket"
    "github.com/valyala/bytebufferpool"
    "go.uber.org/zap"
    "golang.org/x/sync/errgroup"
)

const (
    // Time allowed to write a message to the peer.
    writeWait = 10 * time.Second
    readWait  = 80 * time.Second

    // Maximum message size allowed from peer.
    maxMessageSize = 4096
    readBufferSize = 2048
)

func InitNB() {
    hostname, _ := os.Hostname()
    Hub = lib.NewHub("ocpp", "2.0.1", "core_ocpp", "core_gw")
    Hub.ResponseErrFn = responseErr
    Hub.ResponseFn = response
    Hub.TR = &protocol.Translation{}
    Hub.SetEncrypt(lib.NewAESEncrypt(lib.CBC))

    config := nbhttp.Config{Addrs: []string{":8844"}}
    ctx, cancel := context.WithCancel(context.Background())
    s := &Server{
        Hub:         Hub,
        upgrader:    websocket.NewUpgrader(),
        hostname:    hostname,
        pprofServer: &http.Server{Addr: ":6060", Handler: nil},
        ctx:         ctx,
        cancel:      cancel,
    }
    tlsServerKey := os.Getenv("TLS_SERVER_KEY")
    tlsServerCert := os.Getenv("TLS_SERVER_CER")
    tlsCaCert := os.Getenv("TLS_CA_CER")
    if tlsServerKey != "" && tlsServerCert != "" && tlsCaCert != "" {
        pool := x509.NewCertPool()
        tlsConfig := &tls.Config{}
        cert, err := tls.LoadX509KeyPair(tlsServerCert, tlsServerKey)
        if err != nil {
            log.Logger().Panic("failed to read server cert or server key", zap.Error(err))
        }
        tlsConfig.Certificates = []tls.Certificate{cert}
        caCrt, err := ioutil.ReadFile(tlsCaCert)
        if err != nil {
            log.Logger().Panic(err.Error())
        }
        pool.AppendCertsFromPEM(caCrt)
        tlsConfig.ClientCAs = pool
        config.TLSConfig = tlsConfig
        config.AddrsTLS = []string{":8846"}
    }
    s.server = nbhttp.NewServer(config)
    router := mux.NewRouter().StrictSlash(true)
    router.UseEncodedPath()
    router.HandleFunc("/ocpp/{charge_station:.*}", s.wsHandler)
    s.server.Handler = router

    s.upgrader.KeepaliveTime = readWait
    s.upgrader.CheckOrigin = func(r *http.Request) bool {
        return true
    }

    s.upgrader.SetPingHandler(func(conn *websocket.Conn, s string) {
        c := conn.Session().(*nbClient)
        c.PingHandler(s)
    })

    s.upgrader.OnMessage(func(conn *websocket.Conn, mt websocket.MessageType, msg []byte) {
        c := conn.Session().(*nbClient)
        trData := &lib.TRData{}
        ctx := context.WithValue(context.Background(), "client", c)
        ctx = context.WithValue(ctx, "trData", trData)

        var err error
        defer func() {
            //如果发生了错误,都回复给设备,否则发送到平台
            if err != nil {
                c.ReplyError(ctx, err)
            }
        }()
        var payload proto.Message
        defer func() {
            if r := recover(); r != nil {
                c.log.Sugar().Errorf("%v\n", r)
            }
        }()

        if payload, err = c.hub.TR.ToAPDU(ctx, msg); err != nil {
            return
        }

        if payload == nil {
            return
        }

        if trData.Ignore {
            return
        }

        if trData.APDU.Payload, err = proto.Marshal(payload); err != nil {
            err = fmt.Errorf("encode cmd req payload error, err:%s", err.Error())
            return
        }
        var toCoreMSG []byte
        if toCoreMSG, err = proto.Marshal(trData.APDU); err != nil {
            err = fmt.Errorf("encode cmd req apdu error, err:%s", err.Error())
            return
        }

        var sendTopic string
        var sendQos byte
        if trData.IsTelemetry {
            sendTopic = "coregw/" + c.hub.Hostname + "/telemetry/" + datasource.UUID(c.chargeStation.CoreID()).String()
        } else if !trData.Sync {
            sendTopic = "coregw/" + c.hub.Hostname + "/command/" + datasource.UUID(c.chargeStation.CoreID()).String()
        } else {
            sendTopic = c.Coregw() + "/sync/" + datasource.UUID(c.chargeStation.CoreID()).String()
        }
        sendQos = 2

        c.hub.PubMqttMsg <- mqtt.MqttMessage{
            Topic:    sendTopic,
            Qos:      sendQos,
            Retained: false,
            Payload:  toCoreMSG,
        }
    })

    s.upgrader.OnClose(func(conn *websocket.Conn, err error) {
        c := conn.Session().(*nbClient)
        c.Close(err)
    })

    go Hub.Run()

    ch = make(chan struct{}, 1)

    s.group, _ = errgroup.WithContext(s.ctx)
    s.group.Go(func() error {
        defer func() {
            ch <- struct{}{}
        }()
        if err := s.server.Start(); err != nil {
            log.Logger().Error("failed to start websocket server", zap.Error(err))
            return err
        }
        <-s.ctx.Done()
        ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
        defer cancel()
        return s.server.Shutdown(ctx)
    })

    s.group.Go(func() error {
        log.Logger().Debug("start pprof server", zap.String("port", ":6060"))
        return s.pprofServer.ListenAndServe()
    })

}

type Server struct {
    Hub         *lib.Hub
    upgrader    *websocket.Upgrader
    hostname    string
    server      *nbhttp.Server
    pprofServer *http.Server
    ctx         context.Context
    cancel      context.CancelFunc
    group       *errgroup.Group
}

func (s *Server) Stop() {
    s.cancel()
}

func (s *Server) wsHandler(w http.ResponseWriter, r *http.Request) {
    ctx := r.Context()
    snPassword := mux.Vars(r)["charge_station"]
    // 获取sn号
    if snPassword == "" {
        w.WriteHeader(http.StatusBadRequest)
        _, _ = w.Write(responseErr(ctx, protocol.ErrGenericError, "sn empty"))
        return
    }

    var password string
    spilt := strings.Split(snPassword, ":")
    if len(spilt) > 1 {
        password = spilt[1]
    }

    sn := spilt[0]
    var certSerialNumber string
    if r.TLS != nil {
        if len(r.TLS.PeerCertificates) > 0 {
            log.Logger().Debug("证书中sn号为:", zap.String("sn", r.TLS.PeerCertificates[0].Subject.OrganizationalUnit[0]))
            log.Logger().Debug("实际sn号为:", zap.String("sn", sn))
            certInfo := r.TLS.PeerCertificates[0]
            if certInfo.Subject.OrganizationalUnit[0] != sn {
                w.WriteHeader(http.StatusBadRequest)
                _, _ = w.Write(responseErr(ctx, protocol.ErrGenericError, "参数sn不等于证书中的sn"))
                return
            }
            certSerialNumber = certInfo.SerialNumber.String()
        }
    }
    port := strings.Split(r.Host, ":")[1]
    remoteAddress := readUserIP(r)
    accVerReq := &api.AccessVerifyRequest{
        DeviceSerialNumber:    sn,
        DeviceProtocol:        Hub.Protocol,
        DeviceProtocolVersion: Hub.ProtocolVersion,
        RemoteAddress:         remoteAddress,
        RequestPort:           port,
    }

    if certSerialNumber != "" {
        accVerReq.CertSerialNumber = certSerialNumber
    }
    if password != "" {
        accVerReq.Password = password
    }
    // 确认是否能够访问
    response, err := api.AccessVerify(accVerReq)
    if err != nil {
        w.WriteHeader(http.StatusBadRequest)
        _, _ = w.Write(responseErr(ctx, protocol.ErrGenericError, "access verify failed"+err.Error()))
        log.Logger().Error(err.Error(), zap.String("sn", sn))
        return
    }
    up := websocket.NewUpgrader()
    up.CheckOrigin = func(r *http.Request) bool {
        return true
    }
    up.KeepaliveTime = readWait
    up.SetPingHandler(wrapPingHandler())
    up.OnMessage(wrapMessageHandler())
    up.OnClose(wrapCloseHandler())

    // 升级成websocket协议
    ws, err := up.Upgrade(w, r, nil)
    if err != nil {
        responseErr(ctx, protocol.ErrGenericError, err.Error())
        return
    }
    log.Logger().Debug("创建连接成功", zap.String("sn", sn))
    conn := ws.(*websocket.Conn)
    if response != nil {
        // 解析出平台ID
        coreId, _ := strconv.ParseUint(response.CoreID, 10, 64)
        if _client, ok := Hub.Clients.Load(coreId); ok {
            _client.(lib.ClientInterface).Close(errors.New("旧连接还存在先关闭"))
        }
        // 创建新的客户端
        client := NewClient(interfaces.NewDefaultChargeStation(sn, response.Registered, coreId), Hub, conn, response.KeepAlive, remoteAddress, log.Logger(), true)
        // 设置基本路由
        client.SetBaseURL(response.BaseURL)
        client.SetClientOfflineFunc(func(err error) {
            // 如果已经注册过了需要通知平台
            if client.ChargeStation().Registered() {
                reason := "websocket关闭连接"
                _ = ClientOfflineNotify(client.ChargeStation(), reason)
            }
            Hub.RegClients.Delete(client.ChargeStation().CoreID())
        })
        // 读取加密解密数据的密钥
        encryptKey := readEncryptKey(r)
        if len(encryptKey) != 0 {
            client.SetEncryptKey(encryptKey)
        }
        // 如果注册过了订阅命令消息, 没有就订阅注册消息
        if client.ChargeStation().Registered() {
            go client.SubMQTT()
        } else {
            go client.SubRegMQTT()
        }
        conn.SetSession(client)
    }
}

type nbClient struct {
    once                    sync.Once
    chargeStation           interfaces.ChargeStation
    hub                     *lib.Hub
    conn                    *websocket.Conn
    close                   chan struct{}
    lock                    sync.RWMutex
    clientOfflineNotifyFunc func(err error)       // 网络断开同步到core的函数
    mqttRegCh               chan mqtt.MqttMessage //注册信息
    mqttMsgCh               chan mqtt.MqttMessage //返回或下发的信息
    remoteAddress           string
    log                     *rabbitmq.Logger
    keepalive               int64
    coregw                  string
    isClose                 bool
    encryptKey              []byte
    id                      string
    certificateSN           string
    orderInterval           int
    baseURL                 string // 上传日志、下载固件基本地址
    debug                   bool
}

func NewClient(chargeStation interfaces.ChargeStation, hub *lib.Hub, conn *websocket.Conn, keepalive int, remoteAddress string, log *rabbitmq.Logger, debug ...bool) lib.ClientInterface {
    var b bool
    if len(debug) > 0 {
        b = debug[0]
    }
    return &nbClient{
        log:           log,
        chargeStation: chargeStation,
        hub:           hub,
        conn:          conn,
        remoteAddress: remoteAddress,
        mqttMsgCh:     make(chan mqtt.MqttMessage, 5),
        mqttRegCh:     make(chan mqtt.MqttMessage, 5),
        close:         make(chan struct{}),
        keepalive:     int64(keepalive),
        orderInterval: 30,
        debug:         b,
    }
}

func (c *nbClient) MessageNumber() int16 {
    return 0
}

func (c *nbClient) SetMessageNumber(i int16) {
    return
}

func (c *nbClient) SetData(key, val interface{}) {
    return
}

func (c *nbClient) GetData(key interface{}) interface{} {
    return nil
}

func (c *nbClient) Send(msg []byte) (err error) {
    defer func() {
        if e := recover(); e != nil {
            err = e.(error)
        }
    }()
    if c.hub.Encrypt != nil && len(c.encryptKey) > 0 {
        msg, err = c.hub.Encrypt.Encode(msg, c.encryptKey)
        if err != nil {
            return err
        }
    }
    c.conn.SetWriteDeadline(time.Now().Add(writeWait))
    return c.conn.WriteMessage(websocket.TextMessage, msg)
}

func (c *nbClient) Encrypt() lib.Encrypt {
    return nil
}

func (c *nbClient) SetEncrypt(encrypt lib.Encrypt) {

}

func (c *nbClient) Close(err error) error {
    c.once.Do(func() {
        if err != nil {
            c.log.Error(err.Error(), zap.String("sn", c.chargeStation.SN()))
        }
        c.hub.Clients.Delete(c.chargeStation.CoreID())
        c.hub.RegClients.Delete(c.chargeStation.CoreID())
        c.log.Error("关闭连接", zap.String("sn", c.chargeStation.SN()))
        close(c.close)
        c.clientOfflineNotifyFunc(err)
    })

    return nil
}

func (c *nbClient) SubRegMQTT() {
    c.hub.RegClients.Store(c.chargeStation.CoreID(), c)
    //if c.Evse.CoreID() == 0 {
    for {
        select {
        case <-c.close:
            return
        case m := <-c.mqttRegCh:
            func() {
                var apdu charger.APDU
                var err error
                topic := m.Topic
                defer func() {
                    if err != nil {
                        c.log.Error(err.Error(), zap.String("sn", c.chargeStation.SN()))
                    }
                }()
                if err = proto.Unmarshal(m.Payload, &apdu); err != nil {
                    return
                }

                trData := &lib.TRData{
                    APDU: &apdu,
                }
                ctx := context.WithValue(context.TODO(), "client", c)
                ctx = context.WithValue(ctx, "trData", trData)

                var msg interface{}
                //var f lib.FromAPDUFunc
                if msg, err = c.hub.TR.FromAPDU(ctx, &apdu); err != nil {
                    err = fmt.Errorf("FromAPDU register error, err:%s topic:%s", err.Error(), topic)
                    return
                } else if msg == nil {
                    return
                }
                if trData.Ignore {
                    return
                }
                buffer := bytebufferpool.Get()
                defer func() {
                    buffer.Reset()
                    bytebufferpool.Put(buffer)
                }()
                encoder := json.NewEncoder(buffer)
                if err = encoder.Encode(msg); err != nil {
                    return
                } else if err = c.Send(buffer.Bytes()); err != nil {
                    return
                }
            }()
        }
    }
}

func (c *nbClient) SubMQTT() {
    c.hub.Clients.Store(c.chargeStation.CoreID(), c)
    for {
        select {
        case <-c.close:
            return
        case m := <-c.mqttMsgCh:
            if len(m.Payload) == 0 {
                break
            }
            var apdu charger.APDU
            if err := proto.Unmarshal(m.Payload, &apdu); err != nil {
                break
            }
            func() {
                topic := m.Topic
                trData := &lib.TRData{
                    APDU:  &apdu,
                    Topic: topic,
                }
                ctx := context.WithValue(context.TODO(), "client", c)
                ctx = context.WithValue(ctx, "trData", trData)
                var msg interface{}

                var err error
                defer func() {
                    //如果没有错误就转发到设备上,否则写日志,回复到平台的错误日志有FromAPDU实现了
                    if err != nil {
                        c.log.Error(err.Error(), zap.String("sn", c.chargeStation.SN()))
                        if trData.Ignore == false && (int32(apdu.MessageId)>>7 == 0 || apdu.MessageId == charger.MessageID_ID_MessageError) {
                            if apdu.MessageId != charger.MessageID_ID_MessageError {
                                apdu.MessageId = charger.MessageID_ID_MessageError
                                apdu.Payload, _ = proto.Marshal(&charger.MessageError{
                                    Error:       charger.ErrorCode_EC_GenericError,
                                    Description: err.Error(),
                                })
                            }
                            apduEncoded, _ := proto.Marshal(&apdu)
                            pubMqttMsg := mqtt.MqttMessage{
                                Topic:    strings.Replace(trData.Topic, c.hub.Hostname, "coregw", 1),
                                Qos:      2,
                                Retained: false,
                                Payload:  apduEncoded,
                            }
                            c.hub.PubMqttMsg <- pubMqttMsg
                        }
                    }
                }()

                if msg, err = c.hub.TR.FromAPDU(ctx, &apdu); err != nil {
                    return
                } else if msg == nil {
                    return
                }

                if trData.Ignore {
                    return
                }
                // 优化bytes
                buffer := bytebufferpool.Get()
                defer func() {
                    buffer.Reset()
                    bytebufferpool.Put(buffer)
                }()
                encoder := json.NewEncoder(buffer)
                if err = encoder.Encode(msg); err != nil {
                    return
                } else if err = c.Send(buffer.Bytes()); err != nil {
                    return
                }
            }()
        }
    }
}

func (c *nbClient) Reply(ctx context.Context, payload interface{}) {
    resp, err := c.hub.ResponseFn(ctx, payload)
    if err != nil {
        return
    }
    //resp := protocol.NewCallResult(ctx, payload)
    //b, _ := json.Marshal(resp)
    _ = c.Send(resp)
    //if _client := ctx.Value("client"); _client != nil {
    //  client := _client.(*Client)
    //  client.send <- b
    //}
}

func (c *nbClient) ReplyError(ctx context.Context, err error, desc ...string) {
    b := c.hub.ResponseErrFn(ctx, err, desc...)
    if b != nil {
        _ = c.Send(b)
    }
}
func (c *nbClient) PublishReg(m mqtt.MqttMessage) {
    c.mqttRegCh <- m
}

func (c *nbClient) Publish(m mqtt.MqttMessage) {
    c.mqttMsgCh <- m
}

func (c *nbClient) KeepAlive() int64 {
    return c.keepalive
}

func (c *nbClient) Hub() *lib.Hub {
    return c.hub
}

func (c *nbClient) RemoteAddress() string {
    return c.remoteAddress
}

func (c *nbClient) ChargeStation() interfaces.ChargeStation {
    return c.chargeStation
}

func (c *nbClient) SetChargeStation(chargeStation interfaces.ChargeStation) {
    c.chargeStation = chargeStation
}

func (c *nbClient) SetClientOfflineFunc(clientOfflineFunc func(err error)) {
    c.clientOfflineNotifyFunc = clientOfflineFunc
}

func (c *nbClient) ClientOfflineFunc() func(err error) {
    return c.clientOfflineNotifyFunc
}

func (c *nbClient) Lock() {
    c.lock.Lock()
}

func (c *nbClient) Unlock() {
    c.lock.Unlock()
}

func (c *nbClient) Coregw() string {
    return c.coregw
}

func (c *nbClient) SetCoregw(coregw string) {
    c.coregw = coregw
}

func (c *nbClient) SetEncryptKey(encryptKey string) {
    c.encryptKey = []byte(encryptKey)
}

func (c *nbClient) IsClose() bool {
    return c.isClose
}

func (c *nbClient) EncryptKey() []byte {
    return c.encryptKey
}

func (c *nbClient) PingHandler(msg string) error {
    c.log.Debug("ping message received", zap.String("sn", c.chargeStation.SN()))
    _ = c.conn.SetReadDeadline(time.Now().Add(readWait))
    _ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
    err := c.conn.WriteMessage(websocket.PongMessage, []byte(msg))
    if err != nil {
        c.Close(err)
        return err
    }
    redisConn := redis.GetRedis()
    defer redisConn.Close()
    _, err = redisConn.Do("expire", keys.Equipment(strconv.FormatUint(c.chargeStation.CoreID(), 10)), 190)
    if err != nil {
        c.log.Error(err.Error(), zap.String("sn", c.chargeStation.SN()))
    }
    return nil
}

func (c *nbClient) CertificateSN() string {
    return c.certificateSN
}

func (c *nbClient) SetCertificateSN(sn string) {
    c.certificateSN = sn
}

func (c *nbClient) Conn() net.Conn {
    return c.conn
}

func (c *nbClient) SetKeepalive(keepalive int64) {
    c.keepalive = keepalive
}

func (c *nbClient) SetRemoteAddress(address string) {
    c.remoteAddress = address
}

func (c *nbClient) SetOrderInterval(interval int) {
    c.orderInterval = interval
}

func (c *nbClient) OrderInterval() int {
    return c.orderInterval
}

func (c *nbClient) BaseURL() string {
    return c.baseURL
}

func (c *nbClient) SetBaseURL(baseURL string) {
    c.baseURL = baseURL
}

func (c *nbClient) ReadPump() {

}

func (c *nbClient) WritePump() {

}

func wrapPingHandler() func(conn *websocket.Conn, s string) {
    return func(conn *websocket.Conn, s string) {
        c := conn.Session().(*nbClient)
        c.PingHandler(s)
    }
}

func wrapMessageHandler() func(conn *websocket.Conn, mt websocket.MessageType, msg []byte) {
    return func(conn *websocket.Conn, mt websocket.MessageType, msg []byte) {
        c := conn.Session().(*nbClient)
        trData := &lib.TRData{}
        ctx := context.WithValue(context.Background(), "client", c)
        ctx = context.WithValue(ctx, "trData", trData)

        var err error
        defer func() {
            //如果发生了错误,都回复给设备,否则发送到平台
            if err != nil {
                c.ReplyError(ctx, err)
            }
        }()
        var payload proto.Message
        defer func() {
            if r := recover(); r != nil {
                c.log.Sugar().Errorf("%v\n", r)
            }
        }()

        if payload, err = c.hub.TR.ToAPDU(ctx, msg); err != nil {
            return
        }

        if payload == nil {
            return
        }

        if trData.Ignore {
            return
        }

        if trData.APDU.Payload, err = proto.Marshal(payload); err != nil {
            err = fmt.Errorf("encode cmd req payload error, err:%s", err.Error())
            return
        }
        var toCoreMSG []byte
        if toCoreMSG, err = proto.Marshal(trData.APDU); err != nil {
            err = fmt.Errorf("encode cmd req apdu error, err:%s", err.Error())
            return
        }

        var sendTopic string
        var sendQos byte
        if trData.IsTelemetry {
            sendTopic = "coregw/" + c.hub.Hostname + "/telemetry/" + datasource.UUID(c.chargeStation.CoreID()).String()
        } else if !trData.Sync {
            sendTopic = "coregw/" + c.hub.Hostname + "/command/" + datasource.UUID(c.chargeStation.CoreID()).String()
        } else {
            sendTopic = c.Coregw() + "/sync/" + datasource.UUID(c.chargeStation.CoreID()).String()
        }
        sendQos = 2

        c.hub.PubMqttMsg <- mqtt.MqttMessage{
            Topic:    sendTopic,
            Qos:      sendQos,
            Retained: false,
            Payload:  toCoreMSG,
        }
    }
}

func wrapCloseHandler() func(conn *websocket.Conn, err error) {
    return func(conn *websocket.Conn, err error) {
        c := conn.Session().(*nbClient)
        c.Close(err)
    }
}
lesismal commented 2 years ago
func wrapCloseHandler() func(conn *websocket.Conn, err error) {
    return func(conn *websocket.Conn, err error) {
        log.Printf("onClose isNil: %v, error: %v", conn == nil, err)     
        c, ok := conn.Session().(*nbClient)
        log.Printf("conn.Session: %v, %v", c, ok)
        c.Close(err)
    }
}

nbio这里给你传递了err,把这个打印出来,session也断言判断下

Kotodian commented 2 years ago
image
func wrapCloseHandler() func(conn *websocket.Conn, err error) {
  return func(conn *websocket.Conn, err error) {
      log.Printf("onClose isNil: %v, error: %v", conn == nil, err)     
      c, ok := conn.Session().(*nbClient)
      log.Printf("conn.Session: %v, %v", c, ok)
      c.Close(err)
  }
}

nbio这里给你传递了err,把这个打印出来,session也断言判断下

lesismal commented 2 years ago

上一楼的日志里没有nil报错,也没法判断是你的应用层主动断开还是框架层导致的断开,提供你的异常断开的连接的日志、不要提供正常连接正常断开的日志。

可以在nbio源码里加些日志把close的调用栈打印出来: https://github.com/lesismal/nbio/blob/master/conn_unix.go#L445

Kotodian commented 2 years ago

上一楼的日志里没有nil报错,也没法判断是你的应用层主动断开还是框架层导致的断开,提供你的异常断开的连接的日志、不要提供正常连接正常断开的日志。

可以在nbio源码里加些日志把close的调用栈打印出来: https://github.com/lesismal/nbio/blob/master/conn_unix.go#L445

image

我平常常遇到的错误是这个 那个能请教下怎么样在close里打印调用栈呀。

image

这样写吗

Kotodian commented 2 years ago

上一楼的日志里没有nil报错,也没法判断是你的应用层主动断开还是框架层导致的断开,提供你的异常断开的连接的日志、不要提供正常连接正常断开的日志。

可以在nbio源码里加些日志把close的调用栈打印出来: https://github.com/lesismal/nbio/blob/master/conn_unix.go#L445

应用层只有在这里会调用Close了,业务逻辑上是不会调用Close了

Kotodian commented 2 years ago

上一楼的日志里没有nil报错,也没法判断是你的应用层主动断开还是框架层导致的断开,提供你的异常断开的连接的日志、不要提供正常连接正常断开的日志。

可以在nbio源码里加些日志把close的调用栈打印出来: https://github.com/lesismal/nbio/blob/master/conn_unix.go#L445

我只是把你的Printf改成Error了,因为只有这样才能打印出栈, 刚刚我截图的就是异常断开。。

lesismal commented 2 years ago

你看一下啊兄弟: 可以在nbio源码里加些日志把close的调用栈打印出来: https://github.com/lesismal/nbio/blob/master/conn_unix.go#L445

Kotodian commented 2 years ago

bca2fb4767e6fd86820c907cea6ba57 shi是这样吗

lesismal commented 2 years ago

bca2fb4767e6fd86820c907cea6ba57 shi是这样吗

建议贴代码或者日志的时候,不要贴图,方便大家copy定位问题。 根据这个图里的 upgrader.go:493 行: https://github.com/lesismal/nbio/blob/master/nbhttp/websocket/upgrader.go#L493 说明是你的TextMessage不是utf8,这不符合标准,这种二进制数据最好是改用 BinaryMessage 类型,或者你想忽略这个错误,可以设置下 CheckUtf8 不检查utf8:

s.server.CheckUtf8 = func(data []byte) bool { return true }

两种都行,再试下看看。

我考虑下这块utf8校验失败时是否带上invalid utf8的err进行close

Kotodian commented 2 years ago

bca2fb4767e6fd86820c907cea6ba57 shi是这样吗

建议贴代码或者日志的时候,不要贴图,方便大家copy定位问题。 根据这个图里的 upgrader.go:493 行: https://github.com/lesismal/nbio/blob/master/nbhttp/websocket/upgrader.go#L493 说明是你的TextMessage不是utf8,这不符合标准,这种二进制数据最好是改用 BinaryMessage 类型,或者你想忽略这个错误,可以设置下 CheckUtf8 不检查utf8:

s.server.CheckUtf8 = func(data []byte) bool { return true }

两种都行,再试下看看。

我考虑下这块utf8校验失败时是否带上invalid utf8的err进行close

好的 以后都会复制进来,就是我在想websocket关闭是不是要写一个关闭的消息,再做关闭

lesismal commented 2 years ago

这里在加这个响应Close Message、不关闭连接: https://github.com/lesismal/nbio/commit/6030c715aaa30095b97f6ee5f3e6f6ffca487ff6

晚些和udp一块发布

Kotodian commented 2 years ago

这里在加这个响应Close Message、不关闭连接: 6030c71

晚些和udp一块发布

感谢大佬

lesismal commented 2 years ago

感谢反馈!

lesismal commented 2 years ago

新增udp支持发布了1.3.0,invalid utf8 message一块发布了: https://github.com/lesismal/nbio/releases/tag/v1.3.0

Kotodian commented 2 years ago

我试过了把CheckUtf8设置成直接返回true,但是我发现那个连接只能处理ping消息,其他的消息都没进到我的OnMessage里,gorilla的包是都能进来都能处理,之前只是看到了连接进来了就没在意

lesismal commented 2 years ago

尽量在源码里加日志自己先看下吧: https://github.com/lesismal/nbio/blob/ad13cec506a856454f20058b4c5ac055bb4219ff/nbhttp/websocket/upgrader.go#L514

因为你不能提供完整的可以复现的代码,我这边也没法调试看你具体问题。类似的问题,都可以在源码里添加日志或者debug来尝试解决。

lesismal commented 2 years ago

我这边试了下普通的example,发送BinaryMessage时如果没设置CheckUtf8是不能正常通信的,设置了则是可以正常通信的

Kotodian commented 2 years ago

我用的是TextMessage, 我发现那个连接收到的在nbio里打印出来是乱码,但是用gorilla里能解析出来,所以很在意两者之间的处理上的差异,我先看下gorilla源码吧 2022/09/13 20:06:16.628 [DBG] data:[2,"62349164","Heartbeat",{}] 2022-09-13 20:06:16.826 DEBUG 创建连接成功 {"sn": "T172100020G"} 2022/09/13 20:06:18.132 [DBG]data:[2,"31685345","Heartbeat",{}] 2022-09-13 20:06:20.534 DEBUG 创建连接成功 {"sn": "T172100017G"} 2022/09/13 20:06:20.935 [DBG] data:[2,"31685645","Heartbeat",{}] 2022-09-13 20:06:23.222 DEBUG 创建连接成功 {"sn": "JX000000001"} 2022/09/13 20:06:23.477 [DBG] data:X����wI��c���T��y� U�$��$���mjMnz���R�8]�h

lesismal commented 2 years ago

你的OnMessage到底收到没收到数据,nbio里打印出来是乱码是在哪里打印的?

Kotodian commented 2 years ago

你的OnMessage到底收到没收到数据,nbio里打印出来是乱码是在哪里打印的?

没事了 没事了 有人给我代码里下了毒 原来数据是被加密过的

lesismal commented 2 years ago

你的OnMessage到底收到没收到数据,nbio里打印出来是乱码是在哪里打印的?

没事了 没事了 有人给我代码里下了毒 原来数据是被加密过的

那我就放心了,以为是我又遇到内存池的bug了呢,那是最烧脑的问题、想想都怕。。

Kotodian commented 2 years ago

你的OnMessage到底收到没收到数据,nbio里打印出来是乱码是在哪里打印的?

没事了 没事了 有人给我代码里下了毒 原来数据是被加密过的

那我就放心了,以为是我又遇到内存池的bug了呢,那是最烧脑的问题、想想都怕。。

哈哈 别害怕, 你是说mempool那里会有bug吗(想边问边学点源码)?

lesismal commented 2 years ago

ol那里会有bug吗(想边问

确切说不是mempool自己的bug,是使用mempool的地方怕有重复Free导致同一段buffer重新被拿出来在两个地方共同使用,然后导致脏内存。主要是TLS/HTTP/Websocket相关编解码、写的逻辑。 做了大量的随机数据的压力测试对比数据是否正确,目前应该是没bug的。 去年这玩意消耗了我很多脑细胞,导致今年撸代码效率极大降低需要缓很久。。。

Kotodian commented 2 years ago

ol那里会有bug吗(想边问

确切说不是mempool自己的bug,是使用mempool的地方怕有重复Free导致同一段buffer重新被拿出来在两个地方共同使用,然后导致脏内存。主要是TLS/HTTP/Websocket相关编解码、写的逻辑。 做了大量的随机数据的压力测试对比数据是否正确,目前应该是没bug的。 去年这玩意消耗了我很多脑细胞,导致今年撸代码效率极大降低需要缓很久。。。

确实呀 了解了 大佬辛苦啊