aol / cyclops

An advanced, but easy to use, platform for writing functional applications in Java 8.
Apache License 2.0
1.31k stars 136 forks source link

Why Future only support sync fromTry? #977

Open baha2046 opened 5 years ago

baha2046 commented 5 years ago

I try to convert some code from Vavr to cyclops, but I found that Vavr's Future fromTry only support async but in cyclops fromTry only support sync method. I write it by myself but i wonder why there has no async version. Something like

    public static <T,X extends Throwable> Future<T> futureFromTry(final Executor ex,final Supplier<Try<T,X>> s)
    {
        CompletableFuture<T> cf = new CompletableFuture<>();
        ex.execute(()->s.get().fold(cf::complete, cf::completeExceptionally));
        return Future.of( cf);
    }
johnmcclean commented 5 years ago

Hi @baha2046 ,

It really depends on the use case. AFAIK in Vavr Try is an eager datastructure, so the data is already in place at the point in time at which it is converted to a Future. If we write some code using Java's CompletableFuture that works with data that is already present e.g.

CompletableFuture.supplyAsync(()->"hello",Executors.newFixedThreadPool(1));

There is a good chance any future combinators will be executed on the current thread (not on the thread pool) e.g. using thenAccept to print out the current thread

System.out.println("Thread " + Thread.currentThread().getId());

CompletableFuture.supplyAsync(()->"hello",Executors.newFixedThreadPool(1))
                 .thenAccept(i->{
            System.out.println("Thread " + Thread.currentThread().getId());
}).join();

Running it, prints out that the combinators are being executed on the main thread and not somewhere in the thread pool.

Thread 1
Thread 1

With this behaviour I don't think it makes sense to create a Future from a Try that is already completed, as all additional processing will occur on the current Thread (really its only worth the context switch if we are teeing up an expensive or blocking operation like IO).

If the goal is to do some expensive additional processing that returns a Try, we can make use of Future::of to create the asynchronous Future and mergeMap to flatten the Try into the Future. e.g.

 Future<Integer> f = Future.of(()->Try.withCatch(this::loadData),ex)
                                              .mergeMap(i->i);

Or to continue processing with an existing Try

Try<File> myTry;
 Future<Integer> f = Future.of(()->myTry.map(this::loadData),ex)
                                              .mergeMap(i->i);

In cyclops Try can operate as an eager, lazy or reactive datastructure. If your Try is being populated (completed) asynchronously you can convert it into a Future via Future::fromPublisher

E.g. running this code (on the forthcoming 10.1.0 release, there is a bug in 10.0.x that makes the subscription to completable types blocking).

System.out.println("Thread " + Thread.currentThread().getId());

LazyEither.CompletableEither<String, String> completable = LazyEither.<String>either();
Try<String,Throwable> async = Try.fromEither(completable);

Future<String> future = Future.fromPublisher(completable)
                                        .peek(System.out::println)
                                        .peek(i->System.out.println("Thread " + Thread.currentThread().getId()));

new Thread(()->{
            completable.complete("hello world");
 }).start();

future.get();

Results in

Thread 1
hello world
Thread 11
baha2046 commented 5 years ago

Hi john, Thanks for your reply, i learn a lot from your blog. I didn't know that in cyclops Future<Try> can convert to a Future by mergeMap, that is what I want to do :) I am coding something like:

    private Eval<Future<Image>> fximage = Eval.later(this::loadLazy);

    private Future<Image> loadLazy() {
        return this.localPath.isDefined() ?
                Future.fromTry(loadImageFromFile(this.localPath.get(), this.widthLimit, this.heightLimit)) :
                Future.of(() -> loadImageFromURL(this.thumbURL.get(), this.widthLimit, this.heightLimit), GUICommon.getExecutorServices()).mergeMap(i -> i);
    }

    public void useImage(Consumer<Image> imageConsumer) {
        if (imageConsumer != null) {
            this.fximage.get().forEach(imageConsumer);
        }
    }

Also, i saw there a class call IO in cyclops, but I cannot find any document about how to use it...

johnmcclean commented 5 years ago

Ah cool! It is possible to cleanly mergeMap the Future into Eval aswell (getting the laziness of Eval and the asynchronous execution of Future -without the generics :) )

E.g.

System.out.println("Thread is " + Thread.currentThread().getId());
Eval<Future<String>> isAsync = Eval.later(()->Future.of(()->"hello from thread " + Thread.currentThread().getId(), Executors.newFixedThreadPool(1)));

Eval<String> mergeAsync = isAsync.mergeMap(i->i);
System.out.println(mergeAsync.get());

Will print out (when triggered)

Thread is 1
hello from thread 11

In your example that could look something like this

 private Eval<Image> fximage = Eval.later(this::loadLazy)
                                   .mergeMap(i->i);

By the way, mergeMap is for reactive-streams Publishers - so it will work with Mono & Flux from Project Reactor and Flowable from RxJava2 (flatMap in those libraries will accept Cyclops types).

If you wanted to mix Cyclops and Vavr - concatMap in cyclops will accept most Vavr types (as it accepts an Iterable), flatMap in Vavr generally accepts an Iterable (and all - or nearly all Cyclops types implement both Iterable and Publisher).

I have a Dysfunctional Programming in Java blog planned for IO which should help explain the rationale and how to use it. It's a relatively new datatype in Cyclops - so I am sure there are lots of enhancements we can make to it aswell.