gookit / event

📢 Lightweight event manager and dispatcher implements by Go. Go实现的轻量级的事件管理、调度程序库, 支持设置监听器的优先级, 支持使用通配符来进行一组事件的监听
https://pgk.go.dev/github.com/gookit/event
MIT License
501 stars 59 forks source link

异步多线程消费 #61

Closed Alice52 closed 5 months ago

Alice52 commented 5 months ago

System (please complete the following information):

Describe the bug

  1. 设置 o.ConsumerNum = 10 多个消费者,与预期不一致(感觉还是只有一个routine在执行)

To Reproduce

// preapre: 此时设置 ConsumerNum = 10, 每个任务耗时1s, 触发100个任务
// expected: 10s左右执行完所有任务
// actual: 执行了 100s左右
func TestChanEvent2(t *testing.T) {
    var em = event.NewManager("default", func(o *event.Options) {
        o.ConsumerNum = 10
    })
    defer em.CloseWait()

    var listener event.ListenerFunc = func(e event.Event) error {
        time.Sleep(1 * time.Second)
        return nil
    }

    em.On("app.evt1", listener, event.Normal)

    for i := 0; i < 100; i++ {
        em.FireAsync(event.New("app.evt1", event.M{"arg0": "val2"}))
    }

    fmt.Println("publish event finished!")
}

Screenshots

image

Additional context

  1. 是有什么特殊设置嘛(打开姿势不对? @RelicOfTesla @inhere 可以帮忙看一下嘛,谢谢~
  2. go 可以多个消费者并发读取 chan啊

    // 这个例子就是 10s 左右完成所有任务
    func TestConcurrentRead(t *testing.T) {
        var wg sync.WaitGroup
        defer wg.Wait()
        ch := make(chan int, 500)
    
        // 1. 启动多个 goroutine 读取数据
        routine(&wg, 10, ch)
    
        // 2. 向 chan 内写数据
        for i := 0; i < 100; i++ {
            ch <- i
        }
    
        // 3. 关闭 channel
        close(ch)
    }
    
    func routine(wg *sync.WaitGroup, count int, ch chan int) {
        for i := 0; i < count; i++ {
            wg.Add(1)
            go func(id int) {
                defer wg.Done()
                for {
                    data, ok := <-ch
                    if !ok {
                        fmt.Printf("Goroutine %d: Channel closed\n", id)
                        return
                    }
    
                    time.Sleep(1 * time.Second)
                    fmt.Printf("Goroutine %d: Received %d\n", id, data)
                }
            }(i)
        }
    }
inhere commented 5 months ago

应该是加锁导致的. 下个版本 我默认设置为 false

你设置下

    o.EnableLock = false
    o.ConsumerNum = 10