clj-commons / manifold

A compatibility layer for event-driven abstractions
1.02k stars 106 forks source link

Inconsistent results with deferred/onto #151

Open kingpong opened 6 years ago

kingpong commented 6 years ago

I'm trying to ensure that my chained callbacks are always run on a specific thread pool, so I'm using d/onto followed by d/chain to set up the callbacks. What I'm seeing, though, is that sometimes the callbacks will be run directly on the derefing thread. I'm able to reproduce this behavior reliably (clojure 1.8, Java 1.8, manifold 0.1.6).

(require '[manifold.deferred :as d])
(import '(java.util.concurrent ForkJoinPool))

(defn tid []
  (let [t (Thread/currentThread)]
    (format "Thread %d: %s" (.getId t) (.getName t))))

(defn chained-onto-threads
  "Tries n times to run f (which should produce a deferred) then chains the
   result into a callback function that should run on the ForkJoinPool.
   Returns a map containing the frequencies of threads that were actually used."
  [n f]
  (let [forkjoin-pool (ForkJoinPool/commonPool)
        seen-threads  (atom {})]
    (run! deref
          (for [i (range n)]
            (-> (f)
                (d/onto forkjoin-pool)
                (d/chain
                  (fn [_]
                    ;; Expecting this to be ForkJoinPool, but isn't always.
                    (swap! seen-threads update (tid) #(inc (or % 0))))))))
    @seen-threads))

When I run chained-onto-threads with an f that produces a manifold future, sometimes the chained function runs on an nREPL thread, not a ForkJoinPool thread as expected:

user=> (require '[clojure.pprint :as pp])
nil
user=> (pp/pprint (chained-onto-threads 1000 #(d/future "any value")))
{"Thread 33: nREPL-worker-3" 7,
 "Thread 35: ForkJoinPool.commonPool-worker-0" 142,
 "Thread 37: ForkJoinPool.commonPool-worker-1" 123,
 "Thread 38: ForkJoinPool.commonPool-worker-2" 160,
 "Thread 39: ForkJoinPool.commonPool-worker-3" 144,
 "Thread 40: ForkJoinPool.commonPool-worker-4" 143,
 "Thread 43: ForkJoinPool.commonPool-worker-5" 163,
 "Thread 45: ForkJoinPool.commonPool-worker-6" 118}

With an f that produces a success-deferred, the chained function always runs on an nREPL thread, not on the executor that was specified:

user=>   (pp/pprint
  #_=>     (chained-onto-threads 1000 #(d/success-deferred "any value")))
{"Thread 15: nREPL-worker-1" 1000}

Can you shed any light on what's happening? Am I doing something wrong or is this a bug?

dm3 commented 6 years ago

I'd say it's an inconsistency. This stems from

https://github.com/ztellman/manifold/blob/193f5f48972c8e20dd0a9fc41a1311566a9f7bdd/src/manifold/deferred.clj#L378 - when state of deferred is realized, the listener execution happens immediately on the thread which attaches the listener.

kingpong commented 6 years ago

Here's a more practical example of why this is a problem:

  (let [d  (-> 100
               (d/success-deferred)
               (d/onto (ForkJoinPool/commonPool)))
        t0 (System/currentTimeMillis)
        _  (d/chain d #(Thread/sleep %))  ;; want to run in parallel with ↓
        _  (d/chain d #(Thread/sleep %))  ;; want to run in parallel with ↑
        t1 (System/currentTimeMillis)]
    (println "Chaining took" (- t1 t0) "ms"))

; ==> Chaining took 209 ms

I would have expected the calls to d/chain to be async, and the sleeps to run in parallel, on the ForkJoinPool. But instead they run in sequence and block each other.

It's also a problem if the code in the chained callbacks requires thread-local context that is provided by the executor, for example MDC.

kingpong commented 6 years ago

@dm3 since I've explicitly set the executor with (d/onto ...), doesn't the anonymous function you mention run on the executor (line 384)? https://github.com/ztellman/manifold/blob/eabe63e42af4ee9454751ad272fb418701e90f53/src/manifold/deferred.clj#L376-L385

ztellman commented 6 years ago

As @dm3 said, if you chain on a deferred which is already realized, it executes immediately and does not register any sort of callback. Since you are simply using (d/future "a value") without sleeping or anything, at least some of the time the future has executed before being passed into d/chain.

kingpong commented 6 years ago

@ztellman Is this the desired behavior, though? In my second example, code that I would expect not to block at all actually takes 200ms. Imagine that this is happening in a UI thread, and I want to ensure that the callbacks, which may block, are run in a separate thread to avoid hanging the UI. The initial deferred came from a library function, and it may (or may not) be realized by the time it gets to me.

ztellman commented 6 years ago

It's a fair question. This code is effectively using deferreds as a way to convey blocking work onto another thread, which I would accomplish with d/future-with, but I can see why you might expect that would be done for you. Let me give it some thought.

kingpong commented 6 years ago

Thanks for considering. The actual problem that is biting me is that the code in my callbacks requires thread-local context that is automatically provided by the executor I specified, and the thread-local context isn't always there. I think the unexpected blocking case is more compelling to most users of the library, though.

ztellman commented 6 years ago

Wrapping the chain call in future-with will fix your immediate issue, fwiw. I’ll update once I’ve mulled this over a bit. On Fri, Feb 23, 2018 at 2:40 PM Philip Garrett notifications@github.com wrote:

Thanks for considering. The actual problem that is biting me is that the code in my callbacks requires thread-local context that is automatically provided by the executor I specified, and the thread-local context isn't always there. I think the unexpected blocking case is more compelling to most users of the library, though.

— You are receiving this because you were mentioned.

Reply to this email directly, view it on GitHub https://github.com/ztellman/manifold/issues/151#issuecomment-368117700, or mute the thread https://github.com/notifications/unsubscribe-auth/AAB6P0tVAcJ23ke213AXYO4dIdsHxyllks5tXxRGgaJpZM4SOehO .

mping commented 4 years ago

@ztellman any update on this?

ferdinand-beyer commented 1 year ago

I've stumbled over this as well, trying to use an executor to set up a thread-local context.

On my machine, this snippet constantly yields the REPL thread, so the executor passed to onto is essentially ignored:

(-> (d/future 1)
    (d/onto (executor/fixed-thread-executor 2))
    (d/chain (fn [_] (.getName (Thread/currentThread)))))

This means that the onto examples in the documentation are mostly wrong (probably depending on the computer).

I guess this is not really an issue if all one cares about is fast execution, but this means that one cannot use custom executors for other purposes.

@KingMob -- if you decide this is not a bug, maybe we should document this, stating that there is no guarantee? The current docstring Returns a deferred whose callbacks will be run on executor. is definitely misleading.

arnaudgeiser commented 1 year ago

Yes, if the deferred is already realized, it will continue on the same thread and the specified Executor (onto) won't be used. Here is a bit of context on Aleph regarding this behaviour (I'm not saying the documentation cannot be improved).

https://github.com/clj-commons/aleph/pull/427#issuecomment-1214345044

arnaudgeiser commented 1 year ago
(-> (d/future 1)
    (d/onto (e/fixed-thread-executor 2))
    (d/chain (fn [_] (prn (.getName (Thread/currentThread))))))

=> "nREPL-session-8ba7d6cf-90f9-428b-9505-bba48850ff05"
(or sometimes if unlucky) => "manifold-execute-3"
(-> (d/future (Thread/sleep 100) 1)
    (d/onto (e/fixed-thread-executor 2))
    (d/chain (fn [_] (prn (.getName (Thread/currentThread))))))

=> "manifold-pool-37-1"
ferdinand-beyer commented 1 year ago

Yep, tried the same with Thread/sleep.

I'm not saying this is necessarily wrong behaviour, but certainly surprising and misleading. I lost two days trying to use custom executors to trace my async program. :)

KingMob commented 1 year ago

@ferdinand-beyer

At a bare minimum, we should improve the docs, since someone asks this question at least twice a year.

Would you be willing to contribute a PR improving them? For both the docstrings and the markdown files? Can you document the current behavior of chain, add a caveat to onto, and mention using future-with if you want to guarantee everything will run on a custom executor? (Unfortunately, we don't have access to aleph.io, so we can't update it, which is why we've removed all references to it.)

I'm not in love with the current state of things, but it's risky to try and make it more consistent.

Let's ignore the discussion in clj-commons/aleph#427 about trying to improve performance by detecting when you wish to schedule on the same executor, and consider only whether we can safely alter the chain fns to always schedule the callback on an executor. The immediate issues I see are:

  1. May need to use bound-fn again to avoid callback breakage when relying on dynamic vars, though to be fair, people should be binding their own fns in that case
  2. Using bound-fn, while correct, would be slower, sometimes by a lot, as Arnaud has found.
  3. Executor overhead will be slower

I think we can skip using bound-fn ourselves, but we need to measure the executor overhead before we can implement this. Anyone with a lot of chained fns may see their CPU usage go up. Hopefully, if we set the executor early on in the chain, and rely on a nil executor for later chained fns to mean "continue on this thread", it should be ok.

Thoughts? Anyone have an Aleph server in a staging env they can volunteer as a guinea pig?

ferdinand-beyer commented 1 year ago

Would you be willing to contribute a PR improving them? For both the docstrings and the markdown files?

Sure, happy to give this a try!

Thoughts? Anyone have an Aleph server in a staging env they can volunteer as a guinea pig?

Unfortunately I cannot volunteer as a guinea pig right now. In general, I think it is a good idea to not use bound-fn but require users to use it when they need it. Pay for what you need.

arnaudgeiser commented 1 year ago

Let's ignore the discussion in https://github.com/clj-commons/aleph/pull/427 about trying to improve performance by detecting when you wish to schedule on the same executor, and consider only whether we can safely alter the chain fns to always schedule the callback on an executor. The immediate issues I see are:

I don't think it's a good idea... It means that every deferred will be rescheduled on the Executor using Aleph [1], right? It might impact the performance of some workloads in a bad way.

[1] : https://github.com/exoscale/aleph/blob/master/src/aleph/http/server.clj#L595-L598

KingMob commented 1 year ago

@arnaudgeiser I'm not clear how your linked code sample relates.

What I'm contemplating is this:

We alter the code in chain-/chain'- to stop invoking the callback immediately in (let [x'' (f x')] if the incoming deferred parameter is already realized. Instead, we make it use the same logic as the deferred implementations: if the executor is nil, run on the same thread, and if not, dispatch to the executor. Everywhere else in manifold already uses this logic; chain is the only major exception. (And catch/finally, which use chain).

In terms of performance, I don't think it will be as bad as we fear. For starters, it won't change the perf of any chained fn where the previous link in the chain is usually unrealized at call time, because those will already be scheduled on an executor anyway (or same thread if executor is nil). And for slower fns in a chain, the executor overhead is probably not that bad. This will mostly show up as an issue for tiny fns, but Aleph doesn't chain too many of those.

We can also mitigate any performance impact by using (onto d nil) or (future-with nil..., which will force the subsequent deferreds to run on the same thread. It's work, but it turns something hidden into something explicit. We can also combine tiny fns if needed.

FWIW, I took a stab at converting chain to see how it performs, and at least with a crude time lein test, there was no noticeable difference on my machine.


Regardless, I'm not convinced it'll be worth all the work in the end, as compared to just documenting the existing behavior.

One issue is, chained fns run on unwrapped values, so I had to change unwrap to also record the last seen executor. But it's not clear that's the right thing to do, since the chained fns aren't technically deferred listeners. It could also choose to use the current thread local executor value. I'm not sure the complication is worth it.

arnaudgeiser commented 1 year ago

@arnaudgeiser I'm not clear how your linked code sample relates.

Sorry for the delay. It's not all fresh on my mind, but as soon as you create your executor with manifold.executor/thread-factory, you end up having an executor on your thread-local. [1] Then, every deferred created with manifold.deferred/deferred will have this executor set [2].

Today, if the deferred is realized, you continue on the current thread regardless of the presence of that executor on the thread local. Which means you can blindly (-> d (d/chain .. ) (d/chain ...) (d/chain ...) for synchronous functions with almost no impact performance wise. Tomorrow, every d/chain will start rescheduling that synchronous function on the pipeline. That's not something I really would like.

[1] : https://github.com/clj-commons/manifold/blob/master/src/manifold/executor.clj#L69 [2] : https://github.com/clj-commons/manifold/blob/master/src/manifold/executor.clj#L69

arnaudgeiser commented 1 year ago

On a side note, I was having a look at how @mpenet's auspex library implemented the manifold support for qbits.auspex/then when an executor is provided.

And it seems he just ignored the parameter and calls manifold.deferred/chain [1]. We cannot suggest him using d/onto since it won't guarantee the deferred will be scheduled on the provided executor. So I'm wondering what is the recommended way to schedule a realized deferred on a specific executor?

future-with was mentioned, but it's not doing what some would expect there as everything is now running on the provided executor.

(-> (d/future 1)
    (d/chain' (fn [_] (prn (.getName (Thread/currentThread))))) ;; manifold-pool-1-1
    (->> (d/future-with (manifold.executor/fixed-thread-executor 2)))
    (d/chain' (fn [_] (prn (.getName (Thread/currentThread)))))) ;; manifold-pool-1-1

How the following (non-working code) should be expressed to make the second chain running on the provided executor?

(-> (d/future 1)
    (d/chain' (fn [_] (prn (.getName (Thread/currentThread))))) ;; current-thread
    (d/onto (manifold.executor/fixed-thread-executor 2))
    (d/chain' (fn [_] (prn (.getName (Thread/currentThread)))))) ;; manifold-pool-1-1

[1] : https://github.com/mpenet/auspex/blob/master/src/qbits/auspex/manifold.clj#L55-L56

ztellman commented 1 year ago

Hi, just to weigh in on this, the behavior in Manifold is shaped by two considerations:

I don't think the current behavior properly balances these two considerations, for the reasons already demonstrated in this thread. Luckily, I think the behavior here can be pretty easily addressed by some minor changes to the success-error-unrealized macro (https://github.com/clj-commons/manifold/blob/master/src/manifold/deferred.clj#L62), such that if the deferred in question has an affinity to a thread pool and the macro isn't being executed within that thread pool, then you simply call the unrealized clause rather than executing the callback inline. I should note that it's been quite some time since I last messed around in this codebase, so it's entirely possible there's some issue I'm overlooking, but the point of the macro was to have there be a single place that deals with these sorts of "immediate execution" semantics, so I'm optimistic.

arnaudgeiser commented 1 year ago

Hello Zach (and welcome back)! Thanks for the insights, those are precious.

What you propose kind of match what Matthew had in mind initially when we were talking about this on Aleph [1] :

  1. Select an executor a. If an executor is specified on the deferred, choose that. b. Otherwise, if an executor is set on the executor threadlocal, choose that.
  2. If the deferred is not realized, schedule the callback on the executor
  3. If the deferred IS realized: a. Execute immediately on the current thread if it belongs to the selected executor b. Otherwise, schedule the callback to run on the executor

Considering that selected executor === thread pool, what exists from a Java API to know the membership of a java.lang.Thread to an java.util.concurrent.ExecutorService? java.lang.Thread/getThreadGroup [2] exists but how does it correlate to an ExecutorService?

If this can be easily addressed on the success-error-unrealized macro with minor modifications, I would definitely give it a shot.

[1] : https://github.com/clj-commons/aleph/pull/427#issuecomment-1214345044 [2] : https://docs.oracle.com/en/java/javase/17/docs/api/index.html#getThreadGroup()

KingMob commented 1 year ago

Zach, thanks for chiming in! You're always welcome here.

I think the proposed change of checking the executor and continuing to run in the current thread if it's already in that executor's pool is good, but in the past, we've always held off out of caution, and uncertainty if the benefits are worth it. Is anything different?

To recap what it will entail:

The first major change would be how with-executor behaves. Currently, it (inefficiently) forces each fn in a chain to be submitted to the executor instead of running them all in one thread. This is slower, but maybe someone's code relies on it, even if unintentionally. @alexander-yakushev alluded to cases where this is undesirable, but the conversation went elsewhere before we got his example.

The next major change is that callbacks on already-realized deferreds would switch from running on the current thread (no matter where it is) to only running on the current thread if it's in the same pool (or nil). This has always been unexpected behavior, and someone complains about it at least twice a year.

Generally, I would be most happy if people's first guess as to which pools things are running on is correct. I would love to say "If you specify a pool, it will always run on that pool. If you're already in that pool, it will continue on the current thread. If you need to submit a job to the pool you're already on, use future/future-with/$SOME_ESCAPE_HATCH"


@arnaudgeiser FYI, @alexander-yakushev suggested the following for determining the executor in https://github.com/clj-commons/aleph/pull/427#issuecomment-440341496:

I suppose, if you compare Thread.currentThread().getThreadGroup() to the executor's ThreadGroup from its ThreadFactory, you can avoid redundant rescheduling in Manifold itself, and thus keep with-executor as is.

I didn't see the APIs required to do this with Netty pools, but it should work just fine for determining if a thread is in one of our pools.

arnaudgeiser commented 1 year ago

I suppose, if you compare Thread.currentThread().getThreadGroup() to the executor's ThreadGroup from its ThreadFactory, you can avoid redundant rescheduling in Manifold itself, and thus keep with-executor as is.

This is where I'm a bit confused. For starters, the threads we are creating don't have any ThreadGroup attached to them [1]. So I guess it will require modifications, even on our own pools. This is the same for all Executors created directly from the JDK [2]. Manifold users might not want to use the Dirigiste Executors but plain JDK Executors.

Regarding :

to the executor's ThreadGroup from its ThreadFactory

You cannot get the ThreadFactory from an ExecutorService, but from a ThreadPoolExecutor [3]. Dirigiste executor doesn't give you access to its internal ThreadFactory [4]. Even so... having access to the ThreadFactory is not giving us access to the ThreadGroup the threads are created with.

Pretty sure I'm missing the whole thing there. I understand what we are trying to achieve but not how. If you can shed some light, that would be great.

I was initially thinking about that slight change on success-error-unrealized :

(if (inside-executor? (.executor d))
   ~success-clause
   ~unrealized-clause)

EDIT :

We alter the code in chain-/chain'- to stop invoking the callback immediately in (let [x'' (f x')] if the incoming deferred parameter is already realized.

I tend to concur, this is where the decision is took to call f synchronously. It seems that success-error-unrealized is only called on a special arity function of chain'-. If the thread factories from manifold were not setting that thread-local executor, it will be simpler....

[1] : https://github.com/clj-commons/manifold/blob/master/src/manifold/executor.clj#L71 [2] : https://github.com/openjdk/jdk/blob/master/src/java.base/share/classes/java/util/concurrent/Executors.java#L687-L689 [3] : https://github.com/openjdk/jdk/blob/master/src/java.base/share/classes/java/util/concurrent/ThreadPoolExecutor.java#L1519 [4] : https://github.com/clj-commons/dirigiste/blob/master/src/io/aleph/dirigiste/Executor.java#L121

KingMob commented 1 year ago

Maybe I need to take a closer look, but I think the general point is, with our own pools, we can alter Dirigiste/Manifold/Aleph if needed to achieve this. To the extent that we initialize netty, maybe we can set this up there, too.

But for any random Java executor in the wild, there’s no guarantees. Hopefully we won’t encounter many of those, though I know some people have replaced Dirigiste with their own pools.

On Thu, Mar 2, 2023 at 4:22 AM Arnaud Geiser @.***> wrote:

I suppose, if you compare Thread.currentThread().getThreadGroup() to the executor's ThreadGroup from its ThreadFactory, you can avoid redundant rescheduling in Manifold itself, and thus keep with-executor as is.

This is where I'm a bit confused. For starters, the threads we are creating don't have any ThreadGroup attached to them [1]. So I guess it will require modifications, even on our own pools. This is the same for all Executors created directly from the JDK [2]. Manifold users might not want to use the Dirigiste Executors but plain JDK Executors.

Regarding :

to the executor's ThreadGroup from its ThreadFactory

You cannot get the ThreadFactory from an ExecutorService, but from a ThreadPoolExecutor [3]. Dirigiste executor doesn't give you access to its internal ThreadFactory [4]. Even so... having access to the ThreadFactory is not giving us access to the ThreadGroup the threads are created with.

Pretty sure I'm missing the whole thing there. I understand what we are trying to achieve but not how. If you can shed some light, that would be great.

I was initially thinking about that slight chance on success-error-unrealized :

(if (inside-executor? (.executor d)) ~success-clause ~unrealized-clause)

[1] : https://github.com/clj-commons/manifold/blob/master/src/manifold/executor.clj#L71 [2] : https://github.com/openjdk/jdk/blob/master/src/java.base/share/classes/java/util/concurrent/Executors.java#L687-L689 [3] : https://github.com/openjdk/jdk/blob/master/src/java.base/share/classes/java/util/concurrent/ThreadPoolExecutor.java#L1519 [4] : https://github.com/clj-commons/dirigiste/blob/master/src/io/aleph/dirigiste/Executor.java#L121

— Reply to this email directly, view it on GitHub https://github.com/clj-commons/manifold/issues/151#issuecomment-1450864943, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAHHB5KUHMX5DWKAY6VNEIDWZ64ZFANCNFSM4ERZ5BHA . You are receiving this because you were mentioned.Message ID: @.***>

ztellman commented 1 year ago

I stand corrected on success-error-unrealized being the thing we'd need to change. As already stated elsewhere, we'd need to add an additional check to the various arities of chain- and chain'-. This means the change will be a bit more verbose (sorry), but otherwise pretty straightforward.

Having taken a longer look at everything, here's what I believe needs to change:

Is there anything I'm overlooking?

KingMob commented 1 year ago

Zach, IIUC, you're suggesting the second thread local contain the executor the thread is actually running in, and that it will never change (Which is unlike how the instrumented-executor's :onto? param can set the first thread local, and d/onto changes it). Thus, the first thread local will represent what executor we want new jobs/threads/deferreds to run on, and the second what it's actually on.

Let's call it something like current-executor.

We'll wrap non-Manifold/dirigiste executors so their .execute fns set current-executor.

Then, we need to check the code to find out where it'll have to change. A search for calls to .execute turns up these likely candidates:

I think the future-with, wait-for, and go-off-with macros should always execute a new job, even on the same executor, so they should be left alone.

If we change the above locations, I don't think we also have to fool with the chain* fns, since they bottom out in calls to on-realized, whose bodies will all be handled by one of the above.

Is this a good summary?

arnaudgeiser commented 1 year ago

I think we are overlooking people using Aleph/Manifold/Dirigiste might also use others kind of asynchronous clients which use JDK Executors. The behavior of the Deferreds will then depend on which world the threads have been created from, which will raise even more questions related to this.

I'm scared about the following question:

This kind of behavior is something you can experience today using Aleph. If you ever decide to change the Dirigiste Executor (the thread-factory to be precise) [1] with a JDK Executor, you will lose the onto [2] and thus the automatic rescheduling on that Executor (if not realized already). You will have a code that behaves completely differently and you might be in cases where you will lock some Threads from your various application Executors (Netty EventLoopGroup, tiny FixedThreadPool) you don't want to block...

And then... you realize that some defaults of Aleph is actually preventing you from shooting yourself in the foot.

(def manifold-executor (ex/fixed-thread-executor 4 {:onto? true}))

(.submit manifold-executor
         #(-> (d/future (Thread/sleep 100))
              (d/chain' (fn [_] (prn (.getName (Thread/currentThread)))))))

;; => "manifold-pool-13-2"

(def executor (java.util.concurrent.Executors/newFixedThreadPool 4))

(.submit executor
         #(-> (d/future (Thread/sleep 100))
              (d/chain' (fn [_] (prn (.getName (Thread/currentThread)))))))

;; => "manifold-execute-9"

Do we really want more of this?

Anyway, just wanted to share my concerns. But implementation wise, what you propose will probably work the way we expect.


I would propose to explore another solution to not have this split between the Manifold/JDK world. We do not change anything but we introduce a manifold.deferred/chain-with function.

(defn chain-with
  ([x executor f]
   ...)
  ([x executor f g]
   ...))
  ([x executor f g & fs]
   ...))

And then, we "just" force the application of f on that executor as such :

(-> (d/future 1)
    (d/chain' (fn [_] (prn (.getName (Thread/currentThread))))) ;; current-thread
    (d/chain-with' (manifold.executor/fixed-thread-executor 2) (fn [_] (prn (.getName (Thread/currentThread)))))) ;; manifold-pool-1-1

With that, chain stays :

chain-with comes with :

And onto is still doing the same :

I will be happier with this proposition. Thoughts?

[1] : https://github.com/clj-commons/aleph/blob/master/src/aleph/http/server.clj#L612 [2] : https://github.com/clj-commons/manifold/blob/master/src/manifold/executor.clj#L69

KingMob commented 1 year ago

FYI, bit busy at the moment, I won't be able to get back to this until the following week.