ReactiveX / RxJava

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Apache License 2.0
47.88k stars 7.61k forks source link

Scheduler Synchronization? #3464

Closed fougere-mike closed 8 years ago

fougere-mike commented 9 years ago

Hi all,

For the last couple of days I've been struggling with what appears to be a concurrency issue. I've got two streams concat'd together with Observable.concat(). The first stream completes successfully, but the second stream is never subscribed; effectively stopping execution. Every once in a while (maybe 1/100 executions) I get a "good" execution which actually does complete and everything looks dandy.

I've tried making a test case which illustrates the issue, but I can't seem to reproduce it in stand-alone code. At this point I'm fairly stumped, and I'm hoping someone on here can point me in the right direction.

The test case I've been using to try and reproduce is included below. It illustrates what I'm trying to accomplish. There are two operations concatenated together: cache, publish. The cache operation is a concatenation of the update and delete operations. All cache operations should occur on the cache scheduler, while the publish operation should occur on the publish scheduler. This is to prevent concurrent access to the cache's underlying data store.

In my actual code the 'cache' operation is subscribed to, and it executes the first of its two inner operations (update, but not delete). The publish operation is obviously also never executed since it depends on the cache operation completing.

Using the debugger I've determined that when the cache update operation calls onCompleted(), it ends up in the OperatorConcat at this method with an empty queue:

void subscribeNext() {
            if (requested > 0) {
                Object o = queue.poll();
                if (nl.isCompleted(o)) {
                    child.onCompleted();
                } else if (o != null) {
                    Observable<? extends T> obs = nl.getValue(o);
                    currentSubscriber = new ConcatInnerSubscriber<T>(this, child, arbiter);
                    current.set(currentSubscriber);
                    obs.unsafeSubscribe(currentSubscriber);
                }
            } else {
                // requested == 0, so we'll peek to see if we are completed, otherwise wait until another request
                Object o = queue.peek();
                if (nl.isCompleted(o)) {
                    child.onCompleted();
                }
            }
        }

I believe this is what causes the cache delete operation to not be subscribed (the empty queue). It appears that the same thread which executes the cache update operation is also used to call subscribeNext, so it doesn't have a chance to actually complete and update the queue with the next operation. I could be totally off base here, and as I mentioned before I can't seem to reproduce this with tests.

Does anyone have an idea what could be going wrong? It seems like it's probably my fault, but I can't figure out what I've done wrong.

Note: I've tried removing everything from the bodies of cache update and cache delete to no avail. It appears that even when all the operations do is call subscriber.onCompleted() the concat operator still gets stuck. But not in tests...

Here's the test case:

// Spock Unit Test
def testConcurrencyIssue() {
    given:
        def ts = new TestSubscriber<Void>()
        def test = WeirdnessTest.getTestObservable()

    when:
        println "\n\nTest:"
        test.subscribe(ts)
        ts.awaitTerminalEvent()
        println "Done: ${ts.getOnNextEvents()}"

    then:
        ts.assertCompleted()
}

// Supporting Test Code
public class WeirdnessTest {
    public static final Observable<JsonResult> getTestObservable() {
        // In my code 'source' is an observable which requests data from the network.
        // For test purposes, this should suffice:
        Observable<JsonResult> source = Observable.just(new JsonResult("/server/information", "{ \"version\": 1.0 }"));

        // Note: `onNextDo` is a custom operator. The implementation is included below (could be the problem?).
        return source
                .lift(new onNextDo<>(new Func1<JsonResult, Observable<?>>() {
                    @Override
                    public Observable<?> call(JsonResult result) {
                        return getCacheAndPublishObservable(result.path, result.json);
                    }
                }));
    }

    public static final Observable<Void> getCacheAndPublishObservable(String path, String json) {
        // Combine the cache + publish observables.
        return Observable.concat(
                getCacheObservable(path, json),  // Completes succesfully
                getPublishObservable(path, json) // Never subscribed to
        );
    }

    public static final Observable<Void> getCacheObservable(final String path, final String json) {
        final ThreadFactory   factory        = new ThreadFactoryBuilder().setNameFormat("TestThread-%d").build();
        final ExecutorService cacheExecutor  = Executors.newSingleThreadExecutor(factory);
        final Scheduler       cacheScheduler = Schedulers.from(cacheExecutor);

        // Create an observable which will update the data cache
        Observable<Void> update = Observable.create(new Observable.OnSubscribe<Void>() {
            @Override
            public void call(Subscriber<? super Void> subscriber) {
                // Note: The body here doesn't seem to matter.
                System.out.println("[Cache] Update: "+path+", "+json);
                subscriber.onCompleted();
            }
        }).subscribeOn(cacheScheduler);

        // Create an observable which will delete the data cache if needed
        Observable<Void> delete = Observable.create(new Observable.OnSubscribe<Void>() {
            @Override
            public void call(Subscriber<? super Void> subscriber) {
                // Note: The body here doesn't seem to matter.
                System.out.println("[Cache] Delete: "+path+", "+json);
                subscriber.onCompleted();
            }
        }).subscribeOn(cacheScheduler);

        return Observable.concat(update, delete);
    }

