altangent / ccxws

WebSocket client for 38 cryptocurrency exchanges
MIT License
617 stars 186 forks source link

Kucoin: wait for welcome message before sending subscription requests #207

Open WoutervDijk opened 4 years ago

WoutervDijk commented 4 years ago

Exchange Kucoin

Subscription type L3orderbook

Describe the bug For some reason if I try to connect to the kucoin L3 orderbook it will often not send any updates at all. However, this bug is very inconsistent, sometimes it will connect and stream updates other times it won't at all.

To Reproduce Run the following code in a AWS instance close to kucoin, like ap-northeast-1

/**
 * Prototype for maintaining a Level 3 order book for Kucoin according
 * to the instructions defined here:
 * https://docs.kucoin.com/#full-matchengine-data-level-3
 *
 * This technique uses a Map to store orders. It has efficient updates
 * but will be slow for performing tip of book or snapshot operations.
 *
 * # Example
 * ```javascript
 * */
const ccxws = require("ccxws");
const KucoinOrderBook = require("ccxws/src/orderbooks/KucoinOrderBook");
let market = {id: "BTC-USDT", base: "BTC", quote: "USDT"};
markets=[market, {id: "AMPL-USDT", base: "AMPL", quote: "USDT"}, {id: "SXP-USDT", base: "SXP", quote: "USDT"}]
for (let market of markets) {
    let updates = [];
    let ob;
    const client = new ccxws.Kucoin();
    client.subscribeLevel3Updates(market);
    client.on("l3snapshot", async (snapshot) => {
        console.log("length before", updates.length)
        await sleep(5000);
        if (updates.length === 0){
            console.log("Update length 0 wait longer");
            await sleep(5000);
        }
        console.log("updates after", updates.length);
        ob = new KucoinOrderBook(snapshot, updates);
        console.log("running ", market.id);
    });
    client.on("l3update", update => {
        // enqueue updates until snapshot arrives
        if (!ob) {
            updates.push(update);
            return;
        }
        // validate the sequence and exit if we are out of sync
        if (ob.sequenceId + 1 !== update.sequenceId) {
            console.log(`out of sync, expected ${ob.sequenceId + 1}, got ${update.sequenceId}`);
            //process.exit(1);
        }
        // apply update
        ob.update(update);
    });
}
function sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
}

The exact error I get is as follow: (node:120179) UnhandledPromiseRejectionWarning: Error: Missing update or Must queue updates prior to snapshot, which is the real problem The weirdest thing is, is that it does work at my home computer (Europe), but my latency is also very high here. However, using an AWS instance it will only connect sometimes?!?!? It might be because the snapshot call is too fast after trying to connect but I have no idea tbh. This especially happens when I try to run multiple coins at the same time (using a new ccxws object for each). Especially the SXP/USDT pair seems to fail often. Could we maybe add a function which retries connecting to the socket if no updates start streaming? EDIT: I am actually thinking that it might just be Kucoin servers that are a little broken, perhaps a function to reconnect with the socket would suffice.

Second EDIT: This is the exact output of the error I meant:

length before 0
length before 21
updates after 181
running  SXP-USDT
Update length 0 wait longer
updates after 1416
(node:121384) UnhandledPromiseRejectionWarning: Error: Missing update
    at new KucoinOrderBook (node_modules/ccxws/src/orderbooks/KucoinOrderBook.js:85:15)
    at KucoinClient.<anonymous> (test.js:29:14)
(Use `node --trace-warnings ...` to show where the warning was created)
(node:121384) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). To terminate the node process on unhandled promise rejection, use the CLI flag `--unhandled-rejections=strict` (see https://nodejs.org/api/cli.html#cli_unhandled_rejections_mode). (rejection id: 1)
(node:121384) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
updates after 0
(node:121384) UnhandledPromiseRejectionWarning: Error: Must queue updates prior to snapshot
    at new KucoinOrderBook (node_modules/ccxws/src/orderbooks/KucoinOrderBook.js:59:13)
    at KucoinClient.<anonymous> (test.js:29:14)
(node:121384) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). To terminate the node process on unhandled promise rejection, use the CLI flag `--unhandled-rejections=strict` (see https://nodejs.org/api/cli.html#cli_unhandled_rejections_mode). (rejection id: 2)
bmancini55 commented 4 years ago

Thanks for submitting the issue. Those errors occur in the prototype l3 orderbook when it receives a snapshot but can't properly replay queued updates (either because there are no updates yet or the update stream started after the snapshot's sequence time period).

