SahilKang / cl-rdkafka

Common Lisp library for Kafka
Other
42 stars 7 forks source link

kf:seek signals rdkafka-error: Local: Erroneous state #67

Closed Uthar closed 1 year ago

Uthar commented 1 year ago

Maybe I don't know how to use it, but this fails for me:

(let ((config ("bootstrap.servers" "localhost:1234" "group.id" "test-lisp-consumer" "enable.auto.commit" "false"))
      (consumer (make-instance 'kf:consumer
                               :conf config
                               :serde #'babel:octets-to-string)))
  (kf:assign consumer (list (cons "my-topic" 0)))
  (kf:seek consumer "my-topic" 0 10 2000)
  consumer)

While doing this as separate steps in repl works, weirdly:

(setf consumer (make-instance 'kf:consumer
                               :conf config
                               :serde #'babel:octets-to-string))
(kf:assign consumer (list (cons "my-topic" 0)))
(kf:seek consumer "my-topic" 0 10 2000)
(kf:poll consumer 2000)
Uthar commented 1 year ago

I can put in (sleep 5) around the assign and seek calls, and then it works, but it feels hacky. Is there a way to explicitly sync?

Uthar commented 1 year ago

Also, why does (sleep 5) work, but (sleep 4), (sleep 3) does not?

Uthar commented 1 year ago

Seems related to this: https://github.com/edenhill/librdkafka/wiki/Manually-setting-the-consumer-start-offset

Currently this API doesn't allow to set the offset in seek, right?

For me, this works:

--- src/high-level/consumer.lisp
+++ src/high-level/consumer.lisp
@@ -400,7 +400,7 @@
   (with-slots (rd-kafka-consumer) consumer
     (with-toppar-list
         toppar-list
-        (alloc-toppar-list partitions :topic #'car :partition #'cdr)
+        (alloc-toppar-list partitions :topic #'first :partition #'second :offset #'third)
       (let ((err (cl-rdkafka/ll:rd-kafka-assign rd-kafka-consumer toppar-list)))
         (unless (eq err 'cl-rdkafka/ll:rd-kafka-resp-err-no-error)
           (error (make-rdkafka-error err)))))))

Diff finished.  Wed Oct 26 13:31:12 2022
SahilKang commented 1 year ago

Thanks for pointing this out, I've released v1.2.0 which adds offset support to the kf:assign and kf:assignment methods