smallnest / chanx

unbounded chan
MIT License
431 stars 63 forks source link

data race in Len() and BufLen() #7

Closed changkun closed 3 years ago

changkun commented 3 years ago

Consider the following test:

func TestDataRace(t *testing.T) {
    ch := NewUnboundedChan(1)
    stop := make(chan bool)
    for i := 0; i < 100; i++ { // may tweak the number of iterations
        go func() {
            for {
                select {
                case <-stop:
                    return
                default:
                    ch.In <- 42
                    <-ch.Out
                }
            }
        }()
    }

    for i := 0; i < 10000; i++ { // may tweak the number of iterations
        ch.Len()
    }
    close(stop)
}

The above test results in the following data race:

$ go test -run=DataRace -race
==================
WARNING: DATA RACE
Read at 0x00c0001309a8 by goroutine 7:
  github.com/smallnest/chanx.(*RingBuffer).Len()
      /Users/changkun/dev/changkun.de/chanx/ringbuffer.go:110 +0x118
  github.com/smallnest/chanx.UnboundedChan.Len()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:18 +0xf4
  github.com/smallnest/chanx.TestChanDataRace()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan_test.go:191 +0xf0
  testing.tRunner()
      /Users/changkun/goes/go/src/testing/testing.go:1242 +0x198

Previous write at 0x00c0001309a8 by goroutine 8:
  github.com/smallnest/chanx.(*RingBuffer).Read()
      /Users/changkun/dev/changkun.de/chanx/ringbuffer.go:42 +0x12c
  github.com/smallnest/chanx.(*RingBuffer).Pop()
      /Users/changkun/dev/changkun.de/chanx/ringbuffer.go:51 +0x6c
  github.com/smallnest/chanx.process()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:72 +0x460

Goroutine 7 (running) created at:
  testing.(*T).Run()
      /Users/changkun/goes/go/src/testing/testing.go:1289 +0x5b8
  testing.runTests.func1()
      /Users/changkun/goes/go/src/testing/testing.go:1581 +0xac
  testing.tRunner()
      /Users/changkun/goes/go/src/testing/testing.go:1242 +0x198
  testing.runTests()
      /Users/changkun/goes/go/src/testing/testing.go:1579 +0x780
  testing.(*M).Run()
      /Users/changkun/goes/go/src/testing/testing.go:1487 +0x928
  main.main()
      _testmain.go:55 +0x288

Goroutine 8 (running) created at:
  github.com/smallnest/chanx.NewUnboundedChanSize()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:40 +0x1b0
  github.com/smallnest/chanx.NewUnboundedChan()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:31 +0x3c
  github.com/smallnest/chanx.TestChanDataRace()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan_test.go:174 +0x28
  testing.tRunner()
      /Users/changkun/goes/go/src/testing/testing.go:1242 +0x198
==================
==================
WARNING: DATA RACE
Read at 0x00c0001309b0 by goroutine 7:
  github.com/smallnest/chanx.(*RingBuffer).Len()
      /Users/changkun/dev/changkun.de/chanx/ringbuffer.go:110 +0x134
  github.com/smallnest/chanx.UnboundedChan.Len()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:18 +0xf4
  github.com/smallnest/chanx.TestChanDataRace()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan_test.go:191 +0xf0
  testing.tRunner()
      /Users/changkun/goes/go/src/testing/testing.go:1242 +0x198

Previous write at 0x00c0001309b0 by goroutine 8:
  github.com/smallnest/chanx.(*RingBuffer).Write()
      /Users/changkun/dev/changkun.de/chanx/ringbuffer.go:70 +0xec
  github.com/smallnest/chanx.process()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:69 +0x44c

Goroutine 7 (running) created at:
  testing.(*T).Run()
      /Users/changkun/goes/go/src/testing/testing.go:1289 +0x5b8
  testing.runTests.func1()
      /Users/changkun/goes/go/src/testing/testing.go:1581 +0xac
  testing.tRunner()
      /Users/changkun/goes/go/src/testing/testing.go:1242 +0x198
  testing.runTests()
      /Users/changkun/goes/go/src/testing/testing.go:1579 +0x780
  testing.(*M).Run()
      /Users/changkun/goes/go/src/testing/testing.go:1487 +0x928
  main.main()
      _testmain.go:55 +0x288

Goroutine 8 (running) created at:
  github.com/smallnest/chanx.NewUnboundedChanSize()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:40 +0x1b0
  github.com/smallnest/chanx.NewUnboundedChan()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:31 +0x3c
  github.com/smallnest/chanx.TestChanDataRace()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan_test.go:174 +0x28
  testing.tRunner()
      /Users/changkun/goes/go/src/testing/testing.go:1242 +0x198
==================
==================
WARNING: DATA RACE
Read at 0x00c0001309a0 by goroutine 7:
  github.com/smallnest/chanx.(*RingBuffer).Len()
      /Users/changkun/dev/changkun.de/chanx/ringbuffer.go:118 +0x1ac
  github.com/smallnest/chanx.UnboundedChan.Len()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:18 +0xf4
  github.com/smallnest/chanx.TestChanDataRace()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan_test.go:191 +0xf0
  testing.tRunner()
      /Users/changkun/goes/go/src/testing/testing.go:1242 +0x198

Previous write at 0x00c0001309a0 by goroutine 8:
  github.com/smallnest/chanx.(*RingBuffer).grow()
      /Users/changkun/dev/changkun.de/chanx/ringbuffer.go:96 +0x3bc
  github.com/smallnest/chanx.(*RingBuffer).Write()
      /Users/changkun/dev/changkun.de/chanx/ringbuffer.go:77 +0x18c
  github.com/smallnest/chanx.process()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:69 +0x44c