https://github.com/altangent/ccxws/blob/master/src/orderbooks/KucoinOrderBook.js#L59 and https://github.com/altangent/ccxws/blob/master/src/orderbooks/KucoinOrderBook.js#L85

The issue is result of a race condition in how the snapshot is obtained from REST. If the update stream starts before the REST response is obtained everything is ok. But it is possible that the REST request for the snapshot is initiated prior to the update stream starting which would result in the above errors.

https://github.com/altangent/ccxws/blob/master/src/exchanges/kucoin-client.js#L237

Coding wise, a hacky fix for this is adding a small delay to the sending of the REST request. The proper fix is sending the REST request only after we've successfully subscribed to the update stream.

In the interim, you can monkey patch a delay into _requestLevel3Snapshot. Something like:

const REST_DELAY_MS = 50
client._originalRequestLevel3Snapshot = client._requestLevel3Snapshot;
client._requestLevel3Snapshot = market => setTimeout(() => client._originalRequestLevel3Snapshot(market), REST_DELAY_MS);

Unfortunately there is no option to disable the REST request in Kucoin like there is other clients. If so, you could disable the automatic REST request and initiate the REST request after you're seen the first l3update event for a market. The gotcha is that if there is a disconnection you will need to re-fetch the REST request once the socket is connected again.

WoutervDijk commented 4 years ago

Thank you for your fast response @bmancini55. But I am not really sure whether this is the bug I had in mind (I think it is a different one). I outlined the problem incorrectly in my post, so let me explain it in a different way. In the current code I have used (note I edited it in the original post) in case we receive our snapshot I wait for 5 seconds before processing the orderbook. So I don't think the race condition bug is valid. After those 5 seconds I check whether the updates length is 0 (so we didn't receive any updates), and wait for another 5 seconds to see if updates would start streaming. But even after those 5 seconds, updates still don't start streaming updates after 0. So I am afraid that sometimes the websocket won't connect to kucoins api??

bmancini55 commented 4 years ago

Ah ok, I should have read the description more carefully. So you're not getting updates at all in certain circumstances? If so you can add a log message here to see whether the subscription is being ack'd or has an error: https://github.com/altangent/ccxws/blob/master/src/exchanges/kucoin-client.js#L282

The code should probably emit an error event if it receives an error message.

WoutervDijk commented 4 years ago

Yes, that is exactly the case. No updates at all in some cases. I have added the log and in case I receive no updates then I also do not receive an ack. Furthermore, no error is emitted as well (at that location).

bmancini55 commented 4 years ago

Interesting, do you get a connected event?

WoutervDijk commented 4 years ago

On line 46 I added the following this._wss.on("connected", () => console.log("connected"));. And I can confirm that is does emit a connected event, but no ack is received.

WoutervDijk commented 4 years ago

Any update @bmancini55 ?

bmancini55 commented 4 years ago

Hi @WoutervDijk, I haven't had a chance to dig into it yet.

You want to log anything that comes down the socket at the beginning of the _processMessage method. There might be a message that is being missed.

bmancini55 commented 4 years ago

@WoutervDijk I wasn't able to reproduce locally but I made a few changes to the client to improve reliability of message sends. Let me know if you're still seeing the issue after updating: https://github.com/altangent/ccxws/releases/tag/v0.37.7

WoutervDijk commented 4 years ago

@bmancini55 thank you very much for your response. It did unfortunately not solve the problem. I will have a look at it this weekend and log everything _processmessage and see if I can find something. What OS were you using and which Node version when you tested it?

