rosin-project / rxros

Reactive programming for ROS
BSD 3-Clause "New" or "Revised" License
48 stars 6 forks source link

Fix #31: RxROS operators should use subscribe_on rather than observe_on #32

Open henrik7264 opened 5 years ago

henrik7264 commented 5 years ago

The observe_on call has been changed with the subscribe_on to ensure that the operations are executed in a dedicated thread.

gavanderhoorn commented 5 years ago

@henrik7264: you should rebase your PR once #33 gets merged.

gavanderhoorn commented 5 years ago

@henrik7264: could you please rebase this to we can run CI on it?

nevermind, just noticed that observe_on_blocks_operators is actually a branch in this repository, so I've rebased it.

gavanderhoorn commented 5 years ago

Unassigned myself as I believe @wasowski would be in a better position to review this one.

gavanderhoorn commented 4 years ago

@wasowski: would you have time to take a quick look at this before ROSCon?

wasowski commented 4 years ago

I missed the original deadline, and I need a bit more time for this (need to read up...). I will really try this week ...

gavanderhoorn commented 4 years ago

Thanks for the review @wasowski.

gavanderhoorn commented 4 years ago

I'd like to release an updated version of rxros soon. @henrik7264: would you have time to take a look at @wasowski's comments?

henrik7264 commented 4 years ago

Yes, I shall answer it later tonight. It will however be a long story.

wasowski commented 4 years ago

We like long stories. I also looked further in the terrible book I am reading, and I found contradictory information on this issue. So let's look at your side of things. I think understanding this is key for controlling concurrency in rxros.

henrik7264 commented 4 years ago

OK, so here is my view of this subject. First of all, here are a couple of books I have used in the past:

None of the books are really great, but lack of good literature on reactive programming and especially in the area of threading, schedulers, the special operators subscribe_on and observe_on they have been my main source of inspiration. I tend to like the RxPY book slightly better than RxCPP book.

I have also not been able to find any good explanations on the internet - they repeat over and over again the same information about threading, schedulers, and how to use subscribe_on and observe_on. But the explanation is very thin - I afraid it is because the subject is rather complex.

gavanderhoorn commented 4 years ago

So, what's the way forward?

henrik7264 commented 4 years ago

So, a typically ROS program has the following form:

int main(int argc, char** argv) {
    ros::init(argc, argv, "example");
    //...
    ros::spin();
}

The ros::spin() function blocks the main thread until it is shutdown or terminated. The other purpose of the rxros::spin function is to dispatch messages from ROS topics to the appropriate RxROS observables. It is therefore important that this function is called right after initialization of variables, function definitions etc.

The following (uninteresting) program is therefore not desirable as the ros::spint() never will be reached. The interval observable will emit an infinite number of values in the main thread and the map and subscribe functions will likewise be executed in the main thread.

int main(int argc, char** argv) {
    ros::init(argc, argv, "example");

    rxcpp::observable<>::interval (std::chrono::milliseconds (1000))
    | map ([] (int i) {return i*2;})
    | map ([] (int j) {return j*3;})
    | subscribe(
        [](int i){std::cout << "OnNext: " << v << std::endl;},
        [](){std::cout << "OnCompleted" << std::endl;});

    ros::spin();
}

By default, a message stream and the chain of operators that is apply to it works on the same thread. It is actually bad to use the word thread. The right word is scheduler. However, a scheduler is typically executed in a dedicated thread.

The ObserveOn operator changes the scheduler of the operators and observers below it. This means that the ObserveOn can be called multiple times at various points during the chain of operators in order to change on which thread the operators should operate.

The SubscribeOn operator affects the operators and observers above and below it. In other words, the SubscribeOn operator specifies which scheduler the observable message stream will begin operating on, no matter at what point in the chain the SubscribeOn is called.

This means that the following (uninteresting) program

int main(int argc, char** argv) {
    ros::init(argc, argv, "example");

    rxcpp::observable<>::interval (std::chrono::milliseconds (1000))
    | observe_on(scheduler1)
    | map ([] (int i) {return i*2;})
    | observe_on(scheduler2)
    | map ([] (int j) {return j*3;})
    | subscribe(
        [](int i){std::cout << "OnNext: " << v << std::endl;},
        [](){std::cout << "OnCompleted" << std::endl;});

    ros::spin();
}

behaves as follows: The interval observable will emit values every second in the main thread. The first map function will be executed in the context of scheduller1. The second map function and the subscribe function will be executed in the context of scheduller2. The program is still not desirable as the ros::spin() function is not reached.

Now we make a final change to the program:

