bmoscon / cryptofeed

Cryptocurrency Exchange Websocket Data Feed Handler
Other
2.21k stars 682 forks source link

Orderbook updates missing between snapshot and websocket stream for KuCoin #1016

Open Telofy opened 7 months ago

Telofy commented 7 months ago

Describe the bug Sometimes there's a gap of missing orderbook updates between the snapshot that Cryptofeed pulls from KuCoin and the stream of incremental updates that it applies to it. This will usually go unnoticed (wrong but plausible orderbooks), but in some cases it triggers the cross_check (if cross_check=True).

To Reproduce Steps to reproduce the behavior:

  1. Set cross_check=True on the feed.
  2. Subscribe to some orderbook feeds.
  3. If there's no error within the first 10 s or so, Ctrl+C and start over.

It usually happened to me on every 3–4th try with some 5ish feeds.

Expected behavior No missing updates.

Operating System:

Cryptofeed Version

Here's my workaround:

from __future__ import annotations

from collections import defaultdict, deque
from typing import Any, DefaultDict, Deque

from cryptofeed.defines import L2_BOOK
from cryptofeed.exchanges.kucoin import KuCoin as BaseKuCoin
from cryptofeed.types import OrderBook

# Some imports omitted

class KuCoin(BaseKuCoin):
    recent_updates: DefaultDict[str, Deque[dict[str, Any]]]
    seq_no: DefaultDict[str, int]

    def __init__(self, *args: Any, **kwargs: Any):
        super().__init__(*args, **kwargs)
        # maxlen=100 was sometimes not enough
        self.recent_updates = defaultdict(lambda: deque(maxlen=1000))

    async def _snapshot(self, symbol: str):
        await super()._snapshot(symbol)
        assert self.recent_updates[symbol][0]["data"]["sequenceStart"] <= self.seq_no[symbol], (
            f"Sequence number {self.seq_no[symbol]} earlier than range "
            f"{self.recent_updates[symbol][0]['data']['sequenceStart']} to "
            f"{self.recent_updates[symbol][-1]['data']['sequenceStart']}"
        )
        relevant_updates = [
            update
            for update in self.recent_updates[symbol]
            if update["data"]["sequenceStart"] > self.seq_no[symbol]
        ]
        logger.debug("Reapplying {} updates for {}".format(len(relevant_updates), symbol))
        for update in relevant_updates:
            await super()._process_l2_book(update, symbol, 0.0)

    async def _process_l2_book(self, msg: dict[str, Any], symbol: str, timestamp: float):
        assert self.recent_updates is not None
        self.recent_updates[symbol].append(msg)
        if self.recent_updates[symbol].maxlen == len(self.recent_updates[symbol]):
            # Wait until the queue is full before processing the first update because the first
            # processing will trigger the snapshot fetching, at which point we want to have a solid
            # backlog of updates to reapply.
            await super()._process_l2_book(msg, symbol, timestamp)

This implementation waits for 1000 updates to come in to maximize the chances of having all the necessary updates to bring the snapshot up to date. An alternative would be to always collect updates (any number) for 30 s or so. That would bound the startup time for illiquid markets.

Log:

2024-03-06 21:04:34.832 | DEBUG    | Reapplying 477 updates for FET-USDT
2024-03-06 21:04:40.553 | DEBUG    | Reapplying 195 updates for ETH-USDT
2024-03-06 21:04:42.824 | DEBUG    | Reapplying 22 updates for INSP-USDT

Thank you!

bmoscon commented 6 months ago

@Telofy - I understand your code and what you're saying you did to fix it, but this behavior seems odd. Is it really the case that the orderbooks are so out of date when you initially subscribe that you need to wait this long for a valid book? I might take a look at the API again to make sure it hasnt been updated or something else since it was last looked at.

Telofy commented 6 months ago

Thanks for looking into it! We've switched to waiting 10 s. That seems to generally be enough. On very low-volume markets it would take minutes to collect 1000 updates, which is quite unnecessary.

        if self.recent_updates[symbol] and (
            timestamp > self.recent_updates[symbol][0]["data"]["time"] / 1000 + 10
        ):
bmoscon commented 6 months ago

this seems to be more a bug with the exchange, or at least a bug in their documentation since I confirmed the code is doing what they specify - wait for a book message on websocket, then request a snapshot, then apply the messages on top of snapshot. For it to sometimes be ok and sometimes not makes me want to drop support for either the exchange or this book type. I notice the exchange supports l2 snapshots only at 100ms updated intervals, depth 50. Better than nothing, but I assume people would balk at the reduced depth.