panjf2000 / gnet

🚀 gnet is a high-performance, lightweight, non-blocking, event-driven networking framework written in pure Go.
https://gnet.host
Apache License 2.0
9.69k stars 1.04k forks source link

[Bug]: conn.Next not safe #648

Closed zhongweikang closed 2 weeks ago

zhongweikang commented 2 weeks ago

Actions I've taken before I'm here

What happened?

reopened: https://github.com/panjf2000/gnet/issues/647

@panjf2000 看到上一个被关了,不好意思,重提一个

我提前看过了文档,并且也熟读gnet源码,但是依然感觉有问题

文档里的表述是: the []byte buf returned by Next() is not allowed to be passed to a new goroutine

https://github.com/panjf2000/gnet/issues/647 提到的问题里,并没有将Next()返回的字节数组传递给新的goroutine,是Next本身内部实现造成的线程不安全

Next()方法返回的一刹那,就已经线程不安全了

Major version of gnet

v2

Specific version of gnet

v2

Operating system

Linux, macOS

OS version

Linux 3.10.0

Go version

go1.17.12

Relevant log output

Code snippets (optional)

No response

How to Reproduce

多个事件循环并发执行时概率复现

Does this issue reproduce with the latest release?

It can reproduce with the latest release

zhongweikang commented 2 weeks ago
    # 问题代码
    defer c.inboundBuffer.Discard(n) //nolint:errcheck
    if len(head) >= n {
        return head[:n], err
    }
    # 可能的修复代码
    defer c.inboundBuffer.Discard(n) //nolint:errcheck
    if len(head) >= n {
        c.loop.cache.Reset()
        c.loop.cache.Write(head[:n])
        return c.loop.cache.Bytes(), err
    }
gh-translator commented 2 weeks ago

🤖 Non-English text detected, translating ...


#Problem code
defer c.inboundBuffer.Discard(n) //nolint:errcheck
if len(head) >= n {
return head[:n], err
}
#Possible fix code
defer c.inboundBuffer.Discard(n) //nolint:errcheck
if len(head) >= n {
c.loop.cache.Reset()
c.loop.cache.Write(head[:n])
return c.loop.cache.Bytes(), err
}
zhongweikang commented 2 weeks ago
    defer c.inboundBuffer.Discard(n) // 这行代码,会将head回收到 sync.Pool 中
    if len(head) >= n {
        return head[:n], err // 这行代码直接将 head 返回了
    }

sync.Pool中被回收的字节数组,可能被其他协程复用

gh-translator commented 2 weeks ago

🤖 Non-English text detected, translating ...


defer c.inboundBuffer.Discard(n) // This line of code will recycle head into sync.Pool
if len(head) >= n {
return head[:n], err // This line of code directly returns head
}

The recycled byte array in sync.Pool may be reused by other coroutines

panjf2000 commented 2 weeks ago

我重新看了一下,确实有这种可能,不过只有当 head 是全部数据而且 len(head) == n 时才会发生,因为 inboundBuffer.Discard 只会在所有数据都取完之后才将内部 buffer 放入 sync.Pool。这应该是唯一的可能,你在实际运行过程中有遇到过这个问题吗?

gh-translator commented 2 weeks ago

🤖 Non-English text detected, translating ...


I re-read it, and it is indeed possible, but it will only happen when head is all the data and len(head) == n, because inboundBuffer.Discard will only be fetched after all the data. Then the internal buffer is put into sync.Pool. This should be the only possibility. Have you ever encountered this problem in actual operation?

panjf2000 commented 2 weeks ago

这个地方应该是把 inboundBufferring.Buffer 切换成 elastic.RingBuffer 之后才会有的问题,使用前者应该是没问题的。

zhongweikang commented 2 weeks ago

我重新看了一下,确实有这种可能,不过只有当 head 是全部数据而且 len(head) == n 时才会发生,因为 inboundBuffer.Discard 只会在所有数据都取完之后才将内部 buffer 放入 sync.Pool。这应该是唯一的可能,你在实际运行过程中有遇到过这个问题吗?

最近刚遇到这个问题,最终定位到了这里,复现概率确实挺低的。

gh-translator commented 2 weeks ago

🤖 Non-English text detected, translating ...


I re-read it, and it is indeed possible, but it will only happen when head is all the data and len(head) == n, because inboundBuffer.Discard will only fetch all the data. After finishing, put the internal buffer into sync.Pool. This should be the only possibility. Have you ever encountered this problem in actual operation?

