jOOQ / jOOL

jOOλ - The Missing Parts in Java 8 jOOλ improves the JDK libraries in areas where the Expert Group's focus was elsewhere. It adds tuple support, function support, and a lot of additional functionality around sequential Streams. The JDK 8's main efforts (default methods, lambdas, and the Stream API) were focused around maintaining backwards compatibility and implementing a functional API for parallelism.
http://www.jooq.org/products
Apache License 2.0
2.08k stars 167 forks source link

New Extension of CompletableFuture or a new implementation of CompletionStage #283

Closed billoneil closed 7 years ago

billoneil commented 7 years ago

CompletableFutures API seems a bit off. It would be nice to have an implementation that remembered its Executor and used it by default for all async operations instead of the common pool. Also a shorter name might be nice.

Simplified possible implementation.

// Promise might not be a great name
public class Promise<T> implements Future<T>, CompletionStage<T> {
    private final CompletableFuture<T> future;
    private final Executor defaultExecutor;

    // This could be private and use static methods the same way CompletableFuture does.
    public Promise(CompletableFuture<T> future, Executor defaultExecutor) {
        this.future = future;
        this.defaultExecutor = defaultExecutor;
    }

    @Override
    public <U> Promise<U> thenApply(Function<? super T, ? extends U> fn) {
        return new Promise<>(future.thenApply(fn), defaultExecutor);
    }

    @Override
    public <U> Promise<U> thenApplyAsync(Function<? super T, ? extends U> fn) {
        return new Promise<>(future.thenApplyAsync(fn), defaultExecutor);
    }

    @Override
    public <U> Promise<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor) {
        return new Promise<>(future.thenApplyAsync(fn), executor);
    }

    public T join() {
        return future.join();
    }

    public CompletableFuture<T> getCompletableFuture() {
        return future;
    }

}

This would allow us to use CompletableFuture under the hood and get any future improvements from it. It would gain the benefit of remembering the last Executor used.

lukaseder commented 7 years ago

Thank you very much for your suggestion.

For the record, one of the main reasons why there is no convenience API for CompletionStage / CompletableFuture in jOOλ yet is because I really don't like the API design of this new thing, and I'm quite convinced that a future Java will contain yet another attempt to tackle this problem domain.

But there are real world needs and for those cases, CompletionStage might just be good enough, so I'm happy to discuss.

I completely agree with you that any Executor that is passed should be somehow retained in the downstream call chain. It is really weird how CompletableFuture keeps defaulting to wasting the JDK's default ForkJoinPool busy without providing any easy SPI to override this behaviour. jOOQ for instance has an ExecutorProvider, which allows for providing the default Executor for CompletionStage at all times. I think this design worked really well for jOOQ, we might be able to reproduce it in jOOλ.

While we're discussing this, there's also potential of copying jOOQ's ManagedBlocker convenience API. Every time you pass some logic to a jOOQ CompletionStage, it will be wrapped in a ManagedBlocker in order not to saturate the underlying Executor. Few people are aware of this weird API, but it definitely makes sense to support it, too (through a different issue):

    static <T> Supplier<T> blocking(Supplier<T> supplier, boolean threadLocal) {

        // [#5377] In ThreadLocal contexts (e.g. when using ThreadLocalTransactionprovider),
        //         no ManagedBlocker is needed as we're guaranteed by API contract to always
        //         remain on the same thread.

        return threadLocal ? supplier : new Supplier<T>() {
            volatile T asyncResult;

            @Override
            public T get() {
                try {
                    ForkJoinPool.managedBlock(new ManagedBlocker() {
                        @Override
                        public boolean block() {
                            asyncResult = supplier.get();
                            return true;
                        }

                        @Override
                        public boolean isReleasable() {
                            return asyncResult != null;
                        }
                    });
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }

                return asyncResult;
            }
        };
    }

