taoensso / carmine

Redis client + message queue for Clojure
https://www.taoensso.com/carmine
Eclipse Public License 1.0
1.15k stars 130 forks source link

Connection loss/failure detection on MQ #305

Closed J0sueTM closed 4 weeks ago

J0sueTM commented 1 month ago

cc: @matheusfrancisco, @avelino

Hi Peter, hope you doing well!

We'd (Moclojer) been using you library for a couple of months, and we ran across a reocurring issue with your pubsub module, that consequently made us switch to Jedis.

This is what our pubsub (workers) implementation looked like before (stripped most of it, but kept important stuff):

(defn create-queue-handler-fn [handler components sentry]
  (fn [{:keys [message attempt]}]
    (try
      (logs/log :info "received a message"
                :ctx {:attempt attempt})
      (handler message components)
      {:status :success}
      (catch Throwable e
        (logs/log :error "failed to handle message"
                  :ctx {:ex-message (.getMessage e)})
        (sentry/send-event! sentry e)
        {:status :error
         :throwable e}))))

(let [pool (car/connection-pool {:test-on-borrow? true
                                 :test-on-return? true
                                 :test-while-idle? true})
      conn {:pool pool
            :spec {:uri uri}}
      ws (doall
          (for [{:keys [queue-name handler]} workers]
            (do
              (logs/log :info "starting redis queue"
                        :ctx {:queue-name queue-name})
              (mq/worker
               conn queue-name {:handler (create-queue-handler-fn
                                          handler components sentry)}))))]
  ...)

As the title implies, we had problems with detecting and handling connection failures on our deployments. At first we thought the problem was with our Redis instance, but after a long battery of tests, we ran out of options and had to work around and switch Carmine for another lib, in order to test if it was the cause of the problem.

Turns out the problem was really with our Redis instance, but not completely. See, we run a single Redis instance (managed by DigitalOcean, if that matters), and it for some (still unknown) reason, kept killing connected clients. We are not sure (as I said) of what is causing this problem, but since switching to Jedis, we could finally reproduce and catch the error locally:

Exception in thread "async-thread-macro-1" redis.clients.jedis.exceptions.JedisConnectionException: Unexpected end of stream.
  at redis.clients.jedis.util.RedisInputStream.ensureFill(RedisInputStream.java:248)
  at redis.clients.jedis.util.RedisInputStream.readByte(RedisInputStream.java:47)
  at redis.clients.jedis.Protocol.process(Protocol.java:136)
  at redis.clients.jedis.Protocol.read(Protocol.java:222)
  at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:350)
  at redis.clients.jedis.Connection.getUnflushedObject(Connection.java:316)
  at redis.clients.jedis.JedisPubSubBase.process(JedisPubSubBase.java:115)
  at redis.clients.jedis.JedisPubSubBase.proceed(JedisPubSubBase.java:92)
  at redis.clients.jedis.UnifiedJedis.subscribe(UnifiedJedis.java:3603)
  at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
  at java.base/java.lang.reflect.Method.invoke(Method.java:580)
  at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:167)

You can replicate the same by simply running a worker and sending client kill <client> on your redis-cli

This doesn't seem to be a problem with Redis itself, but probably our managed instance on DO. Anyways, we circumvented this issues by simply applying a retry mechanism that looks somewhat like this:

(defn foobar-subscribe [conn pubsub queues]
  (try
    (.subscribe conn pubsub queues)
    (catch JedisConnectionException _
      (foobar-subscribe conn pubsub queues))))

Take this example with a grain of salt, please 😄

Atlast, what we really wanted to tell you with this report, is to ask:

  1. Do you have something similar in you implementation?
  2. If yes, why was it so hard for us to find it?
  3. If not, why isn't it? Do you have anything in mind?
  4. Are we doing anything wrong? i.e. using your library incorrectly?

Anyhow, hope this report helps you in some way. Thanks for the library, have a great day!

ptaoussanis commented 1 month ago

@J0sueTM Hi Josué!

There's a few things that are a bit unclear to me.

The issue title and Jedis example seem to refer to the Redis Pub/Sub mechanism, but your Carmine code seems to refer to Carmine's message queue (which is unrelated to the Redis Pub/Sub mechanism).

Are the two somehow linked in your case? If so, how?

From what I do understand, it sounds like your Redis connection/s are being dropped - and that's somehow causing problems.

To clarify:

it for some (still unknown) reason, kept killing connected clients.

To confirm: do only long-held connections seem to be affected?

If so, have you confirmed that neither side of the connection has any sort of connection/read timeout? I.e. you'd need to rule out:

  1. Carmine's connection config
  2. Redis server config
  3. DigitalOcean config or limitations

Re: your specific questions - I can better answer once I know whether we're talking about Carmine's message queue, or Pub/Sub.

But basically:

I'll add that it'd be helpful to know what you're actually ultimately trying to achieve since Carmine/Redis offers several different tools with different trade-offs - e.g. Carmine's MQ, Redis Pub/Sub, Redis streams, etc. As a first step, you may want to double check that you're reaching for the right (appropriate) mechanism/s for your needs.

J0sueTM commented 1 month ago

Thanks for clarifying what we missed on the report.

We were using carmine's message queue and Jedis' pubsub for the same end goal: event based messaging. I'm not sure about implementation differences, since @matheusfrancisco was the one who talked to you on slack and was reading Carmine's source code, but at the end of the day, we were only sending messages from a service (publish message on Carmine, lpush on Jedis), which should be caught by a worker (Carmine mq worker or Jedis PubSub Subscriber).

are your pub/sub connections being dropped?

Yes. We did dig a bit after sending you this issue, and found out other people having similar problems with redis long-held connections and/or other docs.

