davidmoten / rxjava-extras

Utilities for use with rxjava
Apache License 2.0
269 stars 27 forks source link

ThrowbackZip Operator? #2

Closed thomasnield closed 9 years ago

thomasnield commented 9 years ago

Hi David, a while back you answered my SO question and created some very helpful toListWhile() operators for my particular problem. http://stackoverflow.com/questions/31523884/rxjava-group-emit-and-zip-sorted-chunks-with-a-common-property

Here's a fun little complication... what if a partitionId value only existed in one of the two Observable streams?

The toListWhile() solves 90% of this particular problem. What would be cool is to have some kind of throwbackZip() operator that behaves like a zip(), but performs a given Func2<X,Y,Boolean> test between the two zipped items first. If that test fails, an Action3<X,Y,ZipThrowback> is provided, where ZipThrowback is a class that allows you to substitute a value for X or Y for the zip operation, and you can also have the X or Y repeated so it will ultimately zip with the correct partner.

    Observable<Item> items1 = ...;
    Observable<Items> items2 = ...;

    Observable<List<Item>> itemLists1 = items1.compose(Transformers.toListWhile(item -> 
        (list, item) -> list.isEmpty() || list.get(0).partitionId == item.partitionId));

    Observable<List<Item>> itemLists2 = items2.compose(Transformers.toListWhile(item -> 
        (list, item) -> list.isEmpty() || list.get(0).partitionId == item.partitionId));

    Observables.throwbackZip(itemLists1, itemsList2, 
        (l1,l2) -> l1.get(0).partitionId == l2.get(0).partitionId,
        (l1,l2,zipThrowback) -> {
            if (l1.get(0).partitionId > l2.get(0).partitionId) { 
                zipThrowback.substituteLeft(Collections.emptyList());
                zipThrowback.resendLeft(l1);
            }
            else { 
              zipThrowback.substituteRight(Collections.emptyList());
              zipThrowback.resendRight(l2);
            }
        }), 
        //standard zip function goes here
        )
        //etc

throwbackzip

What do you think? Is this a niche need or something that be useful?

thomasnield commented 9 years ago

Yeah, I think this might work. I conceptually ran through more unpredictable (but still sorted) data scenarios, and it looks like this operator would self-correct if keys are missing on either side, even in a haphazard way. throwbackzip

thomasnield commented 9 years ago

I suppose a better name for this would be ConditionalZip with overloaded parameters, depending on how much control you want to take in the event of a mismatch.

davidmoten commented 9 years ago

Another interesting problem, I'll have a think!

On Fri, 28 Aug 2015 04:44 Thomas Nield notifications@github.com wrote:

I suppose a better name for this would be ConditionalZip with overloads providing the ThrowbackZip object.

— Reply to this email directly or view it on GitHub https://github.com/davidmoten/rxjava-extras/issues/2#issuecomment-135519287 .

thomasnield commented 9 years ago

Thanks, I can always create a PR but I'll defer to you first if you'd like to tackle it. Not quite sure how id approach this yet since combine operations are more complicated...

davidmoten commented 9 years ago

The first thing I notice is now that we have toListWhile the problem is simplified to e.g combining these streams

1, 2, 3, 4, 5,  7, 9
1, 2, 3, 5, 6, 10

where the number really just represents the characteristic of the stream item that we compare with.

Important characteristics of the streams is that they are ordered and distinct so what I would do is merge them in sorted order and buffer the merged stream into pairs to see what can come of that. There might be another state machine involved but it's doable also with Transformers.stateMachine.

You've come to the right place for ordered merge because in this project is Transformers.orderedMerge. I haven't used it in production but I think it has decent test coverage. Try it and see I guess.

thomasnield commented 9 years ago

Okay I'll do some experiments today and see what can be done...

thomasnield commented 9 years ago

Yes you're right the orderedMergeWith() works. After that operation, you can accumulate the ordered lists into another structure like toListWhile() to create a list of lists.

itemsList1.compose(Transformers.orderdMergeWith(itemsList2, (g1,g2) -> 
Integer.compare(g1.get(0).paritionId, g2.get(0).partitionId)))
.compose(Transformers.toListWhile(groupedList, list -> groupedList.isEmpty() || groupedList.get(0).get(0).partitionId == list.get(0).partitionId));
thomasnield commented 9 years ago

Thanks for your help on this!

davidmoten commented 9 years ago

Yeah that extra use of toListWhile sounds just the ticket.