Closed FZambia closed 7 months ago
Hello!
Checked it out in my playground, it fixes optimistic inflight subscription problem.
But it also covers another situation, when subscription is sent, the client did not receive response yet and connection goes to connecting state. In this case, variable isSubOptimisticallySent
could be lit bit misleading and it will also lead to situation when we will queue unsub frame, it does not affect anything right now, but maybe it can lead to some unexpected behaviour later if server will not just ignore such frames.
Also it may lead to some unexpected behaviour with such call chain, where client is Centrifuge instance
let test = client.newSubscription('test-channel'); test.subscribe(); client.disconnect(); client.connect(); test.unsubscribe();
It will lead to this error "Failed to execute 'send' on 'WebSocket': Still in CONNECTING state." (you can also see it on screenshot), because transport is not open yet.
Also it may lead to some unexpected behaviour with such call chain, where client is Centrifuge instance let test = client.newSubscription('test-channel'); test.subscribe(); client.disconnect(); client.connect(); test.unsubscribe();
Tried to reproduce this with test case:
test.each(transportCases)("%s: unsubscribe inflight", async (transport, endpoint) => {
const c = new Centrifuge([{
transport: transport as TransportName,
endpoint: endpoint,
}], {
websocket: WebSocket,
fetch: fetch,
eventsource: EventSource,
readableStream: ReadableStream,
emulationEndpoint: 'http://localhost:8000/emulation'
});
const sub = c.newSubscription('test');
let unsubcribeCalled: any;
const unsubscribedPromise = new Promise<UnsubscribedContext>((resolve, _) => {
unsubcribeCalled = resolve;
})
sub.on('unsubscribed', (ctx) => {
unsubcribeCalled(ctx);
})
sub.subscribe();
c.connect();
sub.unsubscribe();
expect(sub.state).toBe(SubscriptionState.Unsubscribed);
await unsubscribedPromise;
let disconnectCalled: any;
const disconnectedPromise = new Promise<DisconnectedContext>((resolve, _) => {
disconnectCalled = resolve;
})
c.on('disconnected', (ctx) => {
disconnectCalled(ctx);
})
c.disconnect();
await disconnectedPromise;
expect(c.state).toBe(State.Disconnected);
});
And it worked fine for me. In this case I am not sure how the error you've shown may be possible. Let's break down:
client.newSubscription('test-channel');
- no frames sent since we are disconnectedtest.subscribe();
- no frames sent since we are disconnectedclient.disconnect();
- not sure it's needed since it's noop in disconnected stateclient.connect();
- initializes connection, client goes to Connecting statetest.unsubscribe();
- I think it can't cause the error you've shown because while WebSocket transport in CONNECTING state we are not sending connect command/subscription commands and sub._inflight
should be false - thus no unsubscribe frame should be sent.So for now I do not see how to reproduce the error WebSocket': Still in CONNECTING state.
you've shown. Am I missing something?
I see! Looks like I missed that the client must be originally in Connected state to reproduce, that's why you called client.disconnect
. So I can reproduce it with this test case:
test.each(transportCases)("%s: unsubscribe inflight", async (transport, endpoint) => {
const c = new Centrifuge([{
transport: transport as TransportName,
endpoint: endpoint,
}], {
websocket: WebSocket,
fetch: fetch,
eventsource: EventSource,
readableStream: ReadableStream,
emulationEndpoint: 'http://localhost:8000/emulation'
});
c.connect();
await c.ready(5000);
const sub = c.newSubscription('test');
let unsubcribeCalled: any;
const unsubscribedPromise = new Promise<UnsubscribedContext>((resolve, _) => {
unsubcribeCalled = resolve;
})
let subcribeCalled: any;
const subscribedPromise = new Promise<SubscribedContext>((resolve, _) => {
subcribeCalled = resolve;
})
sub.on('subscribed', (ctx) => {
subcribeCalled(ctx);
})
sub.on('unsubscribed', (ctx) => {
unsubcribeCalled(ctx);
})
sub.subscribe();
c.disconnect();
c.connect();
sub.unsubscribe();
expect(sub.state).toBe(SubscriptionState.Unsubscribed);
await unsubscribedPromise;
sub.subscribe();
await subscribedPromise;
let disconnectCalled: any;
const disconnectedPromise = new Promise<DisconnectedContext>((resolve, _) => {
disconnectCalled = resolve;
})
c.on('disconnected', (ctx) => {
disconnectCalled(ctx);
})
c.disconnect();
await disconnectedPromise;
expect(c.state).toBe(State.Disconnected);
});
Is this correct assumption?
Yep, you are right connection must be connected before calls in my example, sorry that I did not mention that at first, here's short example:
const MIN_RETRY = 4000;
const MAX_RETRY = 4.611686018427388e21;
const client = new Centrifuge(centrifugeUrl, {
minReconnectDelay: MIN_RETRY,
maxReconnectDelay: MAX_RETRY,
token: centrifugoToken
});
client.once('connected', () => {
let test = client.newSubscription('test-channel33');
test.subscribe();
client.disconnect();
client.connect();
test.unsubscribe();
})
client.connect()
Yep, I see now that this alternative solution is worse than what you originally suggested. I'll come with another PR tomorrow - using your approach as base but slightly modified – to manage optimisticallySent
flag on per-subscription basis.
I have been thinking about per subscription basis, but will there be use for it?
All subs, that will be created after connection + optimistic subscribe frames has been sent, will be sent after connection open, if we unsub from them before connection is opened they will be just removed from queue.
Also it may make centrifuge
and subscription
entity more codependent between each other, because client will need to know that subs may be different.
In my approach all logic about optiomistic frames are bound inside centrifuge
entity. It just knows that optimistic frames has been sent and it stops early returning in _unsubscribe
calls. In it's order _unsubscribe
calls are controlled inside subscription and will be called only if subscrption frames has been sent before.
P.S. Just sharing my point of view to the problem, after hours spent in researches)
Thanks, it's definitely more complexity. But having per-subscription flags seems natural to me because not all subscriptions are optimistic in centrifuge-js
(this depends on configuration and current token state - here is a single branch of code in which optimistic subscription will be sent - it's the common path in general I believe, so most subs are optimistic, but other paths around can happen too). So when you call unsubscribe for some subscription – it's not necessarily sent as part of optimistic subscription batch. That's why in this case I think it's better to fallback to the current logic which seems to work correctly for non-optimistic case.
Does it make sense?
UPD. Opened PR with what I mean - #276
Thank you! Yeah, that's exact what I have been thinking about, when have been trying per sub approach)
But it looks like it's already controlled by sub itself, because it will never call this._centrifuge._unsubscribe
, if subscription is not in subscribed | subscribing
state, so _optimisticallySent
flag will be some kind of guard logic over it and not cover any additional case, or I am missing something and there's case where it works another way?
What’s also a little scary (from debugging side of view) is that the unsubscribe logic will be more spread between the client and the subscription entity, which will complicate future debugging.
I mean that calling optimistic subscribe will already change state of subscription and only such subscriptions will be calling _unsubscribe
in client before connection is fully established.
All other subscriptions will just not call sub/unsub before connection is established and after connection is established, there's no cases when subs need to know if they are optimistic or not. So it becomes logic inside subscription that is used only inside client.
it will never call this._centrifuge._unsubscribe, if subscription is not in subscribed | subscribing state
Subscription goes to subscribing
state right after .subscribe()
has been called. It may be done even before calling client.connect()
for the first time.
At any point after that .unsubscribe()
may be called. Let's assume that at this point client is in the process of connecting with other optimistic subscriptions. But this particular subscription in subscribing state may be not included into optimistic batch (if subscription uses token and have not received it for the first time, or if it uses dynamic per subscription data
) – in that case unnecessary unsubscribe command will be sent to server.
Also I think more logic may be added in the future which may result into unnecessary unsubscribe sent. I am not happy from tight coupling of Client and Subscription – but it's sth we already have and it's hard to change without loosing efficiency.
What do you think? Maybe I am still missing sth which ruins my concerns?
I suppose it's worth writing a test case to check this for sure.
Yeah, but even the subs with getData will send their frame, the only difference it will not be in batch with connection, so it looks like unsub frame is also required for such subs.
Here's short representer, which leads to such frames order in open WS connection, you need to turn off/on your internet connection
const MIN_RETRY = 4000;
const MAX_RETRY = 4.611686018427388e21;
const client = new Centrifuge(centrifugeUrl, {
minReconnectDelay: MIN_RETRY,
maxReconnectDelay: MAX_RETRY,
token: centrifugoToken
});
window.addEventListener("offline", (event) => {
console.log("centrifugo went to connecting state, but did not receive connect response yet, removing sub")
let getDataPromiseResolver;
const offlineSub = client.newSubscription('test-channel22', {
getData: () => {
return new Promise((resolve) => {
getDataPromiseResolver = resolve
})
}
});
offlineSub.subscribe();
// You need to delay connection response or add such custom event in the end of onOpen callback document.dispatchEvent(new CustomEvent('centrifugo-connect-sent'))
document.addEventListener('centrifugo-connect-sent', () => {
setTimeout(() => {
getDataPromiseResolver({data: '123'});
setTimeout(() => {
client.removeSubscription(offlineSub);
}, 1)
}, 2)
})
});
client.connect();
Just got another thought, maybe it's better to introduce some kind of flag, that will indicate that transport is open and rely only on it when sending sub/unsub frames?
Looks like it will close and issue and also allow not only subs with getData to send their frame during connection process, which can speed up subscription process lit bit for some cases.
But I did not deep dive server code yet and not sure if it's ready for it, but from first glance, based on screenshot in my prev comment, looks like yes.
upd: I can prepare the PR if this idea makes sense
Yeah, but even the subs with getData will send their frame
May be a bug - they should not be included to optimistic batch. At least in my understanding. Let me check why this happens and come back.
Just got another thought, maybe it's better to introduce some kind of flag, that will indicate that transport is open and rely only on it when sending sub/unsub frames?
I like the idea, may even simplify some things in current implementation. One note - it's not possible to use in emulation (transport.emulation() === true
) scenario since we need to wait for connect reply with a node identifier in it. Maybe sth else will araise. If you want you can try making this, I will come back to this soon also - checking current getData
behaviour as a first step.
Well, it's not included in optimistic batch, it will be sent after it as a separate frame and looks like it works, based on responses from the server
Well, it's not included in optimistic batch, it will be sent after it as a separate frame and looks like it works, based on responses from the server
Sorry, I've meant why it's sent before connect reply received. Also wondering, is it the same now for subscription with token case (when no Subscription token
provided but Subscription getToken
function is used - i.e. no initial token scenario), or only getData
behaves like this.
Looks like not really same, all optimistic subs will early return here
And all non optimistic will end up here during connection
But there is possible to reach same behaviour if we call subscribe for such subscriptions during connected state, but getToken fn will be resolved during connecting state later, for example during reconnect
const MIN_RETRY = 4000;
const MAX_RETRY = 4.611686018427388e21;
const client = new Centrifuge(centrifugeUrl, {
minReconnectDelay: MIN_RETRY,
maxReconnectDelay: MAX_RETRY,
token: centrifugoToken
});
let getTokenPromiseResolver;
let getTokenSub
client.once('connected', () => {
getTokenSub = client.newSubscription('test-channel22', {
getToken: () => {
return new Promise((resolve) => {
getTokenPromiseResolver = resolve
})
}
});
getTokenSub.subscribe();
})
window.addEventListener("offline", (event) => {
console.log("centrifugo went to connecting state, but did not receive connect response yet, removing sub")
// You need to delay connection response or add such custom event in the end of onOpen callback document.dispatchEvent(new CustomEvent('centrifugo-connect-sent'))
document.addEventListener('centrifugo-connect-sent', () => {
setTimeout(() => {
getTokenPromiseResolver(centrifugoToken);
setTimeout(() => {
client.removeSubscription(getTokenSub);
}, 1)
}, 2)
})
});
client.connect();
UPD: Also, it looks like if promise will be resolved and transport will not be opened at that time, it will lead to an error
But there is possible to reach same behaviour if we call subscribe for such subscriptions during connected state, but getToken fn will be resolved during connecting state later, for example during reconnect
Looks like it's also a buggy current behaviour – it fails with unsubscribe in the same manner as optimistic subs. Right?
UPD: Also, it looks like if promise will be resolved and transport will not be opened at that time, it will lead to an error
You mean token promise may be resolved and _sendSubscribe will be called (here) without connection check? Seems true. Need to check connection status also.
Looks like it's also a buggy current behaviour – it fails with unsubscribe in the same manner as optimistic subs. Right?
Yep, it will lead to same behaviour
Looks like transport state could resolve all this problems. Already made draft and it works perfectly with transport that has separated logic for opening and sending first batch of commands. But if it's emulated transport, looks like it may require some kind of queue for unsub commands, that is called during opening process, if we sent optimistic commands. And separation of subscribe and unsubscribe commands, which will lead to situation where we will need to make one more call of sending missing commands.
I have seen you made a change in your PR which will set optimistic false for all emulation transport, looks like in such case we will never have to call _unsubscribe before such transport is really opened and it will not be a problem.
If its not really required to support optimistic subs for emulation, it will be easier to prepare proposal.
P.S. Just deployed fork to production, already see significant drop of unsubscribe errors from 100-200 to 20-40 per minute, but clients are still updating
Looks like transport state could resolve all this problems. Already made draft and it works perfectly with transport that has separated logic for opening and sending first batch of commands.
Yep, sounds great
But if it's emulated transport, looks like it may require some kind of queue for unsub commands, that is called during opening process, if we sent optimistic commands.
I think we can skip optimistic subs for emulation transports for now, it's a bit unfortunate, but I think it will be right to implement it as a separate step later if needed.
Made new PR - #278 , left some notes inside of it, can you check it out plz?
Closing in favour of https://github.com/centrifugal/centrifuge-js/pull/278
Alternative solution for #274