https://www.digitalocean.com/community/cheatsheets/how-to-manage-redis-databases-and-keys https://www.digitalocean.com/community/questions/idle-redis-connection-reconnects-every-five-minutes

Turns out DigitalOcean's managed Redis kills connections after a while, and that's why when testing locally, we could reproduce the exact same error we were getting on production. As per my first message, Jedis was the only one who failed and/or gave us any response, while Carmine stops receiving the messages (obviously), but doesn't alarm us or retries to connect.

My assumption is that maybe Carmine expects kills as normal? I don't know. I'll be investigating a little further. However, if you want to replicate, as I said before, just open a carmine mq worker and kill its client on redis-cli afterwards.

are your carmine message queue connections being dropped?

Yes, same problem as with pubsub. Both fail, but Carmine doesn't seem to reconnect and/or raise an error.

do only long-held connections seem to be affected?

Yes. Not only long-held connections, but only subscribers/receivers. Client connections that only do publishing (or lpush incase of Jedis pubsub) have never failed us.

Connection read/timeout possibilities

As I said before, the problem is most certainly with DO's managed Redis. We speculate that it kills idle connections (for some reason or another), so that's probably the cause.

Why are we using MQ on carmine and PubSub on Jedis

We are currently transitioning from mq to pubsub in order to accomodate multiple subscribers to a unique message, so Jedis' pubsub just felt handy.


I don't think the problem is with our end goal, to be honest, since the problem wasn't in our logic or anything else, but please don't hesitate on telling us the contrary if you feel so. Carmine isn't the creator of the problem, we just couldn't manage to get it working on an unstable environment.

Cheers!

ptaoussanis commented 1 month ago

We were using carmine's message queue and Jedis' pubsub for the same end goal: event based messaging.

Okay, but as I mentioned - please note that these are completely unrelated.

There's at least three things we're discussing here:

  1. Redis's pub/sub mechanism
  2. Carmine's pub/sub API
  3. Carmine's message queue API

1 and 2 are related (Carmine's pub/sub API uses Redis's pub/sub mechanism). 3 has absolutely nothing to do with 1 or 2.

In particular, it's important to note that message queues and pub/sub mechanisms offer completely different semantics and guarantees. They're rarely sensibly interchangeable.

As per my first message, Jedis was the only one who failed and/or gave us any response, while Carmine stops receiving the messages (obviously), but doesn't alarm us or retries to connect.

Again, it's important to be clear here about what we're talking about:

If you're talking about Carmine's pub/sub disconnections causing problems - then that's expected. If you specifically want pub/sub semantics but you also want graceful disconnection handling - then this is something you'd unfortunately need to implement yourself. (It'd be nice for Carmine to offer this out-the-box, but it doesn't currently).

If you're talking about Carmine's message queue disconnections causing problems - then that's unexpected and something that should be debugged. I've made numerous suggestions on Slack about how to debug this, and what information I'd need to further advise. Until I have the requested info, it's difficult for me to suggest what might be going wrong.

But with appropriate conn+pool config, Carmine's message queue should be resistant to disconnections due to any cause (including DigitalOcean killing connections). If this isn't functioning as expected, it could be a config/usage issue on your side, or a bug on Carmine's side. I can't advise further without the info I've requested via Slack.

We are currently transitioning from mq to pubsub in order to accomodate multiple subscribers to a unique message, so Jedis' pubsub just felt handy.

Do please note that pub/sub and message queues have completely different semantics. Pub/sub messages are entirely ephemeral for one. If you can describe the specific properties you need, I can recommend an appropriate mechanism. It sounds Redis streams might be a reasonable fit in your case if you want both multiple subscribers and better persistence/reliability than pub/sub offers, but it really depends on what precisely you need/want.

J0sueTM commented 1 month ago

Yes, I know those are unrelated. Whenever I talk about PubSub, I'm talking about our current implementation, which isn't the focus. The only reason I brough it up for discussion is to let you know what we did to circumvent our problem

Specifically about Carmine's message queue (our previous implementation), we had problems with the expected behaviour on reocurring disconnections (i.e. redis killing clients and carmine not reconnecting and just stalling). Our config is the same as the one on the first message:

(defn create-queue-handler-fn [handler components sentry]
  (fn [{:keys [message attempt]}]
    (try
      (logs/log :info "received a message"
                :ctx {:attempt attempt})
      (handler message components)
      {:status :success}
      (catch Throwable e
        (logs/log :error "failed to handle message"
                  :ctx {:ex-message (.getMessage e)})
        (sentry/send-event! sentry e)
        {:status :error
         :throwable e}))))

(let [pool (car/connection-pool {:test-on-borrow? true
                                 :test-on-return? true
                                 :test-while-idle? true})
      conn {:pool pool
            :spec {:uri uri}}
      ws (doall
          (for [{:keys [queue-name handler]} workers]
            (do
              (logs/log :info "starting redis queue"
                        :ctx {:queue-name queue-name})
              (mq/worker
               conn queue-name {:handler (create-queue-handler-fn
                                          handler components sentry)}))))]
  ...)

there's really nothing else to it. If you see any problem with it, please let us know.

I'm sorry for not being able to provide any additonal helpful traces or logs. All we know is that when killing a connected mq worker, Carmine just stalls.

Anyways, @matheusfrancisco will be answering you on slack as soon as possible.

ptaoussanis commented 1 month ago

Yes, I know those are unrelated. Whenever I talk about PubSub, I'm talking about our current implementation, which isn't the focus.

I'm sorry, but the terminology actually matters here - which is why I keep asking that we please try be clear on this.

Your issue is titled "Connection loss/failure detection on PubSubs" and your main example (a message queue handler) was described as a "pubsub implementation".

