This is a quite dirty fix of the issue discovered in failing command_service_replay_test. The problem is that calling unsubscribe() on a subscription from rxcpp::subjects::synchronize<T, rxcpp::observe_on_one_worker> does not block until a currently running subscribed observer finishes. This lead to CommandServiceImpl destruction while there was still running an observer on statuses that was using CommandServiceImpl::cache_ member, which got invalidated.
There is a task to find a better way to handle this type of problems: IR-426.
Benefits
Fix.
Possible Drawbacks
Need to capture everything needed for a subscriber by value.
Usage Examples or Tests [optional]
command_service_replay_test
Alternate Designs [optional]
Wait till the task finishes. Calling unsubscribe() allows it if we have a dedicated thread for our subscriber, but not when we reuse one.
Description of the Change
This is a quite dirty fix of the issue discovered in failing
command_service_replay_test
. The problem is that callingunsubscribe()
on a subscription fromrxcpp::subjects::synchronize<T, rxcpp::observe_on_one_worker>
does not block until a currently running subscribed observer finishes. This lead toCommandServiceImpl
destruction while there was still running an observer on statuses that was usingCommandServiceImpl::cache_
member, which got invalidated.There is a task to find a better way to handle this type of problems: IR-426.
Benefits
Fix.
Possible Drawbacks
Need to capture everything needed for a subscriber by value.
Usage Examples or Tests [optional]
command_service_replay_test
Alternate Designs [optional]
Wait till the task finishes. Calling
unsubscribe()
allows it if we have a dedicated thread for our subscriber, but not when we reuse one.