jOOQ / jOOL

jOOλ - The Missing Parts in Java 8 jOOλ improves the JDK libraries in areas where the Expert Group's focus was elsewhere. It adds tuple support, function support, and a lot of additional functionality around sequential Streams. The JDK 8's main efforts (default methods, lambdas, and the Stream API) were focused around maintaining backwards compatibility and implementing a functional API for parallelism.
http://www.jooq.org/products
Apache License 2.0
2.08k stars 167 forks source link

chunked(long), chunked(Predicate<T>) #296

Closed billoneil closed 4 years ago

billoneil commented 7 years ago

I'm looking for a way to take a Seq<T> and transform it to a Seq<List<T>> for batch processing while keeping lazy evaluation. (I am streaming and processing large files from S3 then pushing them back to S3)

I found the following and have two concerns. http://stackoverflow.com/a/30662609

  1. Note that this solution unnecessarily stores the whole input stream to the intermediate Map (unlike, for example, Ben Manes solution) Is this accurate?
  2. I believe the groupBy would exhaust the entire stream. In my case the file may not fit in memory so this would not work if that is the case.

Would the above be worth creating a helper for? Or would it be worth making something similar to Guavas Iterables.partition

billoneil commented 7 years ago

Example

Seq.of(1,2,3,4,5,6).grouped(3);
// ((1,2,3), (4,5,6))

EDIT: I don't think grouped would be the right name here some other libraries just use this name.

I tried looking into the Window functions but I'm not sure if that would work.

lukaseder commented 7 years ago

Thanks for your suggestion

1) Yes that's accurate. The solutions I suggested on Stack Overflow are workarounds that work well for small data sets, but don't scale as they're not lazy 2) GroupBy is a terminal operation, indeed, so the stream is consumed 3) Grouped already exists but it might not be a good candidate here, as it also risks consuming the entire stream 4) Window functions currently aren't lazy (enough)

So, yes, perhaps a helper might be worth exploring. Now about naming :) It looks like it's the opposite of flatMap() if you neglect type formalisms. bufferMap() perhaps? I'm not so fan of recycling the term group as the existing group notion groups by value, not index.

Also, the new functionality should probably ship with two flavours. "grouping" by index, and by predicate (e.g. a predicate returning true at the last element of a "group")

ben-manes commented 7 years ago

How about splitBy(int) and splitBy(pred)? This might feel like a plural version of splitAt which does a singular split.

lukaseder commented 7 years ago

Nice thinking, although the return types are fundamentally different, so reusing the term might seem surprising. How do Haskell et al. call this operation?

ben-manes commented 7 years ago

Haskell calls it chunksOf (previously called splitEvery).

lukaseder commented 7 years ago

Hmm, interesting. Yes, that's the exact equivalent to what's suggested here. But divvy looks similar (and useful) as well, although I'm not convinced it's a very easy to remember name :)

billoneil commented 7 years ago

@lukaseder any thoughts on implementation? Would returning a new Seq that wraps the Iterator of the original Seq using logic similar to Guava's Iterables. partition be a good approach?.

I like the two different flavor approach but I think it would require some documentation because of the lazy aspect. The predicate could only define the end of the current chunk or beginning of a new chunk.

Example

Seq.of(1,1,2,2,1,2,3,4,4).chunksOf(n -> isEven(n));
// ((1,1,2), (2), (3,4), (4))

Some might read the above and expect two groups even and odd similar to grouping functionality.

Naming isn't my strong suit and as long as it makes some sense it's fine with me. What about something like chunkWhen, partitionWhen, splitWhen, chunkUntil ... Just tossing some ideas out there. Maybe this could reduce confusion between grouping?

billoneil commented 7 years ago

I like your suggestion of buffer. bufferMap, bufferedBy, buffered

lukaseder commented 7 years ago

No thoughts about implementation...

I actually don't like my suggestion of buffer :) It's too broad and ambiguous...

About the predicate version, there's a third possible implementation similar to those Haskell splitting (or Java String.split()) features:

Seq.of(1,1,2,2,1,2,3,4,4).chunksOf(n -> isEven(n));
// ((1, 1), (), (1), (3), ())

