The way we were using Promise.map(...) in BaseConsumer._fetch can
unnecessarily restrict throughput when handlers are relatively slow,
since we need to process all partitions before we can start fetching
more partition data.
The intent of this change is to schedule requests to fetch additional
data as soon as we can, using an async.queue to throttle processing
parallelism in place of Promise.map(...)'s concurrency option.
This change isn't quite so graceful as the code that came before it (JS isn't my strong suit!) but figured a proof of concept might be useful starting point. In particular I find all the stopping/tryStop noise a bit irritating & I'm sure there's a more graceful way to tackle it, but it's getting late. 😄 Happy to clean this up if you feel the change is worth the extra complexity -- and open to ideas for doing so.
The way we were using Promise.map(...) in BaseConsumer._fetch can unnecessarily restrict throughput when handlers are relatively slow, since we need to process all partitions before we can start fetching more partition data.
The intent of this change is to schedule requests to fetch additional data as soon as we can, using an async.queue to throttle processing parallelism in place of Promise.map(...)'s
concurrency
option.This change isn't quite so graceful as the code that came before it (JS isn't my strong suit!) but figured a proof of concept might be useful starting point. In particular I find all the
stopping
/tryStop
noise a bit irritating & I'm sure there's a more graceful way to tackle it, but it's getting late. 😄 Happy to clean this up if you feel the change is worth the extra complexity -- and open to ideas for doing so.