asaskevich / EventBus

[Go] Lightweight eventbus with async compatibility for Go
MIT License
1.74k stars 220 forks source link

Indexing error due to multiple removeHandler #55

Open Jjjpan opened 2 years ago

Jjjpan commented 2 years ago

In line 141 func Publish

// Publish executes callback defined for a topic. Any additional argument will be transferred to the callback.
func (bus *EventBus) Publish(topic string, args ...interface{}) {
    bus.lock.Lock() // will unlock if handler is not found or always after setUpPublish
    defer bus.lock.Unlock()
    if handlers, ok := bus.handlers[topic]; ok && 0 < len(handlers) {
        // Handlers slice may be changed by removeHandler and Unsubscribe during iteration,
        // so make a copy and iterate the copied slice.
        copyHandlers := make([]*eventHandler, len(handlers))
        copy(copyHandlers, handlers)
        for i, handler := range copyHandlers {
            if handler.flagOnce {
                bus.removeHandler(topic, i) // multiple operation causes indexing error
            }
            if !handler.async {
                bus.doPublish(handler, topic, args...)
            } else {
                bus.wg.Add(1)
                if handler.transactional {
                    bus.lock.Unlock()
                    handler.Lock()
                    bus.lock.Lock()
                }
                go bus.doPublishAsync(handler, topic, args...)
            }
        }
    }
}

i and handler are ranged in copyHandlers while remove operation actions in bus.handlers. 🤨

Jjjpan commented 2 years ago

In a test case of many subscribe and many subscribeOnce, bus.removeHandler tries to remove a handler with index out of range.

Add out-of-range outputs in event_bus.go:

func (bus *EventBus) removeHandler(topic string, idx int) {
    if _, ok := bus.handlers[topic]; !ok {
        return
    }
    l := len(bus.handlers[topic])

    if !(0 <= idx && idx < l) {
                fmt.Println("out of range")
        return
    }

    copy(bus.handlers[topic][idx:], bus.handlers[topic][idx+1:])
    bus.handlers[topic][l-1] = nil // or the zero value of T
    bus.handlers[topic] = bus.handlers[topic][:l-1]
}

Test case in event_bus_test.go:

func TestManySubscribeAndManySubscribe(t *testing.T) {
    bus := New()
    event := "topic"
    flag := 0
    fn := func() { flag += 1 }
    bus.SubscribeOnce(event, fn)
    bus.SubscribeOnce(event, fn)
    bus.Subscribe(event, fn)
    bus.Subscribe(event, fn)
    bus.SubscribeOnce(event, fn)
    bus.SubscribeOnce(event, fn)
    bus.Publish(event)

    if flag != 6 {
        t.Fail()
    }
}

Test result:

=== RUN   TestManySubscribeAndManySubscribe
remove out of range
remove out of range
--- PASS: TestManySubscribeAndManySubscribe (0.00s)
PASS
ok      eventbus/v1     (cached)
Jjjpan commented 2 years ago

Would it be better to use offset to record the offset of index between handlers and copyHandlers.🙄

Maybe like this:


func (bus *EventBus) Publish(topic string, args ...interface{}) {
    bus.lock.Lock()
    defer bus.lock.Unlock()

    if topicHandlers, ok := bus.handlers[topic]; ok && len(topicHandlers) > 0 {
        copiedHandlers := make([]*eventHandler, len(topicHandlers))
        copy(copiedHandlers, topicHandlers)
        offset := 0
        for index, handler := range copiedHandlers {
            if handler.flagOnce {
                bus.removeHandler(topic, index+offset)
                offset--
            }
            if !handler.async {
                bus.doPublish(handler, topic, args...)
            } else {
                bus.wg.Add(1)
                if handler.transactional {
                    bus.lock.Unlock()
                    handler.lock.Lock()
                    bus.lock.Lock()
                }
                go bus.doPublishAsync(handler, topic, args...)
            }
        }
    }
}
varfrog commented 2 years ago

This project seems to not be maintained any more. I'm just looking around the issues here. Maybe there's a maintained fork...