clj-commons / manifold

A compatibility layer for event-driven abstractions
1.02k stars 106 forks source link

(fixed-thread-executor 200) only has 8 threads? #137

Closed lkrubner closed 7 years ago

lkrubner commented 7 years ago

I was given a simple assignment, which I figured I could knock out in a few hours, but I made some mistakes, and some of the mistakes were interesting.

I have some code that pulls a million records from MySQL and then fires them at an API to get some additional information. I have this code:

(def executor (me/fixed-thread-executor 200))

(defn enqueue
  [message]
  (slingshot/try+
   (->> (ms/->source [message])
        (ms/onto executor)
        (ms/map api/query))
   (catch Object o
          (println " message queue is not happy about the message we were given" o)
          )))

In api/query I do a HTTP post and I catch Exceptions, wherein I have the thread sleep and then try again.

I stupidly ran this app before I had started the app for the API. So I got:

java.net.SocketTimeoutException when we tried to make the post-to-api. We will try again after an hour. java.net.SocketTimeoutException when we tried to make the post-to-api. We will try again after an hour. java.net.SocketTimeoutException when we tried to make the post-to-api. We will try again after an hour. java.net.SocketTimeoutException when we tried to make the post-to-api. We will try again after an hour. java.net.SocketTimeoutException when we tried to make the post-to-api. We will try again after an hour. java.net.SocketTimeoutException when we tried to make the post-to-api. We will try again after an hour. java.net.SocketTimeoutException when we tried to make the post-to-api. We will try again after an hour. java.net.SocketTimeoutException when we tried to make the post-to-api. We will try again after an hour.

Just these 8 lines. But I thought I had 200 threads:

      (def executor (me/fixed-thread-executor 200))

It makes sense that I got a SocketTimeout, because I had not started the API app. But I should have gotten 200 of the same error, yes? Why would I only get 8 errors?

dm3 commented 7 years ago

@lkrubner The idea behind streams is that they consume elements one by one. The executor merely specifies where you want your one-by-one processing to happen. If you want to parallelize the processing, consider using something like fork in #131.

ztellman commented 7 years ago

You could also try simply mapping over the stream using (d/future-with executor ...), which will enqueue all the work onto the executor, and the resulting stream of deferred values can be realized with realize-each, or something similar.

ztellman commented 7 years ago

And @dm3, I apologize for not responding to #131 earlier, I'll take a closer look.

lkrubner commented 7 years ago

Thank you. (d/future-with) is a great idea.