robertluo / waterfall

The Unlicense
16 stars 1 forks source link

Consumers with lag hang on consumer .close #28

Closed skydread1 closed 10 months ago

skydread1 commented 10 months ago

Problem

When we close a kafka cluster that has a consumer that has LAG, it hangs.

Following is a test that should passes but hangs instead:

(defexpect some-failing-test
  (let [consumer-clu (sut/kafka-cluster
                      {::sut/nodes "localhost:9092"
                       ::sut/shapes [(shape/topic (constantly "topic1")) (shape/edn) (shape/value-only)]
                       ::sut/consumer-config {:position :beginning}
                       ::sut/group-id "tester1"
                       ::sut/topics ["topic1"]})
        producer-clu (sut/kafka-cluster
                      {::sut/nodes "localhost:9092"
                       ::sut/shapes [(shape/topic (constantly "topic1")) (shape/edn) (shape/value-only)]})
        put-msg! (::sut/put! producer-clu)
        consume (::sut/consume consumer-clu)]
    (consume (fn [msg]
               (Thread/sleep 1000)
               (println msg)))
    (dotimes [_ 1000]
      (put-msg! "hello"))
    (.close producer-clu)
    (.close consumer-clu) ;; hangs here
    (expect "reached here")))

When the [::close] action is sent to the actor while the actor is polling, the putting-all function blocks:

(fn [events] ; function to handle events and resume
  (d/chain (ms/put-all! out-sink events) ;; out-sink has been closed
           #(when % (cmd-self [::resume duration]))))

Suggestion

It might be caused by the fact that (actor [::close])) in core/consumer is derefed:

(ms/on-closed out-sink (fn [] @(actor [::close]))) 

We can prevent the closing put to be be blocking by removing the deref.

(ms/on-closed out-sink (fn [] (actor [::close]))) ;; note that the `@` has been removed

It will pass the test but the messages that were being processed on close might be processed after the cluster has been close.

So in the test above, if consume is:

(consume (fn [msg]
           (println msg "1")
           (Thread/sleep 1000)))

Then the results of the tests can be:

clj꞉robertluo.waterfall-test꞉> 
; Running tests for the following namespaces:
;   robertluo.waterfall-test
;   robertluo.waterfall

hello 1
; 1 tests finished, all passing 👍, ns: 1, vars: 1
clj꞉robertluo.waterfall-test꞉> 
[::close]

We could (Thread/sleep poll-duration) after [::close] but it is not ideal:

(ms/on-closed out-sink (fn []
                         (actor [::close])
                         (Thread/sleep poll-duration)))
JackSho commented 10 months ago

Thanks to @skydread1 for spotting this issue, I think simply removing the @ in the ms/on-closed would introduce a new problem, the closing operation changes from sync to async.

For example, when there are no new messages in the topic, the consumer will block in a poll for 10 seconds (the default is 10 seconds). At this time, (.close consumer-clu) will return immediately, but the consumer will not be closed until the poll timeout.

If the consumer has not been closed when the program exits, this will be a new problem. New consumers of the same consumer group will not be able to consume messages immediately when they start. (e.g. program restart)

I have another suggestion: put (ms/put-all! out-sink events) into d/future in the putting-all function, so that not only can the consumer be closed normally, but also the (.close Consumer-clu) call can always be synchronized. The updated sample code is as follows:

(fn [events] ; function to handle events and resume
  (d/chain (d/future (ms/put-all! out-sink events)) ;; ms/put-all! within `d/future`
           #(when % (cmd-self [::resume duration]))))
skydread1 commented 10 months ago

Using the future and still blocking the (actor [::close]) put will make sure the consumer is closed on time.

However, the last consumed messages might be processed after the consumer has closed preventing the offset commit.

In my test example from above, it means, I can have:

clj꞉robertluo.waterfall-test꞉> 
; Running tests for the following namespaces:
;   robertluo.waterfall-test
;   robertluo.waterfall

hello 1
[::close]
; 1 tests finished, all passing 👍, ns: 1, vars: 1
clj꞉robertluo.waterfall-test꞉> 
hello 1

Note that the second message was processed after the consumer was closed and therefore was not committed.

flybot-nam-nguyenhoai commented 10 months ago

Can we use a promise, like this?

(defn- consumer-actor
  "..."
  [^Consumer consumer out-sink]
  (let [mailbox (ms/stream)
        ...
        closing-promise (promise) ; promise that gets delivered when consumer closes
        ...]
    (d/future
      (loop []
        (let [cmd @(ms/take! mailbox)]
          (if (= cmd [::close])
            (do (reset! closing? true) ; set closing flag
                (with-exception-handling "on closing consumer"
                  (.close consumer)) ; close consumer
                (deliver closing-promise true) ; attempts to dereference promise can now return
                ::stop) ; stop actor loop
            (do
              ...)))))
    {::actor cmd-self
     ::closing-promise closing-promise}))

(defn consumer
  [nodes group-id topics
   {:keys [poll-duration position]
    :as   conf
    :or   {poll-duration (Duration/ofSeconds 10)}}]
  (let [...
        {::keys [actor closing-promise]} (consumer-actor conr out-sink)]
    ...
    (ms/on-closed out-sink (fn []
                             (actor [::close]) ; avoid blocking putting close command into mailbox
                             @closing-promise)) ; wait for consumer to close
    ...))