ReactiveX / RxCpp

Reactive Extensions for C++
Apache License 2.0
3.03k stars 390 forks source link

Unsubscribe callback to support cancellation #517

Open LanderN opened 4 years ago

LanderN commented 4 years ago

Hello!

Is there currently a way to be notified of an "unsubscribe()" call? For my use case, I am connecting to a websocket and using rxcpp to stream incoming messages to subscribers. What I would like to achieve is disconnecting the websocket when there are no longer any subscribers. I would like this to happen whenever the last subscriber calls "unsubscribe()" (as opposed to waiting for the next message to arrive and checking if there are any more subscribers at that point).

Is this possible with RxCpp?

pseudo-code to illustrate what I am trying to do

    rxcpp::observable<>::create<message>([=](const rxcpp::subscriber<message>& s){
        websocket->onMessage([=](const message& message) {
            s.on_next(message);
        });

        // return unsubscribe callback?
        return [](){
            // close websocket if there are no longer any subscribers
            websocket->close();
        };
    })
kirkshoop commented 4 years ago

A subscriber is a subscription and a observer. Use s.add(<lambda>) to attach to the subscription.

LanderN commented 4 years ago

Thanks!

That was rather easy. Is this mentioned somewhere in the docs? I'm pretty new to Rx, but this seems to be a pretty basic thing to do and I couldn't figure it out from the docs (or I'm just missing some part of it somewhere).

LanderN commented 4 years ago

For future reference, this is what I ended up doing for supporting my use case with multiple subscriptions:

Rx::subjects::subject<message> subject;
auto source = Rx::observable<>::create<message>([=](const rxcpp::subscriber<message>& s){
    websocket->open(url);
    websocket->onMessage([=](){
        s.on_next(message);
    });

    s.add([](){
        // close websocket if there are no longer any subscribers
        websocket->close();
    });
});
auto observable = source.multicast(subject).ref_count();

[...]

auto subscriber1 = observable.take(3).subscribe([](message m){
    // receives first three messages
});
auto subscriber2 = observable.take(2).subscribe([](message m){
    // receiving first two messages
});

This behaves exactly the way I was looking for, it closes the websocket when the last subscriber unsubscribes and re-connects when a new subscription comes in after.

The RxJS manual helped me figure this out.