One more question: Why do you let your Promise (I'd prefer not to recycle this name, btw. It makes a false promise ;) which leads to discussions) extend Future?

billoneil commented 7 years ago

I wrote it in a few minutes without much thought :). I agree the name is bad and it probably doesn't need to extend Future. I just copy pasted from CompletableFuture as a starting point.

lukaseder commented 7 years ago

Oh, I see, thanks for the clarification.

billoneil commented 7 years ago

If you think this is something that would fit in jOOλ I would be happy to try and help implement / test.

billoneil commented 7 years ago

Since this is something I will probably want on my own I started teasing it out a bit. I'd be happy to move any code here if you decide this does fit in jOOλ and have any better design decisions.

I still need to write tests. https://github.com/StubbornJava/StubbornJava/pull/4

Everything should return a CompletionStage so the implementation is hidden. Since CompletionStage has a toCompletableFuture() method this seems to work well. I'm still bad at naming. If you want me to make a PR here for discussion I can do that. Just figured this might not be high up on your priority list right now.

lukaseder commented 7 years ago

Yes it definitely fits. Thanks very much for your offer!

How about calling the helper class Async and then offer static constructor methods? That would allow for hiding this "Promise" class and its implementation, similar to what Unchecked already does.

Some further inspiration about what jOOQ already does here: https://github.com/jOOQ/jOOQ/blob/master/jOOQ/src/main/java/org/jooq/impl/ExecutorProviderCompletionStage.java

billoneil commented 7 years ago

Perfect that looks similar to what I ended up with. Just need to switch to CompletionStage instead of CompletableFuture internally as well as make the methods final.

Questions.

  1. Should it take an Executor or did you also want to expose the ExecutorProvider? Once the Async class is created you can have additional methods in the future for an ExecutorProvider if you want.
  2. Do you have closed source unit tests for ExecutorProviderCompletionStage that you would be able to open source for this new class? I'd prefer to reuse already functioning tests if possible. Might not be easy to reuse if they are all based on the ExecutorProvider though. If not no problem.
lukaseder commented 7 years ago
  1. Hmm, the problem with the ExecutorProvider is that it needs to be registered in a globally available "configuration" as in jOOQ. This isn't really possible with static methods. Perhaps we should simply retain the last available Executor instead?
  2. Yes there are closed source integration tests, but they're too tightly coupled to jOOQ and querying databases, so I doubt they'll be useful here.
billoneil commented 7 years ago

Sounds good. I'll start a PR later today and at least get the implementation going. Any suggestions for testing? I was thinking of making a custom Executor that has a single manually created thread and just checking that it is the executing thread for the default async methods.

lukaseder commented 7 years ago

Welcome to the wonderful world of CountDownLatch :) That's just one technique, there are others. Preferably nothing that depends on the randomness of some wall clock. There should also be an executor with more than one thread to test stuff being run in parallel.

billoneil commented 7 years ago

Yeah I used to use sleeps 😱 but found CountDownLatch for async tests not too long ago. I'm thinking of how to check that it is using the correct Executor. I'll try to figure it out then let you poke holes if it's not adequate.

lukaseder commented 7 years ago

A simple test to see if the correct Executor is used is to let the Executor increment some AtomicInteger once it gets a new submission. E.g.:

AtomicInteger i1 = new AtomicInteger();
AtomicInteger i2 = new AtomicInteger();
Executor e1 = wrapAnExecutorAndIncrement(i1, executor1);
Executor e2 = wrapAnExecutorAndIncrement(i2, executor2);
billoneil commented 7 years ago

👍 Thanks

billoneil commented 7 years ago

@lukaseder let me know if this is on the right track https://github.com/jOOQ/jOOL/pull/284

billoneil commented 7 years ago

Keeping this open until I finish writing the tests and we decide on your two comments.

lukaseder commented 7 years ago

Yep

billoneil commented 7 years ago

Let me know if you think this is missing anything or if it needs some documentation. If not I think this can be closed.

I'll make a separate issue for the ManagedBlocker I'll have to read up on that I have not come across it before.

lukaseder commented 7 years ago

Nope it's perfect. We can create new issues if something's wrong / missing :)