ccxt / ccxt

A JavaScript / TypeScript / Python / C# / PHP cryptocurrency trading API with support for more than 100 bitcoin/altcoin exchanges
https://docs.ccxt.com
MIT License
32.73k stars 7.5k forks source link

`watch_orders()`, incremental data structures, and lost updates #10754

Open pietrodn opened 2 years ago

pietrodn commented 2 years ago

Hello, I'm using the latest version of CCXT Pro on Python 3.9 and I have a doubt.

Consider this sequence of events, for example on the same symbol on Binance exchange. We have the newUpdates flag set in the exchange config.

  1. Userland calls binance.watch_orders(), the coroutine blocks
  2. An order update arrives and is processed by Exchange.handle_order(). The order is open.
  3. The future is resolved, binance.watch_orders() unblocks and returns the update in userland. All is good.
  4. An order update arrives and is processed by Exchange.handle_order(). The order is now closed and no further updates will be sent from the exchange.
  5. Userland calls again watch_orders(). It blocks and never resolves because no new event will be streamed, and therefore the future will never be resolved.

This gives us a problem: how to get all the order updates that happened since the latest watch_orders() call? watch_orders(), with or without the newUpdates option, won't give us what we need, as we cannot guarantee that no event arrives after the end of an invocation of watch_orders() and the beginning of the next one.

Should we use the throttling mode? How so? Thank you! πŸ™πŸΌ

kroitor commented 2 years ago

An order update arrives and is processed by Exchange.handle_order(). The order is now closed and no further updates will be sent from the exchange.

First and foremost, you should be calling watch_orders() over and over sequentially by design. If the order was closed and you were watching at the time – you will get the update. And in an async context it cannot be skipped. Therefore you might want to make sure that your await exchange.watch_orders() call is the only async-context-switch in the related watching loop, while the updates are consumed and handled in parallel loops in userland.

Do you actually have this problem with missing "closed" order updates?

pietrodn commented 2 years ago

We observed missing order updates in production that are explained by this issue. We overrode handle_order() to log all the raw Binance WebSocket events and all of them were received, so this is clearly a library/client issue. I managed to reproduce this deterministically by feeding these raw events to handle_order() and putting some asyncio.sleep() calls to simulate delays in the watch_orders() calls.

Unfortunately our application is a complex asyncio program and we cannot simply ensure that watch_orders() is always scheduled as we would like it to be.

pietrodn commented 2 years ago

What I would like to achieve is the equivalent of a queue-based system, where handle_order() is the producer and watch_orders() is the consumer.

kroitor commented 2 years ago

@pietrodn can you please paste a minimal snippet that would be sufficient to reproduce it? If not, no worries, i will make one to debug this issue and will post it here.

pietrodn commented 2 years ago

Here is a snippet that successfully reproduces the problem: https://gist.github.com/pietrodn/460181c5a0709f659654e3883359f970

I inserted some asyncio.sleep() calls to make the issue explicit and deterministic.

kroitor commented 2 years ago

The problem is exactly this line:

That's exactly what I mentioned here: https://github.com/ccxt/ccxt/issues/10754#issuecomment-983935855

Why would you want a synthetic sleep and a context-switch there? That ruins the purpose of continuous processing/consuming of the updates. You can use asyncio's primitives (Queues, etc) to sync the watching loop with the processing loop. Without that line I'm having the following output:

2021-12-05T23:30:05.843Z connecting to wss://fstream.binance.com/ws/K9VrCLbpB5mhZLBdPYzpTQwZFxSvGeFEp62jwuRff3HblCvZuR32GfZGmu2lI5nj with timeout 10000 ms
[2021-12-06 02:30:06,421.421] INFO: πŸ”΅ Feeding order update message (filled=0)...
[2021-12-06 02:30:06,422.422] INFO:     end watch_orders()
[2021-12-06 02:30:06,422.422] INFO: πŸ”΄ Processed order update: filled 0.0 of 31.5, open
[2021-12-06 02:30:06,422.422] INFO:     start watch_orders()
[2021-12-06 02:30:06,626.626] INFO: πŸ”΅ Feeding order update message (filled=0.2)...
[2021-12-06 02:30:06,627.627] INFO:     end watch_orders()
[2021-12-06 02:30:06,627.627] INFO: πŸ”΄ Processed order update: filled 0.2 of 31.5, open
[2021-12-06 02:30:06,627.627] INFO:     start watch_orders()
[2021-12-06 02:30:06,828.828] INFO: πŸ”΅ Feeding order update message (filled=0.5)...
[2021-12-06 02:30:06,830.830] INFO:     end watch_orders()
[2021-12-06 02:30:06,830.830] INFO: πŸ”΄ Processed order update: filled 0.5 of 31.5, open
[2021-12-06 02:30:06,830.830] INFO:     start watch_orders()
2021-12-05T23:30:06.928Z connected
2021-12-05T23:30:06.929Z ping loop
[2021-12-06 02:30:07,031.031] INFO: πŸ”΅ Feeding order update message (filled=3.4)...
[2021-12-06 02:30:07,032.032] INFO:     end watch_orders()
[2021-12-06 02:30:07,032.032] INFO: πŸ”΄ Processed order update: filled 3.4 of 31.5, open
[2021-12-06 02:30:07,032.032] INFO:     start watch_orders()
2021-12-05T23:30:07.193Z pong WSMessage(type=<WSMsgType.PONG: 10>, data=bytearray(b''), extra='')
[2021-12-06 02:30:07,233.233] INFO: πŸ”΅ Feeding order update message (filled=15.6)...
[2021-12-06 02:30:07,233.233] INFO:     end watch_orders()
[2021-12-06 02:30:07,233.233] INFO: πŸ”΄ Processed order update: filled 15.6 of 31.5, open
[2021-12-06 02:30:07,234.234] INFO:     start watch_orders()
[2021-12-06 02:30:07,439.439] INFO: πŸ”΅ Feeding order update message (filled=31.5)...
[2021-12-06 02:30:07,440.440] INFO:     end watch_orders()
[2021-12-06 02:30:07,440.440] INFO: πŸ”΄ Processed order update: filled 31.5 of 31.5, closed
[2021-12-06 02:30:07,440.440] INFO:     start watch_orders()
pietrodn commented 2 years ago