Expected behaviour is different between pub/sub and message queue.

Let me summarize my current understanding:

  1. You were using Carmine's message queue, you've never used Carmine's pub/sub.
  2. For some reason (possibly DigitalOcean's fault), message queue worker connections were being broken.
  3. Once a message queue worker connection is broken, that worker fails to recover.

Is that accurate?

there's really nothing else to it. If you see any problem with it, please let us know.

Your pool connection settings look okay, but you seem to be manually creating multiple workers here:

(doall
  (for [{:keys [queue-name handler]} workers]
    (do
      (logs/log :info "starting redis queue" :ctx {:queue-name queue-name})
        (mq/worker conn queue-name
          {:handler (create-queue-handler-fn handler components sentry)}))))

That's a little unusual, normally you'd create a single worker and specify the number of threads you want.

At the very least multiple workers will make it more difficult to get diagnostic info since your queue stats will be distributed across multiple workers.

This could also create other issues, I'm not sure off-hand.

All we know is that when killing a connected mq worker, Carmine just stalls.

Could you please clarify what you mean by "kill a connected mq worker" and "just stalls"?

As I've mentioned and in DMs to @matheusfrancisco - if it appears that your queue is no longer processing items, you need to confirm:

  1. Is your worker loop still running or not? This can be checked with the :monitor option.

  2. Is your handler still running for each enqueued item or not? This can be checked by derefing your worker. That'll return something like this:

  status: {:nqueued 0, :nlocked 0, :nbackoff 0, :ntotal 0}
  queue-size: {:last 0, :p90 0, :max 0, :n-entries "1.2m"}
  queueing-time: {:p90 "251 msecs", :max "342 msecs", :n-entries "7"}
  handling-time: {:p90 "2.4 secs", :max "2.5 secs", :n-entries "10"}
  counts: {:sleep/end-of-circle "1.2m", :handler/success "7", :handler/retry "3", :handler/backoff "3"}

Deref several times to see which numbers (if any) are changing. This output includes info specifically intended to help debug unexpected queue issues.

With that info in-hand, we'll both be in a better position to try understand what might be happening.

J0sueTM commented 1 month ago

I'm sorry. I see now the misunderstanding. The title is really misleading with the actual topic.

  1. You were using Carmine's message queue, you've never used Carmine's pub/sub.
  2. For some reason (possibly DigitalOcean's fault), message queue worker connections were being broken.
  3. Once a message queue worker connection is broken, that worker fails to recover.

Is that accurate?

Yes.

Your pool connection settings look okay, but you seem to be manually creating multiple workers here:

Hmm. Thanks for pointing out.

Could you please clarify what you mean by "kill a connected mq worker" and "just stalls"?

The literal client connection is killed. The same effect as if you, in the redis-cli, ran client kill xxx, where x is the identifier for the running carmine mq worker.

ptaoussanis commented 1 month ago

I'm sorry. I see now the misunderstanding. The title is really misleading with the actual topic.

No worries, I'm happy that we have clarity now 👍

Is that accurate? Yes.

Okay, then my prior advice still holds to check your worker monitor and deref stats.

Comparing @my-worker at time=x and time=y will show you what the worker's activities have been in the last y-x period.

Before doing that, I would recommend switching to a single worker with as many threads as you need:

:nthreads-worker  - Number of threads to monitor and maintain queue.
:nthreads-handler - Number of threads to handle queue messages with handler fn.

That'll make the @my-worker numbers more meaningful, and reduce the chances of multiple workers interfering with each other.

Could you please clarify what you mean by "kill a connected mq worker" and "just stalls"? The literal client connection is killed. The same effect as if you, in the redis-cli, ran client kill xxx, where x is the identifier for the running carmine mq worker.

The terminology here matters, since if you "kill a worker" that generally means instructing the logical worker to stop. I.e. you expect it to no longer process any more messages.

That's different to your case:

  1. You're not killing the worker
  2. But the worker's connection is being killed

Again the distinction matters since:

J0sueTM commented 1 month ago

That's right. With the terminology out of the way, remains on us to gather info about the problem using the suggestions you laid out. I'm currently busy, so I can only guarantee a response this weekend, if that's ok for you. Will get in touch soon!

J0sueTM commented 1 month ago

Hello @ptaoussanis .

Here's a proper report, as promissed.

This is the basic clj source I used to reproduce the problem:

(ns core
  (:require
   [clojure.pprint :refer [pprint]]
   [taoensso.carmine :as car]
   [taoensso.carmine.message-queue :as mq]))

(defonce conn-opts
  {:pool (car/connection-pool {:test-on-borrow? true
                               :test-on-return? true
                               :test-while-idle? true})
   :spec {:uri "redis://localhost:6379"}})

(defn worker []
  (mq/worker conn-opts
             "my.test"
             {:handler (fn [{:keys [message]}]
                         (prn :message message)
                         {:status :success})
              :monitor (fn [msg]
                         (prn :monitor-msg msg))}))

(comment
  (def w (worker))

  (w :start)
  (w :stop)

  (pprint @w)

  (car/wcar conn-opts (mq/enqueue "my.test" {:hello true}))
  ;;
  )

As you can see, I'm using the MQ, and this implementation is faithful to the prior one I showed you on my first message on this issue. The following text is specifically about Carmine's Message Queue feature. We might have had misidentified nomenclatures like pubsub/mq, but at the end of the day, we were using the mq feature, and nothing else.

Logs

Command accompanied by its resulting logs.

(w :start)

2024-05-29T17:23:51.023Z archlinux INFO [taoensso.carmine.message-queue:679] - [Carmine/mq] Queue worker starting {:qname "my.test"}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "22", :poll-reply ["sleep" "end-of-circle" "a" 10332], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "23", :poll-reply ["sleep" "end-of-circle" "a" 12053], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}

(pprint @w)

{:qname "my.test",
 :running? true,
 :nthreads {:worker 1, :handler 1},
 :conn-opts
 {:pool
  {:pool
   #object[org.apache.commons.pool2.impl.GenericKeyedObjectPool 0xebb30a "GenericKeyedObjectPool [maxTotal=-1, blockWhenExhausted=true, maxWaitDuration=PT-0.001S, lifo=true, fairness=false, testOnCreate=false, testOnBorrow=true, testOnReturn=true, testWhileIdle=true, durationBetweenEvictionRuns=PT30S, numTestsPerEvictionRun=-1, minEvictableIdleTimeDuration=PT1M, softMinEvictableIdleTimeDuration=PT-0.001S, evictionPolicy=org.apache.commons.pool2.impl.DefaultEvictionPolicy@7541e7aa, closeLock=java.lang.Object@358a0eca, closed=false, evictionLock=java.lang.Object@3481f6c8, evictor=org.apache.commons.pool2.impl.BaseGenericObjectPool$Evictor [scheduledFuture=java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@631e3480[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@7c014259[Wrapped task = org.apache.commons.pool2.impl.EvictionTimer$WeakRunner@68786c4]]], evictionIterator=org.apache.commons.pool2.impl.BaseGenericObjectPool$EvictionIterator@5812a99d, factoryClassLoader=java.lang.ref.WeakReference@2e772b4c, oname=org.apache.commons.pool2:type=GenericKeyedObjectPool,name=pool2, creationStackTrace=java.lang.Exception\n\tat org.apache.commons.pool2.impl.BaseGenericObjectPool.<init>(BaseGenericObjectPool.java:420)\n\tat org.apache.commons.pool2.impl.GenericKeyedObjectPool.<init>(GenericKeyedObjectPool.java:264)\n\tat org.apache.commons.pool2.impl.GenericKeyedObjectPool.<init>(GenericKeyedObjectPool.java:248)\n\tat taoensso.carmine.connections$fn__15599.invokeStatic(connections.clj:228)\n\tat taoensso.carmine.connections$fn__15599.invoke(connections.clj:205)\n\tat clojure.lang.AFn.applyToHelper(AFn.java:154)\n\tat clojure.lang.AFn.applyTo(AFn.java:144)\n\tat clojure.core$apply.invokeStatic(core.clj:667)\n\tat clojure.core$apply.invoke(core.clj:662)\n\tat taoensso.encore$cache$fn__11375$fn__11379.invoke(encore.cljc:3574)\n\tat clojure.lang.Delay.deref(Delay.java:42)\n\tat clojure.core$deref.invokeStatic(core.clj:2337)\n\tat clojure.core$deref.invoke(core.clj:2323)\n\tat taoensso.encore$cache$fn__11375.doInvoke(encore.cljc:3562)\n\tat clojure.lang.RestFn.invoke(RestFn.java:421)\n\tat taoensso.carmine$connection_pool.invokeStatic(carmine.clj:63)\n\tat taoensso.carmine$connection_pool.invoke(carmine.clj:37)\n\tat clojure.lang.AFn.applyToHelper(AFn.java:154)\n\tat clojure.lang.AFn.applyTo(AFn.java:144)\n\tat clojure.lang.Compiler$InvokeExpr.eval(Compiler.java:3714)\n\tat clojure.lang.Compiler$MapExpr.eval(Compiler.java:3066)\n\tat clojure.lang.Compiler$DefExpr.eval(Compiler.java:457)\n\tat clojure.lang.Compiler.eval(Compiler.java:7199)\n\tat clojure.lang.Compiler.eval(Compiler.java:7149)\n\tat clojure.core$eval.invokeStatic(core.clj:3216)\n\tat clojure.core$eval.invoke(core.clj:3212)\n\tat nrepl.middleware.interruptible_eval$evaluate$fn__1359$fn__1360.invoke(interruptible_eval.clj:87)\n\tat clojure.lang.AFn.applyToHelper(AFn.java:152)\n\tat clojure.lang.AFn.applyTo(AFn.java:144)\n\tat clojure.core$apply.invokeStatic(core.clj:667)\n\tat clojure.core$with_bindings_STAR_.invokeStatic(core.clj:1990)\n\tat clojure.core$with_bindings_STAR_.doInvoke(core.clj:1990)\n\tat clojure.lang.RestFn.invoke(RestFn.java:425)\n\tat nrepl.middleware.interruptible_eval$evaluate$fn__1359.invoke(interruptible_eval.clj:87)\n\tat clojure.main$repl$read_eval_print__9206$fn__9209.invoke(main.clj:437)\n\tat clojure.main$repl$read_eval_print__9206.invoke(main.clj:437)\n\tat clojure.main$repl$fn__9215.invoke(main.clj:458)\n\tat clojure.main$repl.invokeStatic(main.clj:458)\n\tat clojure.main$repl.doInvoke(main.clj:368)\n\tat clojure.lang.RestFn.invoke(RestFn.java:1523)\n\tat nrepl.middleware.interruptible_eval$evaluate.invokeStatic(interruptible_eval.clj:84)\n\tat nrepl.middleware.interruptible_eval$evaluate.invoke(interruptible_eval.clj:56)\n\tat nrepl.middleware.interruptible_eval$interruptible_eval$fn__1392$fn__1396.invoke(interruptible_eval.clj:152)\n\tat clojure.lang.AFn.run(AFn.java:22)\n\tat nrepl.middleware.session$session_exec$main_loop__1462$fn__1466.invoke(session.clj:218)\n\tat nrepl.middleware.session$session_exec$main_loop__1462.invoke(session.clj:217)\n\tat clojure.lang.AFn.run(AFn.java:22)\n\tat java.base/java.lang.Thread.run(Thread.java:1570)\n, borrowedCount=5, returnedCount=4, createdCount=2, destroyedCount=0, destroyedByEvictorCount=0, destroyedByBorrowValidationCount=0, activeTimes=StatsStore [[12, 4, 10856, 4]], size=100, index=4], idleTimes=StatsStore [[2, 3, 1, 0, 10853]], size=100, index=5], waitTimes=StatsStore [[10, 0, 2, 0, 1]], size=100, index=5], maxBorrowWaitDuration=PT0.010510864S, swallowedExceptionListener=null, maxIdlePerKey=16, minIdlePerKey=0, maxTotalPerKey=16, factory=taoensso.carmine.connections$make_connection_factory$reify__15594@147c29e0, fairness=false, poolMap={{:host \"localhost\", :port 6379, :uri \"redis://localhost:6379\"}=ObjectDeque [idleObjects=[Object: taoensso.carmine.connections.Connection@380dadbd, State: IDLE], createCount=2, allObjects={IdentityWrapper [instance=taoensso.carmine.connections.Connection@380dadbd]=Object: taoensso.carmine.connections.Connection@380dadbd, State: IDLE, IdentityWrapper [instance=taoensso.carmine.connections.Connection@fd67b817]=Object: taoensso.carmine.connections.Connection@fd67b817, State: ALLOCATED}, numInterested=0]}, poolKeyList=[{:host \"localhost\", :port 6379, :uri \"redis://localhost:6379\"}], keyLock=java.util.concurrent.locks.ReentrantReadWriteLock@75b1c79d[Write locks = 0, Read locks = 0], numTotal=2, evictionKeyIterator=java.util.ArrayList$Itr@7d64a7c, evictionKey={:host \"localhost\", :port 6379, :uri \"redis://localhost:6379\"}, abandonedConfig=null]"]},
  :spec {:uri "redis://localhost:6379"}},
 :opts
 {:handler #function[core/worker/fn--17953],
  :monitor #function[core/worker/fn--17956],
  :default-lock-ms 3600000,
  :eoq-backoff-ms #function[taoensso.encore/exp-backoff],
  :nthreads-worker 1,
  :nthreads-handler 1,
  :throttle-ms
  #function[taoensso.carmine.message-queue/default-throttle-ms-fn]},
 :stats
 {:queue-size
  {:min 0,
   :mean 0.0,
   :p75 0,
   :mad-sum 0.0,
   :p99 0,
   :n 2,
   :p25 0,
   :p90 0,
   :var 0.0,
   :max 0,
   :mad 0.0,
   :last 0,
   :p50 0,
   :sum 0,
   :p95 0,
   :var-sum 0.0},
  :queueing-time-ms nil,
  :handling-time-ns nil,
  :counts #:sleep{:end-of-circle 2}}}

(car/wcar conn-opts (mq/enqueue "my.test" {:hello true}))

:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "24", :poll-reply ["sleep" "end-of-circle" "a" 12875], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "0", :poll-reply ["handle" "55cae2f3-5619-4fe6-9d7e-a413035fc315" {:hello true} 1 3600000 1717003448304], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:message {:hello true}

From this point on, Carmine suddenly began spitting a lot of monitor logs:

:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "1", :poll-reply ["sleep" "end-of-circle" "b" 829], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "0", :poll-reply ["skip" "did-gc"], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "1", :poll-reply ["sleep" "end-of-circle" "a" 608], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "2", :poll-reply ["sleep" "end-of-circle" "b" 1952], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "3", :poll-reply ["sleep" "end-of-circle" "a" 3211], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "4", :poll-reply ["sleep" "end-of-circle" "b" 4370], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "5", :poll-reply ["sleep" "end-of-circle" "a" 11777], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "6", :poll-reply ["sleep" "end-of-circle" "b" 11314], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "7", :poll-reply ["sleep" "end-of-circle" "a" 14726], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "8", :poll-reply ["sleep" "end-of-circle" "b" 11405], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "9", :poll-reply ["sleep" "end-of-circle" "a" 10801], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "10", :poll-reply ["sleep" "end-of-circle" "b" 15647], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "11", :poll-reply ["sleep" "end-of-circle" "a" 9319], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "12", :poll-reply ["sleep" "end-of-circle" "b" 13628], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "13", :poll-reply ["sleep" "end-of-circle" "a" 15170], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "14", :poll-reply ["sleep" "end-of-circle" "b" 12225], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "15", :poll-reply ["sleep" "end-of-circle" "a" 15830], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "16", :poll-reply ["sleep" "end-of-circle" "b" 8253], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "17", :poll-reply ["sleep" "end-of-circle" "a" 14086], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "18", :poll-reply ["sleep" "end-of-circle" "b" 8533], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "19", :poll-reply ["sleep" "end-of-circle" "a" 11344], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "20", :poll-reply ["sleep" "end-of-circle" "b" 11007], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "21", :poll-reply ["sleep" "end-of-circle" "a" 12088], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "22", :poll-reply ["sleep" "end-of-circle" "b" 11597], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "23", :poll-reply ["sleep" "end-of-circle" "a" 10499], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "24", :poll-reply ["sleep" "end-of-circle" "b" 13308], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "25", :poll-reply ["sleep" "end-of-circle" "a" 15439], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "26", :poll-reply ["sleep" "end-of-circle" "b" 14685], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "27", :poll-reply ["sleep" "end-of-circle" "a" 13955], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "28", :poll-reply ["sleep" "end-of-circle" "b" 11995], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "29", :poll-reply ["sleep" "end-of-circle" "a" 14140], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "30", :poll-reply ["sleep" "end-of-circle" "b" 11651], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "31", :poll-reply ["sleep" "end-of-circle" "a" 9658], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "32", :poll-reply ["sleep" "end-of-circle" "b" 12129], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "33", :poll-reply ["sleep" "end-of-circle" "a" 14408], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "34", :poll-reply ["sleep" "end-of-circle" "b" 14281], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "35", :poll-reply ["sleep" "end-of-circle" "a" 10755], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "36", :poll-reply ["sleep" "end-of-circle" "b" 11209], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "37", :poll-reply ["sleep" "end-of-circle" "a" 10477], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "38", :poll-reply ["sleep" "end-of-circle" "b" 9964], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "39", :poll-reply ["sleep" "end-of-circle" "a" 11978], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "40", :poll-reply ["sleep" "end-of-circle" "b" 15146], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "41", :poll-reply ["sleep" "end-of-circle" "a" 10547], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "42", :poll-reply ["sleep" "end-of-circle" "b" 15051], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "43", :poll-reply ["sleep" "end-of-circle" "a" 12373], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "44", :poll-reply ["sleep" "end-of-circle" "b" 10339], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "45", :poll-reply ["sleep" "end-of-circle" "a" 9733], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "46", :poll-reply ["sleep" "end-of-circle" "b" 13241], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "47", :poll-reply ["sleep" "end-of-circle" "a" 14190], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "48", :poll-reply ["sleep" "end-of-circle" "b" 10198], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "49", :poll-reply ["sleep" "end-of-circle" "a" 12869], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "50", :poll-reply ["sleep" "end-of-circle" "b" 10322], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "51", :poll-reply ["sleep" "end-of-circle" "a" 13794], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "52", :poll-reply ["sleep" "end-of-circle" "b" 8456], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "53", :poll-reply ["sleep" "end-of-circle" "a" 11984], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "54", :poll-reply ["sleep" "end-of-circle" "b" 14637], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "55", :poll-reply ["sleep" "end-of-circle" "a" 10435], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "56", :poll-reply ["sleep" "end-of-circle" "b" 9611], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "57", :poll-reply ["sleep" "end-of-circle" "a" 9326], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "58", :poll-reply ["sleep" "end-of-circle" "b" 10597], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "59", :poll-reply ["sleep" "end-of-circle" "a" 12722], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "60", :poll-reply ["sleep" "end-of-circle" "b" 12850], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "61", :poll-reply ["sleep" "end-of-circle" "a" 10748], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "62", :poll-reply ["sleep" "end-of-circle" "b" 10597], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "63", :poll-reply ["sleep" "end-of-circle" "a" 8643], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "64", :poll-reply ["sleep" "end-of-circle" "b" 11792], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "65", :poll-reply ["sleep" "end-of-circle" "a" 12321], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "66", :poll-reply ["sleep" "end-of-circle" "b" 8669], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "67", :poll-reply ["sleep" "end-of-circle" "a" 10978], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "68", :poll-reply ["sleep" "end-of-circle" "b" 10395], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "69", :poll-reply ["sleep" "end-of-circle" "a" 13011], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "70", :poll-reply ["sleep" "end-of-circle" "b" 13231], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "71", :poll-reply ["sleep" "end-of-circle" "a" 11945], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "72", :poll-reply ["sleep" "end-of-circle" "b" 9370], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "73", :poll-reply ["sleep" "end-of-circle" "a" 8254], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "74", :poll-reply ["sleep" "end-of-circle" "b" 10256], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "75", :poll-reply ["sleep" "end-of-circle" "a" 10925], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "76", :poll-reply ["sleep" "end-of-circle" "b" 15450], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "77", :poll-reply ["sleep" "end-of-circle" "a" 9291], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "78", :poll-reply ["sleep" "end-of-circle" "b" 14224], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "79", :poll-reply ["sleep" "end-of-circle" "a" 9113], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "80", :poll-reply ["sleep" "end-of-circle" "b" 15855], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "81", :poll-reply ["sleep" "end-of-circle" "a" 15092], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "82", :poll-reply ["sleep" "end-of-circle" "b" 14181], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "83", :poll-reply ["sleep" "end-of-circle" "a" 10926], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "84", :poll-reply ["sleep" "end-of-circle" "b" 14289], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "85", :poll-reply ["sleep" "end-of-circle" "a" 9722], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "86", :poll-reply ["sleep" "end-of-circle" "b" 8273], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "87", :poll-reply ["sleep" "end-of-circle" "a" 10034], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "88", :poll-reply ["sleep" "end-of-circle" "b" 10836], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "89", :poll-reply ["sleep" "end-of-circle" "a" 11981], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "90", :poll-reply ["sleep" "end-of-circle" "b" 10071], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "91", :poll-reply ["sleep" "end-of-circle" "a" 11833], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "92", :poll-reply ["sleep" "end-of-circle" "b" 10063], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "93", :poll-reply ["sleep" "end-of-circle" "a" 14181], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "94", :poll-reply ["sleep" "end-of-circle" "b" 8055], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "95", :poll-reply ["sleep" "end-of-circle" "a" 10219], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "96", :poll-reply ["sleep" "end-of-circle" "b" 15710], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "97", :poll-reply ["sleep" "end-of-circle" "a" 8880], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "98", :poll-reply ["sleep" "end-of-circle" "b" 13590], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "99", :poll-reply ["sleep" "end-of-circle" "a" 14507], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "100", :poll-reply ["sleep" "end-of-circle" "b" 12290], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "101", :poll-reply ["sleep" "end-of-circle" "a" 8439], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "102", :poll-reply ["sleep" "end-of-circle" "b" 14731], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "103", :poll-reply ["sleep" "end-of-circle" "a" 8331], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "104", :poll-reply ["sleep" "end-of-circle" "b" 15940], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "105", :poll-reply ["sleep" "end-of-circle" "a" 12230], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "106", :poll-reply ["sleep" "end-of-circle" "b" 15214], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "107", :poll-reply ["sleep" "end-of-circle" "a" 12370], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "108", :poll-reply ["sleep" "end-of-circle" "b" 14885], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "109", :poll-reply ["sleep" "end-of-circle" "a" 14345], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "110", :poll-reply ["sleep" "end-of-circle" "b" 11973], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "111", :poll-reply ["sleep" "end-of-circle" "a" 8757], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "112", :poll-reply ["sleep" "end-of-circle" "b" 13841], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "113", :poll-reply ["sleep" "end-of-circle" "a" 10654], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}
:monitor-msg {:queue-size 0, :mid-circle-size 0, :ndry-runs "114", :poll-reply ["sleep" "end-of-circle" "b" 14519], :worker #taoensso.carmine.message-queue.CarmineMessageQueueWorker[qname=my.test, nthreads=1w+1h, running]}

Then, I simply stopped my running Redis container (docker-compose down), and the monitor logs stopped.

I ran

(pprint @w)

{:qname "my.test",
 :running? true,
 :nthreads {:worker 1, :handler 1},
 :conn-opts
 {:pool
  {:pool
   #object[org.apache.commons.pool2.impl.GenericKeyedObjectPool 0xebb30a "GenericKeyedObjectPool [maxTotal=-1, blockWhenExhausted=true, maxWaitDuration=PT-0.001S, lifo=true, fairness=false, testOnCreate=false, testOnBorrow=true, testOnReturn=true, testWhileIdle=true, durationBetweenEvictionRuns=PT30S, numTestsPerEvictionRun=-1, minEvictableIdleTimeDuration=PT1M, softMinEvictableIdleTimeDuration=PT-0.001S, evictionPolicy=org.apache.commons.pool2.impl.DefaultEvictionPolicy@7541e7aa, closeLock=java.lang.Object@358a0eca, closed=false, evictionLock=java.lang.Object@3481f6c8, evictor=org.apache.commons.pool2.impl.BaseGenericObjectPool$Evictor [scheduledFuture=java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@631e3480[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@7c014259[Wrapped task = org.apache.commons.pool2.impl.EvictionTimer$WeakRunner@68786c4]]], evictionIterator=org.apache.commons.pool2.impl.BaseGenericObjectPool$EvictionIterator@2a4ca23, factoryClassLoader=java.lang.ref.WeakReference@2e772b4c, oname=org.apache.commons.pool2:type=GenericKeyedObjectPool,name=pool2, creationStackTrace=java.lang.Exception\n\tat org.apache.commons.pool2.impl.BaseGenericObjectPool.<init>(BaseGenericObjectPool.java:420)\n\tat org.apache.commons.pool2.impl.GenericKeyedObjectPool.<init>(GenericKeyedObjectPool.java:264)\n\tat org.apache.commons.pool2.impl.GenericKeyedObjectPool.<init>(GenericKeyedObjectPool.java:248)\n\tat taoensso.carmine.connections$fn__15599.invokeStatic(connections.clj:228)\n\tat taoensso.carmine.connections$fn__15599.invoke(connections.clj:205)\n\tat clojure.lang.AFn.applyToHelper(AFn.java:154)\n\tat clojure.lang.AFn.applyTo(AFn.java:144)\n\tat clojure.core$apply.invokeStatic(core.clj:667)\n\tat clojure.core$apply.invoke(core.clj:662)\n\tat taoensso.encore$cache$fn__11375$fn__11379.invoke(encore.cljc:3574)\n\tat clojure.lang.Delay.deref(Delay.java:42)\n\tat clojure.core$deref.invokeStatic(core.clj:2337)\n\tat clojure.core$deref.invoke(core.clj:2323)\n\tat taoensso.encore$cache$fn__11375.doInvoke(encore.cljc:3562)\n\tat clojure.lang.RestFn.invoke(RestFn.java:421)\n\tat taoensso.carmine$connection_pool.invokeStatic(carmine.clj:63)\n\tat taoensso.carmine$connection_pool.invoke(carmine.clj:37)\n\tat clojure.lang.AFn.applyToHelper(AFn.java:154)\n\tat clojure.lang.AFn.applyTo(AFn.java:144)\n\tat clojure.lang.Compiler$InvokeExpr.eval(Compiler.java:3714)\n\tat clojure.lang.Compiler$MapExpr.eval(Compiler.java:3066)\n\tat clojure.lang.Compiler$DefExpr.eval(Compiler.java:457)\n\tat clojure.lang.Compiler.eval(Compiler.java:7199)\n\tat clojure.lang.Compiler.eval(Compiler.java:7149)\n\tat clojure.core$eval.invokeStatic(core.clj:3216)\n\tat clojure.core$eval.invoke(core.clj:3212)\n\tat nrepl.middleware.interruptible_eval$evaluate$fn__1359$fn__1360.invoke(interruptible_eval.clj:87)\n\tat clojure.lang.AFn.applyToHelper(AFn.java:152)\n\tat clojure.lang.AFn.applyTo(AFn.java:144)\n\tat clojure.core$apply.invokeStatic(core.clj:667)\n\tat clojure.core$with_bindings_STAR_.invokeStatic(core.clj:1990)\n\tat clojure.core$with_bindings_STAR_.doInvoke(core.clj:1990)\n\tat clojure.lang.RestFn.invoke(RestFn.java:425)\n\tat nrepl.middleware.interruptible_eval$evaluate$fn__1359.invoke(interruptible_eval.clj:87)\n\tat clojure.main$repl$read_eval_print__9206$fn__9209.invoke(main.clj:437)\n\tat clojure.main$repl$read_eval_print__9206.invoke(main.clj:437)\n\tat clojure.main$repl$fn__9215.invoke(main.clj:458)\n\tat clojure.main$repl.invokeStatic(main.clj:458)\n\tat clojure.main$repl.doInvoke(main.clj:368)\n\tat clojure.lang.RestFn.invoke(RestFn.java:1523)\n\tat nrepl.middleware.interruptible_eval$evaluate.invokeStatic(interruptible_eval.clj:84)\n\tat nrepl.middleware.interruptible_eval$evaluate.invoke(interruptible_eval.clj:56)\n\tat nrepl.middleware.interruptible_eval$interruptible_eval$fn__1392$fn__1396.invoke(interruptible_eval.clj:152)\n\tat clojure.lang.AFn.run(AFn.java:22)\n\tat nrepl.middleware.session$session_exec$main_loop__1462$fn__1466.invoke(session.clj:218)\n\tat nrepl.middleware.session$session_exec$main_loop__1462.invoke(session.clj:217)\n\tat clojure.lang.AFn.run(AFn.java:22)\n\tat java.base/java.lang.Thread.run(Thread.java:1570)\n, borrowedCount=240, returnedCount=240, createdCount=3, destroyedCount=3, destroyedByEvictorCount=0, destroyedByBorrowValidationCount=3, activeTimes=StatsStore [[3, 1, 3, 1, 3, 1, 3, 1, 3, 1, 3, 1, 3, 0, 1, 0, 3, 1, 2, 1, 2, 1, 3, 1, 3, 1, 3, 1, 3, 1, 3, 1, 3, 1, 3, 1, 3, 1, 1, 0]], size=100, index=40], idleTimes=StatsStore [[198, 1, 197, 1, 197, 1, 198, 1, 198, 2, 198, 1, 197, 1, 198, 0, 199, 1, 197, 1, 198, 1, 198, 1, 197, 1, 198, 1, 198, 1, 197, 1, 198, 1, 197, 1, 198, 2, 197, 1]], size=100, index=40], waitTimes=StatsStore [[1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 1, 0, 1, 0, 0, 0]], size=100, index=40], maxBorrowWaitDuration=PT0.010510864S, swallowedExceptionListener=null, maxIdlePerKey=16, minIdlePerKey=0, maxTotalPerKey=16, factory=taoensso.carmine.connections$make_connection_factory$reify__15594@147c29e0, fairness=false, poolMap={}, poolKeyList=[], keyLock=java.util.concurrent.locks.ReentrantReadWriteLock@75b1c79d[Write locks = 0, Read locks = 0], numTotal=0, evictionKeyIterator=java.util.ArrayList$Itr@1214dc52, evictionKey={:host \"localhost\", :port 6379, :uri \"redis://localhost:6379\"}, abandonedConfig=null]"]},
  :spec {:uri "redis://localhost:6379"}},
 :opts
 {:handler #function[core/worker/fn--17953],
  :monitor #function[core/worker/fn--17956],
  :default-lock-ms 3600000,
  :eoq-backoff-ms #function[taoensso.encore/exp-backoff],
  :nthreads-worker 1,
  :nthreads-handler 1,
  :throttle-ms
  #function[taoensso.carmine.message-queue/default-throttle-ms-fn]},
 :stats
 {:queue-size
  {:min 0,
   :mean 0.0,
   :p75 0,
   :mad-sum 0.0,
   :p99 0,
   :n 120,
   :p25 0,
   :p90 0,
   :var 0.0,
   :max 0,
   :mad 0.0,
   :last 0,
   :p50 0,
   :sum 0,
   :p95 0,
   :var-sum 0.0},
  :queueing-time-ms
  {:min 13605,
   :mean 13605.0,
   :p75 13605,
   :mad-sum 0.0,
   :p99 13605,
   :n 1,
   :p25 13605,
   :p90 13605,
   :var 0.0,
   :max 13605,
   :mad 0.0,
   :last 13605,
   :p50 13605,
   :sum 13605,
   :p95 13605,
   :var-sum 0.0},
  :handling-time-ns
  {:min 875117,
   :mean 875117.0,
   :p75 875117,
   :mad-sum 0.0,
   :p99 875117,
   :n 1,
   :p25 875117,
   :p90 875117,
   :var 0.0,
   :max 875117,
   :mad 0.0,
   :last 875117,
   :p50 875117,
   :sum 875117,
   :p95 875117,
   :var-sum 0.0},
  :counts {:sleep/end-of-circle 118, :handler/success 1}}}

I restarted my stopped redis container, and no logs ever came up again, even after running mq/enqueue again.

This is my docker-compose.yml:

services:
  redis:
    image: redis:latest
    ports:
      - "6379:6379"
    volumes:
      - ./redis:/redis

Nothing fancy.

That's all. Hope that helps you.

Anything you need, please ping me.

ptaoussanis commented 4 weeks ago

@J0sueTM Hi Josué - thank you for the info, that was very helpful 🙏

Seeing that your handler was still active but poll counters not progressing, showed that there was indeed a bug in the main loop that was preventing errors from being properly caught.

This should now be fixed with v3.4.1 👍

avelino commented 4 weeks ago

@ptaoussanis thank you for your attention and hotfix

your dedication to the clojure ecosystem is inspiring

ptaoussanis commented 4 weeks ago

@avelino You're very welcome, thanks to all of you for the report and for the info to help debug this 🙏 Apologies for all the time/effort wasted on this bug!