davidmoten / rxjava-extras

Utilities for use with rxjava
Apache License 2.0
269 stars 27 forks source link

No terminal event from Transformers.orderedMergeWith(Observable<T>, Comparator) if Comparator throws exception #30

Closed moderakh closed 6 years ago

moderakh commented 6 years ago

If Comparator passed to Transformers.orderedMergeWith throws exception, the result observable produces neither more results nor any terminal event. Basically the following code just blocks forever.

How to reproduce:

        try {
            System.out.println("started");
            Observable<Integer> o1 = Observable.range(0, 10000);
            Observable<Integer> o2 = Observable.range(0, 10000);

            Comparator<Integer> comparator = new Comparator<Integer>() {

                AtomicInteger i = new AtomicInteger(0);
                @Override
                public int compare(Integer o1, Integer o2) {
                    if (i.getAndIncrement() > 1000) {
                        throw new RuntimeException("failed");
                    }
                    return o1 - o2;

                }
            };
            Observable<Integer> res = o1.compose(Transformers.orderedMergeWith(o2, comparator));

            System.out.println("finished " + res.toList().toBlocking().single().size());
        } catch (Exception e) {
            e.printStackTrace();
        }

@davidmoten can you please take a look?

davidmoten commented 6 years ago

Thanks for the report @moderakh. Fixed in #31. I'll release to Maven Central shortly.

FYI @akarnokd (as original author of OrderedMerge).

davidmoten commented 6 years ago

0.8.0.16 released to Maven Central with fix.

moderakh commented 6 years ago

Thank you @davidmoten for quickly providing the fix. We will upgrade to 0.8.0.16 for our next release.