vavr-io / vavr

vʌvr (formerly called Javaslang) is a non-commercial, non-profit object-functional library that runs with Java 8+. It aims to reduce the lines of code and increase code quality.
https://vavr.io
Other
5.7k stars 633 forks source link

Support for concurrent recursive function memoization #2086

Open smillies opened 7 years ago

smillies commented 7 years ago

Vavr already supports non-concurrent function memoization. As I happen to have written a concurrent recursive function memoizer some time ago, I will attach it as a patch for you to decide if it's worth including something like this in Vavr. I have ported the code from ordinary Java (CompletableFuture) to (I hope idiomatic) use of Vavr constructs (Promise, Future): concurrent-recursive-function-memoization-patch.zip

Usage example with good old Fibonacci sequence (doesn't actually show concurrent calls, but should be enough to illustrate the use):

import io.vavr.Function1;
import java.math.BigInteger;

import static io.vavr.concurrent.Future.successful;

public class Fibonacci {
    private static final Future<BigInteger> ONE = successful(BigInteger.ONE); // some shorthand
    private final Function1<Integer, Future<BigInteger>> fibMem;  // a thread-safe, concurrently memoized recursive function

    public Fibonacci() {
        fibMem = MemoizedConcurrently.of(this::fib);
        assert fibMem instanceof MemoizedConcurrently;
    }

    // fib delegates recursive calls to its memoizing version
    // in good functional style we use the monadic operations flatMap and map on the Future
    public Future<BigInteger> fib(int n) {
        if (n <= 2) return ONE;
        return fibMem.apply(n - 1).flatMap(x ->  // x = fib(n-1)
                fibMem.apply(n - 2).map(y ->     // y = fib(n-2)
                        x.add(y)));              // return x + y
    }

    public static void main(String[] args) {
        Fibonacci fibonacci = new Fibonacci(); // shared instance that supports concurrent calls to fib(int)
        Future<BigInteger> fibonacciNumber = fibonacci.fib(200_000); // Look Ma: linear runtime without StackOverflowError!
        System.out.println(fibonacciNumber.get()); // block here
    }
}

My original blog post about this topic is here.

chb0github commented 7 years ago

If you would like to take credit for it (and we would like you to), then it should be in the form of a pull-request which is pretty trivial in GH.

danieldietrich commented 7 years ago

Mmmh, that is pretty cool. I have some questions.

1) I would love to see all additional types hidden and instead static factory methods

interface Function0<R> {
    static <R> Function0<Future<R>> memoized(Supplier<? extends Future<? extends R>> f) { ... }
}

interface Function1<T1, R> {
    static <T1, R> Function1<T1, Future<R>> memoized(Function<? super T1, ? extends Future<? extends R>> f) { ... }
}

interface Function2<T1, T2, R> {
    static <T1, T2, R> Function2<T1, T2, Future<R>> memoized(BiFunction<? super T1, ? super T2, ? extends Future<? extends R>> f) { ... }
}

interface Function3<T1, T2, T3, R> {
    static <T1, T2, T3, R> Function3<T1, T2, T3, Future<R>> memoized(Function3<? super T1, ? super T2, ? super T3, ? extends Future<? extends R>> f) { ... }
}

...

That should still work, especially trampolining.

@chb0github you are right but in this case it is not trivial to create a PR. There is a code generator written in Scala that has to be modified (Generator.scala). Also we need to do this for Function and CheckedFunction...

danieldietrich commented 7 years ago

One more thing: It is not clear where to put it. We plan for 1.0.0 to modularize Vavr (see this blog post). If we modify io.vavr.Function*, we have to pull the concurrent package into the upcoming vavr-core module. I planned to create a separate module for the concurrency stuff.

Another solution would be to add this functionality to Future itself, e.g.

interface Future<T> {
    // static factory methods for Function0..n
    static <T1, R> Function1<T1, Future<R>> memoized(Function<? super T1, ? extends Future<? extends R>> f) { ... }
    // ...
}

But I agree that it would be more intuitive to place these methods in Function...

Any opinions?

danieldietrich commented 7 years ago

I think Future is the right location. Also we need variants that take an ExecutorService in order to control the Thread creation etc.

