rsocket / rsocket-js

JavaScript implementation of RSocket
https://github.com/rsocket/rsocket-js
Apache License 2.0
594 stars 97 forks source link

requestChannel: error when subscribing again #204

Open dholzenburg opened 2 years ago

dholzenburg commented 2 years ago

When using requestChannel(), one can subscribe to the Flowable returned. One can unsubscribe as well. If one subscribes again to this flowable, it does not work.

Expected Behavior

After re-subscribing, one will receive values again with the given subscriber. No warning will occur on console.

Actual Behavior

A warning is logged on console (RSocketClient: re-entrant call to request n before initial channel established.). No new values are received by the subscriber.

Steps to Reproduce

const metadata = ...;
const client = new RSocketClient(...);
const processor = new FlowableProcessor(sub => { });
const inputFlowable = processor.map(i => { return { data: i, metadata: metadata } });
client.connect().then(
    socket => {
        var subscription;
        const subscriber = {
            onNext: value => console.log('Value from Responder', value.data),
            onSubscribe: sub => { subscription = sub; sub.request(0x7fffffff) },
        };

                // responder will multiply the given value by 2
        const channelFlowable = socket.requestChannel(inputFlowable);
        channelFlowable.subscribe(subscriber);

        setTimeout(() => processor.onNext(1), 1000);
        setTimeout(() => processor.onNext(2), 2000);
        setTimeout(() => { console.log("unsubscribe"); subscription.cancel(); }, 3000);
        setTimeout(() => { console.log("subscribe again"); channelFlowable.subscribe(subscriber); }, 4000);
        setTimeout(() => processor.onNext(3), 5000);

               //output on console
               // Value from Responder 2
               // Value from Responder 4
               // Unsubscribe
               // subscribe again
               // RSocketClient: re-entrant call to request n before initial channel established.
    }
);

Workaround

Instead of re-subscribing call requestChannel() again

Possible Solution

Looks like the variables 'initialized' and 'payloadsSubscribed' in the requestChannel()-implementation of RSocketMachine are not reset.

Your Environment

Happy new year!

viglucci commented 2 years ago

Hi @dholzenburg ,

If I am interpreting the protocol documentation correctly, I believe that this is working as per protocol spec. The Protocol spec for Request Channel states:

Upon receiving a CANCEL, the stream is terminated on the Responder.

Upon sending a CANCEL, the stream is terminated on the Requester.

Given that, if the Requester or a Responder of a RequestChannel stream send CANCEL, you must re-establish the stream using a subsequent call to requestChannel. You cannot simply re-subscribe to the stream returned form requestStream, as it has already been terminated.

I will also note that while the protocol has not changed, these APIs have changed recently with 1.0.0-alpha.1, and the flowable API is no longer available.