I.e. consider the values where the predicate yields true to be delimiters of sequences. If two consecutive delimiters are encountered, then there's an empty sequence in between them.

billoneil commented 7 years ago
public Seq<List<T>> chunksOf(int size) { ... }
public Seq<List<T>> chunksOf(Predicate<T> predicate) { ... }
public Seq<List<T>> split(Predicate<T> predicate) { ... }

Would split be confused with splitAt?

As far as implementation goes my thoughts lean towards returning a new Seq that wraps the Iterator of the original or possibly wraps the Seq itself.

lukaseder commented 7 years ago

Yeah, split() cannot be used, unfortunately. Also, I wonder if the result type should be a Seq<Seq<T>> maybe, that's a bit less convenience, but this should really be lazy in my opinion. Which, of course, makes the implementation a bit more tricky if it is truly lazy. We could, at first make the nested sequence non-lazy by collecting a chunk completely before returning it...

Hmm, how about chunked(Predicate) rather than chunksOf(Predicate)?

billoneil commented 7 years ago

@lukaseder here is a quick proof of concept. Let me know if you think this is a reasonable implementation. For batching by chunks I ran into an issue where the predicate needed to be stateful and under the hood hasNext() was being called multiple times. I couldn't think of a better solution than special casing out a counting method or assuming the predicates might be stateful and we only want to call test once per element.

public class ASeq {
    private static final Logger log = LoggerFactory.getLogger(ASeq.class);

    private static <T> Predicate<T> countingPredicate(long num) {
        long[] countArray = new long[1];
        countArray[0] = 0L;
        long iterations = num -1;
        return (T t) -> {
            if (iterations == countArray[0]) {
                countArray[0] = 0L;
                return true;
            }
            countArray[0] = countArray[0] + 1;
            return false;
        };
    }

    private enum PredicateCheckState { NOT_TESTED, FALSE, TRUE_UNCONSUMED, TRUE_CONSUMED }

    public static <T> Seq<Seq<T>> chunked(Seq<T> seq, Predicate<T> predicate) {
        Iterator<T> it = seq.iterator();

        class PredicateIterator implements Iterator<T> {
            private final Iterator<T> delegate;
            private final Predicate<T> predicate;
            private T[] next;
            private PredicateCheckState predicateCheckState;

            public PredicateIterator(Iterator<T> delegate, Predicate<T> predicate, T[] next) {
                super();
                this.delegate = delegate;
                this.predicate = predicate;
                this.next = next;
                this.predicateCheckState = PredicateCheckState.NOT_TESTED;
            }

            @Override
            public boolean hasNext() {
                // If next is null, set next = delegates next() if exists.
                if (null == next[0]) {
                    if (delegate.hasNext()) {
                        next[0] = delegate.next();
                    } else {
                        next[0] = null;
                    }
                }

                // This was added in case the predicate is stateful.
                // hasNext() can be called multiple times before calling next();
                if (PredicateCheckState.NOT_TESTED == predicateCheckState) {
                    boolean result = predicate.test(next[0]);
                    if (result) {
                        predicateCheckState = PredicateCheckState.TRUE_UNCONSUMED;
                    } else {
                        predicateCheckState = PredicateCheckState.FALSE;
                    }
                }

                // There are no more elements if next is still null
                // or the chunking predicate passes.
                if (null == next[0] || PredicateCheckState.TRUE_CONSUMED == predicateCheckState) {
                    return false;
                }

                return true;
            }

            @Override
            public T next() {
                T current = next[0];
                if (delegate.hasNext()) {
                    next[0] = delegate.next();
                } else {
                    next[0] = null;
                }
                if (PredicateCheckState.TRUE_UNCONSUMED == predicateCheckState) {
                    predicateCheckState = PredicateCheckState.TRUE_CONSUMED;
                } else {
                    predicateCheckState = PredicateCheckState.NOT_TESTED;
                }
                return current;
            }
        };

        class ChunkingIterator implements Iterator<Seq<T>> {
            private final Iterator<T> delegate;
            private final Predicate<T> predicate;
            private PredicateIterator currentIterator;
            private T[] mutableNext;

            public ChunkingIterator(Iterator<T> delegate, Predicate<T> predicate) {
                super();
                this.delegate = delegate;
                this.predicate = predicate;
                this.mutableNext = (T[]) new Object[1];
                this.currentIterator = new PredicateIterator(delegate, predicate, mutableNext);
            }

            @Override
            public boolean hasNext() {
                if (currentIterator.hasNext()) {
                    return true;
                }

                if (delegate.hasNext()) {
                    currentIterator = new PredicateIterator(delegate, predicate, mutableNext);
                    return true;
                }

                return false;
            }

            @Override
            public Seq<T> next() {
                return Seq.seq(currentIterator);
            }
        };
        return Seq.seq(new ChunkingIterator(it, predicate));
    }

