Closed todayforever closed 2 months ago
请提供一下可以复现这个问题的代码。
🤖 Non-English text detected, translating...
Please provide code that can reproduce this problem.
解码函数--处理粘包
func (codec *PkgCodec) Decode(c gnet.Conn) ([][]byte, error) {
//每次直接读取所有的数据(由于有粘包的情况,只要是完成包都需要处理),并通过\x00
n := c.InboundBuffered()
buf, err := c.Peek(n)
if err != nil {
return nil, err
}
discardNum := 0
//如果是一个完整的包,分割后长度至少是2
pkgList := bytes.Split(buf, []byte(MsgSplit))
pkgLen := len(pkgList)
if pkgLen <= 1 {
return nil, errormsg.ErrIncompletePacket
}
for i := 0; i < pkgLen; i++ {
pkg := pkgList[i]
if i == len(pkgList)-1 {
if bytes.Equal(pkg, []byte("")) {
//包全部丢弃
discardNum = n
}
break
}
discardNum += len(pkg) + 1
}
// Discard the packet including the newline---丢弃已经读取的数据
_, _ = c.Discard(discardNum)
//fmt.Println("已经读取的数量:", discardNum)
return pkgList[:pkgLen-1], nil
}
处理消息函数
func (s *GNetDbNodeServer) OnTraffic(c gnet.Conn) (action gnet.Action) {
defer func() {
if r := recover(); r != nil {
logger.Error("远程句柄端口---节点端收到消息recover:", r)
}
}()
dataList, err := unpack.NewPkgCodec().Decode(c)
if err != nil {
if !errors.Is(err, errormsg.ErrIncompletePacket) {
logger.Error(c.RemoteAddr().String(), " 节点端拆包失败")
_, _ = c.Write(unpack.NewPkgCodec().Encode([]byte(err.Error())))
}
return
}
for _, data := range dataList {
if bytes.Compare(data, ping) == 0 {
continue
}
addr := c.RemoteAddr().String()
logger.Info(addr, " 节点端收到消息:", string(data))
router.NewNodeRouter(data, c).Send()
}
return
}
// Send 路由转发函数
type NodeRouter struct {
Data []byte
Conn gnet.Conn
}
func (NodeRouter *NodeRouter) Send() {
data := strings.Split(string(NodeRouter.Data), " ")
dataLen := len(data)
param := data[0]
var err error
defer func() {
if err != nil {
//请求参数错误
_, _ = NodeRouter.Conn.Write(unpack.NewPkgCodec().Encode([]byte(err.Error())))
return
}
}()
db := &service.DbNodeVerifyServer{Data: data, Conn: NodeRouter.Conn}
if param == login {
if dataLen < 3 {
err = errormsg.Params
return
}
_ = db.Login()
} else if param == userIn || param == userOut || param == kickOut {
if dataLen < 3 {
err = errormsg.Params
return
}
if param == kickOut {
data[1] = data[1] + "," + data[2]
}
if param == userOut || param == kickOut {
_ = db.DeleteUser()
}
dbUser := &service.DbUserVerifyServer{Data: data}
_ = dbUser.UserSend()
} else if param == queryUserRequest {
if dataLen < 5 {
err = errormsg.Params
return
}
_ = db.QueryUser()
} else if param == vpnStatResult {
if dataLen < 2 {
err = errormsg.Params
return
}
db.VpnStatResult()
} else if param == vpnStatResult4Query {
if dataLen < 2 {
err = errormsg.Params
return
}
db.VpnStatResult4Query()
}
}
其中一个路由逻辑处理函数
type DbNodeVerifyServer struct {
Data []string
Conn gnet.Conn
}
func (db *DbNodeVerifyServer) QueryUser() error {
key := db.Data[1] + ":" + db.Data[2]
return GoPool().Submit(func() {
val := goRedis.GetRedisClient().Get(context.Background(), key).Val()
queryResult := "0"
if val != "" {
queryResult = "1"
}
msg := unpack.NewPkgCodec().Encode([]byte(queryUserResp + " " + db.Data[1] + " " + db.Data[2] + " " + queryResult + " " + db.Data[4]))
_, err := db.Conn.Write(msg)
if err != nil {
logger.Error(db.Data, "子连接校验发送消息失败:", err.Error())
return
}
if queryResult == "0" {
//反查结果不存在,直接返回
return
}
//todo 更新val的值 user:pwd:node1:node2
nodeVal := goRedis.GetRedisClient().Get(context.Background(), val).Val()
if nodeVal != "" {
nodeVal += ":" + db.Data[3]
err = goRedis.GetRedisClient().SetEx(context.Background(), val, nodeVal, goRedis.UserPwdEx*time.Second).Err()
if err != nil {
logger.Error(db.Data, "子连接校验成功后,保存子节点信息到redis里面失败:", err.Error())
}
}
})
}
GoPool().Submit 这里是从协程池里面获取一个空闲协程处理数据,虽然我开了一个协程但是我没有使用 AsyncWrite,我不需要接收回调消息
🤖 Non-English text detected, translating...
Decoding function--processing sticky packets func (codec *PkgCodec) Decode(c gnet.Conn) ([][]byte, error) { //Read all data directly each time (due to sticky packets, as long as the package is completed, it needs to be processed), and pass\x00 n := c.InboundBuffered() buf, err := c.Peek(n) if err != nil { return nil, err }
discardNum := 0 //If it is a complete package, the length after splitting is at least 2 pkgList := bytes.Split(buf, []byte(MsgSplit)) pkgLen := len(pkgList) if pkgLen <= 1 { return nil, errormsg.ErrIncompletePacket }
for i := 0; i < pkgLen; i++ { pkg := pkgList[i] if i == len(pkgList)-1 { if bytes.Equal(pkg, []byte("")) { //Discard all packets discardNum = n } break } discardNum += len(pkg) + 1 } // Discard the packet including the newline---Discard the data that has been read , = c.Discard(discardNum) //fmt.Println("Quantity read:", discardNum) return pkgList[:pkgLen-1], nil }
message processing function func (s *GNetDbNodeServer) OnTraffic(c gnet.Conn) (action gnet.Action) { defer func() { if r := recover(); r != nil { logger.Error("Remote handle port---the node received the message recover:", r) } }() dataList, err := unpack.NewPkgCodec().Decode(c) if err != nil { if !errors.Is(err, errormsg.ErrIncompletePacket) { logger.Error(c.RemoteAddr().String(), "Unpacking failed on the node side") , = c.Write(unpack.NewPkgCodec().Encode([]byte(err.Error()))) } return } for _, data := range dataList { if bytes.Compare(data, ping) == 0 { continue } addr := c.RemoteAddr().String() logger.Info(addr, "Node received message:", string(data)) router.NewNodeRouter(data, c).Send() } return }
// Send routing forwarding function type NodeRouter struct { Data[]byte Conn gnet.Conn } func (NodeRouter *NodeRouter) Send() { data := strings.Split(string(NodeRouter.Data), " ") dataLen := len(data) param := data[0] var err error defer func() { if err != nil { //Request parameter error , = NodeRouter.Conn.Write(unpack.NewPkgCodec().Encode([]byte(err.Error()))) return } }() db := &service.DbNodeVerifyServer{Data: data, Conn: NodeRouter.Conn} if param == login { if dataLen < 3 { err = errormsg.Params return } = db.Login() } else if param == userIn || param == userOut || param == kickOut { if dataLen < 3 { err = errormsg.Params return } if param == kickOut { data[1] = data[1] + "," + data[2] } if param == userOut || param == kickOut { = db.DeleteUser() } dbUser := &service.DbUserVerifyServer{Data: data} = dbUser.UserSend() } else if param == queryUserRequest { if dataLen < 5 { err = errormsg.Params return } = db.QueryUser() } else if param == vpnStatResult { if dataLen < 2 { err = errormsg.Params return } db.VpnStatResult() } else if param == vpnStatResult4Query { if dataLen < 2 { err = errormsg.Params return } db.VpnStatResult4Query() } }
One of the routing logic processing functions type DbNodeVerifyServer struct { Data[]string Conn gnet.Conn } func (db *DbNodeVerifyServer) QueryUser() error { key := db.Data[1] + ":" + db.Data[2]
return GoPool().Submit(func() { val := goRedis.GetRedisClient().Get(context.Background(), key).Val() queryResult := "0" if val != "" { queryResult = "1" } msg := unpack.NewPkgCodec().Encode([]byte(queryUserResp + " " + db.Data[1] + " " + db.Data[2] + " " + queryResult + " " + db.Data[4 ])) _, err := db.Conn.Write(msg) if err != nil { logger.Error(db.Data, "Sub-connection verification failed to send message:", err.Error()) return } if queryResult == "0" { //The reverse check result does not exist, return directly return } //todo updates the value of val user:pwd:node1:node2 nodeVal := goRedis.GetRedisClient().Get(context.Background(), val).Val() if nodeVal != "" { nodeVal += ":" + db.Data[3] err = goRedis.GetRedisClient().SetEx(context.Background(), val, nodeVal, goRedis.UserPwdEx*time.Second).Err() if err != nil { logger.Error(db.Data, "After the sub-connection verification was successful, saving the sub-node information to redis failed:", err.Error()) } } }) }
GoPool().Submit here is to get an idle coroutine to process data from the coroutine pool. Although I opened a coroutine, I did not use AsyncWrite and I did not need to receive callback messages.
你不能在 event loop 之外的 goroutine 调用 Write()
这一类非并发安全的方法,要用 AsyncWrite()
替换,这个方法主要是的作用的就是提供并发安全的写操作,回调你不需要直接设置 nil 就行了。文档里已经将所有的 Conn 方法都进行了标注,哪些是并发安全的,哪些不是,使用时要注意看:https://pkg.go.dev/github.com/panjf2000/gnet/v2#Writer
🤖 Non-English text detected, translating...
You cannot call non-concurrency safe methods such as
Write()
from goroutines outside the event loop. You must useAsyncWrite()
instead. The main function of this method is to provide concurrent and safe write operations and call you back. There is no need to set nil directly. All Conn methods have been marked in the document. Which ones are concurrency safe and which ones are not? Please pay attention when using them: https://pkg.go.dev/github.com/panjf2000/gnet/v2#Writer
Actions I've taken before I'm here
What happened?
压测的时候,8H16G的机器,并发量10000+的时候,
Major version of gnet
v2
Specific version of gnet
v2.5.7
Operating system
Linux
OS version
Linux 5.4.0-189-generic x86_64
Go version
1.23.0
Relevant log output
Code snippets (optional)
No response
How to Reproduce
瞬间的建立连接,跑一会,瞬间断开所有连接
Does this issue reproduce with the latest release?
It can reproduce with the latest release