crypto-crawler / crypto-crawler-rs

A rock-solid cryptocurrency crawler library.
Apache License 2.0
227 stars 73 forks source link

After 3 days websocket disconnected (Failed to read, error: IO error: Connection reset by peer (os error 104)) and did not automatically reconnect after #59

Open panicfarm opened 1 year ago

panicfarm commented 1 year ago

I left the basic README websocket example running for a few days, and then ECONNRESET 104 Connection reset by peer occurred and websocket disconnected. After that ping/pongs were failing but there seems to be no auto-reconnection mechanism. I reproduced this several times. I think it should reconnect on ERROR crypto_ws_client::common::connect_async] Failed to read, error: IO error: Connection reset by peer (os error 104) and also on failed ping sends. The log at the moment of the disconnect 2023-01-13T12:44:15Z follows:

[2023-01-13T12:44:09Z INFO  qk] {"exchange":"kucoin","market_type":"spot","msg_type":"trade","received_at":1673613849587,"json":"{\"type\":\"message\",\"topic\":\"/market/match:SOL-USDT\",\"subject\":\"trade.l3match\",\"data\":{\"makerOrderId\":\"63c152190f2fb20001bd63ba\",\"price\":\"16.569\",\"sequence\":\"1184114405824513\",\"side\":\"buy\",\"size\":\"5.4773\",\"symbol\":\"SOL-USDT\",\"takerOrderId\":\"63c1521917f8760001e12e2b\",\"time\":\"1673613849515000000\",\"tradeId\":\"1184114405824513\",\"type\":\"match\"}}"}
[2023-01-13T12:44:10Z INFO  qk] {"exchange":"kucoin","market_type":"spot","msg_type":"trade","received_at":1673613850830,"json":"{\"type\":\"message\",\"topic\":\"/market/match:SOL-USDT\",\"subject\":\"trade.l3match\",\"data\":{\"makerOrderId\":\"63c15219d686c500014277ae\",\"price\":\"16.57\",\"sequence\":\"1184114486564865\",\"side\":\"buy\",\"size\":\"6.5612\",\"symbol\":\"SOL-USDT\",\"takerOrderId\":\"63c1521af109f00001e76310\",\"time\":\"1673613850760000000\",\"tradeId\":\"1184114486564865\",\"type\":\"match\"}}"}
[2023-01-13T12:44:12Z DEBUG crypto_ws_client::common::ws_client_internal] Instant { tv_sec: 80631244, tv_nsec: 862016328 } sending ping {"type":"ping", "id": "crypto-ws-client"}
[2023-01-13T12:44:12Z DEBUG crypto_ws_client::common::ws_client_internal] Received {"id":"crypto-ws-client","type":"pong"} from kucoin, reset num_unanswered_ping to 0
[2023-01-13T12:44:15Z INFO  qk] {"exchange":"kucoin","market_type":"spot","msg_type":"trade","received_at":1673613855208,"json":"{\"type\":\"message\",\"topic\":\"/market/match:SOL-USDT\",\"subject\":\"trade.l3match\",\"data\":{\"makerOrderId\":\"63c1521efe6ad80001d821ae\",\"price\":\"16.569\",\"sequence\":\"1184114833381377\",\"side\":\"buy\",\"size\":\"2.1725\",\"symbol\":\"SOL-USDT\",\"takerOrderId\":\"63c1521fbdf44300018a2cdc\",\"time\":\"1673613855040000000\",\"tradeId\":\"1184114833381377\",\"type\":\"match\"}}"}
[2023-01-13T12:44:15Z INFO  qk] {"exchange":"kucoin","market_type":"spot","msg_type":"trade","received_at":1673613855209,"json":"{\"type\":\"message\",\"topic\":\"/market/match:SOL-USDT\",\"subject\":\"trade.l3match\",\"data\":{\"makerOrderId\":\"63c1521efe6ad80001d821ae\",\"price\":\"16.569\",\"sequence\":\"1184114833643521\",\"side\":\"buy\",\"size\":\"0.6579\",\"symbol\":\"SOL-USDT\",\"takerOrderId\":\"63c1521f6a048f000144ef4c\",\"time\":\"1673613855040000000\",\"tradeId\":\"1184114833643521\",\"type\":\"match\"}}"}
[2023-01-13T12:44:15Z ERROR crypto_ws_client::common::connect_async] Failed to read, error: IO error: Connection reset by peer (os error 104)
[2023-01-13T12:44:15Z DEBUG tokio_tungstenite] websocket start_send error: IO error: Broken pipe (os error 32)
[2023-01-13T12:44:15Z DEBUG crypto_ws_client::common::ws_client_internal] Instant { tv_sec: 80631247, tv_nsec: 621612513 } sending ping {"type":"ping", "id": "crypto-ws-client"}
[2023-01-13T12:44:15Z ERROR crypto_ws_client::common::ws_client_internal] Error sending ping channel closed
[2023-01-13T12:44:43Z DEBUG crypto_ws_client::common::ws_client_internal] Instant { tv_sec: 80631275, tv_nsec: 862016328 } sending ping {"type":"ping", "id": "crypto-ws-client"}
[2023-01-13T12:44:43Z ERROR crypto_ws_client::common::ws_client_internal] Error sending ping channel closed
[2023-01-13T12:44:46Z DEBUG crypto_ws_client::common::ws_client_internal] Instant { tv_sec: 80631278, tv_nsec: 621612513 } sending ping {"type":"ping", "id": "crypto-ws-client"}
[2023-01-13T12:44:46Z ERROR crypto_ws_client::common::ws_client_internal] Error sending ping channel closed
[2023-01-13T12:45:14Z DEBUG crypto_ws_client::common::ws_client_internal] Instant { tv_sec: 80631306, tv_nsec: 862016328 } sending ping {"type":"ping", "id": "crypto-ws-client"}
[2023-01-13T12:45:14Z ERROR crypto_ws_client::common::ws_client_internal] Error sending ping channel closed
[2023-01-13T12:45:17Z DEBUG crypto_ws_client::common::ws_client_internal] Instant { tv_sec: 80631309, tv_nsec: 621612513 } sending ping {"type":"ping", "id": "crypto-ws-client"}
[2023-01-13T12:45:17Z ERROR crypto_ws_client::common::ws_client_internal] Error sending ping channel closed
[2023-01-13T12:45:45Z DEBUG crypto_ws_client::common::ws_client_internal] Instant { tv_sec: 80631337, tv_nsec: 862016328 } sending ping {"type":"ping", "id": "crypto-ws-client"}
[2023-01-13T12:45:45Z ERROR crypto_ws_client::common::ws_client_internal] Error sending ping channel closed
[2023-01-13T12:45:48Z DEBUG crypto_ws_client::common::ws_client_internal] Instant { tv_sec: 80631340, tv_nsec: 621612513 } sending ping {"type":"ping", "id": "crypto-ws-client"}                                                                :
[2023-01-13T12:45:48Z ERROR crypto_ws_client::common::ws_client_internal] Error sending ping channel closed
soulmachine commented 1 year ago

