amaembo / streamex

Enhancing Java Stream API
Apache License 2.0
2.18k stars 249 forks source link

flatMap -> groupRuns -> takeWhile performance issue #267

Closed HoelzelJon closed 1 year ago

HoelzelJon commented 1 year ago

I've noticed some unexpected performance problems when chaining these three stream operations together.

Here is a minimal example (using StreamEx version 0.8.1). If I debug this code, I see that in the "bad" case, the aString -> aString + "!" lambda is run on each element of the input stream (including ones that should be filtered out by the takeWhile) before the groupRuns or takeWhile lambdas are ever called. As shown with the other examples, this only seems to happen when chaining together a flatMap, a groupRuns, and then a takeWhile.

import java.util.List;
import java.util.function.Function;
import java.util.stream.Stream;
import one.util.streamex.StreamEx;

public class StreamExTest {
    private static List<Stream<String>> inputStream() {
        return List.of(Stream.of(
                "foo1", "should be printed", "hello world",
                "foo2", "should be mapped and grouped", "but not printed",
                "foo3", "should not reach initial map"));
    }

    public static void main(String[] args) {
        // bad -- calls the map function on each stream entry before grouping
        StreamEx.of(inputStream())
                .flatMap(Function.identity())
                .map(aString -> aString + "!")
                .groupRuns((aFirst, aSecond) -> !aSecond.startsWith("foo"))
                .takeWhile(aStrings -> !aStrings.get(0).endsWith("2!"))
                .forEach(System.out::println);

        // works as expected (not using takeWhile)
        StreamEx.of(inputStream())
                .flatMap(Function.identity())
                .map(aString -> aString + "!")
                .groupRuns((aFirst, aSecond) -> !aSecond.startsWith("foo"))
                .forEach(System.out::println);

        // works as expected (not using groupRuns)
        StreamEx.of(inputStream())
                .flatMap(Function.identity())
                .map(aString -> aString + "!")
                .takeWhile(aString -> !aString.endsWith("2!"))
                .forEach(System.out::println);

        // works as expected (not starting with flatMap)
        StreamEx.of(inputStream().get(0))
                .map(aString -> aString + "!")
                .groupRuns((aFirst, aSecond) -> !aSecond.startsWith("foo"))
                .takeWhile(aStrings -> !aStrings.get(0).endsWith("2!"))
                .forEach(System.out::println);
    }
}

This behavior is quite undesirable if other intermediate stream operations are computationally intensive and/or produce objects that take up a lot of memory, especially if the takeWhile filters out a large number of elements.

HoelzelJon commented 1 year ago

My workaround for the time being is to do something like this, avoiding the flatMap:

myFlattenedStream = Stream.empty();
for (Stream<String> myStream : inputStream()) {
    myFlattenedStream = Stream.concat(myFlattenedStream, myStream);
}

StreamEx.of(myFlattenedStream)
    ...
amaembo commented 1 year ago

Hello! Unfortunately, this is a known limitation of current implementation. New stream operations, which don't delegate to existing ones, take the spliterator of the original stream. If there's a short-circuiting operation after that, then tryAdvance method of that spliterator is used, which eventually delegates to the tryAdvance method of the original spliterator. But the spliterator returned after flatMap actually consumes the whole sub-stream (see WrappingSpliterator implementation). You can reproduce the problem even without StreamEx:

        var spliterator = Stream.of(0).flatMap(x -> Stream.of(1, 2, 3))
                .peek(System.out::println)
                .spliterator();
        spliterator.tryAdvance(x -> {});

This code prints 1 2 3, despite we requested only one element from the spliterator. Unfortunately, there's no good way to fix this in JDK, and I see no better way to implement custom stream operations, other than delegating to the original stream spliterator. So, sometimes you may observe such performance glitches.

HoelzelJon commented 1 year ago

Huh, yeah that is unfortunate :/ Thank you for the detailed response @amaembo !