    public static final Observable<Void> getPublishObservable(final String path, final String json) {
        ExecutorService executor         = Executors.newSingleThreadExecutor();
        Scheduler       publishScheduler = Schedulers.from(executor);

        // Create an observable which dispatches the data
        Observable<Void> publish = Observable.create(new Observable.OnSubscribe<Void>() {
            @Override
            public void call(Subscriber<? super Void> subscriber) {
                // Note: The body here doesn't seem to matter.
                System.out.println("[Publish]: Dispatch "+path+", "+json);
                subscriber.onCompleted();
            }
        }).subscribeOn(publishScheduler);

        return publish;
    }

    public static final class JsonResult {
        public String path;
        public String json;

        public JsonResult(String path, String json) {
            this.path = path;
            this.json = json;
        }

        @Override
        public String toString() {
            return "JsonResult("+path+", "+json+")";
        }
    }

    /**
     * Injects a side effect Observable into the stream upon receiving an onNext event. The `effect`
     * Observable is executed before forwarding the original event.
     * <p/>
     * If the `effect` Observable sends an error, the error are forwarded to the parent Observable
     * without sending the value that triggered the side-effect.
     *
     * @param <T> The type emitted by onNext of the parent Observable.
     */
    public static final class onNextDo<T> implements Observable.Operator<T, T> {
        private final Func1<T, Observable<?>> mEffect;
        private final flatMap<T, T> mFlatMap;

        public onNextDo(Func1<T, Observable<?>> effect) {
            assert( effect != null );
            mEffect = effect;
            mFlatMap = new flatMap<>(new Func1<T, Observable<? extends T>>() {
                @Override
                public Observable<? extends T> call(T t) {
                    Observable<?> sideEffect = Observable.empty();
                    Observable<T> value = Observable.empty();
                    try {
                        sideEffect = mEffect.call(t);
                        value = sideEffect.lift(new then<>(Observable.just(t)));
                    }catch(Exception e){
                        e.printStackTrace();
                        return Observable.error(e);
                    }

                    return value;
                }
            });
        }

        @Override
        public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
            return mFlatMap.call(subscriber);
        }
    }

    /**
     * Implementation of {@link Observable#flatMap(Func1)} which conforms to the {@link
     * rx.Observable.Operator} interface. This operator can be useful when implementing other operators.
     *
     * @param <R>
     * @param <T>
     */
    public static final class flatMap<R, T> implements Observable.Operator<R, T> {
        private final Func1<? super T, ? extends Observable<? extends R>> mFunction;

        public flatMap(Func1<? super T, ? extends Observable<? extends R>> function) {
            mFunction = function;
        }

        @Override
        public Subscriber<? super T> call(Subscriber<? super R> subscriber) {
            OperatorMap<T, Observable<? extends R>> map = new OperatorMap<>(mFunction);
            OperatorMerge<R> merge = OperatorMerge.instance(false);

            return map.call(merge.call(subscriber));
        }
    }

    /**
     * Ignores all emissions from the source observable. Once the source completes, the provided
     * observable will be subscribed to.
     *
     * @param <T> The type of objects emitted by the source observable.
     */
    public class then<R, T> implements Observable.Operator<R, T> {
        private final Observable<R> mNextObservable;

        public then(Observable<R> nextObservable) {
            assert( nextObservable != null );
            mNextObservable = nextObservable;
        }

        @Override
        public Subscriber<? super T> call(final Subscriber<? super R> subscriber) {
            final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
            subscriber.add(mas);

            Subscriber<T> parent = new Subscriber<T>() {
                @Override
                public void onNext(T t) {
                    // Ignored.
                }

                @Override
                public void onError(Throwable e) {
                    subscriber.onError(e);
                }

                @Override
                public void onCompleted() {
                    Subscriber<R> subscriberR = Subscribers.wrap(subscriber);
                    mas.set(subscriberR);
                    mNextObservable.subscribe(subscriberR);
                }
            };

            mas.set(parent);
            return parent;
        }
    }
}
akarnokd commented 9 years ago

Hi.

Whenever I see the use of Observable.create or Observable.lift, that is a sign for trouble. Why did you implement all those operators instead of using standard RxJava methods?

Regardless, one possible source for the problems is just because it doesn't support backpressure and thus it can overflow the request tracking inside concat which throws an IllegalStateException from produced. It is possible this exception gets swallowed somewhere and thus the stream ends up hanging. Try and using just(m).onBackpressureBuffer() everywhere and see if it still hangs.

fougere-mike commented 8 years ago

Thanks for the reply. I've tried adding onBackpressureBuffer() to all the .just calls, but it has no effect.

To answer your question: I'm using custom operators to achieve behaviour that I don't believe is available as a single operator. For example: then is equivalent to ignoreElementsThen() which is not yet available (currently awaiting merge #3443). onNextDo is for async side-effects. It's doOnNext's async cousin. I realise I can achieve the same effect by chaining several existing rx operators, but that leads to a lot of code clutter where I have to use the same 3 or 4 operators over and over again. It's much cleaner imo to use a custom operator.

With that said, I'm clearly an rx newb so if there are better solutions I'm all ears!

akarnokd commented 8 years ago

I don't see anything outstanding in your custom implementations so my guess is that the problem may be in the unshown code.

To rule out concat, use merge with maxConcurrency = 1 which subscribes to a single source at a time.

fougere-mike commented 8 years ago

You are correct. The issue was in the unshown code (of course, haha). I had been using a combination of operators to concatIfEmpty -- I just removed those lines and it stopped hanging.

A quick google search revealed the switchIfEmpty operator -- which solves the problem and doesn't hang.

All appears well. I just need to spend a bit more time learning all the built-ins. Thanks for the help!