clj-commons / aleph

Asynchronous streaming communication for Clojure - web server, web client, and raw TCP/UDP
http://aleph.io
MIT License
2.54k stars 241 forks source link

Understanding of `max-queue-size` of connection-pool #575

Closed FiV0 closed 2 years ago

FiV0 commented 3 years ago

When running

(require '[aleph.http :as aleph]
         '[manifold.deferred :as d])

(dotimes [_ 10000]
  (d/on-realized (aleph/get "http://localhost:8080" {:throw-exceptions false})
                 (fn [r] (println "Status: " (:status r)))
                 (fn [e] (println "Error type: " (type e)))))

I am getting some java.util.concurrent.RejectedExecutionException exceptions not even in my handler but on manifold.deferred level.

My understanding from looking at the docs of connection-pool and further flow/instrumented-pool was that one could do at least 65536 requests (even per domain) before this should happen.

Is this assumption wrong? Is pending requests not to be equated with pending acquires?

KingMob commented 2 years ago

@FiV0 There's definitely something going on, but I haven't figured out what yet. I'll take a closer look later when I get a chance. What's weird is the two major Manifold executor pools are utilization-executors, which default to a queue length of 0, and are supposed to be unbounded.

I haven't examined that part of the code quite as closely since becoming maintainer, and I don't know Aleph super-well. PRs welcome!

FiV0 commented 2 years ago

@KingMob Ok. I might take a look when I find the time.

arnaudgeiser commented 2 years ago

Hello @FiV0,

In your case, the java.util.concurrent.RejectedExecutionException is not raised by the flow/instrumented-pool. As you mentioned, the default value is 65536, so you are not running out-of-capacity here. [1]

Any java.util.concurrent.Executor can raise this Exception. [2] In your case, it's raised by the (undocumented) response-executor, a manifold.executor/utilization-executor which defaults to 256 threads with a queue-length of 0. [3]

There is fortunately an escape hatch, you can pass your own Executor as an option of any HTTP request.

Here is a simple example where is fails:

(require '[aleph.http :as aleph]
         '[aleph.netty :as netty]
         '[manifold.deferred :as d])

(let [executor (flow/utilization-executor 0.9 1
                                          {:thread-factory (netty/enumerating-thread-factory "response-executor" false)})]
  (dotimes [_ 1]
    (d/on-realized (aleph/get "http://localhost:8080" {:throw-exceptions false
                                                       :response-executor executor})
                   (fn [r] (println "Status: " (:status r)))
                   (fn [e] (println "Error type: " (type e))))))

[response-executor-1] ERROR manifold.deferred - error in deferred handler
java.util.concurrent.RejectedExecutionException
    at io.aleph.dirigiste.Executor.execute(Executor.java:342)
    at manifold.deferred.Deferred$fn__9677.invoke(deferred.clj:398)
    at manifold.deferred.Deferred.success(deferred.clj:398)
    at manifold.deferred$success_BANG_.invokeStatic(deferred.clj:243)
    at manifold.deferred$success_BANG_.invoke(deferred.clj:240)
    at manifold.deferred$eval9831$chain_SINGLEQUOTE____9852.invoke(deferred.clj:754)
    at manifold.deferred$eval9831$subscribe__9832$fn__9837.invoke(deferred.clj:715)
    at manifold.deferred.Listener.onSuccess(deferred.clj:219)
    at manifold.deferred.Deferred$fn__9677$fn__9678.invoke(deferred.clj:398)
    at clojure.lang.AFn.run(AFn.java:22)
    at io.aleph.dirigiste.Executor$3.run(Executor.java:318)
    at io.aleph.dirigiste.Executor$Worker$1.run(Executor.java:62)
    at manifold.executor$thread_factory$reify__9047$f__9048.invoke(executor.clj:47)
    at clojure.lang.AFn.run(AFn.java:22)
    at java.base/java.lang.Thread.run(Thread.java:829)

Then, there are two possibilities: you can either increase the number of threads or the queue-length (not available for utilization-executor) option.

;; an utilization-executor with more threads
(flow/utilization-executor 0.9 1000
                           {:thread-factory (netty/enumerating-thread-factory "response-executor" false)})

;; a fixed-thread-executor (which defaults to an almost infinite (Integer/MAX_VALUE) queue-length)
(flow/fixed-thread-executor 1
                            {:thread-factory (netty/enumerating-thread-factory "response-executor" false)})

;; a fixed-thread-executor with a queue-length of 10000
(flow/fixed-thread-executor 1
                            {:queue-length 10000
                             :thread-factory (netty/enumerating-thread-factory "response-executor" false)})

I will make a PR to document the response-executor.

[1] : https://github.com/clj-commons/dirigiste/blob/master/src/io/aleph/dirigiste/Pool.java#L193 [2] : https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/RejectedExecutionException.html [3] : https://github.com/clj-commons/aleph/blob/master/src/aleph/http.clj#L92

KingMob commented 2 years ago

Thanks for diving into this @arnaudgeiser

KingMob commented 2 years ago

Fixed by #587