davidmoten / rxjava-extras

Utilities for use with rxjava
Apache License 2.0
269 stars 27 forks source link

toCombinedList() operator #3

Closed thomasnield closed 8 years ago

thomasnield commented 9 years ago

Thought I'd offer this particular Transformer since I am using it a lot and maybe others might find value in it. David, if you want this CombinedListTransformer let me know and I can create a PR.

This SO post will provide detailed context on what it does and how it came about. http://stackoverflow.com/questions/32292509/rxjava-consolidating-multiple-infinite-observablelistt/32312586#32312586

Essentially, it transforms an Observable<List<T>> with a mapping for each T item to another Observable<List<R>>, consolidates each latest emitted List<R> into a single List<R>, and emits that Observable<List<R>> result.

It was tedious enough to compose that a Transformer felt warranted.

    public final class CombinedListTransformer<T,R> implements Observable.Transformer<List<T>,List<R>> {

        private final Func1<T,Observable<List<R>>> listMapper;

        public CombinedListTransformer(Func1<T,Observable<List<R>>> listMapper) {
            this.listMapper = listMapper;
        }
        @Override
        public Observable<List<R>> call(Observable<List<T>> sourceList) {
            return sourceList.flatMap(sl ->
                Observable.<Observable<List<R>>>create(s -> {
                    for (T t : sl) {
                        s.onNext(listMapper.call(t));
                    }
                    s.onCompleted();
                }).toList() //List<Observable<List<R>>
                .flatMap(consolidatedChildList -> Observable.combineLatest(consolidatedChildList, args -> {
                    ArrayList<R> list = new ArrayList<>();
                    for (Object obj : args) {
                        list.addAll((List<R>) obj);
                    }
                    return list;
                }))
            );
        }
    }
davidmoten commented 9 years ago

Thanks @thomasnield for the offer. At the moment I don't really understand the use case and didn't really understand on SO either. When does this crop up?

thomasnield commented 9 years ago

Maybe its a niche need then so no worries if its not needed. It comes up when I have an Observable<List> where each T item has an Observable<List> property, and j want to consolidate all instances to a single combined reactive Observable<List>.

davidmoten commented 9 years ago

Sounds like a straight application of flatMap using a scheduler (without a scheduler it just behaves like concatMap).

On 1 September 2015 at 20:44, Thomas Nield notifications@github.com wrote:

Maybe its a niche need then so no worries if its not needed. It comes up when I have an Observable> where each T item has an Observable> property, and j want to consolidate all instances to a single combined reactive Observable>.

— Reply to this email directly or view it on GitHub https://github.com/davidmoten/rxjava-extras/issues/3#issuecomment-136670101 .

thomasnield commented 9 years ago

Does that mean there's a better way to compose this? And yeah I think I can see the flatMap() behavior but its flattening and combining only the latest lists, not accumulating the lists and any emitted lists following.

Oh wait... are you suggesting I don't use lists at all and make a finite Observable act as my collection? I'll need to think on that...