When Stop() or WireUp() occurs the current msg might not
be delivered before case <-m.stopCh is evaluated. In this case
we store the message in m.sortedIns[X].msg for possible delivery
on the next iteration after a reset has occurred.
It is possible that an unconfirmed message keep kafka-pixy from releasing a
partition during a re-balance? I think this situation also existed with
the previous version of this code, so... should be okay?
It's possible that KP will not shutdown the multiplexer until all outstanding
messages are acknowledged, so this might not be a problem at all?
Implementation Notes
When Stop() or WireUp() occurs the current
msg
might not be delivered beforecase <-m.stopCh
is evaluated. In this case we store the message inm.sortedIns[X].msg
for possible delivery on the next iteration after areset
has occurred.It is possible that an unconfirmed message keep kafka-pixy from releasing a partition during a re-balance? I think this situation also existed with the previous version of this code, so... should be okay? It's possible that KP will not shutdown the multiplexer until all outstanding messages are acknowledged, so this might not be a problem at all?