taoensso / carmine

Redis client + message queue for Clojure
https://www.taoensso.com/carmine
Eclipse Public License 1.0
1.15k stars 130 forks source link

Pub/Sub posts to wrong handler fn #307

Closed RokLenarcic closed 1 month ago

RokLenarcic commented 1 month ago

I've tried to run the code from the wiki:

(def listener
  (car/with-new-pubsub-listener {}
                                {"foobar" (fn f1 [msg] (println "Channel match: " msg))
                                 "foo*"   (fn f2 [msg] (println "Pattern match: " msg))}
                                (car/subscribe "foobar" "foobaz")
                                (car/psubscribe "foo*")))
=> #'user/listener
Channel match:  [subscribe foobar 1]
Pattern match:  [psubscribe foo* 3]

Then I post a message:

(car/wcar {}
  (car/publish "foobar" "Heelloo"))
Channel match:  [message foobar Heelloo]
Channel match:  [pmessage foo* foobar Heelloo]
=> 2

Note that f1 is called twice and f2 is not called at all. This is in direct contradiction of the text in wiki:

Which will trigger:

(f1 '("message" "foobar" "Hello to foobar!"))
;; AND ALSO
(f2 '("pmessage" "foo*" "foobar" "Hello to foobar!"))

It would make sense for f2 to trigger here.

ptaoussanis commented 1 month ago

@RokLenarcic Hi Rok, thanks for pinging about this and for the clear example - that's really helpful 👍

This does seem to be a bug, apologies for the trouble! I'll try get a fix and new tests out in the next couple days - will ping back here when it's ready.

RokLenarcic commented 1 month ago

Ok so to clarify. What happens, after your fix, if I have multiple psubscribe handlers that match? Does only 1 get the message or do multiple get it?

ptaoussanis commented 1 month ago

What happens, after your fix, if I have multiple psubscribe handlers that match? Does only 1 get the message or do multiple get it?

I'm sorry, I'm not 100% clear on what you're asking exactly. Could you maybe give an example?

Currently each p/message will be given to at most one matching handler.

The below won't trigger f3 since there's no specific subscription for "foob*":

(let [f1_ (atom [])
      f2_ (atom [])
      f3_ (atom [])
      listener
      (car/with-new-pubsub-listener {}
        {"foobar" (fn f1 [msg] (swap! f1_ conj (into [:f1 msg])))
         "foo*"   (fn f2 [msg] (swap! f2_ conj (into [:f2 msg])))
         "foob*"  (fn f3 [msg] (swap! f3_ conj (into [:f3 msg])))}

        (car/subscribe  "foobar" "foobaz")
        (car/psubscribe "foo*"))]

  (wcar* (car/publish "foobar" "Message to `foobar` channel"))
  (car/close-listener listener)

  [@f1_
   @f2_
   @f3_])

But the below will trigger f3 because there is a separate subscription for "foob*":

(let [f1_ (atom [])
          f2_ (atom [])
          f3_ (atom [])
          listener
          (car/with-new-pubsub-listener {}
            {"foobar" (fn f1 [msg] (swap! f1_ conj (into [:f1 msg])))
             "foo*"   (fn f2 [msg] (swap! f2_ conj (into [:f2 msg])))
             "foob*"  (fn f3 [msg] (swap! f3_ conj (into [:f3 msg])))}

            (car/subscribe  "foobar" "foobaz")
            (car/psubscribe "foo*")
            (car/psubscribe "foob*"))]

      (wcar* (car/publish "foobar" "Message to `foobar` channel"))
      (car/close-listener listener)

      [@f1_
       @f2_
       @f3_])

The current implementation is very simple:

For one p/message from Redis we'll produce one {:keys [kind pattern channel msg]} and then dispatch to 0 or 1 handlers based on the p/message pattern or channel.

If you want different behaviour, could you please explain your use case a little?

RokLenarcic commented 1 month ago

Having 1 message trigger just one handler is fine, but then:

ptaoussanis commented 1 month ago

Carmine doesn't actually implement any kind of pattern matching itself, that's left up to Redis.

For pmessages, the message payload from Redis will include the relevant pattern - and Carmine will just select that exact pattern out of a hashmap.

Redis will deliver one message payload for each matching subscription. And Carmine will execute one matching handler for each message payload.

If you subscribed to pattern "foo*" then the "foo*" handler will be called. If you subscribed to pattern "foob*" then the "foob*" handler will be called. If you subscribed to both on the same connection, then both will be called ("foo*" handler for "foo*" subscription, and "foob*" handler for "foob*" subscription).

This might be easier to understand by experimenting with the test code:

(let [f1_ (atom [])
        f2_ (atom [])
        f3_ (atom [])
        listener
        (car/with-new-pubsub-listener conn-opts
          {"channel1" (fn f1 [msg] (swap! f1_ conj (into [:f1 msg])))
           "channel*" (fn f2 [msg] (swap! f2_ conj (into [:f2 msg])))
           "chan*"    (fn f3 [msg] (swap! f3_ conj (into [:f3 msg])))}
          (car/subscribe  "channel1" "channel2")
          (car/psubscribe "channel*" "chan*" "other*"))]

    (wcar* (car/publish "channel1" "Message to `channel1`"))
    (Thread/sleep 1000)
    (car/close-listener listener)

    [(is (= @f1_ [[:f1 [ "subscribe" "channel1" 1]] [:f1 [ "message"            "channel1" "Message to `channel1`"]]]))
     (is (= @f2_ [[:f2 ["psubscribe" "channel*" 3]] [:f2 ["pmessage" "channel*" "channel1" "Message to `channel1`"]]]))
     (is (= @f3_ [[:f3 ["psubscribe" "chan*"    4]] [:f3 ["pmessage" "chan*"    "channel1" "Message to `channel1`"]]]))])

I'll try add some more info about this in the docstring.