ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.03k stars 390 forks source link

'observe_on_new_thread' cannot behave as expected but RxJava can #524

Closed 0xffff00 closed 4 years ago

0xffff00 commented 4 years ago

Here is my simple test for silmulate heavy blocking tasks. 4 threads ared used: 1 main thread holding the obserable and 3 observers regarded as paralleled tasks A,B,C respectively. the only obserable emit 10 times in every 200 ms. each task will sleep 180ms for silmutating heavy work payload. So, as expected, all jobs of 3 obervers should end up within ~2000ms. I have written simliar test cases both in RxCpp and RxJava: observe_on(rxcpp::observe_on_new_thread()) vs observeOn(Schedulers.newThread())

However, rxcpp used 3426ms. rxjava used 2036ms. Can anyone explain why? Do I misused rxcpp's API?

RxCpp TestCase

Code

#include <catch2/catch.hpp>
#include <rxcpp/rx.hpp>
#include <tests/test_util.hpp>

using namespace std::chrono_literals;

int tid() { return std::hash<std::thread::id>{}(std::this_thread::get_id()) % 1000; }

std::chrono::system_clock::time_point t0;

template<char K>
void heavy_task(int x) {
    auto latency = std::chrono::duration_cast<std::chrono::milliseconds>(
            std::chrono::system_clock::now() - t0).count();
    std::this_thread::sleep_for(180ms);
    printf("--%03d-- [task %c] #%d: %ld ms elapsed\n", tid(), K, x, latency);
}

TEST_CASE("parallel tasks(200 ms x 10 loop) via RxCpp observe_on_new_thread", "[.learn]") {
    printf("--%03d-- [main]\n", tid());
    t0      = std::chrono::system_clock::now();
    auto obe= rxcpp::observable<>::interval(200ms)
            .take(10);
    obe.observe_on(rxcpp::observe_on_new_thread()).subscribe(heavy_task<'A'>);
    obe.observe_on(rxcpp::observe_on_new_thread()).subscribe(heavy_task<'B'>);
    obe.observe_on(rxcpp::observe_on_new_thread()).subscribe(heavy_task<'C'>);
    std::this_thread::sleep_for(1h); //  block the main thread
}

Output


--231-- [main]
--699-- [task A] #1: 0 ms elapsed
--699-- [task A] #2: 200 ms elapsed
--699-- [task A] #3: 400 ms elapsed
--699-- [task A] #4: 600 ms elapsed
--699-- [task A] #5: 800 ms elapsed
--699-- [task A] #6: 1000 ms elapsed
--699-- [task A] #7: 1200 ms elapsed
--699-- [task A] #8: 1400 ms elapsed
--699-- [task A] #9: 1600 ms elapsed
--699-- [task A] #10: 1800 ms elapsed
--908-- [task B] #1: 1800 ms elapsed
--530-- [task C] #1: 1801 ms elapsed
--530-- [task C] #2: 1981 ms elapsed
--908-- [task B] #2: 1981 ms elapsed
--530-- [task C] #3: 2162 ms elapsed
--908-- [task B] #3: 2162 ms elapsed
--908-- [task B] #4: 2342 ms elapsed
--530-- [task C] #4: 2342 ms elapsed
--908-- [task B] #5: 2523 ms elapsed
--530-- [task C] #5: 2523 ms elapsed
--530-- [task C] #6: 2704 ms elapsed
--908-- [task B] #6: 2703 ms elapsed
--908-- [task B] #7: 2884 ms elapsed
--530-- [task C] #7: 2884 ms elapsed
--908-- [task B] #8: 3065 ms elapsed
--530-- [task C] #8: 3065 ms elapsed
--908-- [task B] #9: 3245 ms elapsed
--530-- [task C] #9: 3245 ms elapsed
--908-- [task B] #10: 3426 ms elapsed
--530-- [task C] #10: 3426 ms elapsed

RxJava TestCase

Code

import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

class HeavyTask implements Observer<Long> {
    private long t0;
    private String taskName;
    public HeavyTask(long t0, String taskName) {
        this.t0 = t0;
        this.taskName = taskName;
    }