    public static void main(String[] args) {
        Seq<Integer> initial = Seq.range(0, 10).cycle();
        Seq<Seq<Integer>> chunked = chunked(initial, countingPredicate(5L));
        Seq<List<Integer>> mapped = chunked.map(seq -> seq.toList());
        mapped.limit(5).forEach(System.out::println);

//        Predicate<Integer> isEven = n -> n %2 == 0;
//     // ((1, 1), (), (1), (3), ())
//        Seq<Integer> initial = Seq.of(2, 1,1,2,4,5,6,77,7,7,3,4);
//        Seq<Seq<Integer>> chunked = chunked(initial, isEven);
//        Seq<List<Integer>> mapped = chunked.map(seq -> seq.toList());
//        mapped.limit(5).forEach(System.out::println);
    }
}

This implementation should be truly lazy. If it looks good just let me know if you want classes broken out or kept as inner classes as well as where you would like the code to go and I can make unit tests and make a PR.

Example output

range 0 - 9 chunk size 5 
[0, 1, 2, 3, 4]
[5, 6, 7, 8, 9]
[0, 1, 2, 3, 4]
[5, 6, 7, 8, 9]
[0, 1, 2, 3, 4]

range 0 - 9 chunk size 10 
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

range 0 - 2 chunk size 10 
[0, 1, 2, 0, 1, 2, 0, 1, 2, 0]
[1, 2, 0, 1, 2, 0, 1, 2, 0, 1]
[2, 0, 1, 2, 0, 1, 2, 0, 1, 2]
[0, 1, 2, 0, 1, 2, 0, 1, 2, 0]
[1, 2, 0, 1, 2, 0, 1, 2, 0, 1]

isEven //Seq.of(2, 1,1,2,4,5,6,77,7,7,3,4);
[2]
[1, 1, 2]
[4]
[5, 6]
[77, 7, 7, 3, 4]
billoneil commented 7 years ago

There is an issue with this implementation if the first element passes the test. Working on a fix.

billoneil commented 7 years ago

I think it's pretty much working except for the option to use the predicate as the delimiter and consume the matched elements as mentioned https://github.com/jOOQ/jOOL/issues/296#issuecomment-287038869

stellingsimon commented 7 years ago

I intuitively get how chunked(long) chunks up the stream into n pieces. However, it took me a good while to figure out what chunked(Predicate<T>) would do. The fact that the predicate detects splitting elements is not expressed in the name chunked at all.

Are these two functionalities even related? There's no delimiting element in chunked(long), but it seems to be an important concept in chunked(Predicate<T>). This mismatch seems to cause the statefulness of countingPredicate(): chunked(long) splits between elements, whenever a function of the inputs changes from one element to the next.

As for chunked(Predicate<T>), I'd propose naming it something entirely different. Here are some wild ideas:

As for chunked(long), this looks like special case of a more general strategy: split between elements whenever some function (e.g. index % 500) changes its value. I opened #321 for this.

billoneil commented 7 years ago

@stellingsimon there is more discussion on the PR https://github.com/jOOQ/jOOL/pull/320. One of our proposals is to add an Enum param the defines the behavior when the predicate passes. It can include the delimiter in the current chunk, next chunk, exclude it, add to both chunks. Seems like #321 is a dupe with different naming.

