ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.04k stars 394 forks source link

working with threads #424

Closed Zifkan closed 6 years ago

Zifkan commented 6 years ago

Hi there, May be im idiot and im newbie in c++. Im still trying use rxcpp with Unreal Engine. Im trying to understand some things. This extension doesn't work Like UniRX(special extension for unity ) and its obvious. Just for Example trying to use interval that emits value every second. And when i wrote

observable<>::interval(chrono::seconds(1)).subscribe( [&](int i) { GEngine->AddOnScreenDebugMessage(-1, 5.f, FColor::Red, FString::Printf(TEXT("Interval value: %d"), i)); });

I have endless freez. As i understand in UniRx operator 'interval' released through coroutines which emit value and return control to main thread. In my example (UE 4 + rxpp) its working with another way. looks like interval just take control under main thread and thats all.

I had an idea to create new thread for 'interval' execution and its work.

observable<>::interval(chrono::seconds(1), synchronize_new_thread()) .subscribe( [&](int i) { GEngine->AddOnScreenDebugMessage(-1, 5.f, FColor::Red, FString::Printf(TEXT("Interval value: %d"), i)); });

But i have some problem. When i start editor and pressed "Play button" its created new thread for interval. When play mode off classes destroys but thread for interval still alive and continue working. So i should try to destroy it. Then i thought execute 'interval' in hi thread and result return to main thread with this methods: subscribe_on(workthread). observe_on(mainthread).

May be im mistaken and its wrong way to do this and im still can't control this threads. Im just tried use example described here https://github.com/Reactive-Extensions/RxCpp/pull/154 and have same result as first example. Its freezed for some seconds, because its has take_until this code executed but not parallel.

So im trying to understand everything and have no luck. How to implement time base operators May be someone tell me what should i read some books for understanding

kirkshoop commented 6 years ago

Hi!

observable<>::interval(chrono::seconds(1)).subscribe( [&](int i) { GEngine->AddOnScreenDebugMessage(-1, 5.f, FColor::Red, FString::Printf(TEXT("Interval value: %d"), i)); });

The above expression uses the default scheduler which protects the stack from overflow by taking ownership of the current thread and processing work through a thread-local queue until the queue is empty.

observable<>::interval(chrono::seconds(1), synchronize_new_thread()) .subscribe( [&](int i) { GEngine->AddOnScreenDebugMessage(-1, 5.f, FColor::Red, FString::Printf(TEXT("Interval value: %d"), i)); });

The above expression creates a new thread and keeps it alive until subscription::unsubscribe() is called.

subscribe in both expressions returns the subscription that can be used to stop them. It is better to use the take operator to stop an expression when possible.

In your case the run_loop peek and dispatch could be used in the main thread loop. This would allow any expression that used that run_loop to be non-blocking, but run on the main loop just like UniRx