xtaci / smux

A Stream Multiplexing Library for golang with least memory usage(TDMA)
MIT License
1.27k stars 189 forks source link

看了好多遍都没看懂,这个地方没有错误吗 #87

Open sgolang opened 2 years ago

sgolang commented 2 years ago

这个问题我想了很久也没想明白,恳请大侠指点一下。 因为我不相信这个程序这里会有错误。太抓狂了。

// session.go
// shaper shapes the sending sequence among streams
func (s *Session) shaperLoop() {
    var reqs shaperHeap
    var next writeRequest
    var chWrite chan writeRequest

    for {
        if len(reqs) > 0 {
            chWrite = s.writes
            next = heap.Pop(&reqs).(writeRequest) // 从队列里取一个数据准备发送
        } else {
            chWrite = nil
        }

        select {
        case <-s.die:
            return
        case r := <-s.shaper:  //有新的数据,就把刚刚从队列里取出的数据放回去,问题是放回去顺序不就乱了吗
            if chWrite != nil { // next is valid, reshape
                heap.Push(&reqs, next)
            }
            heap.Push(&reqs, r)
        case chWrite <- next:
        }
    }
}

我的理解 有prio = 1 的一种数据 a1 a2 a3 a4 a5,prio =2 的一种数据 b1,b2,b3,b4,b5 。只要保证 prio相同的数据的发送顺序即可。a、b两种数据可以优先发送a数据。上面的操作把数据取出来,再放回去不是打算了数据a这种数据的顺序了吗? 我写了个小程序测试了一下。程序和结果如下:

package main

import (
    "container/heap"
    "fmt"
    "math/rand"
    "time"
)

type writeRequest struct {
    prio int
    data string
}

type shaperHeap []writeRequest

func (h shaperHeap) Len() int            { return len(h) }
func (h shaperHeap) Less(i, j int) bool  { return h[i].prio < h[j].prio }
func (h shaperHeap) Swap(i, j int)       { h[i], h[j] = h[j], h[i] }
func (h *shaperHeap) Push(x interface{}) { *h = append(*h, x.(writeRequest)) }

func (h *shaperHeap) Pop() interface{} {
    old := *h
    n := len(old)
    x := old[n-1]
    *h = old[0 : n-1]
    return x
}

type Session struct {
    die    chan struct{}
    shaper chan writeRequest // a shaper for writing
    writes chan writeRequest
}

func NewSession() *Session {
    s := &Session{}
    s.writes = make(chan writeRequest)
    s.shaper = make(chan writeRequest)
    s.die = make(chan struct{})
    return s
}

func (s *Session) Consume() {

    for {
        time.Sleep(300 * time.Millisecond)
        select {
        case <-s.die:
            return
        case wr := <-s.writes:
            fmt.Printf("[-]% d %s\n", wr.prio, wr.data)

        }
    }
}

func (s *Session) Write(wr writeRequest) {

    select {
    case s.shaper <- wr:
        fmt.Printf("[+] %d %s\n", wr.prio, wr.data)
    case <-s.die:
        return
    }

}

// shaper shapes the sending sequence among streams
func (s *Session) shaperLoop() {
    var reqs shaperHeap
    var next writeRequest
    var chWrite chan writeRequest

    for {
        if len(reqs) > 0 {
            chWrite = s.writes
            next = heap.Pop(&reqs).(writeRequest)
        } else {
            chWrite = nil
        }

        select {
        case <-s.die:
            return
        case r := <-s.shaper:
            if chWrite != nil { // next is valid, reshape
                heap.Push(&reqs, next)
            }
            heap.Push(&reqs, r)
        case chWrite <- next:
        }
    }
}

func main() {
    sess := NewSession()
    go sess.shaperLoop()

    go sess.Consume()

    rand.Seed(time.Now().UnixNano())

    go func() {

        a1 := writeRequest{1, "a1"}
        a2 := writeRequest{1, "a2"}
        a3 := writeRequest{1, "a3"}
        a4 := writeRequest{1, "a4"}
        a5 := writeRequest{1, "a5"}

        b1 := writeRequest{2, "b1"}
        b2 := writeRequest{2, "b2"}
        b3 := writeRequest{2, "b3"}
        b4 := writeRequest{2, "b4"}
        b5 := writeRequest{2, "b5"}

        sa := []writeRequest{a1, a2, a3, a4, a5}
        sb := []writeRequest{b1, b2, b3, b4, b5}
        _ = sa
        _ = sb
        for {
            x := rand.Intn(2)
            // fmt.Println(x)
            if len(sa) == 0 && len(sb) == 0 {
                break
            }
            if x == 0 {
                if len(sa) > 0 {
                    aa := sa[0]
                    sa = sa[1:]
                    sess.Write(aa)
                }
            } else {
                if len(sb) > 0 {
                    bb := sb[0]
                    sb = sb[1:]
                    sess.Write(bb)
                }

            }
        }

    }()

    time.Sleep(30 * time.Second)
}