We also raised some concerns with the result being a Seq<Seq<T>> of what to do if someone limits the inner Seq. For example say you have an infinite Seq and do the following.

Seq<Integer> initial = Seq.range(0, 10).cycle();
Seq<Seq<Integer>> chunked = chunked(initial, countingPredicate(10L));
chunked = chunked.map(s -> s.limit(1));
chunked.limit(10);
chunked.forEach(System.out::println);

You might expect it to only iterate 10 elements but if that were the case you would have the incorrect results. The other option would be to have the inner Seq's fully iterate to get the right cursor locations (91 elements). This makes sense but isn't quite what you would expect from limit(1) on the internal Seq.

Feel free to join the discussion on the PR.

lukaseder commented 7 years ago

I don't think "chunked" is a bad term for what we're doing here. We're creating a Seq of chunks of the original Seq. This is different from grouping in the way that chunks are delimited in the order of appearance from the original Seq, whereas groups may interleave:

// Seq(Tuple(1, Seq(1, 3, 5, 7)), Tuple(0, Seq(2, 4, 6, 8))
Seq.of(1, 2, 3, 4, 5, 6, 7, 8).grouped(i -> i % 2);

// Seq(Seq(1), Seq(3), Seq(5), Seq(7))
Seq.of(1, 2, 3, 4, 5, 6, 7, 8).chunked(i -> i % 2 == 2);

// Or, with the BEFORE / AFTER / BOTH delimiter behaviour flags:
// Seq(Seq(1, 2), Seq(3, 4), Seq(5, 6), Seq(7, 8))
Seq.of(1, 2, 3, 4, 5, 6, 7, 8).chunked(i -> i % 2 == 2, BEFORE);

In other words:

Other effects of this:

So, the two features are certainly different, and the term "chunk" seems reasonable to me.

stellingsimon commented 7 years ago

Given your explanation I agree that "chunk" is a reasonable term to use.

However, note that so far, we've only been talking about a chunked(Predicate<T>), which would be a great deal more clunky to use than the chunked(Function<? super T, K>) in the case that you just demonstrated. This is why I proposed #321 in the first place. I'd definitely favor the variant chunked(Function<? super T, K>) over chunked(Predicate<T>).

Some more thoughts: I don't like the Enum-"Flag", particularly not in combination with the variant chunked(Function<? super T, K>). What exactly does "BEFORE" refer to? With the variant chunked(Predicate<T>), it was somewhat natural: put the delimiter BEFORE where the predicate is satisfied, but in chunked(i -> i % 2, BEFORE) there seems to be no delimiting element, and there is no predicate...

lukaseder commented 7 years ago

@stellingsimon Excuse my typo. I was going to write chunked(i -> i % 2 == 0, BEFORE). Will fix my previous comment.

lukaseder commented 7 years ago

... given the fixing of the typo, I suspect that the Predicate version is no longer "more clunky" than the Function version? What would a chunked(Function) method even mean?

stellingsimon commented 7 years ago

I'll try to explain what I mean by "clunky":

Try to implement chunked(int) in terms of chunked(Predicate<T>, Enum). It involves a fairly sophisticated PeekingIterator that easy to get wrong. If there is no overload provided, the burden of implementing this iterator is on the client side. I suspect that that's why the issue requested an overload for this special-case in the first place. Nevertheless, I view chunked(int) just as a special case of the more general case chunked(Function), which is what I attempted to describe in #321.

Here's the implementation in terms of chunked(Function):

// Seq(Seq("A", "B", "C"), Seq("D", "E"))
Seq.of("A", "B", "C", "D", "E")
   .zipWithIndex()
   .chunked(i -> i.v2 / 3);

I am sensing that I have an operation in mind that isn't fully compatible with what you think chunked would do, so I'll leave it at this. :-) I'm sorry if I made this issue more confusing than it needed to be.

lukaseder commented 7 years ago

Yeah, your "chunked" is again the same thing we already have through "grouped", so since we already have "grouped", let's keep these things separate.

lukaseder commented 4 years ago

I currently don't have time to re-iterate this feature request