Indeed, this library can auto ping, but it lacks auto-reconnect. When the connection is broken, for now I suggest to exit the whole process and start the process again, for example, https://github.com/crypto-crawler/crypto-crawler-rs/blob/main/crypto-ws-client/src/common/ws_client_internal.rs#L197.

I'll think about a solution about how to re-connect.

panicfarm commented 1 year ago

Indeed, this library can auto ping, but it lacks auto-reconnect. When the connection is broken, for now I suggest to exit the whole process and start the process again, for example, https://github.com/crypto-crawler/crypto-crawler-rs/blob/main/crypto-ws-client/src/common/ws_client_internal.rs#L197.

I'll think about a solution about how to re-connect.

I think it's imperative for any solid crypto data recording or trading code to be continuously and reliably runnable. Therefore a fast autoreconnect solution (with the smallest possible gap in the recorded data) is necessary.

soulmachine commented 1 year ago

Agreed, in theory it is possible to support auto-reconnect as long as we remember all subscribed symbols.

panicfarm commented 1 year ago

If you could outline how the reconnect should occur, as well as where you think the best place is to store the subscribed symbols, I could probably submit a PR in a bit.

The original read error occurred in https://github.com/crypto-crawler/crypto-crawler-rs/blob/fb7b102190b7bb4eebafffb8ecddd184672d051f/crypto-ws-client/src/common/connect_async.rs#L104 . Then there was a subsequent independent error (because the socket was already disconnected) in tokio-tungstenite on https://github.com/snapview/tokio-tungstenite/blob/87d2f7eb09a538c0a0ee77bd92e032e362118f72/src/lib.rs#L337 (that is an implementation of the futures_util::sink::Sink trait ) . The docs of start_send() say "In most cases, if the sink encounters an error, the sink will permanently be unable to receive items." and thus we got subsequent repeat errors on pings sending. This actually tells me that after https://github.com/crypto-crawler/crypto-crawler-rs/blob/fb7b102190b7bb4eebafffb8ecddd184672d051f/crypto-ws-client/src/common/connect_async.rs#L104 it broke out of the loop in https://github.com/crypto-crawler/crypto-crawler-rs/blob/fb7b102190b7bb4eebafffb8ecddd184672d051f/crypto-ws-client/src/common/connect_async.rs#L76 and thus exited connect_async() https://github.com/crypto-crawler/crypto-crawler-rs/blob/fb7b102190b7bb4eebafffb8ecddd184672d051f/crypto-ws-client/src/common/connect_async.rs#L51 without error, because the error was never caught there it seems.