结果是这样的:

[+] 1 a1
[+] 2 b1
[+] 2 b2
[+] 1 a2
[+] 1 a3
[+] 2 b3
[+] 2 b4
[+] 1 a4
[+] 1 a5
[+] 2 b5
[-] 1 a2
[-] 1 a5
[-] 1 a4
[-] 1 a3
[-] 1 a1
[-] 2 b2
[-] 2 b3
[-] 2 b5
[-] 2 b4
[-] 2 b1
xtaci commented 2 years ago

每个流都是独立的,所以流之间乱序没有关系,这反而是shaper的目的,如果是单一流,可能确实肯定存在一个prio变量overflow导致的乱序问题。

sgolang commented 2 years ago

又看了下代码在每个stream内部,使用的是numWritten作为writeRequest结构体中的prio,所以在stream内部怎么重新排列都可以保证发送的顺序。 这个问题可以关闭了。

感谢您的回复,您的代码让我学到了很多!

func (s *Stream) writeV2(b []byte) (n int, err error) {
...
            for len(bts) > 0 {
                sz := len(bts)
                if sz > s.frameSize {
                    sz = s.frameSize
                }
                frame.data = bts[:sz]
                bts = bts[sz:]
                n, err := s.sess.writeFrameInternal(frame, deadline, uint64(atomic.LoadUint32(&s.numWritten)))
                atomic.AddUint32(&s.numWritten, uint32(sz))
                sent += n
                if err != nil {
                    return sent, err
                }
            }
        }
xtaci commented 2 years ago

嗯,这个的prio溢出可能性是很高的,还是需要修复一下。

sgolang commented 2 years ago

这个overflow的问题怎么保证呢,每个stream 发送 1 << 32 字节后就不行了。

xtaci commented 2 years ago

因为stream.Write本身是阻塞函数,那么出现这个情况只可能是发送一个刚好在 1<<32附近的数据导致乱序,那么只需要做 无符号判断即可。

sgolang commented 2 years ago

我怎么觉得这里的判断有问题呢? ··· // shaper.go package smux

func _itimediff(later, earlier uint32) int32 { return (int32)(later - earlier) } ···

比如说某stream用来传输一个大文件。

在这个stream 里,存在三个writeRequest 在队列里,分别为: prio = (1 << 32)- 32768 *2 prio = (1 << 32)- 32768 prio = (1 << 32)+ 1024

等待发送,最后一个溢出了。这样判断还还能保证顺序吗?

使用发送的字节数作为prio 真的合理吗? 假如长期存在以下两个stream。 另一个streamB 存在很久了,且用来发送大文件,那么它的prio增长就很快。 一个streamA是新创建的,每次只发送很小的数据,那么这个stream 里的prio增长就比较慢。

在 shaperHeap 中存在很多这样的数据。 reshape的时候,是不是Stream B 一直得不到发送的机会?

xtaci commented 2 years ago

我怎么觉得这里的判断有问题呢? ··· // shaper.go package smux

func _itimediff(later, earlier uint32) int32 { return (int32)(later - earlier) } ···

比如说某stream用来传输一个大文件。

在这个stream 里,存在三个writeRequest 在队列里,分别为: prio = (1 << 32)- 32768 *2 prio = (1 << 32)- 32768 prio = (1 << 32)+ 1024

等待发送,最后一个溢出了。这样判断还还能保证顺序吗?

成立,你可以做 go test -v -run Shaper

使用发送的字节数作为prio 真的合理吗? 假如长期存在以下两个stream。 另一个streamB 存在很久了,且用来发送大文件,那么它的prio增长就很快。 一个streamA是新创建的,每次只发送很小的数据,那么这个stream 里的prio增长就比较慢。

在 shaperHeap 中存在很多这样的数据。 reshape的时候,是不是Stream B 一直得不到发送的机会?

数值32bit问题不大,很快就会回绕。

sgolang commented 2 years ago
package smux

import (
    "container/heap"
    "testing"
)

