taoensso / carmine

Redis client + message queue for Clojure
https://www.taoensso.com/carmine
Eclipse Public License 1.0
1.15k stars 130 forks source link

Handlers for connection closed due to remote close (idle time exceeded or remote reset) #207

Closed aravindbaskaran closed 3 years ago

aravindbaskaran commented 6 years ago

Fixes for #15 indirectly, without updating the conn-alive? interface. (Perhaps work-around?)

Changes include

  1. Call a "carmine:error" message handler if available with the exception and throw it as normal. Can be used by consumers to create handlers for connection failures
  2. Added keep-alive calls for listeners to call ping, expecting this to be called to prevent idle timeouts from client with a property ping-ms in the conn spec

With the above changes, it is possible to handle errors from listeners as they come. Auto-resubscribe may not be the right thing for carmine library to handle, it should be consumers. Ideally raising alerts for devops to pick up on, rather than current silent failures.

Testing

Tested with redis 4.0.6

(require '[taoensso.carmine :as car])
(require '[taoensso.carmine.connections :as conns])
(def redis-conn-spec {:pool {:test-while-idle? true}
       :spec {:host "<<yourhost>>" :port 6379 :password "<<yourpassword>>"}})

(defn wrap-listener-fn [listener-fn]
  (fn [[t channel msg]]
    (println (System/currentTimeMillis) "channel matched - " t channel msg)
    (listener-fn msg)))

(defn err-handler [e]
  (println "Exception in listener" e)
  ;; TODO Handle rebinding based on type of e - eg: retry n attempts and become a fatal exception for an alert system to pick up
  ;; could be timeout, remote disconnect, remote reconnect
  )

(defn subscribe
  "Subscribes and returns a listener handle. To unsubscribe, keep a reference to the return val"
  [channel listener-fn]
  (let [channel-name (name channel)
    ;; spec can have socket read-timeout-ms as something greater than ping-ms to fail almost immediately when socket wait exceeds it
    listener (car/with-new-pubsub-listener (:spec redis-conn-spec)
      {
        channel-name (wrap-listener-fn listener-fn)
        "carmine:listener:fail" (wrap-listener-fn err-handler)}
     (car/subscribe channel-name))]
    listener))

(def a1 (subscribe :test1 println))
(def a2 (subscribe :test2 println))

Connection timeout after no updates on channel for n=10 minutes

channel matched -  carmine:error carmine:listener:fail #error {
 :cause Operation timed out
 :via
 [{:type java.net.SocketException
   :message Operation timed out
   :at [java.net.SocketInputStream socketRead0 SocketInputStream.java -2]}]
 :trace
 [[java.net.SocketInputStream socketRead0 SocketInputStream.java -2]
  [java.net.SocketInputStream socketRead SocketInputStream.java 116]
  [java.net.SocketInputStream read SocketInputStream.java 170]
  [java.net.SocketInputStream read SocketInputStream.java 141]
  [java.io.BufferedInputStream fill BufferedInputStream.java 246]
  [java.io.BufferedInputStream read BufferedInputStream.java 265]
  [java.io.DataInputStream readByte DataInputStream.java 265]
  [taoensso.carmine.protocol$get_unparsed_reply invokeStatic protocol.clj 122]
  [taoensso.carmine.protocol$get_unparsed_reply invoke protocol.clj 106]
  [user$subscribe$fn__7321 invoke form-init501756084029819874.clj 5]
  [clojure.lang.AFn applyToHelper AFn.java 152]
  [clojure.lang.AFn applyTo AFn.java 144]
  [clojure.core$apply invokeStatic core.clj 657]
  [clojure.core$with_bindings_STAR_ invokeStatic core.clj 1965]
  [clojure.core$with_bindings_STAR_ doInvoke core.clj 1965]
  [clojure.lang.RestFn invoke RestFn.java 425]
  [clojure.lang.AFn applyToHelper AFn.java 156]
  [clojure.lang.RestFn applyTo RestFn.java 132]
  [clojure.core$apply invokeStatic core.clj 661]
  [clojure.core$bound_fn_STAR_$fn__5471 doInvoke core.clj 1995]
  [clojure.lang.RestFn invoke RestFn.java 397]
  [clojure.core$binding_conveyor_fn$fn__5476 invoke core.clj 2022]
  [clojure.lang.AFn call AFn.java 18]
  [java.util.concurrent.FutureTask run FutureTask.java 266]
  [java.util.concurrent.ThreadPoolExecutor runWorker ThreadPoolExecutor.java 1142]
  [java.util.concurrent.ThreadPoolExecutor$Worker run ThreadPoolExecutor.java 617]
  [java.lang.Thread run Thread.java 745]]}

Connection reset by remote disconnect

channel matched -  carmine:error carmine:listener:fail #error {
 :cause nil
 :via
 [{:type java.io.EOFException
   :message nil
   :at [java.io.DataInputStream readByte DataInputStream.java 267]}]
 :trace
 [[java.io.DataInputStream readByte DataInputStream.java 267]
  [taoensso.carmine.protocol$get_unparsed_reply invokeStatic protocol.clj 122]
  [taoensso.carmine.protocol$get_unparsed_reply invoke protocol.clj 106]
  [user$subscribe$fn__7213 invoke form-init6010505428647955718.clj 5]
  [clojure.lang.AFn applyToHelper AFn.java 152]
  [clojure.lang.AFn applyTo AFn.java 144]
  [clojure.core$apply invokeStatic core.clj 657]
  [clojure.core$with_bindings_STAR_ invokeStatic core.clj 1965]
  [clojure.core$with_bindings_STAR_ doInvoke core.clj 1965]
  [clojure.lang.RestFn invoke RestFn.java 425]
  [clojure.lang.AFn applyToHelper AFn.java 156]
  [clojure.lang.RestFn applyTo RestFn.java 132]
  [clojure.core$apply invokeStatic core.clj 661]
  [clojure.core$bound_fn_STAR_$fn__5471 doInvoke core.clj 1995]
  [clojure.lang.RestFn invoke RestFn.java 397]
  [clojure.core$binding_conveyor_fn$fn__5476 invoke core.clj 2022]
  [clojure.lang.AFn call AFn.java 18]
  [java.util.concurrent.FutureTask run FutureTask.java 266]
  [java.util.concurrent.ThreadPoolExecutor runWorker ThreadPoolExecutor.java 1142]
  [java.util.concurrent.ThreadPoolExecutor$Worker run ThreadPoolExecutor.java 617]
  [java.lang.Thread run Thread.java 745]]}
ptaoussanis commented 3 years ago

@aravindbaskaran This is a nice PR Aravind, thank you! Will be including something similar in upcoming Carmine release.

Cheers!