I just encountered this problem recently and finally located it here. The probability of recurrence is indeed very low.

panjf2000 commented 2 weeks ago

我觉得这样修改应该就能修复这个问题:

diff --git a/connection_unix.go b/connection_unix.go
index cfce3a92..82c1d89b 100644
--- a/connection_unix.go
+++ b/connection_unix.go
@@ -326,6 +326,11 @@ func (c *conn) Next(n int) (buf []byte, err error) {
    head, tail := c.inboundBuffer.Peek(n)
    defer c.inboundBuffer.Discard(n) //nolint:errcheck
    if len(head) >= n {
+       if inBufferLen == n && len(tail) == 0 {
+           c.loop.cache.Reset()
+           c.loop.cache.Write(head)
+           return c.loop.cache.Bytes(), err
+       }
        return head[:n], err
    }
    c.loop.cache.Reset()
gh-translator commented 2 weeks ago

🤖 Non-English text detected, translating ...


I think this modification should fix the problem:

diff --git a/connection_unix.go b/connection_unix.go
indexcfce3a92..82c1d89b 100644
--- a/connection_unix.go
+++ b/connection_unix.go
@@ -326,6 +326,11 @@ func (c *conn) Next(n int) (buf []byte, err error) {
 head, tail := c.inboundBuffer.Peek(n)
 defer c.inboundBuffer.Discard(n) //nolint:errcheck
 if len(head) >= n {
+ if inBufferLen == n && len(tail) == 0 {
+ c.loop.cache.Reset()
+ c.loop.cache.Write(head)
+ return c.loop.cache.Bytes(), err
+ }
 return head[:n], err
 }
 c.loop.cache.Reset()
panjf2000 commented 2 weeks ago

更简单的方法:

diff --git a/connection_unix.go b/connection_unix.go
index cfce3a92..3ccac4ab 100644
--- a/connection_unix.go
+++ b/connection_unix.go
@@ -325,11 +325,11 @@ func (c *conn) Next(n int) (buf []byte, err error) {
    }
    head, tail := c.inboundBuffer.Peek(n)
    defer c.inboundBuffer.Discard(n) //nolint:errcheck
-   if len(head) >= n {
-       return head[:n], err
-   }
    c.loop.cache.Reset()
    c.loop.cache.Write(head)
+   if len(head) >= n {
+       return c.loop.cache.Bytes(), err
+   }
    c.loop.cache.Write(tail)
    if inBufferLen >= n {
        return c.loop.cache.Bytes(), err

不过这种方式就变成了每次都会复制数据,会有一点性能损耗。

gh-translator commented 2 weeks ago

🤖 Non-English text detected, translating ...


Easier way:

diff --git a/connection_unix.go b/connection_unix.go
indexcfce3a92..3ccac4ab 100644
--- a/connection_unix.go
+++ b/connection_unix.go
@@ -325,11 +325,11 @@ func (c *conn) Next(n int) (buf []byte, err error) {
 }
 head, tail := c.inboundBuffer.Peek(n)
 defer c.inboundBuffer.Discard(n) //nolint:errcheck
- if len(head) >= n {
- return head[:n], err
- }
 c.loop.cache.Reset()
 c.loop.cache.Write(head)
+ if len(head) >= n {
+ return c.loop.cache.Bytes(), err
+ }
 c.loop.cache.Write(tail)
 if inBufferLen >= n {
 return c.loop.cache.Bytes(), err

However, this method will copy data every time, which will cause a little performance loss.

zhongweikang commented 2 weeks ago

更简单的方法:

diff --git a/connection_unix.go b/connection_unix.go
index cfce3a92..3ccac4ab 100644
--- a/connection_unix.go
+++ b/connection_unix.go
@@ -325,11 +325,11 @@ func (c *conn) Next(n int) (buf []byte, err error) {
  }
  head, tail := c.inboundBuffer.Peek(n)
  defer c.inboundBuffer.Discard(n) //nolint:errcheck
- if len(head) >= n {
-     return head[:n], err
- }
  c.loop.cache.Reset()
  c.loop.cache.Write(head)
+ if len(head) >= n {
+     return c.loop.cache.Bytes(), err
+ }
  c.loop.cache.Write(tail)
  if inBufferLen >= n {
      return c.loop.cache.Bytes(), err

不过这种方式就变成了每次都会复制数据,会有一点性能损耗。

我理解这个损耗应该是必须的,并且这里只是复制了一下数据,并没有新申请内存,感觉上还好~

gh-translator commented 2 weeks ago

🤖 Non-English text detected, translating ...


Easier way:

diff --git a/connection_unix.go b/connection_unix.go
index cfce3a92..3ccac4ab 100644
--- a/connection_unix.go
+++ b/connection_unix.go
@@ -325,11 +325,11 @@ func (c *conn) Next(n int) (buf []byte, err error) {
}
head, tail := c.inboundBuffer.Peek(n)
defer c.inboundBuffer.Discard(n) //nolint:errcheck
- if len(head) >= n {
- return head[:n], err
- }
c.loop.cache.Reset()
c.loop.cache.Write(head)
+ if len(head) >= n {
+ return c.loop.cache.Bytes(), err
+ }
c.loop.cache.Write(tail)
if inBufferLen >= n {
return c.loop.cache.Bytes(), err

However, this method will copy the data every time, which will cause a little performance loss.

I understand that this loss should be necessary, and I just copied the data here and did not apply for new memory. It feels fine~

panjf2000 commented 2 weeks ago

你方便提一个 PR 吗? @zhongweikang

gh-translator commented 2 weeks ago

🤖 Non-English text detected, translating ...


Is it convenient for you to submit a PR? @zhongweikang

zhongweikang commented 2 weeks ago

你方便提一个 PR 吗? @zhongweikang

方便

gh-translator commented 2 weeks ago

🤖 Non-English text detected, translating ...


Is it convenient for you to submit a PR? @zhongweikang

convenient

zhongweikang commented 2 weeks ago

更简单的方法:

diff --git a/connection_unix.go b/connection_unix.go
index cfce3a92..3ccac4ab 100644
--- a/connection_unix.go
+++ b/connection_unix.go
@@ -325,11 +325,11 @@ func (c *conn) Next(n int) (buf []byte, err error) {
  }
  head, tail := c.inboundBuffer.Peek(n)
  defer c.inboundBuffer.Discard(n) //nolint:errcheck
- if len(head) >= n {
-     return head[:n], err
- }
  c.loop.cache.Reset()
  c.loop.cache.Write(head)
+ if len(head) >= n {
+     return c.loop.cache.Bytes(), err
+ }
  c.loop.cache.Write(tail)
  if inBufferLen >= n {
      return c.loop.cache.Bytes(), err

不过这种方式就变成了每次都会复制数据,会有一点性能损耗。

好像不行,要求读 n 个字节,这样把整个head全返回出去了

diff --git a/connection_unix.go b/connection_unix.go
index cfce3a9..1bf11e9 100644
--- a/connection_unix.go
+++ b/connection_unix.go
@@ -325,10 +325,11 @@ func (c *conn) Next(n int) (buf []byte, err error) {
        }
        head, tail := c.inboundBuffer.Peek(n)
        defer c.inboundBuffer.Discard(n) //nolint:errcheck
+       c.loop.cache.Reset()
        if len(head) >= n {
-               return head[:n], err
+               c.loop.cache.Write(head[:n])
+               return c.loop.cache.Bytes(), err
        }
-       c.loop.cache.Reset()
        c.loop.cache.Write(head)
        c.loop.cache.Write(tail)
        if inBufferLen >= n {

这样可以吗?

gh-translator commented 2 weeks ago

🤖 Non-English text detected, translating ...


Easier way:

diff --git a/connection_unix.go b/connection_unix.go
index cfce3a92..3ccac4ab 100644
--- a/connection_unix.go
+++ b/connection_unix.go
@@ -325,11 +325,11 @@ func (c *conn) Next(n int) (buf []byte, err error) {
}
head, tail := c.inboundBuffer.Peek(n)
defer c.inboundBuffer.Discard(n) //nolint:errcheck
- if len(head) >= n {
- return head[:n], err
- }
c.loop.cache.Reset()
c.loop.cache.Write(head)
+ if len(head) >= n {
+ return c.loop.cache.Bytes(), err
+ }
c.loop.cache.Write(tail)
if inBufferLen >= n {
return c.loop.cache.Bytes(), err

However, this method will copy data every time, which will cause a little performance loss.

It doesn't seem to work. It requires reading n bytes, so the entire head is returned.

diff --git a/connection_unix.go b/connection_unix.go
indexcfce3a9..1bf11e9 100644
--- a/connection_unix.go
+++ b/connection_unix.go
@@ -325,10 +325,11 @@ func (c *conn) Next(n int) (buf []byte, err error) {
        }
        head, tail := c.inboundBuffer.Peek(n)
        defer c.inboundBuffer.Discard(n) //nolint:errcheck
+ c.loop.cache.Reset()
        if len(head) >= n {
- return head[:n], err
+ c.loop.cache.Write(head[:n])
+ return c.loop.cache.Bytes(), err
        }
- c.loop.cache.Reset()
        c.loop.cache.Write(head)
        c.loop.cache.Write(tail)
        if inBufferLen >= n {

Is this okay?

panjf2000 commented 2 weeks ago

确实,我前面忘了考虑这一点,就按你最后修改的那个来吧。

gh-translator commented 2 weeks ago

🤖 Non-English text detected, translating ...


Indeed, I forgot to consider this before, so I will just go with the last one you modified.

panjf2000 commented 2 weeks ago

这么说起来后面的 tail 好像也有这个问题,顺便一起改了吧!

gh-translator commented 2 weeks ago

🤖 Non-English text detected, translating ...


Speaking of which, it seems that the tail behind it also has this problem, so let’s change it together!

zhongweikang commented 2 weeks ago

更简单的方法:

diff --git a/connection_unix.go b/connection_unix.go
index cfce3a92..3ccac4ab 100644
--- a/connection_unix.go
+++ b/connection_unix.go
@@ -325,11 +325,11 @@ func (c *conn) Next(n int) (buf []byte, err error) {
  }
  head, tail := c.inboundBuffer.Peek(n)
  defer c.inboundBuffer.Discard(n) //nolint:errcheck
- if len(head) >= n {
-     return head[:n], err
- }
  c.loop.cache.Reset()
  c.loop.cache.Write(head)
+ if len(head) >= n {
+     return c.loop.cache.Bytes(), err
+ }
  c.loop.cache.Write(tail)
  if inBufferLen >= n {
      return c.loop.cache.Bytes(), err

不过这种方式就变成了每次都会复制数据,会有一点性能损耗。

我实际测试了一下,你这个代码还是没问题,刚才理解错了

    head, tail := c.inboundBuffer.Peek(n) // Peek 只取了 n 个字节
    defer c.inboundBuffer.Discard(n)
    c.loop.cache.Reset()
    if len(head) >= n {
        c.loop.cache.Write(head) // head 的长度实际不会大于 n,所以这里 head 与 head[:n] 等价
        return c.loop.cache.Bytes(), err
    }

后面的tail同理也没问题,还是按你这个代码写了。

gh-translator commented 2 weeks ago

🤖 Non-English text detected, translating ...


Easier way:

diff --git a/connection_unix.go b/connection_unix.go
index cfce3a92..3ccac4ab 100644
--- a/connection_unix.go
+++ b/connection_unix.go
@@ -325,11 +325,11 @@ func (c *conn) Next(n int) (buf []byte, err error) {
}
head, tail := c.inboundBuffer.Peek(n)
defer c.inboundBuffer.Discard(n) //nolint:errcheck
- if len(head) >= n {
- return head[:n], err
- }
c.loop.cache.Reset()
c.loop.cache.Write(head)
+ if len(head) >= n {
+ return c.loop.cache.Bytes(), err
+ }
c.loop.cache.Write(tail)
if inBufferLen >= n {
return c.loop.cache.Bytes(), err

However, this method will copy data every time, which will cause a little performance loss.

I actually tested it, and your code is still fine. I just misunderstood it.

head, tail := c.inboundBuffer.Peek(n) // Peek only takes n bytes
defer c.inboundBuffer.Discard(n)
c.loop.cache.Reset()
if len(head) >= n {
c.loop.cache.Write(head) // The length of head will not actually be greater than n, so here head is equivalent to head[:n]
return c.loop.cache.Bytes(), err
}

The following tail works in the same way, so I will write it according to your code.

panjf2000 commented 2 weeks ago

好吧,我刚才也被你绕进去了,这样看应该就没问题了。

gh-translator commented 2 weeks ago

🤖 Non-English text detected, translating ...


Okay, you got me involved just now, so it should be fine.