int main(int argc, char** argv) {
    ros::init(argc, argv, "example");

    rxcpp::observable<>::interval (std::chrono::milliseconds (1000))
    | observe_on(scheduler1)
    | map ([] (int i) {return i*2;})
    | observe_on(scheduler2)
    | map ([] (int j) {return j*3;})
    | subscribe_on(scheduler3).subscribe(
        [](int i){std::cout << "OnNext: " << v << std::endl;},
        [](){std::cout << "OnCompleted" << std::endl;});

    ros::spin();
}

This program behaves as follows: Due to the subscribe_on call all observables and operations will execute in the context of scheduler3. However, the two observe_on calls will overrule this behavior so that the first map is executed in the context of scheduler1 and the second map is executed in the context of the scheduler2. If scheduler3 is executed in a thread different from the main thread then the ros::spin() function will execute in parallel with the interval observable.

To be continued.

henrik7264 commented 4 years ago

The interval observable can take a scheduler as argument as follows:

int main(int argc, char** argv) {
    ros::init(argc, argv, "example");

    rxcpp::observable<>::interval (std::chrono::milliseconds (1000), scheduler)
    | map ([] (int i) {return i*2;})
    | map ([] (int j) {return j*3;})
    | subscribe(
        [](int i){std::cout << "OnNext: " << v << std::endl;},
        [](){std::cout << "OnCompleted" << std::endl;});

    ros::spin();
}

In this case the interval observable will emit vales in the context of the scheduler. The map operators and subscribe function will likewise execute in the context of the scheduler and the ros::spin() function is called if the scheduler is executing in the a thread that is different from the main thread.

I have actually not tried this, but I shall try it out this weekend. The interface provided by the interval observable is a common way to construct observables. The interface for the interval observable looks something like this:

template<class Duration>
auto interval(Duration period)
...
template<class Duration, class Scheduler>
auto interval(Duration period, Scheduler scheduler)
...

This is actually the interface I would like to end up with in RxROS. We should have one interface that will execute in the context of the current scheduler (defined by observe_on or schedule_on) and an interface that specifies the scheduler to be used.

template<class T>
static auto from_topic(const std::string& topic)
...
template<class T, class Scheduler>
static auto from_topic(const std::string& topic, Scheduler scheduler)
...

But, it is unfortunately not that easy. To be continued.

henrik7264 commented 4 years ago

So, suppose we had an interface (our goal) like

template<class T>
static auto from_topic(const std::string& topic)
...
template<class T, class Scheduler>
static auto from_topic(const std::string& topic, Scheduler scheduler)
...

and an implementaion of from_topic (simplified):

template<class T>
static auto from_topic(const std::string& topic)
{
    auto observable = rxcpp::observable<>::create<T>(
        [=](rxcpp::subscriber<T> subscriber) {
            auto callback = [=](const T &val) {subscriber.on_next(val);};
            ros::Subscriber ros_subscriber(node::get_handle().subscribe<T>(topic, 10, callback));
            ros::waitForShutdown();});
    return observable;
}
...
template<class T, class Scheduler>
static auto from_topic(const std::string& topic, Scheduler& scheduler)
{
    auto observable = rxcpp::observable<>::create<T>(
        [=](rxcpp::subscriber<T> subscriber) {
            auto callback = [=](const T &val) {subscriber.on_next(val);};
            ros::Subscriber ros_subscriber(node::get_handle().subscribe<T>(topic, 10, callback));
            ros::waitForShutdown();});
    return observable.subscribe_on(scheduler);
}

The from_topic then works as follows. The following code is

    auto observable = rxcpp::observable<>::create<T>(
        [=](rxcpp::subscriber<T> subscriber) {
           // Here goes the code that should be executed when we subscribe to the topic
        });
    return observable;

standard way of creating an observable. So when we execute something like

    auto obs = rxros::observable::from_topoic<int> ("/some_topic")

we actually get a (standard) observable as a result. This means we can subscribe to it as follows:

    auto obs = rxros::observable::from_topoic<int> ("/some_topic")
    obs.subscribe (
        [](int i){std::cout << "OnNext: " << v << std::endl;},
        [](){std::cout << "OnCompleted" << std::endl;});

When we subscribe to it the "// Here goes the code that should be executed when we subscribe to the topic" is executed or more precisly the following code:

auto callback = [=](const T &val) {subscriber.on_next(val);};
ros::Subscriber ros_subscriber(node::get_handle().subscribe<T>(topic, 10, callback));
ros::waitForShutdown();});

