ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.03k stars 390 forks source link

Escaping the monad in RxCpp #514

Closed zpyatt closed 4 years ago

zpyatt commented 4 years ago

Hi, thanks for the great library.

I have a stream/observable of asio udp packets. I need to collect X of them and return them from a function. The problem is the c++ version of Rx doesn't seem to provide any of the methods to "escape the monad" as described in the intro to Rx. What I'd like to do is something like:

std::future<std::vector<udp_packet>> get_data() {
  std::promise<std::vector<udp_packet>> p;
  auto f = p.get_future();
  auto v = std::make_shared<std::vector<udp_packet>>();
  packets().take(10).subscribe(
    [v](udp_packet u) mutable { v->emplace_back(u); },
    [p = std::move(p)](std::exception_ptr e_ptr) mutable { p.set_exception(e_ptr); },
    [p = std::move(p), v]() mutable { p.set_value(*v); }
  );
  return f;
}

This isn't exactly what I'm doing, the main thing being I want to move capture the promise so I can set the value on completion (or exception). In order to do so I have to mark the lambdas as mutable; however, that won't compile. Something about the lambdas being const, I don't have access to the compiler right now. Also, "packets()" is a hot observable from a subject that calls on_next with the packets rx'd from an asio receive call. I'm wondering if "tap" might work, but I don't really understand the documentation on "tap"?

Any suggestions? Thanks.

kirkshoop commented 4 years ago

Have a look at buffer and reduce.

Packets().buffer(10).first()

Or in rare cases

Packets().buffer(10).as_blocking().first()

zpyatt commented 4 years ago

thx.

Is there a way of getting the Ts out of the observable<T>, besides lambda captures? Even in Haskell I thought you could escape certain monads like List and State?

first() also returns an observable, is there any way of converting this to a future?

nikobarli commented 4 years ago

I think you can just add helpers as needed, e.g.

template<typename T>
std::future<std::vector<T>> to_future(rxcpp::observable<T> const& obs) {
    auto coord = rxcpp::observe_on_new_thread();
    auto p = std::make_shared<std::promise<std::vector<T>>>();
    auto f = p->get_future();

    auto result = std::make_shared<std::vector<T>>();
    obs.subscribe_on(coord).subscribe(
        [result](auto&& x) { // on_next
        result->emplace_back(x);
    },
        [p](std::exception_ptr e) { // on_error
        p->set_exception(e);
    },
        [p, result]() { // on_completed
        p->set_value(*result);
    });
    return f;
}

Then you can use it as follows:

        auto obs = rxcpp::observable<>::interval(std::chrono::seconds(1)).take(3);
        auto f = to_future(obs.as_dynamic());
        auto result = f.get(); // block here until observable completes
        for (auto& v : result) {
            std::cout << v << std::endl;
        }
zpyatt commented 4 years ago

That helps too.

Thanks guys.