func TestShaper(t *testing.T) {

    w1 := writeRequest{prio: 10}
    w2 := writeRequest{prio: 2048}
    w3 := writeRequest{prio: (1 << 32) - 32768*2}
    w4 := writeRequest{prio: (1 << 32) - 32768}

    prioOverflow := uint32((1 << 32) - 32768)
    prioOverflow += 32768
    prioOverflow += 1024

    w5 := writeRequest{prio: prioOverflow}

    var reqs shaperHeap
    heap.Push(&reqs, w5)
    heap.Push(&reqs, w4)
    heap.Push(&reqs, w3)
    heap.Push(&reqs, w2)
    heap.Push(&reqs, w1)

    var lastPrio = reqs[0].prio
    for len(reqs) > 0 {
        w := heap.Pop(&reqs).(writeRequest)
        if int32(w.prio-lastPrio) < 0 {
            t.Fatal("incorrect shaper priority")
        }

        t.Log("prio:", w.prio)
        lastPrio = w.prio
    }
}

go test -run Shaper -v === RUN TestShaper shaper_test.go:43: prio: 4294901760 shaper_test.go:43: prio: 4294934528 shaper_test.go:43: prio: 10 shaper_test.go:43: prio: 1024 shapertest.go:43: prio: 2048 --- PASS: TestShaper (0.00s) PASS ok /Users/jason/code/smux 1.744s

您的意思是只要保证溢出数据附近的数据顺序正确就可以是吧。 感谢您的回复,我是个业余程序员,读代码也是技能,恕我愚钝,耽误您时间。 之前看第一版的学到了很多知识,比如分包组包、流复用,也用在了自己写的工具上,这个版本总算也快搞明白了。 感谢无私共享和指教!

sgolang commented 2 years ago

有两个问题请您解答一下:

  1. stream.Write 是阻塞的,func (s *Session) writeFrameInternal()也是阻塞的.就是说某一个stream 在这个 shaperHeap []writeRequest 队列里,最多只有一个cmdPSH数据,(可以同时有cmdNOP、cmdUPD)我理解的没错吧?
  2. 这个结构体里writeRequest.result 为什么要用带缓存的channel呢?