This leads to (Function0..8 + CheckedFunction0..8) 2 = 36 new static factory methods in Future (compared to (Checked)FunctionX 2 = 2 new static factory methods in every (CheckedFunctionX). 😱

danieldietrich commented 7 years ago

@smillies (Off-topic) I see that you already got your hands on Kotlin coroutines. I've read that they do not use native Threads. Do you think that it is possible to achieve the same in Java while preserving the functionality of Future/Promise? (I mean backing Future on s.th. else than a native Thread)

smillies commented 7 years ago

relaxing the type signatures to Function1<T1, ? extends Future<? extends R>> -> Function<T1,Future<R>> seems a good idea, although that will require a cast in the short-circuiting clause of MemoizedConcurrently.

But I do not see what you gain by introducing all those factory methods. In what way is writing fibMem = Function1.memoized(this::fib) any prettier than fibMem = MemoizedConcurrently.of(this::fib) ?

And you can keep a separate module for the concurrent stuff.

smillies commented 7 years ago

@danieldietrich (Off-topic) I haven't looked at Kotlin coroutines in-depth. However, as Kotlin coroutines require a new keyword (suspend) that makes the Kotlin compiler issue special bytecode (see here), I don't think you can pull this in easily without pulling in a compile-time dependency on Kotlin. As a library user, I wouldn't like that.

Apart from suspend, the rest seems to be Kotlin library code, not language constructs. The context switching seems to be done by the Kotlin CoroutineDispatcher. Something of how concurrency and threads work is explained in the Coroutines Guide and Informal Description document. These links may be outdated.

danieldietrich commented 7 years ago

@smillies Thanks!

I do not see what you gain by introducing all those factory methods

I think having not too many entry points (= kind of different words of the 'language' Vavr) makes it easier to find functionality. But maybe you are right. I will think about it...

relaxing the type signatures to (...)

I think you are right, it can be simplified in this case - it is similar to this one


(Off-topic)

Apart from suspend, the rest seems to be Kotlin library code, not language constructs (...)

Thank you for your thoroughly investigation. The idea of having 'lightweight' concurrency in Java without native threads is tempting but I will not dig deeper for now...

smillies commented 7 years ago

Oh yes, and I believe you do not want a varant of MemoizedConcurrentlly.of() that accepts an ExecutorService. The async call in the memoizer is only for trampolining, it it weren't for that, you might just as well call the underlying function directly. For trampolining, I think you'll be fine with Future's default ForkJoinPool. A sensible alternative would be to create a dedicated newSingleThreadExecutor for each instance of ConcurrentTrampoliningMemoizer, although that might become wasteful when there are very many memoized functions. I think the opportunity to pass in a trampoline might potentially create more harm than good. Perhaps experience will show otherwise later, but I wouldn't do it now.

smillies commented 7 years ago

What you do want is a variant of MemoizedConcurrentlly.of() that accepts a cache implementation: that's why the constructor of ConcurrentTrampoliningMemoizer has a ConcurrentMap argument in the first place. The memo might take up a lot of memory for intermediate results that might become unneeded. I'd like people to be able to pass in a real cache that has some sort of expiry policy, as a replacement for the ConcurrentHashMap in the default implementation.

danieldietrich commented 7 years ago

Thanks. Oh yeah, caching... Nice topic 🤔

smillies commented 7 years ago

Hi there, I've gone and created a pull request. Changes compared to the original proposal: MemoizedConcurrently has an additional method that accepts a cache implementation, and ConcurrentTrampoliningMemoizer is hidden (package private). Regards, Sebastian

smillies commented 7 years ago

I have closed the pull request, because the code used Promise, which has been removed meanwhile.

Without Promise, I currently see no good way to implement this functionality. It could be implemented in terms of CompletableFuture, as in the original version on my blog, but then there's no good way to go from a Function1 that produces a vavr.io.concurrent.Future to a Function that produces a CompletableFuture and back.

(And Future.fromCompletableFuture() cannot be used, because it blocks.)

danieldietrich commented 7 years ago

(And Future.fromCompletableFuture() cannot be used, because it blocks.)

This can be considered as bug. We need a version that asynchronously completes the wrapping future. I will create an issue. Update: see #2115