jOOQ / jOOL

jOOλ - The Missing Parts in Java 8 jOOλ improves the JDK libraries in areas where the Expert Group's focus was elsewhere. It adds tuple support, function support, and a lot of additional functionality around sequential Streams. The JDK 8's main efforts (default methods, lambdas, and the Stream API) were focused around maintaining backwards compatibility and implementing a functional API for parallelism.
http://www.jooq.org/products
Apache License 2.0
2.09k stars 168 forks source link

Added on-line version of `groupBy()` #321

Closed stellingsimon closed 7 years ago

stellingsimon commented 7 years ago

This is a feature request for a variant of groupBy that doesn't materialize the entire Seq at once by exploiting pre-sortedness of the inputs.

Consider the following scenario that I experienced in the past. From a relational DB, we stream a large number of order-lines. Because we've got a DB index already, sorting them by order_id on the DB is cheap and we do it there. The order-lines need to be processed in groups of order_id, however. Using groupBy, we unnecessarily materialize the entire result set before starting to process the first group of order-lines. Exploiting the existing grouping, this could be avoided. An example:

OrderLine entities corresponding to:
order_id | order_line_id
--------------------------------
1        | A
1        | B
1        | C
2        | D
2        | E
2        | F
3        | G
3        | H

A new groupBySorted should return the following Seq<Tuple2<Long, OrderLine>>:

[(1, [A, B, C])
 (2, [D, E, F]),
 (3, [G, H])]

Or, alternatively, a new splitGroupsBy should return the following Seq<Seq<OrderLine>>:

[[A, B, C],
 [D, E, F],
 [G, H]]

This same operation can be used to implement chunked(long) as proposed in #320 without resorting to a stateful CountingPredicate:

Seq.seq(items)
   .zipWithIndex() // known to be sorted
   .groupBySorted(tuple -> tuple.v2 / 500)
   .map(tuple -> tuple.v2)
   .forEach( ... );

or

Seq.seq(items)
   .zipWithIndex() // known to be sorted
   .splitGroupsBy(tuple -> tuple.v2 / 500)
   .forEach( ... );

Do you think either of these would be a useful addition? Which version do you prefer?

Edge cases: Given

OrderLine entities corresponding to:
order_id | order_line_id
--------------------------------
1        | A
1        | B
1        | C
3        | G
3        | H
2        | D
2        | E
2        | F
1        | G

groupBySorted should produce

[(1, [A, B, C]),
 (3, [G, H]),
 (2, [D, E, F]),
 (1, [G])]

or respectively, splitGroupsBy should produce

[[A, B, C],
 [G, H],
 [D, E, F],
 [G]]
landawn commented 7 years ago

I think A better design should be something like: StreamEx#collapse

stellingsimon commented 7 years ago

StreamEx#collapse is something very different. In the order-line example, you'd lose all but the first order line of each order, which makes it completely unsuitable...

landawn commented 7 years ago

@stellingsimon take a look the docs or groupRuns and have a try. it keeps all the records, not just the first of each order

lukaseder commented 7 years ago

Thank you very much for your suggestion. I think we already cover this functionality through the various Seq.grouped() overloads, right? https://www.jooq.org/products/jOO%CE%BB/javadoc/latest/org/jooq/lambda/Seq.html#grouped-java.util.function.Function-

They transform a Seq<T> into a Seq<Tuple2<T, Seq<T>>.

stellingsimon commented 7 years ago

TL;DR: After reading your latest comment in #296 I'm fine with marking this as a duplicate of #296 and sticking with the name chunk.

Elaborated: Thanks for getting back to me. I saw grouped before, and it does produce the output that I am looking for. However, my concern with grouped is that it will fully materialize (almost) all its inputs before the iterator for the first group reports that it has no more elements. As you said yourself in #296:

In other words:

  • In order to determine the size of an individual group, we have to run through the entire Seq
  • In order to determine the size of an individual chunk, we only have to encounter the next delimiter

Since the consumer (usually) will process the tuples in order, grouped will necessarily buffer all elements of the second to the last group [1]. This is problematic in large streams of sizable objects. It is also avoidable if we know the inputs to be pre-sorted, thus the idea to provide explicit support for this in Seq's API. In relation to #296, I think that

Seq.seq(items)
   .chunked(i -> i.getOrderId(), BEFORE)
   .forEach( ... );

is not self-explanatory at all due to the Enum argument. This issue was an attempt to provide an alternative solution that expresses the same operation using the client's vocabulary:

Seq.seq(items)
   .groupBySorted(i -> i.getOrderId())
   .forEach( ... );

[1]: grouped implementation: https://github.com/jOOQ/jOOL/blob/master/src/main/java/org/jooq/lambda/Seq.java#L9463

lukaseder commented 7 years ago

Let me stress what I said in #296 a bit differently:

In order to determine the size of an individual group, we have to run through the entire Seq

But this doesn't mean that we have to traverse the entire Seq to start consuming a group. Quite possibly, the current implementation is not optimal / lazy enough - I think there's currently no test that checks for the laziness of this operation.

I'm fine with re-discussing the naming of BEFORE in the relevant issue, although do note that neither is this name set in stone (or far beyond draft status), nor is it the fault of the designer if a user uses a static import ;) The fully qualified enum name could be something along the lines of IncludeDelimiterInChunks.BEFORE. Or whatever.

Also, I don't think a groupBySorted operation is meaningful. We should not add an operation that depends on a hopefully correct assumption by the developer, which in case it is incorrect, fails completely (or rather: reverts to the original name "chunked"). Besides, I would read your suggestion more like a SQL hint, indicating to the API that the grouping operation must be performed by sorting the Seq, not by using a hashmap, rather than a Stream state predisposition hint.

Having said so, I'm now convinced as well that this is a duplicate of either the existing grouped() operation (which might be optimised) or the newly proposed chunked() operation.