ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.03k stars 390 forks source link

Implementing logical OR between multiple operations #571

Open JavierBejMen opened 2 years ago

JavierBejMen commented 2 years ago

Hi all! We are currently developing an stream processing program using RxCpp, but we are having issues on how to correctly approach the following situation.

Let's say we have multiple operations, where each operation can succeed or fail. We want to lift a pipeline of observables that sequentially try each operation while they fail, returning as soon as one operation succeed.

The main questions is:

One possible implementation we come across is the following:

We define the OperationResult class, just a wrapper over the event being processed and a boolean indicating if the operation succeeded or failed:

template <class T> class OperationResult{
private:
    T m_event;
    bool m_success;

public:
    OperationResult(bool succeded, T event) : m_success{succeded}, m_event{event}{}

    bool success() const{
        return this->m_success;
    }

    T event() const{
        return this->m_event;
    }
};

And the Operation class, that set up the rxcpp pipeline, where each operation exposes 2 observables, one that emits successfully processed items and one that emits the failed ones.

template <class T> class Operation
{
    using function_t = std::function<OperationResult<T>(OperationResult<T>)>;
    using observable_t = rxcpp::observable<OperationResult<T>>;

private:
    std::string m_name;
    function_t m_fn;
    bool m_inputConnected;

public:
    Operation(const std::string & name, function_t fn) : m_name{name}, m_fn{fn}, m_inputConnected{false}{}

    // This function sets up the rxcpp pipeline, implementing the bifurcation depending on OperationResult
    // in a way that no new observables/subscriptions are performed
    std::pair<observable_t, observable_t> connect(const observable_t & input){
        if (this->m_inputConnected){
            throw std::runtime_error("Error, operation " + this->m_name + " is already connected");
        }else{
            auto result = input.map(this->m_fn).publish();
            auto failure = result.filter([](OperationResult<T> result) { return !result.success(); });
            auto success = result.filter([](OperationResult<T> result) { return result.success(); });
            result.connect();

            this->m_inputConnected = true;
            return std::pair(failure, success);
        }
    }
};

This allows for setting up a logical OR between multiple Operations like this:

// Operations
Operation<int> op0("is pair",
                    [](OperationResult<int> res)
                    {
                        cout << "pair got " << res.event() << endl;
                        return OperationResult<int>(res.event() % 2 == 0, res.event());
                    });

Operation<int> op1("is greater than 3",

                    [](OperationResult<int> res)
                    {
                        cout << "greater than 3 got " << res.event() << endl;
                        return OperationResult<int>(res.event() > 3, res.event());
                    });

Operation<int> op2("equals 1",
                    [](OperationResult<int> res)
                    {
                        cout << "equals 1 got " << res.event() << endl;
                        return OperationResult<int>(res.event() == 1, res.event());
                    });

// Input to the observable chain
auto input_sbj = subjects::subject<OperationResult<int>>();
auto input = input_sbj.get_observable();

// Subscriber to be called if one of the 3 operation succeeded
auto success_subscriber = make_subscriber<OperationResult<int>>([](OperationResult<int> res)
                                                                { cout << "[Success] got " << res.event() << endl; });

// Subscriber to be called if all operations failed
auto error_subscriber = make_subscriber<OperationResult<int>>([](OperationResult<int> res)
                                                                { cout << "[Error] got " << res.event() << endl; });

// Logical OR
auto outs = op0.connect(input);
outs.second.subscribe(success_subscriber);

outs = op1.connect(outs.first);
outs.second.subscribe(success_subscriber);

outs = op2.connect(outs.first);
outs.second.subscribe(success_subscriber);

outs.first.subscribe(error_subscriber);

// Emit input
auto s = input_sbj.get_subscriber();
s.on_next(OperationResult(false, 0)); // Success
s.on_next(OperationResult(false, 1)); // Success
s.on_next(OperationResult(false, 3)); // Error
s.on_next(OperationResult(false, 5)); // Success
s.on_completed();

Output:

pair got 0
[Success] got 0
pair got 1
greater than 3 got 1
equals 1 got 1
[Success] got 1
pair got 3
greater than 3 got 3
equals 1 got 3
[Error] got 3
pair got 5
greater than 3 got 5
[Success] got 5

Any help or insight you can give me is much appreciated!

victimsnino commented 2 years ago

What about most straightforward way? when you pass all comparisons to one lambda or something like this? like

observable.filter([](int v){return v % 2 == 0 || v > 3 || v ==1;})

another way I can imagine is use amb operator, but you will need to create it for each obtained value and immediately execute. Like...

observable.flat_map([](int v)
{
return rxcpp::observable<>::just(v)
   .filter([](int v){return v % 2 ==0;})
   .amb(rxcpp::observable<>::just(v).filter([](int v){return v > 3;}), 
        rxcpp::observable<>::just(v).filter([](int v){return v ==  1;}))
});

but not sure if it is really useful (due to every time for each value you will create new observable, configure and immediately run)

kirkshoop commented 2 years ago

Let's say we have multiple operations, where each operation can succeed or fail. We want to lift a pipeline of observables that sequentially try each operation while they fail, returning as soon as one operation succeed.

That sounds like

Amb(op0.retry(), op1.retry(), ..)