SahilKang / cl-rdkafka

Common Lisp library for Kafka
Other
42 stars 7 forks source link

Producer messages not seen by consumer #17

Closed patrickmay closed 5 years ago

patrickmay commented 5 years ago

I'm testing the latest release with these two functions:

(defun test-consume (topic-name)
  (let* ((string-serde (lambda (x)
                         (kf:bytes->object x 'string)))
         (conf (kf:conf
            "bootstrap.servers" "127.0.0.1:9092"
            "group.id" (write-to-string (get-universal-time))
            "enable.auto.commit" "false"
            "auto.offset.reset" "earliest"
            "offset.store.method" "broker"
            "enable.partition.eof"  "false"))
         (consumer (make-instance 'kf:consumer
                      :conf conf
                                  :key-serde string-serde
                      :value-serde string-serde))
         (topics (list topic-name)))
    (kf:subscribe consumer topics)
    (format t "Subscribed:  ~S, ~S.~%"
            (gethash "bootstrap.servers" conf)
            (kf:subscription consumer))
    (let ((message (kf:poll consumer 10000)))
      (format t "Message received or poll expired.~%")
      (when message
        (format t "Received message:  (~S) ~S~%"
                (kf:key message) (kf:value message))
        (kf:commit consumer)))
    (format t "Unsubscribing.~%")
    (kf:unsubscribe consumer)))

and

(defun test-produce (topic-name key message)
  (let ((producer (make-instance 'kf:producer
                                 :conf (kf:conf
                                        "bootstrap.servers" "127.0.0.1:9092")
                                 :key-serde #'kf:object->bytes
                                 :value-serde #'kf:object->bytes)))
    (kf:produce producer topic-name message :key key)
    (kf:flush producer 2000)))

When I monitor the topic with kafka-console-consumer, I see the messages produced by test-produce. When I run test-consumer and send messages with kafka-console-producer, it gets the messages. When I send messages with test-producer, test-consumer does not see them. Please tell me I'm making a foolish mistake.

SahilKang commented 5 years ago

:thinking: The code you posted worked fine for me:

(test-produce "test-topic-1" "key-1" "value-1")

(test-consume "test-topic-1")
;; the call to test-consume printed:
;; Subscribed:  "127.0.0.1:9092", #("test-topic-1").
;; Message received or poll expired.
;; Received message:  ("key-1") "value-1"
;; Unsubscribing.

If you're still not having any luck, can you share your cl-rdkafka and librdkafka versions? Here's what I'm running:

(cl-rdkafka/ll:rd-kafka-version-str) ; => "0.9.3"
(asdf:component-version (asdf:find-system 'cl-rdkafka)) ; => "0.2.2"
patrickmay commented 5 years ago

The issue appears to be related to Emacs and Slime. When I run the consumer test in a standalone SBCL image, it works fine. Inside of Slime, something gets confused (possibly me).

Thanks for checking.