Why would you want a synthetic sleep and a context-switch there?

Obviously I don't have any such explicit sleep in production code. The problem I have is that my program is a complex asyncio application running many coroutines in the same event loop, and asyncio may decide to run a different coroutine after the end of a watch_orders(), for example a watch_ticker() called in a different coroutine, running concurrently. This would have the same effect of the asyncio.sleep() statement.

In general, there's no way (AFAIK) to tell the event loop it should execute the coroutine called in an await() first, as there may be other awaitables in the event queue.

kroitor commented 2 years ago

The problem I have is that my program is a complex asyncio application running many coroutines in the same event loop, and asyncio may decide to run a different coroutine after the end of a watch_orders(),

That's true, asyncio decides that.

for example a watch_ticker() called in a different coroutine, running concurrently. This would have the same effect of the asyncio.sleep() statement.

Nope, the effect is not the same, it is actually very different, cause adding the await sleep call allows the asyncio to do the async context switch in between the calls to await watch. That's exactly the point i'm trying to make.

If you continuously call await watch without intermediate awaits, then all events will be captured successfully (including closed orders).

You can process those events in parallel coroutines independently as you like and you can also synchronize your execution and data flows by means of asyncio as you like, just don't insert another await into the while True: await watch loop. Launch the other coroutines as parallel tasks or parallel futures on the same asyncio loop – that will work fine. Nothing will break as long as there's only synchronous code in between the watch-loop iterations.

If you insert an extra await there – you're kinda making it miss some of the updates on purpose (technically you are not watching for them as they are coming in within that short period of time). That extra await could've and should've resided elsewhere in another coroutine that runs on the same asyncio loop, if you really need it.

kroitor commented 2 years ago

@pietrodn i mean, with this code it runs multiple parallel coroutines on the same asyncio loop and there's no context-switches in between the watch_orders-calls:

async def main() -> None:
    # credentials = get_binance_credentials()  # XXX: replace this!
    client = ccxtpro.binanceusdm({
        "apiKey": "apiKey",
        "secret": "secret",
    })

    market = "GTC/USDT"
    orders = []

    _LOGGER.info(f"Expecting {len(BINANCE_MESSAGES)} order updates.")

    async def orders_listener() -> None:
        nonlocal orders
        try:
            while True:
                _LOGGER.info("    start watch_orders()")
                new_orders = await client.watch_orders(market)
                _LOGGER.info("    end watch_orders()")
                for o in new_orders:
                    _LOGGER.info(
                        f"πŸ”΄ Processed order update: filled {o['filled']} of {o['amount']}, {o['status']}"
                    )
                orders += new_orders

                # If handle_order() is called during this downtime, the next watch_orders()
                # does not pick it up.
                # await asyncio.sleep(0.3)
        except asyncio.CancelledError:
            return

    async with client:
        order_task = asyncio.create_task(orders_listener())

        # Wait for the listening loops to start
        await asyncio.sleep(2)

        wsclient = client.client(
            client.urls["api"]["ws"]["future"] + "/" + client.options["future"]["listenKey"]
        )
        for msg in BINANCE_MESSAGES:
            # Manually feed a synthetic WebSocket message to the client
            wrapped_msg = {
                "e": "ORDER_TRADE_UPDATE",
                "E": int(time.time() * 1000),
                "T": int(time.time() * 1000),
                "o": msg,
            }
            _LOGGER.info(f"πŸ”΅ Feeding order update message (filled={msg['z']})...")
            client.handle_order_update(wsclient, wrapped_msg)
            await asyncio.sleep(0.2)

        await asyncio.sleep(2)
        order_task.cancel()
        await order_task

    _LOGGER.info(f"Received {len(orders)} order updates.")