What happens here is that we create a callback function and setup a ROS subscriber that calls the callback function each time the topic is updated. The callback function is rather simple it just forwards the value to the subscriber's on_next call, i.e. we are emitting a value.

So lets now look at the following code:

int main(int argc, char** argv) {
    ros::init(argc, argv, "example");

    rxros::observable::from_topoic<int> ("/some_topic")
    | map ([] (int i) {return i*2;})
    | map ([] (int j) {return j*3;})
    | subscribe(
        [](int i){std::cout << "OnNext: " << v << std::endl;},
        [](){std::cout << "OnCompleted" << std::endl;});

    ros::spin();
}

This almost looks perfect, but it will not work because the following code

    rxros::observable::from_topoic<int> ("/some_topic")
    | map ([] (int i) {return i*2;})
    | map ([] (int j) {return j*3;})
    | subscribe(
        [](int i){std::cout << "OnNext: " << v << std::endl;},
        [](){std::cout << "OnCompleted" << std::endl;});

is blocking the main thread due to the ros::waitForShutdown() call. The consequence is that the ros::spin() is never called.

OK, then we just remove the ros::waitForShutdown() call! This will also not work as the following code

auto callback = [=](const T &val) {subscriber.on_next(val);};
ros::Subscriber ros_subscriber(node::get_handle().subscribe<T>(topic, 10, callback));
//ros::waitForShutdown();});

would simply terminate immediately as soon as we subscribe to it.

OK, then we just call observe_on and execute the code in another thread

    rxros::observable::from_topoic<int> ("/some_topic")
    | map ([] (int i) {return i*2;})
    | map ([] (int j) {return j*3;})
    | observe_on(scheduler).subscribe(
        [](int i){std::cout << "OnNext: " << v << std::endl;},
        [](){std::cout << "OnCompleted" << std::endl;});

This will also not work. The only part that will execute in the schedulers context is the handler part, i.e the following code

[](int i){std::cout << "OnNext: " << v << std::endl;},
[](){std::cout << "OnCompleted" << std::endl;});

The only way to solve the problem is use subscribe_on:

int main(int argc, char** argv) {
    ros::init(argc, argv, "example");

    rxros::observable::from_topoic<int> ("/some_topic")
    | map ([] (int i) {return i*2;})
    | map ([] (int j) {return j*3;})
    | subscribe_on(scheduler).subscribe(
        [](int i){std::cout << "OnNext: " << v << std::endl;},
        [](){std::cout << "OnCompleted" << std::endl;});

    ros::spin();
}

or to use the other interface of from_topic:

int main(int argc, char** argv) {
    ros::init(argc, argv, "example");

    rxros::observable::from_topoic<int> ("/some_topic", scheduler)
    | map ([] (int i) {return i*2;})
    | map ([] (int j) {return j*3;})
    | subscribe(
        [](int i){std::cout << "OnNext: " << v << std::endl;},
        [](){std::cout << "OnCompleted" << std::endl;});

    ros::spin();
}

In both these cases both the emitter and the handlers of the subscribe operator will be executed in the context of the scheduler.

To be continued...

henrik7264 commented 4 years ago

The current implementation of from_topic has the following interface (simplified)

template<class T>
static auto from_topic(const std::string& topic)

It has similarities with the standard/ideal interface discussed above

template<class T>
static auto from_topic(const std::string& topic)

template<class T, class Scheduler>
static auto from_topic(const std::string& topic, Scheduler scheduler)

but the implementation is different. The current version of from_topic is exactly equal to executing the ideal version of from_topic with rxcpp::synchronize_new_thread() as scheduler.

The advantages of this is that we make the choice for the programmer. It is simple and easy to use. The from_topic will always execute in the context of the rxcpp::synchronize_new_thread() scheduler. This scheduler always execute in a thread that is different from main thread and we are sure to reach the ros::spin function at the end of the main function.

The disadvantages is that we do not give the programmer a chance to choose the scheduler he may like to impose on the program and we deviate from providing a standard interface to an observable.

We have three choices here:

  1. Go with the current implementation knowing that it has limited functionality.
  2. Go with the standard/ideal interface knowing that it may be confusing to use with the ros::spin() function.
  3. Go with standard interface, but a hybrid implementation where the from_topic(const std::string& topic) behaves as the current implementation.

To be continued (we are almost finished - can you follow me) ...

henrik7264 commented 4 years ago

The last part is about operators. The publish_to_topic operator has the following interface (simplified):

template<typename T>
auto publish_to_topic(const std::string &topic);

and the following implementation (simplified):

