vavr-io / vavr

vʌvr (formerly called Javaslang) is a non-commercial, non-profit object-functional library that runs with Java 8+. It aims to reduce the lines of code and increase code quality.
https://vavr.io
Other
5.6k stars 622 forks source link

ConsImpl and Lazy Accumulate During Stream Processing #2245

Closed Bill closed 6 years ago

Bill commented 6 years ago

The simplest stream processing expression, involving only iterate(), drop() and head() seems to result in an accumulation of Lazy and ConsImpl objects even though the client code doesn't seem to be holding onto head(s).

Looking at code like this (test project is here: https://github.com/Bill/vavr-stream-test):

    private static final int SIZE = (int)1E1; // twiddle this to make it arbitrarily large

    @Test
    public void simplestTest() {
        assertThat(SIZE+1, is(process(naturals(), SIZE))); // test passes but seems to accumulate objects
    }

    private static int process(final Traversable<Integer> traversable, final int size) {
        return traversable.drop(size).head();
    }

    private static Stream<Integer> naturals() {
        return Stream
                .iterate(1, i->i+1);
    }

If I set a breakpoint down in Stream.drop() it appears (from the IntelliJ memory debugger) that we accumulate a ConsImpl and a Lazy per step through the loop. I don't think they are eligible for GC until head() is called (after drop() returns.)

Alas my proof-fu is weak. Do you know if this is really a problem, or am I barking up the wrong tree?

The idea for this test arose from some production code that was definitely holding onto a lot of objects. In that code we weren't drop()ing—we were processing every element. The problem was that our app was holding on to all the intermediate objects until the end.

Update September 6, 2018: see my updates in the thread below. Figured out the Java issue here and found a cool (hack) fix in clojure.lang.Util.ret1(). Also figured out how to make the corresponding Scala nth(n,s) fn scale. Here's the scalable Java/vavr code:

    // Clojure clojure.lang.Util.ret1() trick to clear a local
    static <T> T ret1(final T arg, final T ignored) {
        return arg;
    }

    static <T> Option<T> nth(
            final int n,
            Stream<T> stream) {
        return drop(n,ret1(stream,stream=null)).toOption();
    }

I made the stream parameter to nth() not final and I used ret1() to clear that reference before calling drop().

This cool (hack) fix won't work in Scala. In Scala you have to use call by name for stream to avoid holding onto the head. That's arguably a cleaner fix.

nfekete commented 6 years ago

I'm not sure what is the expected behavior from your point of view in this case, but it might be worth pointing out, that vavr's Stream is not really analogous to Java's Stream as the former is traversable multiple times, while the latter can only be used once. Vavr's Stream is evaluated lazily but is cached afterwards, it's similar to a lazily constructed linked list. If you want a vavr type similar to JDK's Stream you're looking for Iterator.

Bill commented 6 years ago

Thank you @nfekete! That was exactly my problem. Redefining my "stream" functions in terms of vavr Iterator let me process arbitrarily long sequences without memory growth. This runs fine with java -Xmx10m whereas using vavr Streams it blows up with 10x the memory:

    private static final int SIZE = (int)1E8;

    @Test
    public void simplestIteratorTest() {
        assertThat(SIZE+1, is(process(naturalsIterator(), SIZE)));
    }

    private static Iterator<Integer> naturalsIterator() {
        return Iterator.iterate(1, i-> i + 1);
    }

    private static int process(final Traversable<Integer> traversable, final int size) {
        return traversable.drop(size).head();
    }

Q.E.D.

That being said, I don't know what I'd want a vavr Stream for 🤔

nfekete commented 6 years ago

You'd want it to lazily calculate only once (just like a vavr Iterator) but then transparently store the results in a linked list that can be reused. Note that vavr's Iterator cannot be reused, just like normal java iterators or java Streams.

Bill commented 6 years ago

Inasmuch as vavr is a functional programming library, I expected vavr's Stream to be like a SICP stream or a Clojure sequence.

I am very surprised that a thing called Iterator, in any Java library, is filling the role of SICP-style stream especially given the dichotomy between iterators (stateful) and SICP-style streams (immutable).

Are vavr's name choices, Stream/Iterator, coming from Scala tradition? A little googling leads me to believe that this is indeed the case ✓ (diabolical)

nfekete commented 6 years ago

Yes, vavr is close to Scala.

Bill commented 6 years ago

I have learned so much. But I still don't understand why this holds onto all the Stream data for so long:

   final int SIZE = (int)1E8;
   assertThat(SIZE+1, is(Stream.iterate(1, i -> i + 1).drop(SIZE).head()));

Looking at the pertinent Stream methods:

    static <T> Stream<T> iterate(T seed, Function<? super T, ? extends T> f) {
        Objects.requireNonNull(f, "f is null");
        return Stream.ofAll(Iterator.iterate(seed, f));
    }

    static <T> Stream<T> ofAll(Iterable<? extends T> elements) {
        Objects.requireNonNull(elements, "elements is null");
        if (elements instanceof Stream) {
            return (Stream<T>) elements;
        } else {
            return StreamFactory.create(elements.iterator());
        }
    }

And looking at Stream.drop():

    @Override
    default Stream<T> drop(int n) {
        Stream<T> stream = this;
        while (n-- > 0 && !stream.isEmpty()) {
            stream = stream.tail();
        }
        return stream;
    }

It seems like the Stream will be lazily materialized by the tail() calls in the drop() loop ✓ But what I don't see is why the old head doesn't become garbage after each iteration completes.

nfekete commented 6 years ago

This is just first glance:

  1. you create a Stream instance with Stream.iterate(1, i -> i + 1)
  2. you invoke drop(SIZE) on the previously created instance - while drop is running, the instance is still kept alive because that's the instance drop() was invoked on.
  3. when 2. finishes, the original stream can be thrown away, since a new Stream instance is returned to call tail() on.

So that means while drop() is running, the original Stream instance is still alive with all the lazily evaluated values cached in the linked list.

I don't see how this could be "solved". It seems unsolvable at this moment in Java. Maybe if there was tail call elimination for tail recursive function calls, we could use a recursive implementation for drop(), and the GC would (maybe) notice those instances are not needed anymore. But without that, the original Stream instance is on the stack until drop() returns and cannot be reclaimed.

Bill commented 6 years ago

And once again, I am surprised. Iterator does not appear to be immutable like a SICP-style stream. Instead it is mutable, more like a Java iterator:

    @Test
    public void iteratorImmutabilityTest() {
        final Iterator<Integer> sixPlus = naturalsIterator().drop(5);
        assertThat(sixPlus.drop(5).head(), is(11));
        assertThat(sixPlus.drop(5).head(), is(11)); // assertion fails!
    }
java.lang.AssertionError: 
Expected: is <11>
     but: was <17>

Somehow each of those drop(5) calls mutated the object reference by sixPlus. Digging in I see that vavr Iterator.next() actually (indirectly) calls java.util.Iterator.next()—mutating an actual Java iterator.

Is there no vavr thing that behaves like an immutable SICP-style stream/Clojure sequence?

nfekete commented 6 years ago

I don't know about SICP-style streams, but both Java iterators and streams are mutable. Java Streams are "consumed" once you invoke a terminal operation on them and cannot be reused. Vavr Iterators are still Java iterators, only on steroids 😄

danieldietrich commented 6 years ago

@Bill yes, Nándor is right, Vavr aligns closely to Scala. There, Iterator is also mutable.

Bill commented 6 years ago

Thanks @danieldietrich. As I've dug deeper into this issue I have come to believe that there is no good solution to the problem of scalable, lazy, immutable, sequence processing, for a Java library

Vavr's Lazy is actually very close to the SICP-style stream/Clojure sequence I seek. But there are deeper problems beyond the fact that e.g. as @nfekete pointed out: drop() is an instance method on Stream.

You can rewrite drop() as a static method so it doesn't hold onto its head (this, in this case). That'll get you a little further, but then you'll run into the next set of problems—problems that can't be fixed without severe contortion of all your library code, and a lot of client code too…

In the Clojure community, those contortions fall under the heading "closed-over locals clearing". Failing to aggressively clear closed-over locals will prevent the garbage collector from seeing what is properly garbage, in run-of-the-mill stream processing scenarios.

It turns out that a pillar of functional programming, recursion, is pretty easy to do without most of the time, in Java—by replacing recursion with looping. It's a little annoying, but you can start with recursive code, and the transformation to looping code is fairly straightforward. And when you're done, the result is still pretty readable. You can bundle that ugly code up into some higher-order fns that will take you very far.

What's surprising, to me at least, is that it is the garbage collection implications of laziness that present the more significant challenge. The Clojure compiler, for instance, goes to great lengths to clear closed-over locals. I've tried doing this by hand for a tiny Java test program and I haven't been able to get it right (yet). Even after I eventually figure it out, the difficulty of getting it right (for the toy case) does not bode well for it's application more generally (in a Java library).

So meh. I suppose I'm late to the party. But I was really hoping we could have scalable, lazy, immutable, sequence processing in our Java library. It seems that you can have any two but not all three:

note: by "scalable" here I mean that neither stack nor heap growth is related to the length of the stream you're processing. Also, when I say "loop" I mean write your own loops or call higher-order fns implemented in terms of loops.

historic note: I think Rich Hickey and friends implemented aggressive closed-over local clearing before Clojure 1.0 which was released in 2009. As recently as 2013-2016 bugs were still being discovered and refinements were still being made to that functionality.

nfekete commented 6 years ago

@Bill Could you elaborate a little bit on the problem you're trying to solve as all the above is a little bit too generic for me to see what you're trying to do that it's not supported by our current libraries? I'm not sure what the correct form would be for such a discussion as it's not strictly related to the present issue, so maybe the gitter channel would be a better fit, but then, that might be cluttered by other discussions too. But it's up to you...

Bill commented 6 years ago

@nfekete This is idiomatic Clojure processing of lazy, long (in this case, infinite) sequences:

Bills-MacBook-Air:java-garbage-test Bill$ clojure -J-Xmx10m
Clojure 1.9.0
user=> (defn naturals [] (iterate inc 1))
#'user/naturals
user=> (for [n [1e3 1e4 1e5 1e6 1e7 1e8]]
  (time (first (drop n (naturals)))))
("Elapsed time: 40.11968 msecs"
"Elapsed time: 92.93135 msecs"
"Elapsed time: 137.836554 msecs"
"Elapsed time: 824.371466 msecs"
"Elapsed time: 5806.061034 msecs"
"Elapsed time: 36581.824392 msecs"
1001 10001 100001 1000001 10000001 100000001)

We defined a function that returns the lazy, infinite, sequence of natural numbers. Then we timed dropping the first N of them and getting the N+1th one, for various values of N from N=one thousand to N=one hundred million. In a Java VM with 10MB of heap, it reaches the one-hundred-million-and-first natural number in around 37 seconds.

The important part is not necessarily that the sequence is infinite. It's that it is lazy and it is large, relative to the size of stack and the heap. It's important that we're able to process the sequence without overflowing either the stack or the heap.

Here is the smallest possible Maven/Java/JUnit project I was able to craft to try and attempt the same idea in Java:

https://github.com/Bill/java-garbage-test

Please see the README over there for what it is, how it behaves, how I tried to understand its behavior, and why I'm still puzzled.

update September 4, 2018: A kind person on Stack Overflow answered my question, why does this Java method leak—and why does inlining it fix the leak? and pointed out that adding the -Xcomp option to the JVM allows both the drop() and nth() methods to scale in my toy example.

Then another kind person weighed in, showing me the solution right out of the Clojure library code (clojure.lang.Util.ret1()) that obviates the -Xcomp JVM option.

My takeaway is that, while it's possible for a Java library to provide scalable, lazy, immutable sequence facility, it would place an onerous burden on users of that library. See e.g. ClearingArgsJavaTest.nth():

    static int nth(final int n, /*final*/ LazyishSeq lazySeqArg) {
        LazyishSeq lazySeqLocal = lazySeqArg;
        lazySeqArg = null;
        return drop(n,ret1(lazySeqLocal, lazySeqLocal=null)).head();
    }

I wouldn't want to have to write all my sequence-handling methods that way.

nfekete commented 6 years ago

Maybe I understand what you want. How about this code? Is this good for your purpose?

static interface VavrIterable<T> extends Iterable<T> {
    @Override
    io.vavr.collection.Iterator<T> iterator();
}

static VavrIterable<Integer> naturalsWithIterable() {
    return () -> Iterator.iterate(1, n -> n + 1);
}

@Test
public void nthTest2() {
    assertThat(naturalsWithIterable().iterator().drop(N).head(), is(N + 1));
    assertThat(naturalsWithIterable().iterator().drop(N).head(), is(N + 1));
}

If this does what you want, it might be worth giving a thought whether to include something like this into vavr. There's already an improved Iterator in vavr, but there's no improved Iterable, even if all that would do is to return an improved Iterator.

Bill commented 6 years ago

If I understand that code, drop(N) mutates the Iterator resulting from naturalsWithIterable().iterator().

It is perfectly possible to write lots of good software with mutable objects, and in particular (mutable) iterators. The existence proofs are numerous. But what I'm after, though, is a sequence abstraction that is immutable, like Clojure's sequence and like Abelson & Sussman's Scheme streams, and, incidentally, like Haskell lists.

Notice how the tail() method on the LazySeq class I defined, is idempotent. Contrast this with the drop(N) method on Iterable.

In this toy example, perhaps the value of immutability is not apparent. In Clojure and Scheme and Haskell (immutable) sequences or streams or lists take the place of (mutable) iterators or enumerators (like you have in Java or C++ STL or Ruby). In my opinion, code is easier to write and easier to understand with the immutable abstractions than with the mutable ones.

Bill commented 6 years ago

Here's a Scala worksheet implementing something very similar to what I tried in plain old Java and Java with vavr. Notice I reimplemented some of the Stream methods as functions so as not to hang on to the head of the stream (this in that case):

import scala.collection.immutable.Stream.cons

def iterate[A](start: A, f: A => A): Stream[A] =
  cons(start, iterate(f(start), f))

def inc(n:Int) = n + 1

def naturals() =
iterate(1, inc)

def drop[T](n : Int, s : Stream[T]) : Stream[T] =
  if (n <= 0 || s.isEmpty) s
  else drop(n-1, s.tail)

def nth[T](n : Int, s : Stream[T]) =
  drop(n,s).head

def N = 1e1.toInt

drop(N,naturals()).head
nth(N, naturals())

By implementing iterate(), drop(), and nth() as functions, I see no reason why they would cause objects to be retained in memory as they iterated.

I set N large e.g. 1e7, and observed significant memory growth in the JVM. That, coupled with googling about Scala and finding no mention of clearing of closed-over locals, leads me to believe that Scala does not clear closed-over locals and therefore suffers the same memory growth I see with my Java code.

I poked around a little bit but it wasn't obvious how to get at the class file the Scala compiler generated. That would be interesting.

This is pretty surprising. My impression was that Scala came from the Haskell tradition. Scala has actual syntax for call-by-name (which is used in the library definition of cons()) ✓ Given how important laziness is for our ability to apply "equational reasoning" it just doesn't make sense that the garbage issue hasn't been addressed.

I must be missing something.

Update September 6, 2018: Stack Overflow abides https://stackoverflow.com/questions/52208959/function-forwarding-stream-parameter-to-another-function-retains-reference

Changing nth(n,s) to this, solves the scaling problem:

  // have to call by name (s) here, otherwise we hold on to head!
  def nth[T](n : Int, s : => Stream[T]) =
    drop(n,s).head
nfekete commented 6 years ago

Fun fact: in your java project example dropTest() runs fine when run normally, but fails with OutOfMemoryError: GC overhead limit exceeded when run in debug mode.

I'm not sure what causes the difference, probably there are some optimizations that are run in normal mode but are suppressed in debug mode.

It makes sense in the sense that with debug mode the JVM needs to keep unreferenced stack local variables alive, because a debugger might inspect them, but they are otherwise unneeded. The question, is whether there is a contractual obligation for the JVM to mark those otherwise unused stack local variables eligible for GC or not. I'm not that familiar with the JVM to be able to answer this question. If there's no contractual obligation from the JVM to mark unused stack local variables eligible for GC then any kind of solution based on this behavior would be risky territory, as it would depend on unspecified JVM optimization.

Bill commented 6 years ago

Yeah that is interesting (that the heap interaction of dropTest() changes significantly when debugging.)

Note that the Clojure compiler has an option to disable locals-clearing. It's there to overcome the problem that "when using a debugger locals will appear as nulls, which makes debugging difficult". The Cursive Clojure plugin for IntelliJ has a button to enable/disable this feature. (The section Starting a Debug REPL has a good explanation).

I've spent some time looking at the JVM instructions in class files generated by the Clojure compiler. Have a look at this Clojure fn:

(defn foo [x]
  (iterate inc x))

Here's the Java decompiled from the class file generated by the Clojure compiler:

public final class core$foo extends AFunction {
    public static final Var const__0 = (Var)RT.var("clojure.core", "iterate");
    public static final Var const__1 = (Var)RT.var("clojure.core", "inc");

    public core$foo() {
    }

    public static Object invokeStatic(Object x) {
        IFn var10000 = (IFn)const__0.getRawRoot();
        Object var10001 = const__1.getRawRoot();
        Object var10002 = x;
        x = null;
        return var10000.invoke(var10001, var10002);
    }

    public Object invoke(Object var1) {
        Object var10000 = var1;
        var1 = null;
        return invokeStatic(var10000);
    }
}

Notice the function foo is implemented via class core$foo that extends AFunction (abstract base-class) which defines invoke(…) methods for various arities. See how the method call is bounced to a static method call? See how before calling anything, method parameters are first nulled?

Look at the bytecodes generated for the core$foo.invoke() method:

  public invoke(Ljava/lang/Object;)Ljava/lang/Object;
    ALOAD 1
    ACONST_NULL
    ASTORE 1
   L0
    LINENUMBER 79 L0
    INVOKESTATIC jira_rate/core$foo.invokeStatic (Ljava/lang/Object;)Ljava/lang/Object;
    ARETURN
    MAXSTACK = 2
    MAXLOCALS = 2

This is a method call so this is the local variable indexed at 0 and the first explicit parameter, var1 is at index 1. We ALOAD var1 onto the stack, then we ALOAD null onto the stack and store it into the local at index 1 (var1). Then we invoke the static method. At that point the local var1 on the stack has been set to null and its original value is on the stack (lower down) for core$foo.invokeStatic to use. Note though that this is on the stack in core$foo.invoke()'s frame and is not null. It's an instance of core$foo and so I don't think it will have any impact on GC tracing.

For completeness, here's the bytecode for core$foo.invokeStatic():

  public static invokeStatic(Ljava/lang/Object;)Ljava/lang/Object;
   L0
    LINENUMBER 79 L0
    GETSTATIC jira_rate/core$foo.const__0 : Lclojure/lang/Var;
    INVOKEVIRTUAL clojure/lang/Var.getRawRoot ()Ljava/lang/Object;
   L1
    LINENUMBER 80 L1
    CHECKCAST clojure/lang/IFn
    GETSTATIC jira_rate/core$foo.const__1 : Lclojure/lang/Var;
    INVOKEVIRTUAL clojure/lang/Var.getRawRoot ()Ljava/lang/Object;
    ALOAD 0
    ACONST_NULL
    ASTORE 0
   L2
    LINENUMBER 80 L2
    INVOKEINTERFACE clojure/lang/IFn.invoke (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
   L3
    ARETURN
    LOCALVARIABLE x Ljava/lang/Object; L0 L3 0
    MAXSTACK = 4
    MAXLOCALS = 1

I am curious to know if the Scala compiler performs this sort of clearing of locals. As I said, I suspect it does not. Without this local clearing—either by a compiler, or by hand-written Java code, I don't see how we can't have scalable, lazy, immutable sequences.

It seems that the Scala community, and by extension, the vavr community, has primarily carried forth the Java/C++/Ruby tradition of using iterators/iterators/enumerators (mutable) for sequence processing, as opposed to the Scheme/Haskell/Clojure tradition of immutable streams/lists/sequences for sequence processing. As I've said, that can certainly work.

Seems a shame though, since so much modern work in functional programming assumes immutability. Haskell has been the lingua franca of functional programming for over twenty years. Look at this paper for instance:

http://www.cs.nott.ac.uk/~pszgmh/fold.pdf

How could those results be applied to Scala with iterators? At a minimum, the proofs would have to be adapted to mutability, perhaps by placing certain constraints on the iterators. On the other hand, even though it has a different syntax, and lacks Haskell's type system, Clojure benefits from Hutton's proof framework, I believe, since Clojure's reduce is directly analogous to the fold/foldr on which the framework is built.

Update: I heard from Graham Hutton, the author of that paper. He said:

I think you could write the programs from the paper in any language that supports higher-order functions, but the reasoning aspect of the paper likely requires a pure functional language, as the basic principles of equational reasoning are not valid in other settings.

Then I heard from Jeremy Gibbons (a.k.a. @jegi here on Github) who shared this:

Bruno Oliveira's and my paper "The Essence of the Iterator Pattern" was intended to capture the story for mutable datatypes. http://www.cs.ox.ac.uk/publications/publication1409-abstract.html

Those two papers, to me, encapsulate the whole problem/opportunity here. If you have the opportunity to build on immutable sequences, you eliminate a ton of complexity relative to (mutable) iterators.

Update September 6, 2018: if you look at my updates to this thread you'll see that, thanks to kind+smart people on Stack Overflow, not only do I have a Java solution (borrowed from clojure.lang.Util.ret1() but I also have a general Scala solution too (call by name).

So Scala/vavr Stream can work. If you're gonna pass around streams in Scala, you need to do so, by name, not by value. In Java/vavr land, the clojure.lang.Util.ret1() trick for clearing the local is a bit uglier—but it works.