amaembo / streamex

Enhancing Java Stream API
Apache License 2.0
2.18k stars 249 forks source link

Adding sliding() implementation #158

Open oleg-smith opened 7 years ago

oleg-smith commented 7 years ago

Is it possible to add sliding() to StreamEx? Now I found it only in tests.

amaembo commented 7 years ago

I'm reluctant to implement it as built-in operation, because it's hard to implement efficient parallel version. Test demonstrates how you can do it in your project if you need it. If you want slides of two elements, use pairMap. Also if your source is List or array, you can use StreamEx.ofSubLists() and you will have very efficient sliding with a little allocations.

oleg-smith commented 6 years ago

Ok, and could you tell why ofSublists() cannot work over ordered sequential stream? Why do I need to create collection first to use ofSublists() for such case?

amaembo commented 6 years ago

There's no thing like sequential stream in the Stream API type system: the parallel flag can be set and cleared at any point and I cannot provide sequential implementation only without providing a parallel implementation. It's possible to use "poor man parallelism", but I don't like to do it (though do it occasionally).

List.ofSubLists works completely fine for parallel stream if the source list provides fast random access, because different threads can get different list elements by an index.

oleg-smith commented 6 years ago

Ok, there is no thing like that, how do I solve problem where I need to split stream by equally sized portions without loading all its content in memory?

amaembo commented 6 years ago

Currently you may create utility methods in your project like it's demonstrated in tests. Write in utility class:

private static <T> StreamEx<List<T>> sliding(StreamEx<List<T>> input, int size) {
    return input.headTail((head, tail) -> head.size() == size ? sliding(
            tail.mapFirst(next -> StreamEx.of(head.subList(1, size), next).toFlatList(l -> l)), size).prepend(head)
            : sliding(tail.mapFirst(next -> StreamEx.of(head, next).toFlatList(l -> l)), size));
}

public static <T> Function<StreamEx<T>, StreamEx<List<T>>> sliding(int size) {
    return s -> sliding(s.map(Collections::singletonList), size);
}

Use:

StreamEx.of(1,2,3,4,5).chain(sliding(3)).forEach(System.out::println);

Output:

[1, 2, 3]
[2, 3, 4]
[3, 4, 5]
oleg-smith commented 6 years ago

yeah, and for non-overlapping case - just to set some other params?

amaembo commented 6 years ago

For batches there's another sample. In utility class:

private static <T> StreamEx<List<T>> batches(StreamEx<T> input, int size, List<T> cur) {
    return input.headTail((head, tail) -> cur.size() >= size
                    ? batches(tail, size, asList(head)).prepend(cur)
                    : batches(tail, size, StreamEx.of(cur).append(head).toList()),
            () -> Stream.of(cur));
}

public static <T> Function<StreamEx<T>, StreamEx<List<T>>> batches(int size) {
    return s -> batches(s, size, Collections.emptyList());
}

Use:

StreamEx.of(1,2,3,4,5,6,7,8).chain(batches(3)).forEach(System.out::println);

Output:

[1, 2, 3]
[4, 5, 6]
[7, 8]
oleg-smith commented 6 years ago

Thank you!

mkobit commented 5 months ago

In Java 22 from JEP-461, the Gatherer API will support this type of aggregation and emission in java.util.Stream.

There looks to be some built-ins right now, similar to Collectors.