robertluo / waterfall

The Unlicense
16 stars 1 forks source link

After calling the `close`, if a new message arrives, the `close` call will be blocked forever #21

Closed JackSho closed 1 year ago

JackSho commented 1 year ago

Reproduce steps

  1. start a consumer, and produce some messages to sentence, these messages are printed, nice.
    (def clu
    (kafka-cluster 
     {::nodes "localhost:9092"
      ::shapes [(shape/edn) (shape/value-only)]
      ::consumer-config {:position :beginning}
      ::group-id "tester1"
      ::topics ["sentence"]
      ::source-xform (map identity)}))
    (def consume (::consume clu))
    (consume println) 
  2. Close the consumer and immediately generate a message in the same way.
    (.close clu)
  3. The close call blocks forever.