Perhaps we could catch there error on exit from connect_async_internal() (it appears that it actually returns an Ok() variant after breaking out of the receive loop, or there https://github.com/crypto-crawler/crypto-crawler-rs/blob/fb7b102190b7bb4eebafffb8ecddd184672d051f/crypto-ws-client/src/common/connect_async.rs#L53 where it's assigned to _ and initiate a reconnect?

PS. you can simulate kucoin closing websocket connection (sending RST, or ECONNRESET 104) by tcpkill host ws-api-spot.kucoin.com and port 443 while the process is running

foonsun commented 1 year ago

Indeed, this library can auto ping, but it lacks auto-reconnect. When the connection is broken, for now I suggest to exit the whole process and start the process again, for example, https://github.com/crypto-crawler/crypto-crawler-rs/blob/main/crypto-ws-client/src/common/ws_client_internal.rs#L197. I'll think about a solution about how to re-connect.

I think it's imperative for any solid crypto data recording or trading code to be continuously and reliably runnable. Therefore a fast autoreconnect solution (with the smallest possible gap in the recorded data) is necessary.

yes. autoreconnect solution is better than pm2 restart.

panicfarm commented 1 year ago

@soulmachine should I work on a PR or will you add this functionality?

soulmachine commented 1 year ago

@panicfarm I would appreciate if you could create a PR. I think you need to modify line 197 and 221 in ws_client_internal.rs

Currently the two lines just crash for simplicity, you'll need to modify the two lines to support auto reconnect

panicfarm commented 1 year ago

@soulmachine When it disconnects due to ECONNRESET 104 (presumably when an exchange restarts its server), the error occurs on 104 in connect_async.rs and it returns from connect_async here without an error and without a message to be handled in the while loop here. Therefore it never gets to neither 197 nor 221 in ws_client_internal.rs. I think it should return an Error from connect_async here and handle the reconnect on matching that Error variant. Do you agree?

somefact commented 1 year ago

But without modifiy lines 197 and 221 in ws_client_internal.rs it will not reconnect on websocket Closed (very often case, especially on binance perp) or some kind of Reconnect message ..

panicfarm commented 1 year ago

@somefact yes, there should be the same reconnection logic in all these places, although i have only encountered three disconnects on 104 in connect_async.rs after running it for about 10.days total with Kucoin

soulmachine commented 1 year ago

@panicfarm You've made very good point, I think you can start to implement the re-connection logic inside connect_async_internal()

Feel free to send a PR.

panicfarm commented 1 year ago

Feel free to send a PR.

@soulmachine I have looked at the code in detail and would appreciate any feedback before I make changes.

When the exchange reboots its server, the disconnect happens here. However, by this time you have already returned from connect_async_internal because you drop the handle to tokio::task::spawn and therefore the program returns from ws_client.send(&commands).await and you are now in https://github.com/crypto-crawler/crypto-crawler-rs/blob/fb7b102190b7bb4eebafffb8ecddd184672d051f/crypto-crawler/src/crawlers/kucoin.rs#L24.

  1. I could hold on to the send task handle

    let send_task_handle = tokio::task::spawn(async move {
    loop {
        tokio::select! {
          command = command_rx.recv() => {
            match command {
              Some(command) => {
                match command {
    ...

    and pass it into run(), and then when the error occurs, error out in run and enclose the entire send();run(); block in each exchange (unfortunately) into a reconnect loop, something like

    loop {
        let ws_client = KuCoinSpotWSClient::new(tx, None).await;
        match ws_client.send(&commands).await {
            Error => continue,
            Ok(send_task_handle) => { 
                  match ws_client.run(send_task_handle).await {
                  Error => continue;
            }
        }
    }
    ws_client.close().await;
  2. Alternatively, I could somehow cram the reconnect logic into https://github.com/crypto-crawler/crypto-crawler-rs/blob/fb7b102190b7bb4eebafffb8ecddd184672d051f/crypto-ws-client/src/common/connect_async.rs#L59 but I am not sure if it's a good idea, because by this time you are already in the run() function, except that the task that you spun in send() function was also running separately.

soulmachine commented 1 year ago

@panicfarm The first way looks ugly because you make run() return something weird, which looks scary to users, run should always return void.

The second way looks better, handling the re-connect in connect_async_internal() so that it is invisible to users.

panicfarm commented 1 year ago

@soulmachine I don't think I can completely confine it inside connect_async_internal(ws_stream :WebSocketStream, ...), because after that disconnect ws_stream that is passed into the function is no longer connected? At the minimum I have to do it in 'connect_async()'.

But what about the other cases of disconnect, mentioned in this issue? They happen inside of run(). Would be nice to unify them all.

soulmachine commented 1 year ago

@panicfarm Yes, let's encapsulate re-connection logic inside connect_async() instead of connect_async_internal(). For line 197, we can check msg inside connect_async_internal(), if msg is a Message::Close, then we should re-connect. As to line 221, let's ignore it for now

hittori-hanzo commented 1 year ago

hey fellas did this fyix ever get done?