Open aryszka opened 7 years ago
this is a bug
func (cb *Breaker) sendEvent(event BreakerEvent) {
for _, receiver := range cb.eventReceivers {
receiver <- event
}
for _, listener := range cb.listeners {
le := ListenerEvent{CB: cb, Event: event}
select {
case listener <- le: //step 1 :this channel have not space to receive
default: // but between step 1 and 2,another coroutine have receive the event from listener and listener is empty ,so <listener block !
//<-listener //step: 2 so you want to release a event and send to it again
//listener <- le
}
}
}
i think if step 1 fail ,step 2 is unnecessary ,ignore this le ,this is not import; and event channel in your test should better have a bigger capacity,10 may be good,this guarantee the channel have cache to receive more elemet in highly Concurrent;you can test again use c := make(chan circuit.ListenerEvent, 10);it will not block anymore(In theory, this still has a certain probability happen); But it would be best to annotation the default select operation ; this can fix the bug,
func (cb *Breaker) Subscribe() <-chan BreakerEvent {
eventReader := make(chan BreakerEvent)
output := make(chan BreakerEvent, 100)
go func() {
for v := range eventReader {
select {
case output <- v:
default:
<-output
output <- v
}
}
}()
cb.eventReceivers = append(cb.eventReceivers, eventReader)
return output
}
this also have this bug
When adding a listener (buffered) channel with AddListener(), and consuming the events, the breaker can block forever. The test code in the linked gist can reproduce the issue with a reasonable chance:
https://gist.github.com/aryszka/115fea73da80422a6d46ff058c8dcb0b