ReactiveX / RxJava

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Apache License 2.0
47.88k stars 7.61k forks source link

2.x: different behavior of throttleLast() (compared to 1.x) #5516

Closed wychi closed 7 years ago

wychi commented 7 years ago

Hello there,

I am not sure if there is a bug or not, but I got different result when using throttleLast()

Here is my testing result. As you can see, there is a difference in handling onNext(5). 5 is emitted in 1.x but lost in 2.x

rxjava 1.x throttleLast()
emit onNext 0
emit onNext 0 end
progress 0
emit onNext 1
emit onNext 1 end
progress 1
emit onNext 2
emit onNext 2 end
progress 2
emit onNext 3
emit onNext 3 end
progress 3
emit onNext 4
emit onNext 4 end
progress 4
emit onNext 5
emit onNext 5 end
emit onComplete 
emit onComplete - end 
wait before terminate
progress 5
doOnComplete
emit onNext 6
emit onNext 6 end
emit onNext 7
emit onNext 7 end
emit onNext 8
emit onNext 8 end
emit onNext 9
emit onNext 9 end
emit onNext 10
emit onNext 10 end
emit onNext 11
emit onNext 11 end
emit onNext 12
emit onNext 12 end
emit onNext 13
emit onNext 13 end
emit onNext 14
emit onNext 14 end
rxjava 2.x throttleLast()
emit onNext 0
emit onNext 0 end
progress 0
emit onNext 1
emit onNext 1 end
progress 1
emit onNext 2
emit onNext 2 end
progress 2
emit onNext 3
emit onNext 3 end
progress 3
emit onNext 4
emit onNext 4 end
progress 4
emit onNext 5
emit onNext 5 end
emit onComplete 
emit onComplete - end 
doOnComplete
wait before terminate
emit onNext 6
emit onNext 6 end
emit onNext 7
emit onNext 7 end
emit onNext 8
emit onNext 8 end
emit onNext 9
emit onNext 9 end
emit onNext 10
emit onNext 10 end
emit onNext 11
emit onNext 11 end
emit onNext 12
emit onNext 12 end
emit onNext 13
emit onNext 13 end
emit onNext 14
emit onNext 14 end

Here is my testing code

    compile "io.reactivex.rxjava2:rxjava:2.1.1"
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
    compile 'io.reactivex:rxjava:1.3.0'
package allstar.wychi.allstardemo;

import org.junit.Test;

import java.util.concurrent.TimeUnit;

import io.reactivex.functions.Action;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;

import static org.junit.Assert.assertEquals;

public class ExampleUnitTest {

    @Test
    public void test1x() {
        final rx.subjects.PublishSubject<Integer> subject = rx.subjects.PublishSubject.create();

        System.out.println("rxjava 1.x throttleLast()");
        subject
                .throttleLast(60, TimeUnit.MILLISECONDS)
                .observeOn(rx.schedulers.Schedulers.io())
                .doOnNext(new rx.functions.Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.println("progress " + integer);
                        try {
                            Thread.sleep(16);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                })
                .doOnError(new rx.functions.Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        System.out.println("doOnError " + throwable.getMessage());
                    }
                })
                .doOnCompleted(new rx.functions.Action0() {
                    @Override
                    public void call() {
                        System.out.println("doOnComplete");
                    }
                })
                .subscribe();

        new Thread(new Runnable() {
            @Override
            public void run() {
                for(int i=0; i<100; i++) {

                    System.out.println("emit onNext " + i);
                    subject.onNext(i);
                    System.out.println("emit onNext " + i + " end");
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

            }
        }).start();

        try {
            Thread.sleep(520);
            System.out.println("emit onComplete ");
            subject.onCompleted();
            System.out.println("emit onComplete - end ");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
            System.out.println("wait before terminate");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void test2x() {
        System.out.println("rxjava 2.x throttleLast()");

        final PublishSubject<Integer> subject = PublishSubject.create();

        subject
                .throttleLast(60, TimeUnit.MILLISECONDS)
                .observeOn(Schedulers.io())
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println("progress " + integer);
                        try {
                            Thread.sleep(16);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                })
                .doOnError(new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        System.out.println("doOnError " + throwable.getMessage());
                    }
                })
                .doOnComplete(new Action() {
                    @Override
                    public void run() throws Exception {
                        System.out.println("doOnComplete");
                    }
                })
                .subscribe();

        new Thread(new Runnable() {
            @Override
            public void run() {
                for(int i=0; i<100; i++) {

                    System.out.println("emit onNext " + i);
                    subject.onNext(i);
                    System.out.println("emit onNext " + i + " end");
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

            }
        }).start();

        try {
            Thread.sleep(520);
            System.out.println("emit onComplete ");
            subject.onComplete();
            System.out.println("emit onComplete - end ");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        try {
            System.out.println("wait before terminate");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Just want to know which behavior is expected. Thanks

akarnokd commented 7 years ago

ThrottleLast is practically the same as sample which in 2.x by default doesn't emit the last throttled item when the upstream completes. There is, however, a sample overload that let's you configure that.

wychi commented 7 years ago

@akarnokd thanks for your answer.

According to that statement, the behavior in 2.x is expected. Is my understanding correct?

But I am still wondering why I get different result in testing code. Is it just something wrong in testing code? Or there is actual implementation different in 1.x?

Thanks

akarnokd commented 7 years ago

Yes, this is the expected behavior.

But I am still wondering why I get different result in testing code.

I've already told you. 2.x doesn't emit the last throttled item when the upstream completes (the underlying sample operator was fixed in #4955) and 1.x does emit the very last (#3757).

wychi commented 7 years ago

Got it. Thank you a lot. Now I know the history behind this.