robertluo / waterfall

The Unlicense
16 stars 1 forks source link

Consumer can not consume messages after being recreated(close first, then create again) #23

Closed JackSho closed 1 year ago

JackSho commented 1 year ago

Reproduce steps

  1. create a kafka-cluster, start to consume.
    (def clu
      (kafka-cluster
       {::nodes "localhost:9092"
        ::shapes [(shape/topic (constantly "sentence")) (shape/edn) (shape/value-only)]
        ::consumer-config {:position :beginning}
        ::group-id "tester1"
        ::topics ["sentence"]
        ::source-xform (map identity)}))
    (def consume (::consume clu))
    (consume println)
  2. produce some message to topic "sentence", these messages are printed immediately.
  3. close kafka-cluster by call (.close clu), then recreate kafka-cluster with the above code and start to consume again.
  4. produce some message to topic "sentence" again, but these messages are not printed.

version

short sha: 3437ab7