bmancini55 commented 4 years ago

Bummer. I'm running node v12.10 on macOS and Pop_OS. Couldn't get it to reproduce on either.

WoutervDijk commented 4 years ago

This might be a fault in my OS (Ubuntu) or Nodejs. I have downgraded to Node 12 (latest) and it is working for now. I'll close the issue.

WoutervDijk commented 4 years ago

@bmancini55 I think I found the error. If you enable tcp_fastopen (on your OS) then for some reason Kucoin won't open a connection all the time! Anyway we can disable that with a particular call in the websocket (when we open the connection) library? Then people can keep tcp_fastopen enabled on their system, without this bug. I know it is possible in C++. Thanks

EDIT: My hypothesis seems incorrect. I will look at it tomorrow

WoutervDijk commented 4 years ago

In line 333 I added a logger for any other message. What is interesting is that even if I receive NO updates afterwards, I still receive a welcome message with an id. I also receive this when I DO get updates.

EDIT it might be best to check whether we receive an ACK within e.g. 10 secs and if not retry connecting?

WoutervDijk commented 4 years ago

I think this is the best solution. I have tried all kinds of tuning on two different OS (clear linux and Ubuntu), but for some reason the KUCOIN server won't send and ACK. It does connect though via the welcome message. It so weird, might be a bug at kucoin server side?

WoutervDijk commented 4 years ago

I am just thinking out loud now, but the kucoin documentation says that the ID given must be unique https://docs.kucoin.com/#subscribe. The code now uses the current milliseconds, so what if your server is REALLY fast, then it wouldn't be unique anymore right. And what if someone else uses the same ID, would that also result in undefined behaviour at kucoin servers?

WoutervDijk commented 4 years ago

@bmancini55 I think you want to rewrite watcher because it seems to NOT consider the fact that you might have never received a message in the first place. In which case it should restart the connection

WoutervDijk commented 4 years ago

@bmancini55 I have found it!!! Because I am so close to Kucoin servers, Kucoin servers can't handle the subscription event and drops it. I think this happens because right after the socket is connected, we immediately ask for a subscription and the socket might not have been fully ready. So I added a small delay to the _onConnected method (line 223 basic-client.js). This is the code that works for me:

  _onConnected() {
    this.emit("connected");
    setTimeout(() => {    for (let [marketSymbol, market] of this._tickerSubs) {
      this._sendSubTicker(marketSymbol, market);
    }
      for (let [marketSymbol, market] of this._candleSubs) {
        this._sendSubCandles(marketSymbol, market);
      }
      for (let [marketSymbol, market] of this._tradeSubs) {
        this._sendSubTrades(marketSymbol, market);
      }
      for (let [marketSymbol, market] of this._level2SnapshotSubs) {
        this._sendSubLevel2Snapshots(marketSymbol, market);
      }
      for (let [marketSymbol, market] of this._level2UpdateSubs) {
        this._sendSubLevel2Updates(marketSymbol, market);
      }
      for (let [marketSymbol, market] of this._level3UpdateSubs) {
        this._sendSubLevel3Updates(marketSymbol, market);
      }
      this._watcher.start();}, 1000);
  }
}
bmancini55 commented 3 years ago

Circling back on this. Awesome find, thanks for tracking down the issue!!

I found this nugget in the doco. I should be more mindful of reading the fine print :rofl:

Only when the welcome message is received will the connection be available

Architecturally, I'll add a new connection related event, ready which will fire after connected and will occur if there are any setup messages that need to be initiated. For exchanges that don't require this functionality, the ready event will fire immediately after the connected event. This will be useful in a broader context for sockets that need to authenticate or perform setup upon connection.

Until I can get a fix released, you can monkey patch a delay into your client with something like:

let client = new ccxws.Kucoin();
client._originalOnConnected = client._onConnected.bind(client);
client._onConnected = () => setTimeout(client._originalOnConnected, 500);
WoutervDijk commented 3 years ago

Awesome find! Sounds like a good fix. Like they always say RTFM 😄