Closed Aspirationtocode closed 1 month ago
Are you opening these 2000+ kline streams in a very short amount of time (e.g. within a loop), without any kind of delay?
At the moment, with this SDK, each websocket stream is a new connection dedicated to that stream. If you're trying to open that many in a short amount of time it might be that some don't successfully connect - they'll just hang before an eventual timeout. Are these timeouts taking a while to show?
What you can do is build some tracking logic. This should track:
By doing this, you can set your own timeout to abandon a "stuck" connection attempt and start a new one. This is an approach I've taken in a service that does something similar, opening almost 3k individual streams in a short amount of time.
In terms of tracking this, each connection in my websocket clients has a unique primary-key style property you can use to uniquely identify the connection. I call this the "wsKey", and this is heavily used by the SDK to track any per-connection state and functionality. When you call subscribeKlines to open a kline stream, the "WebSocket" object actually also includes the "wsKey" property. If you store this, you can set a timer to check after x seconds if the connection opened yet. If not, ask the SDK to cancel the connection and open a new one.
Pseudo example based on what I did in my own microservice:
private expectedConnections: Map<string, Date> = new Map()
private openConnections: Map<string, Date> = new Map()
private wsKeyContextStore: Record<
string, // ws key
{ symbol: string; interval: KlineIntervals }
> = {}
....
this.wsClient.on('open', (event) => {
// track successful connections
this.openConnections.set(wsKey, new Date());
const totalExpected = this.expectedConnections.size;
const totalConnected = this.openConnections.size;
this.logger.log(`Total ${totalConnected}/${totalExpected} ws connections open | (${wsKey} connected)`);
const { symbol, interval } = this.wsKeyContextStore[wsKey];
const isKlineTopic = wsKey.includes('kline');
if (symbol && interval && isKlineTopic) {
this.queueKlineBackfill(symbol, interval);
}
if (totalConnected === totalExpected) {
this.logger.log(`All WS connections are now open`);
this.didFinishConnectingWS = true;
} else {
// more to go
}
})
symbols.forEach((symbol) => {
intervals.forEach((interval) => {
const response = this.wsClient.subscribeKlines(symbol, interval, 'usdm');
const wsKey = response.wsKey;
if (wsKey) {
this.wsKeyContextStore[wsKey] = {
symbol,
interval
};
this.expectedConnections.set(wsKey, new Date());
this.scheduleWsDidOpenOrReconnectCheck(wsKey, 30);
} else {
this.logger.error('no wskey? ' + { symbol, interval, wsKey });
}
});
});
...
public forceReconnect(wsKey: string) {
this.ws.close(wsKey, true)
}
/** Sets a timer to check if a WS is open before a timeout. If not, forces a reconnect */
private scheduleWsDidOpenOrReconnectCheck(wsKey: string, forceReconnectAfterSeconds: number) {
const timeout = setTimeout(() => {
if (!this.openConnections.has(wsKey)) {
this.logger.log(`WS ${wsKey} failed to open after ${forceReconnectAfterSeconds} seconds... forcing respawn and rechecking with delay...`)
this.forceReconnect(wsKey)
this.scheduleWsDidOpenOrReconnectCheck(wsKey, forceReconnectAfterSeconds * 1.25)
}
clearTimeout(timeout)
}, forceReconnectAfterSeconds * 1000)
}
There's more to it, but hopefully this extract gives an idea on one way to approach this. Start connections, check they're open before a countdown, if not, ask the SDK to kill-and-retry that connection using the "wsKey" to identify it. The SDK will handle the heavy lifting from here.
Closing this for now but feel free to reopen / open new issues as needed
I need to sub 2000+ kline streams, when i do it i get errors, like this: error: AggregateError at internalConnectMultiple (node:net:1116:18) at afterConnectMultiple (node:net:1683:7) { code: 'ETIMEDOUT',
What can i do with it?