You can launch many coroutines on the same asyncio loop and you can make them talk to each other and wait for each other if necessary. With the unnecessary await commented that code works as designed and consumes all messages regardless of when asyncio decides to switch the context and regardless of which context it switches to.

pdivos commented 2 years ago

Thank you @kroitor this did solve my problem as well!

To reiterate, there is a huge difference between:

async def listener_task():
  while True:
    orders = await client.watch_orders()
    await asyncio.sleep(0)

and

queue = asyncio.Queue()
async def listener_task():
  while True:
    orders = await client.watch_orders()
    await queue.put(orders)

In the first case asyncio.sleep ultimately contains a yield statement which will hand back control to the event loop which as @pietrodn and myself found, might cause some messages being skipped by the time control is back.

In the second case await queue.put(orders) contains no yield statement, therefore will not hand back control to the loop and all control-hand-back happens within watch_orders ensuring no updates are lost.

ShieldTrade commented 2 years ago

@pdivos while the above is true in my opinion there is a bug in watch_orders withnew_updates: True as you can see by yourself running this code adapted from the official example binance-watch-orders-being-placed.py

import ccxtpro
from asyncio import get_event_loop, gather
import asyncio
from pprint import pprint

# from https://github.com/ccxt/ccxt/commit/79870591db8f5dbfc995fbe2102f3b237d590c0d

async def place_delayed_order(exchange, symbol, amount):
    try:
        await exchange.sleep(1000)  # wait a bit
        # order = await exchange.create_limit_buy_order(symbol, amount, price)
        asyncio.create_task(exchange.create_order(symbol, 'market', 'buy', amount, None, {'tdMode': 'isolated'}))
        asyncio.create_task(exchange.create_order(symbol, 'market', 'sell', amount, None, {'tdMode': 'isolated'}))
        asyncio.create_task(exchange.create_order(symbol, 'market', 'buy', amount, None, {'tdMode': 'isolated'}))
        asyncio.create_task(exchange.create_order(symbol, 'market', 'sell', amount, None, {'tdMode': 'isolated'}))
    except Exception as e:
        # break
        print(e)

async def watch_orders_loop(exchange, symbol):
    list_order_arrived = []
    while True:
        try:
            orders = await exchange.watch_orders(symbol)
            print(exchange.iso8601(exchange.milliseconds()), 'watch_orders_loop', len(orders), ' last orders cached')
            print('orders arrived',orders )
            for x in range(0, len(orders)):
                keys_i_want = ['id', 'type', 'status']
                order_my_dict = {your_key: orders[x][your_key] for your_key in keys_i_want}
                list_order_arrived.append(order_my_dict)
                print(' ***** list_order_arrived', list_order_arrived)
            print('---------------------------------------------------------------')
        except Exception as e:
            # break
            print(e)

async def main(loop):

    exchange = ccxtpro.okx({'verbose': False,
                                 'enableRateLimit': False,
                                 'apiKey': '269427xxxxxxxxxxx3883504aa1d',
                                 'secret': '1F4228EExxxxxxxxxxxxxD180828',
                                 'password': 'Pay0Schield',
                                 'newUpdates': True,
                                 'headers': {"x-simulated-trading": 1},
                                 'posSide': 'long',
                                 'options': {
                                     'tradesLimit': 1000,
                                     'OHLCVLimit': 1000,
                                     'ordersLimit': 1000,
                                 },
                                 })
    exchange.set_sandbox_mode(True)
    await exchange.load_markets()  # carica tutte le info relative al mercato. Γ¨ possibile stamparle
    await exchange.sleep(3000)
    print("okex_test loaded")

    symbol = 'BTC/USDT:USDT'
    amount = 1
    loops = [
        watch_orders_loop(exchange, symbol),
        # watch_balance_loop(exchange),
        place_delayed_order(exchange, symbol, amount)
    ]
    await gather(*loops)
    await exchange.close()

loop = get_event_loop()
loop.run_until_complete(main(loop))

The code does not have any await which hands back control to the loop. If you run it a few times and you will see that when 2 messages are received from the exchange at the same mls watch_orders returns a wrong output. I think the problem is not related to the exchange ( I tested it on Bitmex and OKX) even if it is easier to see it on OKX which returns 2 messages for a market orders . I detailed the problem in 12405

JakubMartinovicHusar commented 3 months ago

Hi has this issue been solved somehow?

pdivos commented 3 months ago

not sure

On Sat, 13 Jul 2024, 18:08 JakubMartinovicHusar, @.***> wrote:

Hi has this issue been solved somehow?

β€” Reply to this email directly, view it on GitHub https://github.com/ccxt/ccxt/issues/10754#issuecomment-2226998526, or unsubscribe https://github.com/notifications/unsubscribe-auth/AB3PXBY5NCVXUBATVPVPD4DZMFNHXAVCNFSM6AAAAABK2NOQ6WVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDEMRWHE4TQNJSGY . You are receiving this because you were mentioned.Message ID: @.***>

bryanchen463 commented 2 months ago

I meet this situation too