// internal writeFrame version to support deadline used in keepalive
func (s *Session) writeFrameInternal(f Frame, deadline <-chan time.Time, prio uint32) (int, error) {
    req := writeRequest{
        prio:   prio,
        frame:  f,
        result: make(chan writeResult, 1), //为什么要用带缓存的channel呢?这个地方直接make(chan writeResult)会影响速度吗?
    }
xtaci commented 2 years ago

有两个问题请您解答一下:

  1. stream.Write 是阻塞的,func (s *Session) writeFrameInternal()也是阻塞的.就是说某一个stream 在这个 shaperHeap []writeRequest 队列里,最多只有一个cmdPSH数据,(可以同时有cmdNOP、cmdUPD)我理解的没错吧? 队列里可以有多个,阻塞是函数不保证立即返回。

  2. 这个结构体里writeRequest.result 为什么要用带缓存的channel呢?

// internal writeFrame version to support deadline used in keepalive
func (s *Session) writeFrameInternal(f Frame, deadline <-chan time.Time, prio uint32) (int, error) {
  req := writeRequest{
      prio:   prio,
      frame:  f,
      result: make(chan writeResult, 1), //为什么要用带缓存的channel呢?这个地方直接make(chan writeResult)会影响速度吗?
  }

结果就一个,避免阻塞啊。

sgolang commented 2 years ago

即使把所有的cmdPSH 数据的writeResult 的prio 设置为1,也能保证数据发送顺序

stream.Write 是阻塞的,Session.writeFrameInternal()也是阻塞的.就是说某一个stream 在这个 shaperHeap []writeRequest 队列里,最多只有一个cmdPSH数据,(可以同时有cmdNOP、cmdUPD)我理解的没错吧? 队列里可以有多个,阻塞是函数不保证立即返回。

队列里可以有多个,阻塞是函数不保证立即返回。

我做了个测试,某一个stream 里(sid相同)的数据,没有并发调用 Write(不应该并发调用吧?) 函数的情况下,实际上同一时刻只有一个cmdPSH数据 在shaperHeap队列里。即使把所有的cmdPSH 数据的writeResult 的prio 设置为1,也能保证数据发送顺序,不会出错。

您说的队列里可以有多个

  1. 在多个stream里队列里肯定有多个
  2. 同一个stream里,可以有多个的情况是,还有 cmdNOP、cmdUPD等数据。
sgolang commented 2 years ago

把shaper.go中比较函数改成随机的,删除shaper_test.go,也能测试通过。

//shaper.go
package smux

import "math/rand"

func _itimediff(later, earlier uint32) int32 {
    x := rand.Intn(2)
    return int32(x)

    // return (int32)(later - earlier)
}

··· jason@MacBook-Air ~/g/s/g/x/smux (master)> go test -v === RUN TestAllocGet --- PASS: TestAllocGet (0.00s) === RUN TestAllocPut --- PASS: TestAllocPut (0.00s) === RUN TestAllocPutThenGet --- PASS: TestAllocPutThenGet (0.00s) === RUN TestConfig mux_test.go:23: keep-alive interval must be positive mux_test.go:32: keep-alive timeout must be larger than keep-alive interval mux_test.go:40: max frame size must be positive mux_test.go:48: max frame size must not be larger than 65535 mux_test.go:56: max receive buffer must be positive mux_test.go:64: max stream buffer must be positive mux_test.go:73: max stream buffer must not be larger than max receive buffer --- PASS: TestConfig (0.00s) === RUN TestEcho --- PASS: TestEcho (0.01s) === RUN TestWriteTo --- PASS: TestWriteTo (0.03s) === RUN TestWriteToV2 --- PASS: TestWriteToV2 (0.02s) === RUN TestGetDieCh --- PASS: TestGetDieCh (0.00s) === RUN TestSpeed session_test.go:315: 127.0.0.1:56096 127.0.0.1:56095 session_test.go:336: time for 16MB rtt 59.936929ms --- PASS: TestSpeed (0.06s) === RUN TestParallel session_test.go:374: created 501 streams --- PASS: TestParallel (1.49s) === RUN TestParallelV2 session_test.go:408: created 605 streams --- PASS: TestParallelV2 (1.48s) === RUN TestCloseThenOpen --- PASS: TestCloseThenOpen (0.00s) === RUN TestSessionDoubleClose --- PASS: TestSessionDoubleClose (0.00s) === RUN TestStreamDoubleClose --- PASS: TestStreamDoubleClose (0.00s) === RUN TestConcurrentClose --- PASS: TestConcurrentClose (0.00s) === RUN TestTinyReadBuffer --- PASS: TestTinyReadBuffer (0.01s) === RUN TestIsClose --- PASS: TestIsClose (0.00s) === RUN TestKeepAliveTimeout --- PASS: TestKeepAliveTimeout (3.00s) === RUN TestKeepAliveBlockWriteTimeout --- PASS: TestKeepAliveBlockWriteTimeout (3.00s) === RUN TestServerEcho --- PASS: TestServerEcho (0.02s) === RUN TestSendWithoutRecv --- PASS: TestSendWithoutRecv (0.00s) === RUN TestWriteAfterClose --- PASS: TestWriteAfterClose (0.00s) === RUN TestReadStreamAfterSessionClose session_test.go:703: EOF --- PASS: TestReadStreamAfterSessionClose (0.00s) === RUN TestWriteStreamAfterConnectionClose --- PASS: TestWriteStreamAfterConnectionClose (0.00s) === RUN TestNumStreamAfterClose --- PASS: TestNumStreamAfterClose (0.00s) === RUN TestRandomFrame --- PASS: TestRandomFrame (0.01s) === RUN TestWriteFrameInternal --- PASS: TestWriteFrameInternal (1.01s) === RUN TestReadDeadline --- PASS: TestReadDeadline (0.00s) === RUN TestWriteDeadline --- PASS: TestWriteDeadline (0.00s) PASS ok github.com/xtaci/smux 13.257s ···

xtaci commented 2 years ago

都是正数,当然能过哈

xtaci commented 2 years ago

无符号数转换为带符号数,比如uint32 1<<32 - 1 ,cast成 int32,就会负了。

sgolang commented 2 years ago

都是正数,当然能过哈

//shaper.go
package smux

import "math/rand"

func _itimediff(later, earlier uint32) int32 {
    x := rand.Intn(2)
    return int32(x) // 这里返回0 或者1 

    // return (int32)(later - earlier)
}
//shaper.go 这里比较的是大于0
func (h shaperHeap) Less(i, j int) bool  { return _itimediff(h[j].prio, h[i].prio) > 0 }

因为 你的测试函数,TestWriteToV2,这里有比较发送和接受的数据是否一致,能测试通过说明数据顺序是正确的。原因就是某一个stream 在这个 shaperHeap []writeRequest 队列里,最多只有一个cmdPSH数据,

func TestWriteToV2(t *testing.T) {
......

    if bytes.Compare(sndbuf, rcvbuf.Bytes()) != 0 {
        t.Fatal("mismatched echo bytes")
    }
}

我的意思是说把s.numWritten 作为 writeFrameInternal函数的prio参数,并没有发挥作用啊。

func (s *Stream) writeV2(b []byte) (n int, err error) {
...
            for len(bts) > 0 {
                sz := len(bts)
                if sz > s.frameSize {
                    sz = s.frameSize
                }
                frame.data = bts[:sz]
                bts = bts[sz:]
                n, err := s.sess.writeFrameInternal(frame, deadline, uint64(atomic.LoadUint32(&s.numWritten)))
                atomic.AddUint32(&s.numWritten, uint32(sz))
                sent += n
                if err != nil {
                    return sent, err
                }
            }
        }