template<typename T>
auto publish_to_topic(const std::string &topic) {
    return [=](auto&& source) {
        ros::Publisher publisher(rxros::node::get_handle().advertise<T>(topic, 10));
        source.observe_on(rxcpp::synchronize_new_thread()).subscribe(
            [=](const T& msg) {publisher.publish(msg);});
        return source;};}

A standard opeator is constructed as follows:

auto myOperator(/* some arguments */) {
    return [=](auto&& source) {
        // Here goes the code that modifies the emitted values of the observable source.
        return source;};}

An operator is thus a higher order function or closure. The returned lambda expression/function takes as argument a source observable and return as result a new observable. Just for the record: The myOperator is a kind of identity function. It takes as input a source observable and returns it without any changes. Observe that this is how the publish_to_topic works.

Lets now look at what the publish_to_topic does with the source observable:

ros::Publisher publisher(rxros::node::get_handle().advertise<T>(topic, 10));
source.observe_on(rxcpp::synchronize_new_thread()).subscribe(
    [=](const T& msg) {publisher.publish(msg);});

It first creates a ros::Publisher object and then we subscribe to the source/input observable. This means that each time a event is emmited by the source observable it is published on the created ros::Publisher object. Observe that the source is observed on the scheduler rxcpp::synchronize_new_thread()). This means that handler part, i.e. publisher.publish(msg) is executed in a thread different from the main thread.

Lets now look at a the following piece of code:

int main(int argc, char** argv) {
    ros::init(argc, argv, "example");

    rxros::observable::from_topoic<int> ("/some_topic")
    | map ([] (int i) {return i*2;})
    | map ([] (int j) {return j*3;})
    | publish_to_topic<int>("/another_topic")

    ros::spin();
}

Will this code execute as expected?

wasowski commented 4 years ago

Guys, give me a few days to read this :)

henrik7264 commented 4 years ago

Well, the answer depends on how from_topic and publish_to_topic is implemented:

  1. If neither from_topic nor publish_to_topic use the subscribe_on then the execution will block and we will never reach the ros::spin function.

  2. If either from_topic or publish_to_topic use the subscribe_on the execution will take place in the context of the specified scheduler which normally is executing in a thread different from the main thread. This means that the ros::spin function is reached and the code works as expected. The observe_on operator can be used anywhere as long as there is one subscribe_on call.

  3. If both from_topic and publish_to_topic use the subscribe_on. This situation is actually a bit strange and not really desirable as it becomes unclear which scheduler is executing what part of the code. There should in general only be one call of the subscribe_on. This is at least a goal we should try to reach. The code will work as ros::spin() is reached, but we may have used an unnecessary number of threads and the code is not easy to understand as it is unclear in which context/thread the code actually is executing.

The standard way of creating operators like publish_to_topic is:

template<typename T>
auto publish_to_topic(const std::string &topic);

template<typename T, typename Scheduler>
auto publish_to_topic(const std::string &topic, Scheduler& scheduler);

Observe that this interface is almost identical to the standard interface we defined for from_topic. In normal situations publish_to_topic(const std::string &topic) would execute in the context of the current scheduler and publish_to_topic(const std::string &topic, Scheduler& scheduler) would execute in the context of the specified scheduler (argument). I think this is the preferred solution for the publish_to_topic operator.

So, this is my understanding of how the observe_on and subscribe_on operators work. Does it give any meaning? The reason for this long discussion of the two operators is actually not caused by the operators themselves, but rather by the ros::spin() function that need to be called in order for the from_topic and publish_to_topic to work as expected.

I will add one more comment to this discussion. Look at the following example:

int main(int argc, char** argv) {
    ros::init(argc, argv, "example");

    rxros::observable::from_topic<int> ("/some_topic")
    | map ([] (int i) {return i*2;})
    | map ([] (int j) {return j*3;})
    | publish_to_topic<int>("/another_topic")
    | spin();
}

In this case the spin function has become an RxROS operator. I actually don't know if this is possible and what the implications of this is, but it is a taught that has been spinning around in my head for some time :-)

henrik7264 commented 4 years ago

Well, the last idea is unfortunately nor realizable for some obvious reason. Look at this code:

int main(int argc, char** argv) {
    ros::init(argc, argv, "example");

    rxros::observable::from_topic<int> ("/some_topic")
    | map ([] (int i) {return i*2;})
    | map ([] (int j) {return j*3;})
    | publish_to_topic<int>("/another_topic")
    | spin();

    rxros::observable::from_topic<int> ("/some_other_topic")
    | map ([] (int i) {return i*2;})
    | map ([] (int j) {return j*3;})
    | publish_to_topic<int>("/yet_another_topic")
    | spin();
}

This will not work at all.