xtaci / smux

A Stream Multiplexing Library for golang with least memory usage(TDMA)
MIT License
1.31k stars 196 forks source link

Data race in Stream.numWritten when two goroutines are writing #56

Closed Eudi4H closed 4 years ago

Eudi4H commented 4 years ago

I am using smux v2.0.11. This program shows a data race when two goroutines are writing to the same stream. I think it is in numWritten.

go.mod ``` module example.com/smux-race require ( github.com/klauspost/cpuid v1.2.1 // indirect github.com/klauspost/reedsolomon v1.9.3 // indirect github.com/templexxx/cpufeat v0.0.0-20180724012125-cef66df7f161 // indirect github.com/templexxx/xor v0.0.0-20181023030647-4e92f724b73b // indirect github.com/tjfoc/gmsm v1.0.1 // indirect github.com/xtaci/kcp-go v5.4.11+incompatible github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae // indirect github.com/xtaci/smux/v2 v2.0.11 golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc // indirect golang.org/x/net v0.0.0-20191007182048-72f939374954 // indirect ) ```
demo.go ``` // Demonstrates a data race when two goroutines are writing to the same stream. // go run -race demo.go package main import ( "io" "io/ioutil" "net" "time" "github.com/xtaci/kcp-go" "github.com/xtaci/smux/v2" ) type DummyAddr struct{} func (addr DummyAddr) Network() string { return "dummy" } func (addr DummyAddr) String() string { return "dummy" } // ChanPacketConn simulates a net.PacketConn using two channels. type ChanPacketConn struct { recv, send chan []byte } func (c *ChanPacketConn) ReadFrom(p []byte) (int, net.Addr, error) { buf := <-c.recv return copy(p, buf), DummyAddr{}, nil } func (c *ChanPacketConn) WriteTo(p []byte, addr net.Addr) (int, error) { buf := make([]byte, len(p)) copy(buf, p) c.send <- buf return len(buf), nil } func (c *ChanPacketConn) Close() error { return nil } func (c *ChanPacketConn) LocalAddr() net.Addr { return DummyAddr{} } func (c *ChanPacketConn) SetDeadline(t time.Time) error { return nil } func (c *ChanPacketConn) SetReadDeadline(t time.Time) error { return nil } func (c *ChanPacketConn) SetWriteDeadline(t time.Time) error { return nil } func server(pconn net.PacketConn) error { ln, err := kcp.ServeConn(nil, 0, 0, pconn) if err != nil { return err } defer ln.Close() conn, err := ln.AcceptKCP() if err != nil { return err } defer conn.Close() sess, err := smux.Server(conn, smux.DefaultConfig()) if err != nil { return err } defer sess.Close() stream, err := sess.AcceptStream() if err != nil { return err } defer stream.Close() _, err = io.Copy(ioutil.Discard, stream) return err } func client(pconn net.PacketConn) error { conn, err := kcp.NewConn2(DummyAddr{}, nil, 0, 0, pconn) if err != nil { return err } defer conn.Close() sess, err := smux.Client(conn, smux.DefaultConfig()) if err != nil { return err } defer sess.Close() stream, err := sess.OpenStream() if err != nil { return err } defer stream.Close() go writeByte(stream, 'A') go writeByte(stream, 'B') var c chan struct{} <-c return nil } func writeByte(w io.Writer, b byte) error { for { _, err := w.Write([]byte{b}) if err != nil { return err } } } func main() { c2s := make(chan []byte, 1024) s2c := make(chan []byte, 1024) go func() { err := server(&ChanPacketConn{c2s, s2c}) if err != nil { panic(err) } }() err := client(&ChanPacketConn{s2c, c2s}) if err != nil { panic(err) } } ```
go run -race demo.go
$ go run -race demo.go 
==================
WARNING: DATA RACE
Read at 0x00c000088a6c by goroutine 19:
  github.com/xtaci/smux/v2.(*Stream).Write()
      $GOPATH/pkg/mod/github.com/xtaci/smux/v2@v2.0.11/stream.go:183 +0x6bd
  main.writeByte()
      demo.go:104 +0x88

Previous write at 0x00c000088a6c by goroutine 17:
  github.com/xtaci/smux/v2.(*Stream).Write()
      $GOPATH/pkg/mod/github.com/xtaci/smux/v2@v2.0.11/stream.go:201 +0x3b2
  main.writeByte()
      demo.go:104 +0x88

Goroutine 19 (running) created at:
  main.client()
      demo.go:94 +0x27b
  main.main()
      demo.go:120 +0x116

Goroutine 17 (running) created at:
  main.client()
      demo.go:93 +0x248
  main.main()
      demo.go:120 +0x116
==================
xtaci commented 4 years ago

I was aware this issue, and it can be easily fixed.

let us think for another question: under which scenario it sounds reasonable for 2 concurrent writers to write on a single stream, assume there is a condition where 2 concurrent writer exists, can they split into two streams?

xtaci commented 4 years ago

Actually, concurrent writing on a single connection leads to indeterministic frame interleaving, I may write a comment for Write() function to state that issue.

And for this particular issue, I just replaced with atomic ops to suppress the error(not to fix).

Eudi4H commented 4 years ago

let us think for another question: under which scenario it sounds reasonable for 2 concurrent writers to write on a single stream, assume there is a condition where 2 concurrent writer exists, can they split into two streams?

I agree, the situation is a little unnatural. In my case, it was in a test program (an echo server), where one goroutine was echoing input, and another goroutine was sending a periodic heartbeat message. I can easily redesign the program to avoid concurrent writes. I mention this only because the contract for net.Conn says "multiple goroutines may invoke methods on a Conn simultaneously."

Anyway thanks, 182ea54 makes the data race message disappear.