    @Override
    public void onNext(@NonNull Long x) {
        long threadId = Thread.currentThread().getId();
        long dt = System.currentTimeMillis() - t0;
        try {
            Thread.sleep(180);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.printf("--%d-- [task %s] #%d: %d ms elapsed\n",threadId,taskName, x, dt);
    }

    @Override    public void onSubscribe(@NonNull Disposable d) {    }

    @Override    public void onError(@NonNull Throwable e) {    }

    @Override    public void onComplete() {    }
}

public class LearnRx {

    @Test
    public void run1() throws Exception {
        var obe = Observable.interval(200, TimeUnit.MILLISECONDS).take(10);
        long t0 = System.currentTimeMillis();
        obe.observeOn(Schedulers.newThread()).subscribe(new HeavyTask(t0,"A"));
        obe.observeOn(Schedulers.newThread()).subscribe(new HeavyTask(t0,"B"));
        obe.observeOn(Schedulers.newThread()).subscribe(new HeavyTask(t0,"C"));

        Thread.sleep(3600000); // block the main thread
    }
}

Output

--25-- [task C] #0: 236 ms elapsed
--26-- [task B] #0: 236 ms elapsed
--27-- [task A] #0: 236 ms elapsed
--27-- [task A] #1: 435 ms elapsed
--25-- [task C] #1: 435 ms elapsed
--26-- [task B] #1: 435 ms elapsed
--25-- [task C] #2: 635 ms elapsed
--27-- [task A] #2: 635 ms elapsed
--26-- [task B] #2: 635 ms elapsed
--27-- [task A] #3: 834 ms elapsed
--25-- [task C] #3: 835 ms elapsed
--26-- [task B] #3: 835 ms elapsed
--26-- [task B] #4: 1035 ms elapsed
--25-- [task C] #4: 1035 ms elapsed
--27-- [task A] #4: 1035 ms elapsed
--25-- [task C] #5: 1235 ms elapsed
--26-- [task B] #5: 1235 ms elapsed
--27-- [task A] #5: 1236 ms elapsed
--27-- [task A] #6: 1434 ms elapsed
--26-- [task B] #6: 1436 ms elapsed
--25-- [task C] #6: 1436 ms elapsed
--27-- [task A] #7: 1634 ms elapsed
--25-- [task C] #7: 1635 ms elapsed
--26-- [task B] #7: 1635 ms elapsed
--26-- [task B] #8: 1835 ms elapsed
--27-- [task A] #8: 1835 ms elapsed
--25-- [task C] #8: 1835 ms elapsed
--26-- [task B] #9: 2036 ms elapsed
--25-- [task C] #9: 2036 ms elapsed
--27-- [task A] #9: 2036 ms elapsed
nikobarli commented 4 years ago

Hi,

By default rxcpp uses the current thread to generate the interval. That's why you saw the first 10 of outputs are always from task A, because the current thread is held inside the interval generator.

You can replace the code for the interval observable as follows to let the interval generation is handled by another thread instead of the current thread.

        auto obe= rxcpp::observable<>::interval(200ms, rxcpp::observe_on_new_thread())
kirkshoop commented 4 years ago

One of the differences between rxcpp and rxJava is the default scheduler. RxJava uses a thread pool as the default. Rxcpp uses a trampoline as the default. This means that the rxcpp version is using the main thread to emit all the intervals and that rxJava uses multiple threads in the pool to produce the intervals. To make rxcpp use different threads, use the same observe_on_new_thread() as a parameter to interval, this overrides the default.

There are other effects. The default in rxcpp also means that each subscribe completes emitting all the intervals before the next subscribe begins ( if you did the heavy tasks in a transform and then merged all the transforms into one subscribe you would get different results). I seem to remember a surprising behavior in interval that emits relative to now() at the time of construction.

Combining these effects results in all of A being emitted first and then B & C being interleaved.

nikobarli commented 4 years ago

I seem to remember a surprising behavior in interval that emits relative to now() at the time of construction.

Combining these effects results in all of A being emitted first and then B & C being interleaved.

Yes, I think it's the one reported here: https://github.com/ReactiveX/RxCpp/issues/413

0xffff00 commented 4 years ago

Thanks! @nikobarli @kirkshoop