The problem is that when the out-sink got closed, it might have been in the middle of (ms/put-all! out-sink events) meaning some of the polled events were not added to out-sink and therefore not processed.
(fn [events] ; function to handle events and resume
(d/chain (d/future (ms/put-all! out-sink events)) ;; not all events put on time
#(when % (cmd-self [::resume duration]))))
This means that even though part of the events were processed, none of them were actually committed to kafka.
In other words, this way of closing the consumer assumed that the consumer logic is idempotent which might not always be the case.
Suggestion
Find a way to be sure all events were put into out-sink and therefore committed before closing the consumer for good.
Problem
When
out-sink
is closed via:the consumer is closed right away.
The problem is that when the
out-sink
got closed, it might have been in the middle of(ms/put-all! out-sink events)
meaning some of the polled events were not added to out-sink and therefore not processed.This means that even though part of the
events
were processed, none of them were actually committed to kafka.In other words, this way of closing the consumer assumed that the consumer logic is idempotent which might not always be the case.
Suggestion
Find a way to be sure all events were put into
out-sink
and therefore committed before closing the consumer for good.