Goroutine 7 (running) created at:
  testing.(*T).Run()
      /Users/changkun/goes/go/src/testing/testing.go:1289 +0x5b8
  testing.runTests.func1()
      /Users/changkun/goes/go/src/testing/testing.go:1581 +0xac
  testing.tRunner()
      /Users/changkun/goes/go/src/testing/testing.go:1242 +0x198
  testing.runTests()
      /Users/changkun/goes/go/src/testing/testing.go:1579 +0x780
  testing.(*M).Run()
      /Users/changkun/goes/go/src/testing/testing.go:1487 +0x928
  main.main()
      _testmain.go:55 +0x288

Goroutine 8 (running) created at:
  github.com/smallnest/chanx.NewUnboundedChanSize()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:40 +0x1b0
  github.com/smallnest/chanx.NewUnboundedChan()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan.go:31 +0x3c
  github.com/smallnest/chanx.TestChanDataRace()
      /Users/changkun/dev/changkun.de/chanx/unbounded_chan_test.go:174 +0x28
  testing.tRunner()
      /Users/changkun/goes/go/src/testing/testing.go:1242 +0x198
==================
--- FAIL: TestChanDataRace (0.01s)
    testing.go:1135: race detected during execution of test
FAIL
exit status 1
FAIL    github.com/smallnest/chanx      0.194s

I think providing a Len() method in abstracting an unbounded channel needs more careful handling (Mutex or lock-free pattern via an external atomic counter similar to the runtime channel implementation). The self-created internal buffer for an unbounded channel is maintained in a separate goroutine, a read via len() creates a race condition with any writes with respect to that buffer.

smallnest commented 3 years ago

谢 @changkun 大佬指出问题

smallnest commented 3 years ago

fixed

changkun commented 3 years ago

https://github.com/smallnest/chanx/blob/63d09862f6aa4d8da9bdf0abb4e2005a957b8f07/unbounded_chan.go#L67-L68

I think the fix may remain problematic. If BufLen call happens concurrently with line 67, and they both happens-before line 68. Assume a global order, there might be two different cases:

  1. a call to BufLen -> buffer.Write -> ch.bufCount++, the BufLen works as expected
  2. buffer.Write -> a call to BufLen -> ch.bufCount++, the BufLen returns a number that is less than the actual length because of buffer.Write has changed the buffer.

From a memory order perspective, BufLen should observe the length as soon as buffer.Write is complete. The above experiment describes a failure case.

smallnest commented 3 years ago

You are right. Original intention of adding BufLen and Len method is only for evaluating approximate number of elements in this chan, and not for exact count. There is not only the issue you said but also may be some elements are put into In and some elements are read from Out, the Len is not accurate.

That means the number of the real elements is less or more than the value returned by Len and BufLen. I hope users don't check based on the method and only for evaluating approximate memory usage.

Maybe we can have a. method to implement a accurate Len and BufLen but I think it won't be complex. I'd rather keep it a simple implementation for readability and performance.

Anyway, I think I'd better add some comments for Len and BufLen to avoid misuse.

q191201771 commented 3 years ago

我进来学习的,两位大佬真棒。 我赞成smallnest基于可读、性能方面的考虑,以及增加注释的做法。

即使外层接口获取到的是绝对精准值,也是个瞬时值。没有想到有啥场景会造成大的影响。 个人感觉只要不因为误差导致出现负数即可(现在写操作都在一个协程里,应该不存在这个问题)。

现在可能出现的误差只是+1或-1吗?

changkun commented 3 years ago

I share a consensus of readability and performance concerns. This is precisely the same reason why sync.Map does not have a Len method because Len will never be accurate in a concurrent environment. See golang/go#20680.

Nevertheless, in terms of the semantics of Len, it is only about the rigorous definition of memory orders. Otherwise, it may cause misuse and nasty bugs from the user's perspective. Consider the following example:

ch.In <- 42
ch.In <- 42
ch.In <- 42

// Now the channel has three items.
// We create two loops that produce and consume elements from the channel.

go func() { for { ch.In <- 42 } }()
go func() { for { <-ch.Out } }()

// Also, a third party may loop and listen to the status of that channel, and do some work.
// If the soundness behavior is that the application terminates when the channel is empty,
// then the soundness is challenged, because ...

for ch.Len() > 0 {
    // ... do stuff ...
}

// The loop may terminate even the length of the channel is not empty.

One may argue that the following two lines are good, because it pops first then decrease the counter:

https://github.com/smallnest/chanx/blob/63d09862f6aa4d8da9bdf0abb4e2005a957b8f07/unbounded_chan.go#L79-L80

But compiler optimizations may reorder these two lines and hence a program bug is introduced. This is not a compiler's fault as there is no memory order guarantee for this case, even not in the recently proposed updates.

Nonetheless, if we intended to have a memory usage approximation, the method signatures may be renamed too:

-// Len returns len of In plus len of Out plus len of buffer.
-func (c UnboundedChan) Len() int
+// Cap returns the capacity of the channel.
+func (c *UnboundedChan) Cap() int

-// BufLen returns len of the buffer.
-func (c UnboundedChan) BufLen() int
+// EstimateLen returns an approximation of the length of
+// the channel where the error may be bound by 1.
+func (c *UnboundedChan) EstimateLen() int

The name EstimateLen is not concise and may use different names such as ApproxLen, etc.