SahilKang / cl-rdkafka

Common Lisp library for Kafka
Other
43 stars 8 forks source link

If poll-loop exits with an error, lparallel replaces a thread but Kafka consumer stop working #71

Open svetlyak40wt opened 1 week ago

svetlyak40wt commented 1 week ago

I've added some tracing to the function:

(defun poll-loop ()
  (log:info "Entering Kafka poll loop")
  (unwind-protect
       (cffi:with-foreign-object (pollfd '(:struct pollfd))
         (setf (cffi:foreign-slot-value pollfd '(:struct pollfd) 'fd) +read-fd+
               (cffi:foreign-slot-value pollfd '(:struct pollfd) 'events) pollin)
         (loop
           for rd-kafka-queue = (read-rd-kafka-queue-from-fd pollfd)
           do (with-simple-restart (continue "Ignore error and continue polling")
                (bt:with-lock-held (+address->queue-lock+)
                  (process-events rd-kafka-queue)))))
    (log:info "Exiting Kafka poll loop")))

And if there is not broker available, an error is throwed from the the function and it dies:

MONITOR> (start)
(#<BT2:THREAD "Kafka Producer" {10060E7D93}>
 #<BT2:THREAD "Kafka Consumer" {10060B78F3}>)
 <INFO> [21:47:14] kf kernel.lisp (top level form poll-loop) -
  Entering Kafka poll loop
 <INFO> [21:47:19] kf kernel.lisp (top level form poll-loop cleanup-fun-3) -
  Exiting Kafka poll loop
WARNING: lparallel: Replacing lost or dead worker.

And attempts to restart consumer don't start a new poll loop:

MONITOR> (ql:where-is-system :cl-rdkafka)
#P"/home/art/cl-rdkafka/"
MONITOR> (start)
(#<BT2:THREAD "Kafka Producer" {1003B55D93}>
 #<BT2:THREAD "Kafka Consumer" {1003B51B13}>)

Probably poll loop should be made a more error proof and run forever?

SahilKang commented 4 days ago

thanks for #73, which adds a continue restart

I'll keep this issue open though because: