CustomCacheInformer uses an operator.informerProcessor to distribute events to all listeners (ResourceWatchers added by users). When the reflector submits a list/watch event, it goes through the On method in the Informer, then hits informerProcessor.distribute, which writes the event to every listener to process in its own queue. informerProcessor.distribute calls informerProcessorListener.push on each listener, which pushes the event to a channel which should be constantly being pulled from by informerProcessorListener.pop, which is started by informerProcessorListener.run (which distributes the events popped off the channel by pop to the ResourceWatcher for the listener), which, for each listener is started by informerProcessor.run. However, if distribute gets called before pop is running, the channel in a listener can fill up and distribute can block. This actually prevents informerProcessor.run from starting the listeners, because it needs to lock on informerProcessor.listenersMux, and distribute already has the lock (and is stuck).
In somewhat infrequent race conditions, the CustomCacheInformer.controller may begin attempting to distribute events before CustomCacheInformer.processor has begun running. This is due to the startup of CustomCacheInformer is done in a goroutine, so there are no sequential guarantees. This causes intermittent test failures, and could cause an operator to fail to function properly (simply locking up and doing nothing without any information provided to the user) in production (though a restart would likely fix it).
To fix this, informerProcessor now has an optional field startedCh, which, if non-nil, it will send an empty struct to when run has completed its startup process (all listeners are started), which can ensure that the controller run call is made after the processor has started.
As an additional safety measure, informerProcessor.distribute will also drop any event it receives prior to informerProcessor.started being true, to ensure that it cannot get stuck with the listenersMux locked (it will warn to the default logger if this happens).
CustomCacheInformer
uses anoperator.informerProcessor
to distribute events to all listeners (ResourceWatchers added by users). When the reflector submits a list/watch event, it goes through the OninformerProcessor.distribute
, which writes the event to every listener to process in its own queue.informerProcessor.distribute
callsinformerProcessorListener.push
on each listener, which pushes the event to a channel which should be constantly being pulled from byinformerProcessorListener.pop
, which is started byinformerProcessorListener.run
(which distributes the events popped off the channel by pop to the ResourceWatcher for the listener), which, for each listener is started byinformerProcessor.run
. However, ifdistribute
gets called beforepop
is running, the channel in a listener can fill up anddistribute
can block. This actually preventsinformerProcessor.run
from starting the listeners, because it needs to lock oninformerProcessor.listenersMux
, anddistribute
already has the lock (and is stuck).In somewhat infrequent race conditions, the
CustomCacheInformer.controller
may begin attempting to distribute events beforeCustomCacheInformer.processor
has begun running. This is due to the startup ofCustomCacheInformer
is done in a goroutine, so there are no sequential guarantees. This causes intermittent test failures, and could cause an operator to fail to function properly (simply locking up and doing nothing without any information provided to the user) in production (though a restart would likely fix it).To fix this,
informerProcessor
now has an optional fieldstartedCh
, which, if non-nil, it will send an empty struct to whenrun
has completed its startup process (all listeners are started), which can ensure that the controller run call is made after the processor has started.As an additional safety measure,
informerProcessor.distribute
will also drop any event it receives prior toinformerProcessor.started
being true, to ensure that it cannot get stuck with thelistenersMux
locked (it will warn to the default logger if this happens).